跟我学Kafka源码Producer分析
本章主要讲解分析Kafka的Producer的业务逻辑,分发逻辑和负载逻辑都在Producer中维护。
一、Kafka的总体结构图
(图片转发)
二、Producer源码分析
- class Producer[K,V](val config: ProducerConfig,
- private val eventHandler: EventHandler[K,V]) // only for unit testing
- extends Logging {
- private val hasShutdown = new AtomicBoolean(false)
- //异步发送队列
- private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages)
- private var sync: Boolean = true
- //异步处理线程
- private var producerSendThread: ProducerSendThread[K,V] = null
- private val lock = new Object()
- //根据从配置文件中载入的信息封装成ProducerConfig类
- //判断发送类型是同步,还是异步,如果是异步则启动一个异步处理线程
- config.producerType match {
- case "sync" =>
- case "async" =>
- sync = false
- producerSendThread =
- new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,
- queue,
- ventHandler,
- config.queueBufferingMaxMs,
- config.batchNumMessages,
- config.clientId)
- producerSendThread.start()
- }
- private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)
- KafkaMetricsReporter.startReporters(config.props)
- AppInfo.registerInfo()
- def this(config: ProducerConfig) =
- this(config,
- new DefaultEventHandler[K,V](config,
- Utils.createObject[Partitioner](config.partitionerClass, config.props),
- Utils.createObject[Encoder[V]](config.serializerClass, config.props),
- Utils.createObject[Encoder[K]](config.keySerializerClass, config.props),
- new ProducerPool(config)))
- /**
- * Sends the data, partitioned by key to the topic using either the
- * synchronous or the asynchronous producer
- * @param messages the producer data object that encapsulates the topic, key and message data
- */
- def send(messages: KeyedMessage[K,V]*) {
- lock synchronized {
- if (hasShutdown.get)
- throw new ProducerClosedException
- recordStats(messages)
- sync match {
- case true => eventHandler.handle(messages)
- case false => asyncSend(messages)
- }
- }
- }
- private def recordStats(messages: Seq[KeyedMessage[K,V]]) {
- for (message <- messages) {
- producerTopicStats.getProducerTopicStats(message.topic).messageRate.mark()
- producerTopicStats.getProducerAllTopicsStats.messageRate.mark()
- }
- }
- //异步发送流程
- //将messages异步放到queue里面,等待异步线程获取
- private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {
- for (message <- messages) {
- val added = config.queueEnqueueTimeoutMs match {
- case 0 =>
- queue.offer(message)
- case _ =>
- try {
- config.queueEnqueueTimeoutMs < 0 match {
- case true =>
- queue.put(message)
- true
- case _ =>
- queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS)
- }
- }
- catch {
- case e: InterruptedException =>
- false
- }
- }
- if(!added) {
- producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark()
- producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark()
- throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString)
- }else {
- trace("Added to send queue an event: " + message.toString)
- trace("Remaining queue size: " + queue.remainingCapacity)
- }
- }
- }
- /**
- * Close API to close the producer pool connections to all Kafka brokers. Also closes
- * the zookeeper client connection if one exists
- */
- def close() = {
- lock synchronized {
- val canShutdown = hasShutdown.compareAndSet(false, true)
- if(canShutdown) {
- info("Shutting down producer")
- val startTime = System.nanoTime()
- KafkaMetricsGroup.removeAllProducerMetrics(config.clientId)
- if (producerSendThread != null)
- producerSendThread.shutdown
- eventHandler.close
- info("Producer shutdown completed in " + (System.nanoTime() - startTime) / 1000000 + " ms")
- }
- }
- }
- }
说明:
上面这段代码很多方法我加了中文注释,首先要初始化一系列参数,比如异步消息队列queue,是否是同步sync,异步同步数据线程ProducerSendThread,其实重点就是ProducerSendThread这个类,从队列中取出数据并让kafka.producer.EventHandler
将消息发送到broker。这个代码量不多,但是包含了很多内容,通过config.producerType判断是同步发送还是异步发送,每一种发送方式都有相关类支持,下面我们将重点介绍这二种类型。
我们发送消息的类是如下格式:
- case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V)
说明:
当使用三个参数的构造函数时, partKey会等于key。partKey是用来做partition的,但它不会最当成消息的一部分被存储。
1、同步发送
- private def dispatchSerializedData(messages: Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = {
- //分区并且整理方法
- val partitionedDataOpt = partitionAndCollate(messages)
- partitionedDataOpt match {
- case Some(partitionedData) =>
- val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]]
- try {
- for ((brokerid, messagesPerBrokerMap) <- partitionedData) {
- if (logger.isTraceEnabled)
- messagesPerBrokerMap.foreach(partitionAndEvent =>
- trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
- val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap)
- val failedTopicPartitions = send(brokerid, messageSetPerBroker)
- failedTopicPartitions.foreach(topicPartition => {
- messagesPerBrokerMap.get(topicPartition) match {
- case Some(data) => failedProduceRequests.appendAll(data)
- case None => // nothing
- }
- })
- }
- } catch {
- case t: Throwable => error("Failed to send messages", t)
- }
- failedProduceRequests
- case None => // all produce requests failed
- messages
- }
- }
说明:
这个方法主要说了二个重要信息,一个是partitionAndCollate,这个方法主要获取topic、partition和broker的,这 个方法很重要,下面会进行分析。另一个重要的方法是groupMessageToSet是要对所发送数据进行压缩设置,如果没有设置压缩,就所有topic对应的消息集都不压缩。如果设置了压缩,并且没有设置对个别topic启用压缩,就对所有topic都使用压缩;否则就只对设置了压缩的topic压缩。
在这个gruopMessageToSet中,并不有具体的压缩逻辑。而是返回一个ByteBufferMessageSet对象。
在我们了解的partitionAndCollate方法之前先来了解一下如下类结构:
- TopicMetadata -->PartitionMetadata
- case class PartitionMetadata(partitionId: Int,
- val leader: Option[Broker],
- replicas: Seq[Broker],
- isr: Seq[Broker] = Seq.empty,
- errorCode: Short = ErrorMapping.NoError)
- def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]] = {
- val ret = new HashMap[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
- try {
- for (message <- messages) {
- //获取Topic的partition列表
- val topicPartitionsList = getPartitionListForTopic(message)
- //根据hash算法得到消息应该发往哪个分区(partition)
- val partitionIndex = getPartition(message.topic, message.partitionKey, topicPartitionsList)
- val brokerPartition = topicPartitionsList(partitionIndex)
- // postpone the failure until the send operation, so that requests for other brokers are handled correctly
- val leaderBrokerId = brokerPartition.leaderBrokerIdOpt.getOrElse(-1)
- var dataPerBroker: HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]] = null
- ret.get(leaderBrokerId) match {
- case Some(element) =>
- dataPerBroker = element.asInstanceOf[HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
- case None =>
- dataPerBroker = new HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]
- ret.put(leaderBrokerId, dataPerBroker)
- }
- val topicAndPartition = TopicAndPartition(message.topic, brokerPartition.partitionId)
- var dataPerTopicPartition: ArrayBuffer[KeyedMessage[K,Message]] = null
- dataPerBroker.get(topicAndPartition) match {
- case Some(element) =>
- dataPerTopicPartition = element.asInstanceOf[ArrayBuffer[KeyedMessage[K,Message]]]
- case None =>
- dataPerTopicPartition = new ArrayBuffer[KeyedMessage[K,Message]]
- dataPerBroker.put(topicAndPartition, dataPerTopicPartition)
- }
- dataPerTopicPartition.append(message)
- }
- Some(ret)
- }catch { // Swallow recoverable exceptions and return None so that they can be retried.
- case ute: UnknownTopicOrPartitionException => warn("Failed to collate messages by topic,partition due to: " + ute.getMessage); None
- case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to: " + lnae.getMessage); None
- case oe: Throwable => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None
- }
- }
说明:
调用partitionAndCollate根据topics的messages进行分组操作,messages分配给 dataPerBroker(多个不同的Broker的Map),根据不同Broker调用不同的SyncProducer.send批量发送消息数 据,SyncProducer包装了nio网络操作信息。
partitionAndCollate这个方法的主要作用是:获取所有partitions的leader所在leaderBrokerId(就是在该 partiionid的leader分布在哪个broker上),创建一个HashMap>>>,把messages按照 brokerId分组组装数据,然后为SyncProducer分别发送消息作准备工作,在确定一个消息应该发给哪个broker之前,要先确定它发给哪个partition,这样才能根据paritionId去找到对应的leader所在的broker。
我们进入getPartitionListForTopic这个方法看一下,这个方法主要是干什么的。
- private def getPartitionListForTopic(m: KeyedMessage[K,Message]): Seq[PartitionAndLeader] = {
- val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(m.topic, correlationId.getAndIncrement)
- debug("Broker partitions registered for topic: %s are %s"
- .format(m.topic, topicPartitionsList.map(p => p.partitionId).mkString(",")))
- val totalNumPartitions = topicPartitionsList.length
- if(totalNumPartitions == 0)
- throw new NoBrokersForPartitionException("Partition key = " + m.key)
- topicPartitionsList
- }
- def getBrokerPartitionInfo(topic: String, correlationId: Int): Seq[PartitionAndLeader] = {
- debug("Getting broker partition info for topic %s".format(topic))
- // check if the cache has metadata for this topic
- val topicMetadata = topicPartitionInfo.get(topic)
- val metadata: TopicMetadata =
- topicMetadata match {
- case Some(m) => m
- case None =>
- // refresh the topic metadata cache
- updateInfo(Set(topic), correlationId)
- val topicMetadata = topicPartitionInfo.get(topic)
- topicMetadata match {
- case Some(m) => m
- case None => throw new KafkaException("Failed to fetch topic metadata for topic: " + topic)
- }
- }
- val partitionMetadata = metadata.partitionsMetadata
- if(partitionMetadata.size == 0) {
- if(metadata.errorCode != ErrorMapping.NoError) {
- throw new KafkaException(ErrorMapping.exceptionFor(metadata.errorCode))
- } else {
- throw new KafkaException("Topic metadata %s has empty partition metadata and no error code".format(metadata))
- }
- }
- partitionMetadata.map { m =>
- m.leader match {
- case Some(leader) =>
- debug("Partition [%s,%d] has leader %d".format(topic, m.partitionId, leader.id))
- new PartitionAndLeader(topic, m.partitionId, Some(leader.id))
- case None =>
- debug("Partition [%s,%d] does not have a leader yet".format(topic, m.partitionId))
- new PartitionAndLeader(topic, m.partitionId, None)
- }
- }.sortWith((s, t) => s.partitionId < t.partitionId)
- }
说明:
这个方法很重要,首先看一下topicPartitionInfo这个对象,这个一个HashMap结构:HashMap[String, TopicMetadata] key是topic名称,value是topic元数据。
通过这个hash结构获取topic元数据,做match匹配,如果有数据(Some(m))则赋值给metadata,如果没有,也就是None的时候,则通过nio远程连到服务端更新topic信息。
请看如下流程图:
接下来看updateInfo源码如下:
- def updateInfo(topics: Set[String], correlationId: Int) {
- var topicsMetadata: Seq[TopicMetadata] = Nil
- //将配置参数发送到服务端请求最新元数据
- val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId)
- //通过response响应信息解析topic元数据和partition元数据,并且放入缓存
- topicsMetadata = topicMetadataResponse.topicsMetadata
- // throw partition specific exception
- topicsMetadata.foreach(tmd =>{
- trace("Metadata for topic %s is %s".format(tmd.topic, tmd))
- if(tmd.errorCode == ErrorMapping.NoError) {
- topicPartitionInfo.put(tmd.topic, tmd)
- } else
- warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass))
- tmd.partitionsMetadata.foreach(pmd =>{
- if (pmd.errorCode != ErrorMapping.NoError && pmd.errorCode == ErrorMapping.LeaderNotAvailableCode) {
- warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId,
- ErrorMapping.exceptionFor(pmd.errorCode).getClass))
- } // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata
- })
- })
- producerPool.updateProducer(topicsMetadata)
- }
- private def getPartition(topic: String, key: Any, topicPartitionList: Seq[PartitionAndLeader]): Int = {
- val numPartitions = topicPartitionList.size
- if(numPartitions <= 0)
- throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist")
- val partition =
- if(key == null) {
- // If the key is null, we don't really need a partitioner
- // So we look up in the send partition cache for the topic to decide the target partition
- val id = sendPartitionPerTopicCache.get(topic)
- id match {
- case Some(partitionId) =>
- // directly return the partitionId without checking availability of the leader,
- // since we want to postpone the failure until the send operation anyways
- partitionId
- case None =>
- val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
- if (availablePartitions.isEmpty)
- throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
- val index = Utils.abs(Random.nextInt) % availablePartitions.size
- val partitionId = availablePartitions(index).partitionId
- sendPartitionPerTopicCache.put(topic, partitionId)
- partitionId
- }
- } else
- partitioner.partition(key, numPartitions)
- if(partition < 0 || partition >= numPartitions)
- throw new UnknownTopicOrPartitionException("Invalid partition id: " + partition + " for topic " + topic +
- "; Valid values are in the inclusive range of [0, " + (numPartitions-1) + "]")
- trace("Assigning message of topic %s and key %s to a selected partition %d".format(topic, if (key == null) "[none]" else key.toString, partition))
- partition
- }
当 key不为null时,就用传给handler的partitioner的partition方法,根据partKey和numPartitions来确 定这个消息被发给哪个partition。注意这里的numPartition是topicPartitionList.size获取的,有可能会有 parition不存在可用的leader。这样的问题将留给send时解决。实际上发生这种情况时,partitionAndCollate会将这个消 息分派给brokerId为-1的broker。下面的代码就是计算选择分区的算法公式:key.hashCode%numPartitions。
- class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner {
- private val random = new java.util.Random
- def partition(key: Any, numPartitions: Int): Int = {
- Utils.abs(key.hashCode) % numPartitions
- }
- }
- private def send(brokerId: Int, messagesPerTopic: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) = {
- if(brokerId < 0) {
- warn("Failed to send data since partitions %s don't have a leader".format(messagesPerTopic.map(_._1).mkString(",")))
- messagesPerTopic.keys.toSeq
- } else if(messagesPerTopic.size > 0) {
- val currentCorrelationId = correlationId.getAndIncrement
- val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requestRequiredAcks,
- config.requestTimeoutMs, messagesPerTopic)
- var failedTopicPartitions = Seq.empty[TopicAndPartition]
- try {
- val syncProducer = producerPool.getProducer(brokerId)
- debug("Producer sending messages with correlation id %d for topics %s to broker %d on %s:%d"
- .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port))
- val response = syncProducer.send(producerRequest)
- debug("Producer sent messages with correlation id %d for topics %s to broker %d on %s:%d"
- .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port))
- if(response != null) {
- if (response.status.size != producerRequest.data.size)
- throw new KafkaException("Incomplete response (%s) for producer request (%s)".format(response, producerRequest))
- if (logger.isTraceEnabled) {
- val successfullySentData = response.status.filter(_._2.error == ErrorMapping.NoError)
- successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message =>
- trace("Successfully sent message: %s".format(if(message.message.isNull) null else Utils.readString(message.message.payload)))))
- }
- val failedPartitionsAndStatus = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq
- failedTopicPartitions = failedPartitionsAndStatus.map(partitionStatus => partitionStatus._1)
- if(failedTopicPartitions.size > 0) {
- val errorString = failedPartitionsAndStatus
- .sortWith((p1, p2) => p1._1.topic.compareTo(p2._1.topic) < 0 ||
- (p1._1.topic.compareTo(p2._1.topic) == 0 && p1._1.partition < p2._1.partition))
- .map{
- case(topicAndPartition, status) =>
- topicAndPartition.toString + ": " + ErrorMapping.exceptionFor(status.error).getClass.getName
- }.mkString(",")
- warn("Produce request with correlation id %d failed due to %s".format(currentCorrelationId, errorString))
- }
- failedTopicPartitions
- } else {
- Seq.empty[TopicAndPartition]
- }
- } catch {
- case t: Throwable =>
- warn("Failed to send producer request with correlation id %d to broker %d with data for partitions %s"
- .format(currentCorrelationId, brokerId, messagesPerTopic.map(_._1).mkString(",")), t)
- messagesPerTopic.keys.toSeq
- }
- } else {
- List.empty
- }
- }
http://flychao88.iteye.com/blog/2266611