前言

Kafka 集群高可用主要由 Replication、Controller 两个模块配合 zk 共同实现,分别负责副本复制、主从分配,本节将分析 Replication 模块处理读写请求、执行主从切换及日志复制的实现,笔记仅供个人 Review


一:准备

1.1 Replica

描述 topic 分区的一个副本,用于维护 HW, LEO 等状态元信息;replica 不仅有 leader, follower 两种集群角色,还有 local, remote 两种网络位置

主要维护的 3 个字段及其更新操作如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Replica(val brokerId: Int, val partition: Partition, // 所属分区
initialHW: Long = 0L, val log: Option[Log] = None { // local replica 则 log 不为空
var highWatermarkMetadata: LogOffsetMetadata = new LogOffsetMetadata(initialHW) // 1. HW
var logEndOffsetMetadata: LogOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata // 2. LEO
val lastCaughtUpTimeMsUnderlying = new AtomicLong(time.milliseconds) // 3. 上次全量复制时间戳

def updateLogReadResult(logReadResult : LogReadResult) { // 更新 follower 的复制进度
logEndOffset = logReadResult.info.fetchOffsetMetadata
if(logReadResult.isReadFromLogEnd) // Fetch 请求将从 leader 的 LEO 处读取
lastCaughtUpTimeMsUnderlying.set(time.milliseconds) // fully caught up
}
// local replica 推进 HW
// 1) leader: ISR 中最慢 replica 复制日志的 LEO 超过了 HW
// 2) follower: 本地 LEO 超过 leader 的 HW,则推进与 leader 保持一致
def highWatermark_=(newHighWatermark: LogOffsetMetadata) = // = 运算符重载
if (isLocal) highWatermarkMetadata = newHighWatermark
}

1.2 Partition

描述一个分区的 AR, ISR 等 replica 集合,负责 leader/follower 角色切换,以及 ISR 扩张和收缩,还负责触发执行延迟任务等

1
2
3
4
5
6
class Partition(val topic: String, val partitionId: Int, replicaManager: ReplicaManager){
val logManager = replicaManager.logManager
val assignedReplicaMap = new Pool[Int, Replica] // AR: broker.id -> Replica
var inSyncReplicas: Set[Replica] = Set.empty[Replica] // ISR: 仅 leader replica 维护
/*...*/
}

1. 主从切换

local replica 会根据 Controller 的分配结果来更新 AR, ISR 集合:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def makeLeader(/*...*/, partitionStateInfo: LeaderAndIsrRequest.PartitionState) { /*...*/
val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
allReplicas.foreach(replicaId => getOrCreateReplica(replicaId))
(assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_)) // 刷新 AR
inSyncReplicas = partitionStateInfo.isr.map(r => getOrCreateReplica(r)).toSet // 刷新 ISR
leaderReplicaIdOpt = Some(localBrokerId) // 成为新 leader
assignedReplicas.filter(_.brokerId != localBrokerId) // follower replica 的 LEO 状态置为 -1
.foreach(_.updateLogReadResult(LogReadResult.UnknownLogReadResult))
}

def makeFollower(/*...*/, partitionStateInfo: LeaderAndIsrRequest.PartitionState) {
/*...*/ // 同上刷新 AR
inSyncReplicas = Set.empty[Replica] // 但 follower 不维护 ISR
leaderReplicaIdOpt = Some(partitionStateInfo.leader) // 记录新 leader
}

2. ISR 扩张与收缩

(1)扩张:若 follower 的 Fetch 请求尝试读取 leader 的 LEO,说明已全量复制,可加入 ISR

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def updateReplicaLogReadResult(replicaId: Int, logReadResult: LogReadResult) { /*...*/
getReplica(replicaId).updateLogReadResult(logReadResult) // 更新 follower 的复制进度
maybeExpandIsr(replicaId) // 尝试将此 follower 加入 ISR
}

// 1. 实时扩张 ISR:若 Fetch 请求读取 leader LEO,则可加入 ISR,将扩张结果发布到 zk 通知 Controller
def maybeExpandIsr(replicaId: Int) {
val leaderHWIncremented = leaderReplicaIfLocal() match {
case Some(leaderReplica) =>
val replica = getReplica(replicaId).get
val leaderHW = leaderReplica.highWatermark
if(!inSyncReplicas.contains(replica) && // 1. follower 首次加入 ISR
assignedReplicas.map(_.brokerId).contains(replicaId) && // 2. AR 中包含此 follower
replica.logEndOffset.offsetDiff(leaderHW) >= 0) { // 3. follower.LEO >= leader.HW
val newInSyncReplicas = inSyncReplicas + replica // 加入 ISR
updateIsr(newInSyncReplicas) // 发布到 zk /brokers/topics/<T>/partition/<P>/state
}
maybeIncrementLeaderHW(leaderReplica) // ISR 变化,尝试推进 HW
}
case None => false
}
// 若 leader.HW 被推进,则尝试触发此 tp 上的 DelayedProduce 延迟任务
if (leaderHWIncremented)
tryCompleteDelayedRequests()
}

由上可知,Fetch 请求会推进三种状态:ISR 的扩张、leader 的 HW、DelayedProduce 延迟任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 2. 触发推进 leader 的 LEO
private def maybeIncrementLeaderHW(leaderReplica: Replica) { /*...*/
// 找出当 ISR 中最慢的 follower 的 LEO,作为新的 HW
val newHW = inSyncReplicas.map(_.logEndOffset).min(new LogOffsetMetadata.OffsetOrdering)
if (leaderReplica.highWatermark.messageOffset < newHW.messageOffset) {
leaderReplica.highWatermark = newHighWatermark // 推进 leader HW
true
} else false
}

// 3. 触发完成 tp 上的延迟任务
private def tryCompleteDelayedRequests() {
val requestKey = new TopicPartitionOperationKey(this.topic, this.partitionId)
replicaManager.tryCompleteDelayedProduce(requestKey) // 尝试完成此 tpOpKey 的 DelayedProduce 任务
replicaManager.tryCompleteDelayedFetch(requestKey)
}
class ReplicaManager{/*...*/
def tryCompleteDelayedProduce(key: DelayedOperationKey) =
val completed = delayedProducePurgatory.checkAndComplete(key) // 参考上篇 purgatory 的应用
}

(2)收缩:对于 ISR 中的任一 replica,若距上次全量复制已过去 replica.lag.time.max.ms 则踢出 ISR

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def maybeShrinkIsr(replicaMaxLagTimeMs: Long) { // replica.lag.time.max.ms // 默认 10s
val leaderHWIncremented = leaderReplicaIfLocal() match {
case Some(leaderReplica) =>
val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs)
if(outOfSyncReplicas.nonEmpty) {
updateIsr(inSyncReplicas -- outOfSyncReplicas) // 将最新 ISR 发布到 zk
maybeIncrementLeaderHW(leaderReplica) // 最慢的 follower 被踢出 ISR,可继续推进 HW
} else false
case None => false
}
}
if (leaderHWIncremented) // 同理
tryCompleteDelayedRequests()
}

// 筛选出 ISR 中复制延迟过高的 replica,即距离上次全量复制已过去 maxLagMs 的 replica
def getOutOfSyncReplicas(leaderReplica: Replica, maxLagMs: Long): Set[Replica] =
(inSyncReplicas - leaderReplica).filter(r => (time.ms - r.lastCaughtUpTimeMs) > maxLagMs)

ISR 收缩是由 ReplicaManager 中的 isr-expiration 定时任务触发,同样会推进 HW,尝试完成延迟任务

综上,leader replica 会检查 ISR 的有效性,并将扩张、收缩的结果写入 zk,通知 Controller 做出切主的决策:

image-20200829103204609

3. 日志落盘

Partition 还负责将日志落盘,推进 DelayedFetch 延迟任务及时完成

1
2
3
4
5
6
7
8
9
10
def appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int = 0) {
val (info, leaderHWIncremented) = {/*...*/
val log = localLeaderReplica.log.get
// acks=-1 的写请求,ISR 数量不能少于 min.insync.replicas,保证高可用
if (inSyncReplicas.size < log.config.minInSyncReplicas && requiredAcks == -1)
throw new NotEnoughReplicasException(/*...*/)
val info = log.append(messages, assignOffsets = true) // leader 落盘
replicaManager.tryCompleteDelayedFetch(new TopicPartitionOperationKey(topic, partitionId))
}/*...*/
}

至此,分析了 Partition 类的功能:

  • 维护分区的各 AR, ISR 等 Replica 集合
  • local replica 的角色切换
  • ISR 的实时扩张、定时收缩,结果发布到 zk
  • 封装日志写入

二:Fetcher

image-20200829164308203

2.1 FetcherThread

负责执行面向单个 broker 的多个 tp 的 fetch 请求,并处理响应

1. AbstractFetcherThread

持有 fetch 任务池,并提供增删任务的方法,作为后台线程发送 Fetch 请求并处理响应,推进 fetch 进度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
abstract class AbstractFetcherThread(clientId: String, sourceBroker: BrokerEndPoint, /*...*/) {
// 维护各 tp 的 fetch 进度,是当前 fetcher 线程的任务注册中心
partitionMap = new mutable.HashMap[TopicAndPartition, PartitionFetchState]
// 增删一批 tp 的 fetch 任务,新建或停止 fetch
def addPartitions(partitionAndOffsets: Map[TopicAndPartition, Long]) {/*...*/}
def removePartitions(topicAndPartitions: Set[TopicAndPartition]) {/*...*/}

// 核心逻辑:生成 Fetch 请求并发送,处理新消息,推进 fetch 进度
override def doWork() {
val fetchRequest = buildFetchRequest(partitionMap) /*...*/
val responseData = fetch(fetchRequest) // 1. 执行 fetch,出错则 backoff await
responseData.foreach { // 2. 逐个 tp 处理 fetch 结果
case (tp, partitionData) =>
partitionMap.get(tp).foreach(curPartitionFetchState =>
Errors.forCode(partitionData.errorCode) match {
case Errors.NONE => // 2.1 无异常:推进 fetch 进度,处理消息
val newMessages = partitionData.toByteBufferMessageSet
val newOffset = newMessages.shallowIterator.toSeq.lastOption match {
case Some(m: MessageAndOffset) => m.nextOffset
case None => curPartitionFetchState.offset
}
partitionMap.put(tp, new PartitionFetchState(newOffset)) // 更新 fetch 进度
processPartitionData(tp, curPartitionFetchState.offset, partitionData) // 处理新消息

case Errors.OFFSET_OUT_OF_RANGE => // 2.2 请求的 offset 向上或向下溢出
val newOffset = handleOffsetOutOfRange(tp) // 触发日志截断等
partitionMap.put(tp, new PartitionFetchState(newOffset))
}
)}/* error handling...*/
}
}

2. ReplicaFetcherThread

构建 FetchRequest,使用阻塞网络 IO 版的 NetworkClient 发送请求、等待响应,特别处理 offset 溢出的错误:

image-20200829202457081

  • 向下溢出

    • 场景:follower 下线期间,leader 做了多次 retention 清理,重新上线后本地日志已全部过期
    • 解决:清空本地日志,从 leader.startOffset 开始全量复制
  • 向上溢出

    • 场景:

      • 副本复制存在延迟:follower0 复制速度跟不上 leader2
      • 该副本被选为主:leader2 下线,用户配置 unclean.leader.election.enable = true 使 Controller 将 follower0 选为 leader0
      • 产生溢出:当 follower2 重新上线后向 leader0 发起 Fetch 请求,发现自身的 LEO 过大
    • 解决:截断日志与 leader 对齐,从 leader.LEO 开始复制

    • 问题:消息丢失,主从数据不一致

      image-20200829212117383

实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class ReplicaFetcherThread(/*...*/) extends AbstractFetcherThread(/*...*/) {
val networkClient = {/*...*/} // NetworkClient 对象,将被隐式转换为 NetworkClientBlockingOps 对象

// 1. 正常 fetch:新消息直接写入本地 Log,推进 HW 与 leader 同步
override def processPartitionData(tp: TopicAndPartition, fetchOffset: Long, partitionData: PartitionData) {
val replica = replicaManager.getReplica(tp.topic, tp.partitionId).get
replica.log.get.append(partitionData.toByteBufferMessageSet, assignOffsets = false)
replica.highWatermark = new LogOffsetMetadata(replica.logEndOffset.messageOffset.min(partitionData.highWatermark))
}

// 2. fetch 溢出:截断或清空日志
def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = {
val replica = replicaManager.getReplica(topicAndPartition.topic, topicAndPartition.partition).get
val leaderEndOffset: Long = earliestOrLatestOffset(topicAndPartition, ListOffsetRequest.LATEST_TIMESTAMP, brokerConfig.brokerId)
if (leaderEndOffset < replica.logEndOffset.messageOffset) {
// 2.1 向上溢出
if (!LogConfig.fromProps(brokerConfig.originals, /*zk*/)).uncleanLeaderElectionEnable) {
Runtime.getRuntime.halt(1) // Controller 决策有误,若禁用 unclean leader election 则直接宕机,避免消息丢失
}
// 截断日志与 leader.LEO 对齐
replicaManager.logManager.truncateTo(Map(topicAndPartition -> leaderEndOffset))
} else {
// 2.2 向下溢出
val leaderStartOffset: Long = earliestOrLatestOffset(topicAndPartition, ListOffsetRequest.EARLIEST_TIMESTAMP, brokerConfig.brokerId)
val offsetToFetch = Math.max(leaderStartOffset, replica.logEndOffset.messageOffset)
if (leaderStartOffset > replica.logEndOffset.messageOffset) // 完全无交集,才清空日志
replicaManager.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset)
} /*...*/
}

// 3. 构建和发起 fetch 请求
protected def buildFetchRequest(partitionMap: Map[TopicAndPartition, PartitionFetchState]): FetchRequest = {
val requestMap = mutable.Map.empty[TopicPartition, JFetchRequest.PartitionData] // 发往同一个 broker 的多个 tp 的 fetch 请求
partitionMap.foreach {
case ((TopicAndPartition(topic, partition), partitionFetchState)) =>
requestMap(new TopicPartition(topic, partition)) = new JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize)
}
new FetchRequest(new JFetchRequest(brokerConfig.brokerId, maxWait, minBytes, requestMap.asJava))
}
protected def fetch(fetchRequest: FetchRequest): Map[TopicAndPartition, PartitionData] = {
val clientResponse = sendRequest(ApiKeys.FETCH, Some(fetchRequestVersion), fetchRequest.underlying)
new FetchResponse(clientResponse.responseBody).responseData.asScala.map {
case (key, value) =>
TopicAndPartition(key.topic, key.partition) -> new PartitionData(value) // 解析出各 tp fetch 的数据
}
}

// util: 构造指定类型的请求并发送,等待响应
private def sendRequest(apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest): ClientResponse = {/*...*/}
// util: 请求指定分区的 earliest/latest 的 offset
private def earliestOrLatestOffset(topicAndPartition: TopicAndPartition, earliestOrLatest: Long, consumerId: Int): Long = {/*...*/
}

至此,分析了副本复制的核心机制,即 Fetcher 线程维护着发往同一个 leader replica 的多个 tp 的 fetch 任务池,在后台不断构建 FetchRequest 并发出,收到响应后将新消息写入本地,若有错误则处理 offset 溢出等问题


2.2 FetcherManager

维护全局 fetcher 线程池,并提供批量增删 fetch 任务的方法,对应关系:

image-20200829224220089

实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
case class BrokerAndFetcherId(broker: BrokerEndPoint, fetcherId: Int) // leader replica, fetcher id
case class BrokerAndInitialOffset(broker: BrokerEndPoint, initOffset: Long)

abstract class AbstractFetcherManager(/*...*/, numFetchers: Int = 1) { // num.replica.fetchers=1
// 全局 fetcher 线程池
private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread]
// 每个 tp 通过哈希取余唯一确定要使用的 fetcher 线程,对应关系如下
private def getFetcherId(topic: String, partitionId: Int) : Int =
Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers

// 1. 新增一批 fetch 任务
def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset]) {
// fetcher 线程视角,先将 tp 按 [broker, fetcherId] 分组
val partitionsPerFetcher: Map[BrokerAndFetcherId, Map[TopicAndPartition, BrokerAndInitialOffset]] =
partitionAndOffsets.groupBy {
case (topicAndPartition, brokerAndInitialOffset) => BrokerAndFetcherId(
brokerAndInitialOffset.broker, // 1. 发往同一个 broker 的 tp
getFetcherId(topicAndPartition.topic, topicAndPartition.partition)) // 2. 使用同一个 fetcher 线程的 tp
}

// 分配给不同的 fecher 线程
for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) {
var fetcherThread: AbstractFetcherThread = null
fetcherThreadMap.get(brokerAndFetcherId) match {
case Some(f) => fetcherThread = f
case None => // 首次使用,lazy 创建 fetcher 线程
fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)
fetcherThread.start // 后台启动
}

// 提交新的一批 tp fetch 任务
fetcherThreadMap(brokerAndFetcherId).addPartitions( partitionAndOffsets.map {
case (topicAndPartition, brokerAndInitOffset) =>
topicAndPartition -> brokerAndInitOffset.initOffset
})
}
}

// 2. 删除一批 fetch 任务
def removeFetcherForPartitions(partitions: Set[TopicAndPartition]) =
for ((key, fetcher) <- fetcherThreadMap) // 全量遍历
fetcher.removePartitions(partitions)
/*...*/
}

至此,分析了 FetcherManager 通过持有全局 fetcher 线程池,来管理 fetch 任务的机制


三:ReplicaManager

职责:

  • 负责执行 hw-checkpoint、ISR 结果定时发布到 zk、ISR shrink 等定时任务
  • 负责执行来自 Controller 的 LeaderAndIsrRequest 请求,做主从切换
  • 处理 Produce 和 Fetch 请求,持有 producer 和 consumer 两个 purgatory,用于管理 DelayedProduce 和 DelayedFetch 延迟任务,参考上篇笔记不再赘述

主要字段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class ReplicaManager(/*...*/)  {
// 当前 broker 作为 replica 的所有 (topic, partition) -> Partition
val allPartitions = new Pool[(String, Int), Partition]
// 负责管理与 leader 的 fetch 同步消息
val replicaFetcherManager = new ReplicaFetcherManager(/*...*/)
// hw checkpoint 文件
val highWatermarkCheckpoints = config.logDirs.map(
dir => (new File(dir).getAbsolutePath,
new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap
// 2 个 purgatory:提前执行 DelayedProduce 和 DelayedFetch 请求
val delayedProducePurgatory = DelayedOperationPurgatory[DelayedProduce](
purgatoryName = "Produce", config.brokerId, config.producerPurgatoryPurgeIntervalRequests)
val delayedFetchPurgatory = DelayedOperationPurgatory[DelayedFetch](
purgatoryName = "Fetch", config.brokerId, config.fetchPurgatoryPurgeIntervalRequests)
/*...*/
}

3.1 scheduler

1. highwatermark-checkpoint

与之前 Log 模块中 recovery-point-offset-checkpoint 文本文件用于加速 Log 状态重建的思路一样,各 local leader replica 也会将 HW 定期更新到 replication-offset-checkpoint 文件中,用于恢复 Replica 状态

HW 作为主从同步的安全边界,表示再之前的日志已成功复制:

  • leader replica:HW 之前的日志已被 ISR 成功复制,可投递给 consumer
  • follower replica:HW 之前的日志已成功落盘,从 HW 之后开始 fetch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class ReplicaManager{
// replica.high.watermark.checkpoint.interval.ms // 默认 5s 写入一次 // 退出前也会再次记录 HW
def checkpointHighWatermarks() {
// 将所有 local replica 的 HW 更新到所在数据目录的 replication-offset-checkpoint 文件中
val localReplicas = allPartitions.values.flatMap(_.getReplica(config.brokerId))
val replicasByDir = localReplicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath)
for ((dir, reps) <- replicasByDir) {
val hwms: Map[TopicAndPartition, Long] = reps.map(r => new TopicAndPartition(r) -> r.highWatermark.messageOffset).toMap
highWatermarkCheckpoints(dir).write(hwms) // 写入文本行 [topic, partition, hw]
}
}
}

class Partition{
def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = {
getReplica(replicaId) match {
case None =>
if (isReplicaLocal(replicaId)) { // local replica 读取 HW
val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
// checkpoint 无 HW 记录则为 0,有记录但比 LEO 超前则以 LEO 为准,否则以 HW 为准
val offset = checkpoint.read.getOrElse(TopicAndPartition(topic, partitionId), 0L).min(log.logEndOffset)
val localReplica = new Replica(replicaId, this, time, offset, Some(log))
addReplicaIfNotExists(localReplica)
/*...*/
}
}
}

2. ISR

  • isr-expiration:每隔 replica.lag.time.max.ms, 10s 触发所有 local leader replica 的 ISR 收缩操作

    1
    2
    private def maybeShrinkIsr() = 
    allPartitions.values.foreach(tp => tp.maybeShrinkIsr(config.replicaLagTimeMaxMs))
  • isr-change-propagation:默认每隔 2.5s 创建顺序 znode 通知 Controller 来读取 zk 上最新的 ISR 变化结果

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    def maybePropagateIsrChanges() {
    if (/*...*/) { // 限制频率:ISR 有变化,且距上次通知 zk 至少过去 5s
    ReplicationUtils.propagateIsrChanges(zkUtils, isrChangeSet) // 将 ISR 有变化的 tp 写入 zk
    isrChangeSet.clear()
    lastIsrPropagationMs.set(now)
    }
    }

    def recordIsrChange(topicAndPartition: TopicAndPartition) { // expand, shrink 时都会调用
    isrChangeSet += topicAndPartition
    lastIsrChangeMs.set(System.currentTimeMillis())
    }

    注意 expand/shrink 实时写 zk,与如上定时任务写 zk 的路径并不一样,前者负责上传最新的 ISR 变化结果,而后者负责通知 Controller 做出主从切换的决策


3.2 LeaderAndIsr

replicaManager 还负责执行 Controller 的 LeaderAndIsrRequest 主从分配决策,其中包含了当前 broker 要作为 replica 的所有 partition 信息:

1
2
3
4
5
6
7
8
9
10
public class LeaderAndIsrRequest {
public static class PartitionState { // 描述单个分区的 leader, AR, ISR 等主从信息
public final int controllerEpoch; // 标记分配结果的有效性
public final int leader; // 将决定 local replica 是 leader 还是 follower
public final List<Integer> isr;
public final Set<Integer> replicas;
/*...*/
}
Map<TopicPartition, PartitionState> partitionStates; // 当前 broker 作为 replica 的所有分区信息
}

处理流程:

  • 要切换为 leader 的分区:停止其旧 fetch 任务,更新 partition 状态
  • 要切换为 follower 的分区:停止其旧 fetch 任务,更新 HW 与 leader 对齐,开启新的 fetch 任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest, metadataCache: MetadataCache, /*...*/) {
val partitionState = new mutable.HashMap[Partition, LeaderAndIsrRequest.PartitionState]()
// 执行分配结果
leaderAndISRRequest.partitionStates.asScala.foreach {
case (tp, stateInfo) => // 创建 local replica
partitionState.put(getOrCreatePartition(tp.topic, tp.partition), stateInfo)
}
// 切换为 leader
val partitionsTobeLeader = partitionState.filter { case (partition, stateInfo) => stateInfo.leader == config.brokerId}
makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap)

// 切换为 follower
val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys)
makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap, metadataCache)
}
}

private def makeLeaders(partitionState: Map[Partition, LeaderAndIsrRequest.PartitionState], /*...*/) {
// 1. 停止要切换的 partition 的 fetch 任务
replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_)))
// 2. 更新 partition 信息,完成 leader 切换
partitionState.foreach {
case (partition, partitionStateInfo) =>
partition.makeLeader(controllerId, partitionStateInfo, correlationId)
}
}

private def makeFollowers(/*...*/) {
// 1. 删除到旧 leader 的 fetch 任务
replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(new TopicAndPartition(_)))
// 对每个 partition,归类要从哪个 leader 的哪个 offset 开始 fetch
val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(
partition =>
new TopicAndPartition(partition) -> BrokerAndInitialOffset(
metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerSecurityProtocol),
partition.getReplica(localBrokerId).get.logEndOffset.messageOffset)
).toMap
// 2. 启动到新 leaders 的 fetch 任务
replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
}

此处省略了 Controller/Leader epoch 的有效性检测、新 leader broker 的可用性检测等逻辑,具体参考源码


总结

本文大致分析了:

  • 描述单个副本的 Replica 状态
  • 描述单个分区的 Partition 及其 ISR 扩缩容的机制、主从切换时的状态变更
  • Fetcher 后台线程的任务池实现,重点分析了 offset 上溢时导致的一致性问题
  • ReplicaManager 的三个定时任务,以及部分请求的处理

整个 Replication 模块最出彩的设计在于使用了两个相互协作的 purgatory 组件,Produce 可推进 Fetch 提前累积足够的消息,Fetch 也可推进 Produce 提前完成副本复制目标;另外 Fetcher 的任务池、FetcherManager 的哈希线程池,都是优秀的并发设计