前言

简要分析 Coordinator 处理 consumer group 分区分配、offset 提交、心跳检测等机制,以及自身依赖 offsets topic 实现高可用的原理;主流程如下:


image-20201114205152648


一:准备

1.1 子组件

GroupMetadataManager 负责维护 coordinator 所需的 group 元数据、offset 消费进度等缓存,并负责持久化对应消息,与各元数据的对应关系:

image-20201115164730657

主要字段如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 维护一个 group member 的元数据
class MemberMetadata(val memberId: String, // manager 分配的 uuid
val groupId: String, // 所属的 group.id
val sessionTimeoutMs: Int, // consumer 配置的 group.session.timeout.ms
var supportedProtocols: List[(String, Array[Byte])]) { // 支持的 assign 策略列表
var awaitingJoinCallback: JoinGroupResult => Unit = null // 响应回调,有值表示在等待对应的 Response
var awaitingSyncCallback: (Array[Byte], Short) => Unit = null // 无值表示当前 consumer 并未在等待请求
var latestHeartbeat: Long = -1 // 上次心跳的时间戳
}

// 维护一个 group 的元数据
class GroupMetadata(val groupId: String, val protocolType: String) {
val members = new mutable.HashMap[String, MemberMetadata] // 持有组内所有 consumer
var state: GroupState = Stable // 当前 group 的状态(核心字段)
var generationId = 0 // 递增的分代信息,用于检测过期请求
def add(memberId: String, member: MemberMetadata) {
if (leaderId == null) leaderId = memberId // 首个加群的即 group leader
members.put(memberId, member)
}
}

// 表示一次消费进度
case class OffsetMetadata(offset: Long, /*..*/)

class GroupMetadataManager(replicaManager: ReplicaManager, /*..*/) { // 只会对应 offsets topic 的一个分区
// 保存 offset 提交进度,管理群组成员
val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata] // [group, tp] -> offset
val groupsCache = new Pool[String, GroupMetadata] // group.id -> [memberId -> MemberMetadata, ...]

// 委托 replicaManager 从本地回放 offsets topic 分区日志,回放中及回放完毕的分区 id
val loadingPartitions: Set[Int] = Set()
val ownedPartitions: Set[Int] = Set()

val scheduler = new KafkaScheduler(threads = 1)
scheduler.schedule(name = "delete-expired-consumer-offsets", fun = deleteExpiredOffsets, /*..*/)
}

至此,简要列出了各子组件的主要字段,第二节在分析请求处理时会用到


1.2 Group 状态机

参考源码及注释,GroupMetadata 持有 group 的四种状态,各自处理六种请求的协调逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
trait GroupState

/************** 1. 准备 rebalance **************/
object PreparingRebalance extends GroupState
// HeartbeatRequest // 过期请求,返回 REBALANCE_IN_PROGRESS
// SyncGroupRequest // 过期请求,返回 REBALANCE_IN_PROGRESS
// LeaveGroupRequest // 删除请求退出的 member
// JoinGroupRequest // 收到但暂不响应,继续等旧 member 都上线、或 delayedJoin 超时,才响应
// OffsetCommitRequest // 允许上一分代的 member 继续提交消费进度
// OffsetFetchRequest // 允许

/*************** 2. 执行 rebalance:等待 leader 发来 assgin 结果 ***************/
object AwaitingSync extends GroupState
// HeartbeatRequest // 过期请求,返回 REBALANCE_IN_PROGRESS
// SyncGroupRequest // 收到 follower 的请求但暂不响应,继续等 leader 发来 assign 结果,才响应
// LeaveGroupRequest // 旧 member 下线,重新触发 rebalance
// JoinGroupRequest // 新 member 加入或旧 member 带新 protocols 加入,重新触发 rebalance
// OffsetCommitRequest // 准备 rebalance 已结束,不再允许提交旧进度,返回 REBALANCE_IN_PROGRESS
// OffsetFetchRequest // 允许

/*************** 3. 结束 rebalance:正常处理所有请求 ***************/
object Stable extends GroupState
// HeartbeatRequest // 完成上一轮心跳,开启下一轮心 delayedHeartbeat 跳延迟任务
// SyncGroupRequest // 响应分配给此 member 的分区列表
// LeaveGroupRequest // 触发 rebalance
// JoinGroupRequest // 若是 leader,或 follower 带新 protocols,则触发 rebalance
// OffsetCommitRequest // 只允许目前分代的 member 提交消费进度
// OffsetFetchRequest // 允许

/*************** 4. 所有 member 都下线 ***************/
object Dead extends GroupState
// *Request // 此 group 不再归我协调,返回 UNKNOWN_MEMBER_ID 错误
// OffsetFetchRequest // 允许

可见,只要请求会导致 assign 结果失效,就会触发 rebalance:旧 memeber 上下线、新 member 加入、旧 member 更新了 protocols 等等。此外,不同请求会推动 group 状态机在状态之间转移:

image-20201115201828710


二:请求处理

2.1 GroupCoordinator

  • 发起请求:member 启动后,会先给负载最低(自身角度看请求数最少)的 broker 发送此请求,查找负责协调此 group 的 Coordinator

  • 处理请求:直接从本地 MetadataCache 中查找 group 对应的 offsets topic 分区的 leader 信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def handleGroupCoordinatorRequest(request: RequestChannel.Request) {
// group.id 能唯一确定 offsets topic 的分区 id
val partition = coordinator.partitionFor(request.body.asInstanceOf[GroupCoordinatorRequest].groupId)
// 请求 admin 模块惰性创建 __consumer_offsets topic
val offsetsTopicMetadata = getOrCreateGroupMetadataTopic(request.securityProtocol)
val coordinatorEndpoint = offsetsTopicMetadata.partitionMetadata().asScala
.find(_.partition == partition).map(_.leader()) //

val responseBody = coordinatorEndpoint match {
case Some(endpoint) if !endpoint.isEmpty =>
new GroupCoordinatorResponse(Errors.NONE.code, endpoint)
case _ => /* Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code */
}
requestChannel.sendResponse(responseBody, /*..*/)
}

class GroupMetadataManager { // 对 offsets.topic.num.partitions 取余,唯一确定
def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
}

可见,offsets topic 分区禁止扩容,是为了保证同一个 group 的 offset 等信息不随扩容而丢失


2.2 JoinGroup

  • 发起请求:member 向 coordinator 发起加入指定的 group,携带有 member_id, protocol 等关键数据

  • 处理请求:coordinator 收到请求后要收集成员信息,所以并不立刻回复,为 group 生成唯一的 delayedJoin 并委托给 joinPurgatory 去管理,等最后一个旧 member 重新加入时,提前触发向所有 member 返回响应,其中给 leader 返回整个组的成员分布情况,delayedJoin 正常过期的逻辑同理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class KafkaApi {
def handleJoinGroupRequest(request: RequestChannel.Request) {
val joinGroupRequest = request.body.asInstanceOf[JoinGroupRequest]
def sendJoinGroupResponseCallback(res: JoinGroupResult) {
val members = res.members map { case (memberId, metadata) => (memberId, ByteBuffer.wrap(metadata)) }
val responseBody = new JoinGroupResponse(res.errorCode, res.generationId, res.subProtocol,
res.memberId, res.leaderId, members)
requestChannel.sendResponse(responseBody, /*..*/)
}

val protocols: List[(String, Array[Byte])] = joinGroupRequest.groupProtocols().map(protocol =>
(protocol.name, Utils.toArray(protocol.metadata))).toList
coordinator.handleJoinGroup(
joinGroupRequest.groupId, // group.id
joinGroupRequest.memberId, // 新 member 为空,旧 member 则为之前分配的 uuid
joinGroupRequest.sessionTimeout, // session.timeout.ms,默认 30s
joinGroupRequest.protocolType, // "consumer"
protocols, // assignor 类名,subscribe 的 topic 集合等
sendJoinGroupResponseCallback) // 并不立刻响应,而是交给 coordinator 在合适的时机再调用
}
}

coordinator 收到此请求会先校验 group 的状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def handleJoinGroup(groupId: String, memberId: String, /*..*/ responseCallback: JoinCallback) {
if (!groupManager.ownedPartitions.contains(partitionFor(groupId))) {
// coordinator 未持有对应的 offsets topic 分区
responseCallback(joinError(memberId, Errors.NOT_COORDINATOR_FOR_GROUP.code))
} else if (groupManager.loadingPartitions.contains(partitionFor(groupId))) {
// 还在回放本地日志,暂不可用
responseCallback(joinError(memberId, Errors.GROUP_LOAD_IN_PROGRESS.code))
} else if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs ||
sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) {
// session.timeout.ms 必须在 coordinator 的 (group.min.session.timeout.ms, group.max.session.timeout.ms) 区间内
responseCallback(joinError(memberId, Errors.INVALID_SESSION_TIMEOUT.code))
} else {
var group = groupManager.getGroup(groupId)
if (group == null) // 新 group 则生成 groupMetadata
group = groupManager.addGroup(new GroupMetadata(groupId, protocolType))
doJoinGroup(group, memberId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols, responseCallback)
}

当新 member 加入、旧 member 更新 metadata 后重新加入,都会导致现有的 assign 分配结果失效,状态转移为 PreparingRebalance 状态,开启下一轮 rebalance:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def doJoinGroup(group: GroupMetadata, memberId: String, /*..*/ responseCallback: JoinCallback) {
group.currentState match {
case Dead =>
// 1. 当前 coordinator 可能发生高可用切换,已不再为此 group 服务,返回错误
responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
case PreparingRebalance =>
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID)
// 2.1 新 member 加入,分配 uuid 作为 memberId,触发 rebalance
addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback)
else
// 2.2 旧 member 重新加入,触发 rebalance
updateMemberAndRebalance(group, group.get(memberId), protocols, responseCallback)
case AwaitingSync =>
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
addMemberAndRebalance(/*..*/) // 3.1 新 member 同上
} else {
val member = group.get(memberId)
if (member.matches(protocols)) // 3.2 旧 member 可能未收到 JoinGroupResponse,重发
responseCallback(JoinGroupResult(/*..*/))
else // 3.3 旧 member 更新 metadata,触发 rebalance
updateMemberAndRebalance(/*..*/)
}
case Stable =>
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
addMemberAndRebalance(/*..*/) // 4.1. 新 member 同上
} else {
val member = group.get(memberId)
if (memberId == group.leaderId || !member.matches(protocols))
// 4.2 旧 leader 重新加入,或旧 member 更新 metadata 后重新加入,则强制触发 rebalance
updateMemberAndRebalance(group, member, protocols, responseCallback)
else
responseCallback(JoinGroupResult(/*..*/)) // 4.3 旧 member 同上
}
}
}

// 新 member 加入
def addMemberAndRebalance(/*..*/, group: GroupMetadata, callback: JoinCallback) = {
val memberId = group.generateMemberIdSuffix // 生成 uuid
val member = new MemberMetadata(memberId, group.groupId, sessionTimeoutMs, protocols, /*..*/)
member.awaitingJoinCallback = callback // 返回 JoinGroupResponse 的回调
group.add(member.memberId, member)
maybePrepareRebalance(group)
}

// 旧 member 重新加入,可能更新 metadata
def updateMemberAndRebalance(group: GroupMetadata, member: MemberMetadata, callback: JoinCallback, /*..*/) {
member.supportedProtocols = protocols
member.awaitingJoinCallback = callback
maybePrepareRebalance(group)
}

// 执行 rebalance
def maybePrepareRebalance(group: GroupMetadata) {
if (group.state == Stable || group.state == AwaitingSync) {
if (group.is(AwaitingSync)) // AwaitingSync 状态被终止,为 SyncGroup 阶段的 member 返回错误,重新加入
resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS.code)

group.transitionTo(PreparingRebalance)
val rebalanceTimeout = group.rebalanceTimeout // 各 member 最大的 session.timeout.ms
val delayedJoin = new DelayedJoin(this, group, rebalanceTimeout)
val groupKey = GroupKey(group.groupId)

// 为 group 的本轮 rebalance 生成一个 delayedJoin 延迟操作
joinPurgatory.tryCompleteElseWatch(delayedJoin, Seq(groupKey))
}
}

可见,每次触发 rebalance,coordinator 都会生成 delayedJoin 延迟操作,等待尽可能多的 member 重新加入,直到超时触发,结束 rebalance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class DelayedJoin(coordinator: GroupCoordinator, group: GroupMetadata, sessionTimeout: Long) extends DelayedOperation(sessionTimeout) {
override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete)
override def onExpiration() = coordinator.onExpireJoin() // 空实现
override def onComplete() = coordinator.onCompleteJoin(group)
}

class GroupCoordinator {
def onCompleteJoin(group: GroupMetadata) {
val failedMembers = group.notYetRejoinedMembers
if (group.isEmpty || !failedMembers.isEmpty) {
failedMembers.foreach(m => group.remove(m.memberId)) // 踢除重新加入的旧 member
if (group.isEmpty) {
group.transitionTo(Dead) // 所有 member 都下线,则转为 Dead
groupManager.removeGroup(group) // group 下线,将 tombstone 消息写入 offsets topic
}
}
// 成功完成新一轮 rebalance
if (!group.is(Dead)) {
group.initNextGeneration() // 递增 generation
for (member <- group.allMemberMetadata) {
val joinResult = JoinGroupResult(/*..*/)
member.awaitingJoinCallback(joinResult) // 最终回复 JoinGroupResponse
member.awaitingJoinCallback = null
completeAndScheduleNextHeartbeatExpiration(group, member)
}
}
}
}

class GroupMetadata { // 旧 member 中还有哪些没有重新加入
def notYetRejoinedMembers: List[MemberMetadata] = members.values.filter(_.awaitingJoinCallback == null).toList
def initNextGeneration() = {
assert(notYetRejoinedMembers == List.empty[MemberMetadata])
generationId += 1 // 递增分代
protocol = selectProtocol // 选出得票最多的 assign 策略,作为整个 group 的策略
transitionTo(AwaitingSync) // 推进到第三阶段
}
}

需要说明的是,0.10 之前的旧版 consumer,分区分配是在 broker 端完成的,当需要调整全局 assign 策略时必须要重启 broker 才生效,甚至无法为不同的 group 启用不同策略。于是 KAFKA-2652 重构了新版 consumer,增强了 JoinGroup 收集 member 元数据、SyncGroup 下发 assign 决策结果等两阶段流程,逻辑稍复杂


2.3 SyncGroup

JoinGroup 请求会等待收集足够的 assign 决策信息才返回,同样的,SyncGroup 请求要等到 leader 发来 assign 结果才返回。时序图示例:m1 超前的 SyncGroup 请求并不会被响应,而是要等到 m2 发来决策结果:

image-20201115164601960

实现:

1
2
3
4
5
6
7
8
9
10
11
12
class KafkaApi {
def handleSyncGroupRequest(request: RequestChannel.Request) {
val req = request.body.asInstanceOf[SyncGroupRequest]
def sendResponseCallback(memberState: Array[Byte], errorCode: Short) {
val responseBody = new SyncGroupResponse(errorCode, ByteBuffer.wrap(memberState))
requestChannel.sendResponse(responseBody, /*..*/)
}
coordinator.handleSyncGroup(req.groupId(), req.generationId(), req.memberId(),
req.groupAssignment().mapValues(Utils.toArray(_)),
sendResponseCallback) // 同上,达到回复条件后才会执行回调,返回 SyncGroupResponse
}
}

coorinator 处理请求时,同样会根据 group 当前状态做出不同处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def doSyncGroup(group: GroupMetadata, generationId: Int, memberId: String, groupAssignment: Map[String, Array[Byte]], responseCallback: SyncCallback) {
var delayedGroupStore: Option[DelayedStore] = None
group.currentState match {
case Dead => // 1. coordinator 已切主
responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)

case PreparingRebalance => // 2. 过期的 SyncGroup 请求
responseCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS.code)

case AwaitingSync =>
group.get(memberId).awaitingSyncCallback = responseCallback // 更新返回 SyncGroupResponse 的回调
completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId)) // 更新心跳信息
// 等到 leader 发来的 assign 决策结果
if (memberId == group.leaderId) {
delayedGroupStore = Some(groupManager.prepareStoreGroup(group, groupAssignment, (errorCode: Short) => { /*..*/
setAndPropagateAssignment(group, assignment) // 调用 syncGroupCallback
group.transitionTo(Stable) // 转移为 Stable
}))
}

case Stable => // 4. 之后的 member 来取结果,直接返回
val memberMetadata = group.get(memberId)
responseCallback(memberMetadata.assignment, Errors.NONE.code)
completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
}
// 委托 groupManager,将 delayedGroupStore 内部的 assign 结果同步给副本
delayedGroupStore.foreach(groupManager.store)
}

assign 结果、commit offset 二者都要写入 __consumer_offsets topic,当 coordinator 发生高可用切换后,新 coordinator 才能通过回放本地日志,来恢复 group 状态、消费进度。此操作依赖 GroupStore 对象完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class DelayedStore(messageSet: Map[TopicPartition, MessageSet], callback: Map[TopicPartition, PartitionResponse] => Unit)

class GroupMetadataManager {
// assign 结果生成消息 + 处理副本同步结果的回调 = DelayedStore 对象
def prepareStoreGroup(group: GroupMetadata, groupAssignment: Map[String, Array[Byte]], responseCallback: Short => Unit): DelayedStore = {
// group.id 作为消息 key,assign 结果作为 value
val message = new Message(key = GroupMetadataManager.groupMetadataKey(group.groupId),
bytes = GroupMetadataManager.groupMetadataValue(group, groupAssignment), /*..*/)

val tp = new TopicPartition("__consumer_offsets", partitionFor(group.groupId))
val groupMetadataMessageSet = Map(tp -> new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, message))

// 构建 callback,等到此消息被 tp 的 ISR 都同步成功后,会被触发执行
def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
var responseCode = if (responseStatus(tp).errorCode != Errors.NONE.code)
Errors.NOT_COORDINATOR_FOR_GROUP.code // 副本同步出现如同步超时、tp 已切主等错误时,转换为 REBALANCE_IN_PROGRESS 等错误
} else Errors.NONE.code
responseCallback(responseCode) // 执行回调响应 SyncGroupResponse
}
DelayedStore(groupMetadataMessageSet, putCacheCallback)
}

def store(delayedStore: DelayedStore) {
replicaManager.appendMessages(
config.offsetCommitTimeoutMs.toLong, // offsets.commit.timeout.ms // 默认 5s
config.offsetCommitRequiredAcks, // offsets.commit.required.acks,默认 -1,即需要所有 ISR 都同步后,才认为写成功
delayedStore.messageSet,
delayedStore.callback) // 副本同步完成后,执行 delayedStore 的回调
}
}

如上逻辑嵌套了三层回调,是为了 delayedStore 一次性执行所有 member 的 awaitingSyncCallback 回调,返回 SyncGroupResponse

image-20201115173015705

至此,分析了 SyncGroup 请求的处理流程:

  • 超前 member 的请求会被搁置,直到 leader 请求到来
  • 读取 leader 的 assign 结果,转换为以 group.id 为 key,assign 数据为 value 的消息,委托 replicaManager 模块,将此消息在 ISR 副本间同步
  • ISR 均同步完成后,批量执行回调,为 member 返回 SyncGroupResponse,其中包含有分配的分区集合

第二步的目的:当 coordinator 切主,即 ISR 中的副本成为新 coordinator 时,能回放本地日志,得知 group 中各 member metadata(各自订阅的 topic)、上次的 assign 结果等数据,当 member 带上各自的 memberId 重连到新 coordinator 时,能提前完成 rebalance


2.4 OffsetCommit

与 SyncGroup 类似,member 会将消费进度 offsetX 提交给 coordinator,进一步构建成消息在 ISR 副本间同步。目的是当一个 member 下线时,分配给它的分区通过 rebalance 会被 reassign 给其他 member,后者再通过 OffsetFetch 请求拿到 offsetX,继续消费

示例时序图:

image-20201115203429312

实现:类比 SyncGroup 即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class GroupCoordinator {
def handleCommitOffsets(groupId: String, memberId: String, generationId: Int,offsetMetadata: Map[TopicPartition, OffsetAndMetadata],
responseCallback: Map[TopicPartition, Short] => Unit) {
val member = groupManager.getGroup(groupId).get(memberId) /*..*/ // 各种异常状态处理
var delayedOffsetStore = Some(groupManager.prepareStoreOffsets(groupId, memberId, generationId, offsetMetadata, responseCallback))
delayedOffsetStore.foreach(groupManager.store)
}
}

class GroupMetadataManager {
// 描述提交进度的消息 + 处理同步结果的回调 = DelayedStore 对象
def prepareStoreOffsets(groupId: String, consumerId: String, generationId: Int, offsetMetadata: Map[TopicPartition, OffsetAndMetadata],
responseCallback: Map[TopicPartition, Short] => Unit): DelayedStore = {
// 用 [group.id, topic, partition] 三元组作为 key,消费进度 offset 作为 value,构建消息
val messages = offsetMetadata.map { case (tp, offsetAndMetadata) =>
new Message(key = GroupMetadataManager.offsetCommitKey(groupId, tp.topic, tp.partition),
bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata), /*..*/) }.toSeq
val offsetsTp = new TopicPartition("__consumer_offsets", partitionFor(groupId))
val offsetsAndMetadataMessageSet = Map(offsetsTp -> new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))

// ISR 副本同步调用完成后,会执行此 callback
def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
val responseCode =
if (responseStatus(offsetsTp).errorCode == Errors.NONE.code) {
offsetMetadata.foreach { case (tp, offsetAndMetadata) =>
putOffset(GroupTopicPartition(groupId, tp), offsetAndMetadata) // 同步成功,更新 offset 缓存
Errors.NONE.code
} else Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code // 副本同步异常,转为对应错误
responseCallback((tp, responseCode)) // 执行回调,响应 CommitOffsetResponse
}
DelayedStore(offsetsAndMetadataMessageSet, putCacheCallback)
}
}

注意,副本同步成功后,会更新 GroupMetadataManager 的 offsetsCache 缓存,将会用于 OffsetFetchRequest 请求查询,此缓存也会在 coordinator 切主后回放本地日志,重建状态


2.5 OffsetFetch

  • 发起请求:member 收到 assign 给自己的分区后,向 coordinator 发起 OffsetFetch 请求,查找此 group 上次在各分区上的消费进度

  • 处理请求:直接查询 offset 缓存(缓存重建逻辑在第三节)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class GroupCoordinator {
def handleFetchOffsets(groupId: String, partitions: Seq[TopicPartition]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = {
// offsetsCache 还在回放日志重建中
if (isCoordinatorLoadingInProgress(groupId)) {
partitions.map { case topicPartition =>
(topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.GROUP_LOAD_IN_PROGRESS.code))}.toMap
} else
groupManager.getOffsets(groupId, partitions)
}
}

class GroupMetadataManager {
def getOffsets(group: String, topicPartitions: Seq[TopicPartition]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = {
if (isGroupLocal(group)) { /*..*/
topicPartitions.map { tp => // 直接读 offsets 缓存
val meta = offsetsCache.get(GroupTopicPartition(group, tp))
(tp, new OffsetFetchResponse.PartitionData(meta.offset, meta.metadata, Errors.NONE.code)) // 返回指定 tp 的消费进度
}.toMap
}
}

2.6 Heartbeat

  • 发起请求:member 定时发送心跳请求来维持 assign 结果的有效性

  • 处理请求:coordinator 收到该请求后会生成心跳延迟任务,并交由全局的 heartbeatPurgatory 管理,正常情况下,会在超时前收到下一次心跳,提前完成该任务,并同样为下一次心跳生成延迟任务,周而复始;但当心跳超时时,该 member 会被踢出 group,触发新的 rebalance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class GroupCoordinator {
def handleHeartbeat(groupId: String, memberId: String, generationId: Int, responseCallback: Short => Unit) {
if (!group.is(Stable)) { /*..*/
responseCallback(Errors.REBALANCE_IN_PROGRESS.code) // 只有 group 正常状态下,心跳请求才有意义
} else {
completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId)) // 更新心跳
responseCallback(Errors.NONE.code)
}
}
// 更新心跳:提前完成上次 delayedHeartbeat 任务,并开启下一次任务
def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) {
member.latestHeartbeat = time.milliseconds() // 更新心跳时间
val memberKey = MemberKey(member.groupId, member.memberId) // 二元组唯一标识 member
heartbeatPurgatory.checkAndComplete(memberKey) // 完成上次心跳任务
val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs // 添加下一轮任务
val delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs)
heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
}

// 延迟任务到期,心跳超时
def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {
if (!shouldKeepMemberAlive(member, heartbeatDeadline)) onMemberFailure(group, member)
}
def shouldKeepMemberAlive(member: MemberMetadata, heartbeatDeadline: Long) = {
member.awaitingJoinCallback != null || // 2.1 正等待 JoinGroupResponse
member.awaitingSyncCallback != null || // 2.2 正等待 SyncGroupResponse
member.latestHeartbeat > heartbeatDeadline - member.sessionTimeoutMs // 2.3 心跳有推进
}
def onMemberFailure(group: GroupMetadata, member: MemberMetadata) {
group.remove(member.memberId) // member 下线
group.currentState match {
case Dead =>
case Stable | AwaitingSync => maybePrepareRebalance(group) // 触发 rebalance
case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId)) // JoinGroup 可能还在等此旧 group 重新加入,尝试完成
}
}
}

class DelayedHeartbeat(coordinator: GroupCoordinator, group: GroupMetadata, member: MemberMetadata,
heartbeatDeadline: Long, sessionTimeout: Long) extends DelayedOperation(sessionTimeout) {
override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline)
override def onComplete() = coordinator.onCompleteHeartbeat() // 提前完成,空实现
override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete) // 取消任务
}

除了显式的 Heartbeat 请求外,JoinGroup, SyncGroup, OffsetCommit 三个请求也等效于心跳,也会调用 completeAndScheduleNextHeartbeatExpiration 更新心跳时间


2.7 LeaveGroup

  • 发起请求:当 member 主动 close 下线时,会请求主动退出 group

  • 处理请求:coordinator 将 member 踢除后,尽快地发起 rebalance

1
2
3
4
5
6
7
def handleLeaveGroup(groupId: String, consumerId: String, responseCallback: Short => Unit) {
val group = groupManager.getGroup(groupId)
val member = group.get(consumerId)
removeHeartbeatForLeavingMember(group, member) // 删除心跳任务
onMemberFailure(group, member) // 即刻开始 rebalance
responseCallback(Errors.NONE.code)
}

可见,相比等待心跳超时被动下线,主动下线能提高可用性,减少消费堆积


三:Coordinator 高可用

在 Replication 模块处理 LeaderAndIsr 请求,完成角色切换后,会执行 onLeadershipChange 回调:

1
2
3
4
5
6
7
8
9
10
11
12
class KafkaApi {
def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) {
updatedLeaders.foreach(partition => // follower 升级为 leader
if (partition.topic == "__consumer_offsets") coordinator.handleGroupImmigration(partition.partitionId))
updatedFollowers.foreach(partition => // leader 降级为 follower
if (partition.topic == "__consumer_offsets") coordinator.handleGroupEmigration(partition.partitionId))
}
replicaManager.becomeLeaderOrFollower(/*..*/, onLeadershipChange)
/*..*/
}
}

特别是切换为 leader 后,要回放本地日志,重建 groupCache 和 offsetsCache,目的分别是加速 rebalance、处理 OffsetFetch 请求。切换实现如下:

1
2
3
4
5
6
7
8
9
class GroupCoordinator {
def onGroupLoaded(group: GroupMetadata) { // 回放完毕后开始心跳计时
group.allMemberMetadata.foreach(completeAndScheduleNextHeartbeatExpiration(group, _))
}

def handleGroupImmigration(offsetTopicPartitionId: Int) {
groupManager.loadGroupsForPartition(offsetTopicPartitionId, onGroupLoaded)
}
}

回放日志交由 GroupMetadataManager 完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def loadGroupsForPartition(offsetsPartition: Int, onGroupLoaded: GroupMetadata => Unit) {
val tp = TopicAndPartition("__consumer_offsets", offsetsPartition)
scheduler.schedule(tp.toString, loadGroupsAndOffsets) // 异步回放日志

def loadGroupsAndOffsets() {
loadingPartitions.add(offsetsPartition) /*..*/ // 回放中
val log = replicaManager.logManager.getLog(tp)
var currOffset = log.logSegments.head.baseOffset // 从最早的 segment 开始回放
val buffer = ByteBuffer.allocate(config.loadBufferSize) // offsets.load.buffer.size,默认 5MB
val loadedGroups = Map[String, GroupMetadata]()

// 从头到尾全量扫描 segment,收集正常消息、tombstone 消息
// 若发生 leader 切换则 hw 会被重置为 -1,否则一直读取消息直到 HW
while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) {
log.read(currOffset, config.loadBufferSize).messageSet.asInstanceOf[FileMessageSet].readInto(buffer, 0)
val messageSet = new ByteBufferMessageSet(buffer)
messageSet.foreach { msgAndOffset =>
val baseKey = GroupMetadataManager.readMessageKey(msgAndOffset.message.key) // 读取消息 key
if (baseKey.isInstanceOf[OffsetKey]) {
// 1. 处理 group offset 消息
val key = baseKey.key.asInstanceOf[GroupTopicPartition]
if (msgAndOffset.message.payload == null)
offsetsCache.remove(key) // 1.1 tombstone 消息:删除此 group 的 offset 缓存
else
putOffset(key, GroupMetadataManager.readOffsetMessageValue(msgAndOffset.message.payload)) // 1.2 恢复 group 的提交进度
} else {
// 2. 处理 group metadata 消息
val groupId = baseKey.key.asInstanceOf[String]
val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, msgAndOffset.message.payload)
if (groupMetadata == null)
loadedGroups.remove(groupId) // 2.1 tombstone 消息:删除此 group 的 metadata 缓存
else
loadedGroups.put(groupId, groupMetadata) // 2.2 group [重新]订阅,添加缓存
}
currOffset = msgAndOffset.nextOffset
}
loadedGroups.values.foreach(group => onGroupLoaded(group) } // 为旧 member 开启心跳
}
}
// 正式接管此 offsets 分区的所有 group 的协调职责,完成高可用切换
ownedPartitions.add(offsetsPartition)
loadingPartitions.remove(offsetsPartition)
}

至此,分析了 coordinator 启动或切主后,通过回放日志重建 group 元数据、各 group 在的提交进度等数据


总结

本文简要分析了 coordinator 模块的协调服务,即 member 整个生命周期内的七种请求,在 Group 四种不同状态下的实现逻辑,还分析了通过 metadata 消息、offsets 消息实现高可用切换。此外,JoinGroupHeartbeat 复用了 purgatory 实现延迟等待,而 SyncGroupOffsetCommit 则构造多重回调来等待副本同步…等实现机制都值得学习