GroupMetadataManager分析
GroupMetadataManager是负责管理GroupMetadata和对应的offset信息的组件。底层使用内部的Offset Topic,以消息的形式存储消费者组的GroupMetadata信息以及消费的每一个分区的offset,如下图所示:
为了提升查询效率,GroupMetadataManager还会将消费者组的Group
Metadata信息和offset信息在内存中维护一分相同的副本,并进行同步的修改
组建依赖图:
一 核心字段
groupMetadataCache: Pool[String, GroupMetadata] 维护一个消费者组和其对应GroupMetadata的映射关系
loadingPartitions: Set[Int] 记录正在被加载的Offset Topic的分区的id
ownedPartitions: Set[Int] 记录已经加载的Offset Topic的分区的id
groupMetadataTopicPartitionCount:记录Offsets Topic的分区数量,该字段首先从zk获取topic '__consumer_offsets'的分配的分区,如果有则返回zookeeper返回的分区数量,否则返回默认值50
replicaManager: ReplicaManager Offset Topic和普通topic一样,也会在zookeeper记录相关信息;也有leader副本、follower副本和AR副本集,也会出现leader副本迁移等情况,所以也是有ReplicaManager管理的
二groupMetadataCache管理
我们知道, offset topic 会保存GroupMetadata 和 消费分区的offset消息,但是这个消息应该落在offset topic哪一个分区呢?这是由partitionFor方法在offset topic中选择合适的分区:
defpartitionFor(groupId: String): Int = Utils
.abs(groupId.hashCode) % groupMetadataTopicPart
itionCount
所以相同的消费者组的GroupMetadata和 提交的offset消息肯定都在同一个offset topic 分区中。但是这两类消息的key是不同的:
# groupMetadataKey 用于创建记录GroupMetadata的消息的key,仅仅是由一个groupId字段组成的
def groupMetadataKey(group: String): Array[Byte] = {
val key = new Struct(CURRENT_GROUP_KEY_SCHEMA)
key.set(GROUP_KEY_GROUP_FIELD, group)
val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf)
byteBuffer.putShort(CURRENT_GROUP_KEY_SCHEMA_VERSION)
key.writeTo(byteBuffer)
byteBuffer.array()
}
# offsetCommitKey 该方法用于创建记录提交的offset消息的key,由groupId,topic名称和partitionId组成
private def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata): Array[Byte] = {
// generate commit value with schema version 1
val value = new Struct(CURRENT_OFFSET_VALUE_SCHEMA)
value.set(OFFSET_VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset)
value.set(OFFSET_VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata)
value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp)
value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp)
val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
byteBuffer.putShort(CURRENT_OFFSET_VALUE_SCHEMA_VERSION)
value.writeTo(byteBuffer)
byteBuffer.array()
}
GroupMetadataManager还提供了对groupMetadataCache集合的管理方法,getGroup()、addGroup(),putOffset()和getOffset()
def addGroup(group: GroupMetadata): GroupMetadata = {
val currentGroup = groupMetadataCache.putIfNotExists(group.groupId, group)
if (currentGroup != null) {
currentGroup
} else {
group
}
}
def getGroup(groupId: String): Option[GroupMetadata] = {
Option(groupMetadataCache.get(groupId))
}
三 查找GroupCoordinator
消费者在和GroupCoordinator交互之前,首先会发送一个请求到集群中一个负载较小的broker,这个请求时GroupCoordinatorRequest,目的是查询它所在消费者组对应的GroupCoordinator的网络位置。之后,消费者会连接到该GroupCoordinator,发送剩余的JoinGroupRequest,SyncGroupRequest或者其他请求。
然后KafkaApis中的handleGroupCoordinatorRequest方法负责处理该请求
def handleGroupCoordinatorRequest(request: RequestChannel.Request) {
// 将请求转换成GroupCoordinatorRequest
val groupCoordinatorRequest = request.body.asInstanceOf[GroupCoordinatorRequest]
val responseHeader = new ResponseHeader(request.header.correlationId)
if (!authorize(request.session, Describe, new Resource(Group, groupCoordinatorRequest.groupId))) {
val responseBody = new GroupCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED.code, Node.noNode)
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
} else {
// 根据groupId确定保存消费者组信息的offsets topic 分区
val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId)
// 根据MetadataCache查找Offsets Topic,如果没有找到则创建
val offsetsTopicMetadata = getOrCreateGroupMetadataTopic(request.securityProtocol)
// 如果有错误,立即返回响应
val responseBody = if (offsetsTopicMetadata.error != Errors.NONE) {
new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode)
} else {
// 找到根据当前消费组应该所所在的分区leader所在的Node
val coordinatorEndpoint = offsetsTopicMetadata.partitionMetadata().asScala
.find(_.partition == partition)
.map(_.leader())
// 创建GroupCoordinatorResponse响应
coordinatorEndpoint match {
case Some(endpoint) if !endpoint.isEmpty =>
new GroupCoordinatorResponse(Errors.NONE.code, endpoint)
case _ =>
new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode)
}
}
trace("Sending consumer metadata %s for correlation id %d to client %s."
.format(responseBody, request.header.correlationId, request.header.clientId))
// 将响应加入requestChannel,等待被发送
requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
}
}
四 GroupCoordinator迁移
默认情况下,offsets topic有50个分区,3个副本。当某一个分区的leader副本broker出现故障时会发生迁移,消费者组则由新leader副本所在broker上运行的GroupCoordinator负责管理。如何迁移的呢?
KafkaApis# handleLeaderAndIsrRequest方法中处理完LeaderAndIsr
Request之后,会回调onLeadershipChange方法完成迁移操作:
def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
val correlationId = request.header.correlationId
// 将请求转换成LeaderAndIsrRequest
val leaderAndIsrRequest = request.body.asInstanceOf[LeaderAndIsrRequest]
try {
// 回调函数
def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) {
// 对于每一个新的leader或者follower,调用GroupCoordinator处理消费者组的迁移,这个回调是发生在副本状态改变时,以确保leader改变顺序
// 遍历更新的partition leader
updatedLeaders.foreach { partition =>
// 如果是offsets topic
if (partition.topic == Topic.GroupMetadataTopicName)
// 调用GroupCoordinator#handleGroupImmigration
coordinator.handleGroupImmigration(partition.partitionId)
}
// 遍历更新的partition followers
updatedFollowers.foreach { partition =>
// 如果是offsets topic
if (partition.topic == Topic.GroupMetadataTopicName)
// 调用GroupCoordinator#handleGroupImmigration
coordinator.handleGroupEmigration(partition.partitionId)
}
}
val responseHeader = new ResponseHeader(correlationId)
val leaderAndIsrResponse =
if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange)
new LeaderAndIsrResponse(result.errorCode, result.responseMap.mapValues(new JShort(_)).asJava)
} else {
// 调用ReplicaManager#becomeLeaderOrFollower
val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, new JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap
// 返回LeaderAndIsrResponse
new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code, result.asJava)
}
// 添加响应到requestChannel
requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, responseHeader, leaderAndIsrResponse)))
} catch {
case e: KafkaStorageException =>
fatal("Disk error during leadership change.", e)
Runtime.getRuntime.halt(1)
}
}
def handleGroupImmigration(offsetTopicPartitionId: Int) {
groupManager.loadGroupsForPartition(offsetTopicPartitionId, onGroupLoaded)
}
当broker成为offsets topic分区的leader副本的时候,会回调Group
Coorinator. handleGroupImmigration方法进行进行加载,它直接委托给GroupMetadataManager#loadGroupsForPartition方法
第一:检测当前的offsets topic分区是否正在加载,如果是,则结束本次加载动作,否则将其将入loadingPartitions集合,表示该分区正在加载
第二:通过ReplicaManager组件得到此分区对应的log对象
第三:从Log对象的第一个log segments开始加载,加载过程可能会碰到标记为删除的的消息,需要区别处理:
# 如果是offset消息且是删除标记的,则添加OffsetAndMeta到删除列表,否则如果是offset 且没有删除标记,解析成OffsetMetadata添加添加列表
# 如果是group metadata消息且是删除标记的,则添加到对应的删除列表;否则如果是没有删除标记的group metadata怎添加到添加列表
第四:将需要加载的GroupMetadata信息加载到groupMetadataCache集合,以及offset添加到GroupMetdata的offset列表,并且检测需删除的GroupMetadata信息是否还在groupMetadataCache
第五:将当前offset topic 分区从loadingPartitions移到ownedPartitions
def loadGroupsForPartition(offsetsPartition: Int, onGroupLoaded: GroupMetadata => Unit) {
val topicPartition = TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)
// 以KafkaScheduler的形式调用loadGroupsAndOffsets方法
scheduler.schedule(topicPartition.toString, loadGroupsAndOffsets)
def loadGroupsAndOffsets() {
info("Loading offsets and group metadata from " + topicPartition)
inLock(partitionLock) {
// 检测当前的offsets topic 分区是否处于loadingPartitions集合,如果有直接返回,没有则添加到该集合
if (loadingPartitions.contains(offsetsPartition)) {
info("Offset load from %s already in progress.".format(topicPartition))
return
} else {
loadingPartitions.add(offsetsPartition)
}
}
val startMs = time.milliseconds()
try {
// 通过ReplicaManager得到该分区的Log对象
replicaManager.logManager.getLog(topicPartition) match {
case Some(log) =>
// 获取log中第一个logsegments
var currOffset = log.logSegments.head.baseOffset
// 创建缓冲区
val buffer = ByteBuffer.allocate(config.loadBufferSize)
// loop breaks if leader changes at any time during the load, since getHighWatermark is -1
val loadedOffsets = mutable.Map[GroupTopicPartition, OffsetAndMetadata]()
val removedOffsets = mutable.Set[GroupTopicPartition]()
val loadedGroups = mutable.Map[String, GroupMetadata]()
val removedGroups = mutable.Set[String]()
// 读取log的结束位置high watermark
while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) {
buffer.clear()// 首先清空缓存
// 返回FileMessageSet对象
val messages = log.read(currOffset, config.loadBufferSize, minOneMessage = true).messageSet.asInstanceOf[FileMessageSet]
// 读进缓存,构建ByteBufferMessageSet
messages.readInto(buffer, 0)
val messageSet = new ByteBufferMessageSet(buffer)
// 迭代messageSet
messageSet.foreach { msgAndOffset =>
require(msgAndOffset.message.key != null, "Offset entry key should not be null")
val baseKey = GroupMetadataManager.readMessageKey(msgAndOffset.message.key)
// 如果获取到的key是offsetkey
if (baseKey.isInstanceOf[OffsetKey]) {
// 读取记录的offset信息
val key = baseKey.key.asInstanceOf[GroupTopicPartition]
// 若是删除标记,则删除记录offset信息
if (msgAndOffset.message.payload == null) {
loadedOffsets.remove(key)
removedOffsets.add(key)
} else {
// 如不是删除标记,则解析value
val value = GroupMetadataManager.readOffsetMessageValue(msgAndOffset.message.payload)
// 往loadedOffsets添加OffsetAndMetadata
loadedOffsets.put(key, value)
removedOffsets.remove(key)
}
} else {
// 获取到的是GroupMetadata key,则加载GroupMetadata信息
val groupId = baseKey.key.asInstanceOf[String]
val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, msgAndOffset.message.payload)
// 根据是否为删除标记消息进行处理
if (groupMetadata != null) {
trace(s"Loaded group metadata for group ${groupMetadata.groupId} with generation ${groupMetadata.generationId}")
removedGroups.remove(groupId)
loadedGroups.put(groupId, groupMetadata)
} else {
loadedGroups.remove(groupId)
removedGroups.add(groupId)
}
}
currOffset = msgAndOffset.nextOffset
}
}
// 对于loadedOffsets按照group进行分组,一个是有组的一个是没有组的
val (groupOffsets, noGroupOffsets) = loadedOffsets
.groupBy(_._1.group)
.mapValues(_.map{ case (groupTopicPartition, offsetAndMetadata) => (groupTopicPartition.topicPartition, offsetAndMetadata)})
.partition(value => loadedGroups.contains(value._1))
// 将需要加载的GroupMetadata信息加载到groupMetadataCache集合
loadedGroups.values.foreach { group =>
val offsets = groupOffsets.getOrElse(group.groupId, Map.empty)
loadGroup(group, offsets)
onGroupLoaded(group)
}
noGroupOffsets.foreach { case (groupId, offsets) =>
val group = new GroupMetadata(groupId)
loadGroup(group, offsets)
onGroupLoaded(group)
}
// 检测需删除的GroupMetadata信息是否还在groupMetadataCache
removedGroups.foreach { groupId =>
if (groupMetadataCache.contains(groupId))
throw new IllegalStateException(s"Unexpected unload of active group ${groupId} while " +
s"loading partition ${topicPartition}")
}
if (!shuttingDown.get())
info("Finished loading offsets from %s in %d milliseconds."
.format(topicPartition, time.milliseconds() - startMs))
case None =>
warn("No log found for " + topicPartition)
}
}
catch {
case t: Throwable =>
error("Error in loading offsets from " + topicPartition, t)
}
finally {
// 将当前offset topic 分区从loadingPartitions移到ownedPartitions
inLock(partitionLock) {
ownedPartitions.add(offsetsPartition)
loadingPartitions.remove(offsetsPartition)
}
}
}
}
当broker成为offset topic分区的follower副本时,会回调coordinator的handleGroupEmigration方法,他直接委托给coordinator的方法:
removeGroupsForPartition,需要进行一些清理工作
第一:从ownedPartitions集合中将对应的Offset Topic分区删除,表示当前GroupCoordinator不再管理其对应消费者组
第二:遍历groupMetadataCache的GroupMetadata元数据
第三:将该分区的对应的GroupMetadata全部清除
def removeGroupsForPartition(offsetsPartition: Int, onGroupUnloaded: GroupMetadata => Unit) {
val topicPartition = TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)
scheduler.schedule(topicPartition.toString, removeGroupsAndOffsets)
def removeGroupsAndOffsets() {
var numOffsetsRemoved = 0 // 删除的offset数量
var numGroupsRemoved = 0 // 删除的group数量
inLock(partitionLock) {
// 从ownedPartitions集合中将对应的Offset Topic分区删除,表示当前GroupCoordinator不再管理其对应消费者组
ownedPartitions.remove(offsetsPartition)
// 遍历groupMetadataCache的GroupMetadata元数据
for (group <- groupMetadataCache.values) {
// 将该分区的对应的GroupMetadata全部清除
if (partitionFor(group.groupId) == offsetsPartition) {
onGroupUnloaded(group)
groupMetadataCache.remove(group.groupId, group)
numGroupsRemoved += 1
numOffsetsRemoved += group.numOffsets
}
}
}
if (numOffsetsRemoved > 0) info("Removed %d cached offsets for %s on follower transition."
.format(numOffsetsRemoved, TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)))
if (numGroupsRemoved > 0) info("Removed %d cached groups for %s on follower transition."
.format(numGroupsRemoved, TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition)))
}
}
五 SyncGroupRequest请求处理
消费者组中Leader消费者通过发送SyncGroupRequest请求,将分区分配的结果发送给GroupCoordinator, GroupCoordinator会根据此分配结果形成SyncGroupResponse返回给所有消费者。每一个消费者收到该响应后进行解析,即可得知分配结果。
另外GroupCoordinator还会根据这个分配结果,将其形成消息追加到对应的offsets topic分区中,GroupCoordinator#prepareStoreGroup就是干这个事情的:
def prepareStoreGroup(group: GroupMetadata, groupAssignment: Map[String, Array[Byte]],
responseCallback: Errors => Unit): Option[DelayedStore] = {
// 获取offsets topic 分区使用消息的格式
val magicValueAndTimestampOpt = getMessageFormatVersionAndTimestamp(partitionFor(group.groupId))
magicValueAndTimestampOpt match {
case Some((magicValue, timestamp)) =>
val groupMetadataValueVersion = {
if (interBrokerProtocolVersion < KAFKA_0_10_1_IV0)
0.toShort
else
GroupMetadataManager.CURRENT_GROUP_VALUE_SCHEMA_VERSION
}
// 创建记录GroupMetadata信息的消息,消息的value是分区分配的结果
val message = new Message(
key = GroupMetadataManager.groupMetadataKey(group.groupId),
bytes = GroupMetadataManager.groupMetadataValue(group, groupAssignment, version = groupMetadataValueVersion),
timestamp = timestamp,
magicValue = magicValue)
// 获取消费者组对应的offset topic分区
val groupMetadataPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId))
// 构建offset topic分区和消息集合的对应关系
val groupMetadataMessageSet = Map(groupMetadataPartition ->
new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, message))
val generationId = group.generationId
// 在上述消息成功追加到offset topic分区之后被调用
def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
// the append response should only contain the topics partition
if (responseStatus.size != 1 || !responseStatus.contains(groupMetadataPartition))
throw new IllegalStateException("Append status %s should only have one partition %s"
.format(responseStatus, groupMetadataPartition))
// construct the error status in the propagated assignment response
// in the cache
val status = responseStatus(groupMetadataPartition)
val statusError = Errors.forCode(status.errorCode)
val responseError = if (statusError == Errors.NONE) {
Errors.NONE
} else {
debug(s"Metadata from group ${group.groupId} with generation $generationId failed when appending to log " +
s"due to ${statusError.exceptionName}")
// transform the log append error code to the corresponding the commit status error code
statusError match {
case Errors.UNKNOWN_TOPIC_OR_PARTITION
| Errors.NOT_ENOUGH_REPLICAS
| Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
Errors.GROUP_COORDINATOR_NOT_AVAILABLE
case Errors.NOT_LEADER_FOR_PARTITION =>
Errors.NOT_COORDINATOR_FOR_GROUP
case Errors.REQUEST_TIMED_OUT =>
Errors.REBALANCE_IN_PROGRESS
case Errors.MESSAGE_TOO_LARGE
| Errors.RECORD_LIST_TOO_LARGE
| Errors.INVALID_FETCH_SIZE =>
error(s"Appending metadata message for group ${group.groupId} generation $generationId failed due to " +
s"${statusError.exceptionName}, returning UNKNOWN error code to the client")
Errors.UNKNOWN
case other =>
error(s"Appending metadata message for group ${group.groupId} generation $generationId failed " +
s"due to unexpected error: ${statusError.exceptionName}")
other
}
}
responseCallback(responseError)
}
// 创建DelayedStore对象
Some(DelayedStore(groupMetadataMessageSet, putCacheCallback))
case None =>
responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP)
None
}
}
prepareStoreGroup方法并没有追加消息的代码,仅仅是创建了DelayedStore对象,封装了消息和回调函数。真正实现追加的消息操作是GroupMetadataManager.store()方法,其中会调用ReplicaManager的appendMessage方法
def store(delayedStore: DelayedStore) {
// call replica manager to append the group message
replicaManager.appendMessages(
config.offsetCommitTimeoutMs.toLong,
config.offsetCommitRequiredAcks,
true, // allow appending to internal offset topic
delayedStore.messageSet,
delayedStore.callback)
}
当requiredAcks参数为-1的时候,需要创建DelayedProduce并等待相应的条件满足后才执行完成并调用回调函数。这里指的回调就是之前的prepareStoreGroup里面定义的putCacheCallback方法,他的参数是追加消息的结果
六 OffsetCommitRequest请求处理
消费者在正常消费消息的时候,或者rebalance的之前,都会进行offset的commit操作,主要就是将消费者消费的每一个分区的offset封装成消息,追加到offset topic分区中
prepareOffsetStore方法负责封装offset消息到DelayedStore中
store方法负责向offsets topic分区中追加消息
def prepareStoreOffsets(group: GroupMetadata, consumerId: String, generationId: Int,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicPartition, Short] => Unit): Option[DelayedStore] = {
// 首先过滤掉offset元数据超过限制大小的分区
val filteredOffsetMetadata = offsetMetadata.filter { case (topicPartition, offsetAndMetadata) =>
validateOffsetMetadataLength(offsetAndMetadata.metadata)
}
// 获取offsets topic 分区使用消息的格式
val magicValueAndTimestampOpt = getMessageFormatVersionAndTimestamp(partitionFor(group.groupId))
magicValueAndTimestampOpt match {
case Some((magicValue, timestamp)) =>
// 创建offset信息的消息
val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
new Message(
key = GroupMetadataManager.offsetCommitKey(group.groupId, topicAndPartition.topic, topicAndPartition.partition),
bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata),
timestamp = timestamp,
magicValue = magicValue
)
}.toSeq
// 创建消费者组对应offsets topic的分区
val offsetTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId))
// 获取offsets topic分区和message set的对应关系
val offsetsAndMetadataMessageSet = Map(offsetTopicPartition ->
new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))
// 在消息成功追加到日志文件后,回调这个方法
def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
// the append response should only contain the topics partition
if (responseStatus.size != 1 || ! responseStatus.contains(offsetTopicPartition))
throw new IllegalStateException("Append status %s should only have one partition %s"
.format(responseStatus, offsetTopicPartition))
val status = responseStatus(offsetTopicPartition)
val statusError = Errors.forCode(status.errorCode)
val responseCode =
group synchronized {
if (statusError == Errors.NONE) {
if (!group.is(Dead)) {
filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>
group.completePendingOffsetWrite(topicAndPartition, offsetAndMetadata)
}
}
Errors.NONE.code
} else {
if (!group.is(Dead)) {
filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>
group.failPendingOffsetWrite(topicAndPartition, offsetAndMetadata)
}
}
debug(s"Offset commit $filteredOffsetMetadata from group ${group.groupId}, consumer $consumerId " +
s"with generation $generationId failed when appending to log due to ${statusError.exceptionName}")
// transform the log append error code to the corresponding the commit status error code
val responseError = statusError match {
case Errors.UNKNOWN_TOPIC_OR_PARTITION
| Errors.NOT_ENOUGH_REPLICAS
| Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
Errors.GROUP_COORDINATOR_NOT_AVAILABLE
case Errors.NOT_LEADER_FOR_PARTITION =>
Errors.NOT_COORDINATOR_FOR_GROUP
case Errors.MESSAGE_TOO_LARGE
| Errors.RECORD_LIST_TOO_LARGE
| Errors.INVALID_FETCH_SIZE =>
Errors.INVALID_COMMIT_OFFSET_SIZE
case other => other
}
responseError.code
}
}
// compute the final error codes for the commit response
val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
(topicAndPartition, responseCode)
else
(topicAndPartition, Errors.OFFSET_METADATA_TOO_LARGE.code)
}
// finally trigger the callback logic passed from the API layer
responseCallback(commitStatus)
}
group synchronized {
group.prepareOffsetCommit(offsetMetadata)
}
// 返回DelayedStore对象
Some(DelayedStore(offsetsAndMetadataMessageSet, putCacheCallback))
case None =>
val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
(topicAndPartition, Errors.NOT_COORDINATOR_FOR_GROUP.code)
}
responseCallback(commitStatus)
None
}
}
七 OffsetFetchRequest
当消费者组宕机后重新上线,可以向GroupCoordinator发送请求OffsetFetchRequest获取最近一次提交的offset,并从此位置开始进行消费。GroupCoordinator在收到OffsetFetchRequest后会交给Group
MetatdataManager进行管理,它会根据请求中的groupId查找对应的OffsetAndMetadata对象,并返回给消费者
def handleFetchOffsets(groupId: String,
partitions: Seq[TopicPartition]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = {
if (!isActive.get) {
partitions.map { case topicPartition =>
(topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))}.toMap
} else if (!isCoordinatorForGroup(groupId)) {// 检测GroupCoordinator是否是消费者组的管理者
debug("Could not fetch offsets for group %s (not group coordinator).".format(groupId))
partitions.map { case topicPartition =>
(topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NOT_COORDINATOR_FOR_GROUP.code))}.toMap
} else if (isCoordinatorLoadingInProgress(groupId)) {// 检测GroupMetadata是否已经完成
partitions.map { case topicPartition =>
(topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.GROUP_LOAD_IN_PROGRESS.code))}.toMap
} else {
// 交给GroupMetadataManager处理
groupManager.getOffsets(groupId, partitions)
}
}
def getOffsets(groupId: String, topicPartitions: Seq[TopicPartition]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = {
trace("Getting offsets %s for group %s.".format(topicPartitions, groupId))
// 根据groupId获取Group Metadata元素
val group = groupMetadataCache.get(groupId)
if (group == null) {
topicPartitions.map { topicPartition =>
(topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE.code))
}.toMap
} else {
group synchronized {
if (group.is(Dead)) {
topicPartitions.map { topicPartition =>
(topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE.code))
}.toMap
} else {
// 请求的分区为空,则表示请求全部分区对应最近提交的offset
if (topicPartitions.isEmpty) {
group.allOffsets.map { case (topicPartition, offsetAndMetadata) =>
(topicPartition, new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE.code))
}
} else {
// 查找指定分区集合最近提交的offset,即在offset topic中查找指定topic and partition的offset数据
topicPartitions.map { topicPartition =>
group.offset(topicPartition) match {
case None => (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE.code))
case Some(offsetAndMetadata) =>
(topicPartition, new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE.code))
}
}.toMap
}
}
}
}
}