前言

本文分析 Controller 模块如何使用 zk 驱动状态机完成状态迁移,来管理分区、副本的机制


一:准备

1.1 子组件

整个 Controller 的子组件如下,将逐一分析:,以及增删 topic、reassign、优先副本选举等任务的执行逻辑

image-20200926140904441


先说明会用到的 znode,假设集群有 1,2,3 三个 broker,及三副本、单分区的 topicx,zk 分布可能如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
├─ admin
│   ├─ delete_topics # 待删除的 topic
│   ├─ preferred_replica_election # 待执行优先副本选举的分区列表 JSON
│   └─ reassign_partitions # 待执行 reassign 分区分布 JSON
├─ brokers
│   ├─ ids
│   │   ├─ 1 # {"endpoints":["PLAINTEXT://mesa:9092"],"host":"mesa","version":3,"port":9092}
│   │   ├─ 2 # 集群可用 broker 的连接信息
│   │   └─ 3
│   └─ topics
│   └─ topicx # {"version":1,"partitions":{"0":[1,2,3]}} # 记录 AR 信息
│   └─ partitions
│   └─ 0 # 记录 leader,ISR 信息
│     └─ state # {"controller_epoch":1,"leader":1,"leader_epoch":0,"isr":[1,2,3]}

├─ controller # {"version":1,"brokerid":1}
├─ controller_epoch # 1 # controller 递增的分代信息
└─ isr_change_notification/isr_change_0000000001 # 有 ISR 发生变更的 tp 列表 JSON

1.2 ControllerChannelManager

职责:实现与其他节点 LeaderAndIsr, StopReplica, UpdateMetada 三种请求的通信。有 3 个关键设计:

  • 解耦:为每个节点都维护一个任务队列,将主线程、各节点的请求发送线程解耦

  • 异步:发送请求后,通过回调异步处理响应

  • 分批:对于集群广播 UpdateMetadata,AR 内广播 LeaderAndIsr,都要同时向多个 Broker 发送请求,故抽象出 batch 集中管理此类操作

    image-20200926154301939

主要字段如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class ControllerChannelManager(ctx: ctx, config: KafkaConfig, /*..*/) {
// 核心结构:一对一维护与其他 broker 的网络连接
val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
ctx.liveBrokers.foreach(addNewBroker(_))

// 将发往指定 broker 的请求放入队列后即返回,异步等待响应并执行 callback
def sendRequest(brokerId: Int, apiKey: ApiKeys, request: AbstractRequest, callback /*..*/) =
brokerStateInfo.get(brokerId).messageQueue.put(QueueItem(apiKey, apiVersion, request, callback))

def addBroker(broker: Broker) { // /brokers/ids 新上线 broker
val queue = new LinkedBlockingQueue[QueueItem] // 新建任务队列
val networkClient = { /*..*/ new NetworkClient(/*..*/) }
val requestThread = new RequestSendThread(brokerId, ctx, queue, networkClient, /*..*/)
brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, queue, requestThread))
startRequestSendThread(broker.id) // 启动新的 RequestSend 后台线程
}
/*..*/
}

1. RequestSendThread

职责:实现与单个 Broker 间通信;从任务队列中取出请求,发送后阻塞等待响应,并执行对应的回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class QueueItem(apiKey: ApiKeys, apiVersion: Option[Short], 
request: AbstractRequest, callback: AbstractRequestResponse => Unit)

class RequestSendThread(val queue: BlockingQueue[QueueItem], val client: NetworkClient, /*..*/) extends ShutdownableThread {
override def doWork(): Unit = {
val QueueItem(apiKey, apiVersion, request, callback) = queue.take() /*..backoff*/
import NetworkClientBlockingOps._ // 隐式转换实现阻塞网络 IO
val requestHeader = apiVersion.fold(client.nextRequestHeader(apiKey))(/*...*/)
val send = new RequestSend(brokerNode.idString, requestHeader, request.toStruct)
val clientRequest = new ClientRequest(time.milliseconds(), true, send, null)
clientResponse = client.blockingSendAndReceive(clientRequest)(time) // 发送请求并等待响应
if (clientResponse != null) {
val response = ApiKeys.forId(clientResponse.request.request.header.apiKey) match {
case ApiKeys.LEADER_AND_ISR => new LeaderAndIsrResponse(clientResponse.responseBody)
/* ApiKeys.STOP_REPLICA, ApiKeys.UPDATE_METADATA_KEY */
}
if (callback != null) callback(response) // 执行 callback 处理响应
}
}
}

2. ControllerBrokerRequestBatch

职责:管理单次向多个 broker 发送多个请求;新请求会先缓存在对应的 RequestsMap 中:

1
2
3
4
5
6
7
8
9
10
11
class LeaderIsrAndControllerEpoch(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int)
class PartitionStateInfo(leaderAndIsr: LeaderIsrAndControllerEpoch, allReplicas: Set[Int])

class PartitionAndReplica(topic: String, partition: Int, replica: Int)
class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback)

class ControllerBrokerRequestBatch {
val leaderAndIsrRequestMap = Map.empty[Int, Map[TopicPartition, PartitionStateInfo]]
val updateMetadataRequestMap = Map.empty[Int, Map[TopicPartition, PartitionStateInfo]]
val stopReplicaRequestMap = Map.empty[Int, Seq[StopReplicaRequestInfo]]
}

三种请求的入参及构建方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 1. AR 内广播 LeaderAndIsr
def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
li: LeaderIsrAndControllerEpoch, replicas: Seq[Int] ) {
val tp = new TopicPartition(topic, partition)
brokerIds.foreach(brokerId => // 使用最新分布情况,逐个生成 LeaderAndIsr 请求
leaderAndIsrRequestMap(brokerId).put(tp, PartitionStateInfo(li, replicas.toSet)))
addUpdateMetadataRequestForBrokers(ctx.liveOrShuttingDownBrokerIds.toSeq, Set(tp)) // 有必要的
}

// 2. 全局广播 UpdateMetadata
def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int], partitions: Set[TopicAndPartition]) {
def updateMetadataRequestMapFor(tp: TopicAndPartition, beingDeleted: Boolean) {
val li = ctx.partitionLeadershipInfo.get(tp) /*..*/
val partitionStateInfo = PartitionStateInfo(li, ctx.partitionReplicaAssignment(tp).toSet)
brokerIds.foreach(brokerId => updateMetadataRequestMap(brokerId).put(tp, partitionStateInfo))
} /*..*/ // 要被删除的 topic leader 更新为 -2,广播后其他 broker 将删除此 topic 的元信息
// 逐个为 tp 执行函数,生成新的 partitionStateInfo,加入请求集
val filteredPartitions = partitions.isEmpty) ctx.partitionLeadershipInfo.keySet else partitions
filteredPartitions.foreach(partition => updateMetadataRequestMapFor(partition, false))
}

// 3. 向一批 broker 广播 StopReplica
def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
deletePartition: Boolean, callback /*resp => ..*/) {
brokerIds.foreach(brokerId =>
stopReplicaRequestMap(brokerId) :+= StopReplicaRequestInfo(
PartitionAndReplica(topic, partition, brokerId), deletePartition, callback(r, brokerId)))
}

def sendRequestsToBrokers(controllerEpoch: Int) {
/*..*/ // 将三个 RequestsMap 中的请求构建出 ApiRequest,放入对应 broker 的 RequestSend 线程任务队列
}

至此,分析了 Controller 如何通过 batch 批量缓冲广播请求,使用任务队列来与 RequestSend 后台线程解耦,实现与其他节点的并行请求,并提供 callback 来异步处理响应的机制


二:PartitionStateMachine

职责:分区状态机(简称 PSM)维护集群中所有 topic 分区的状态;当增删分区、执行优先副本选举、ISR 扩张或收缩时会修改 znode,进而通知 PSM 执行 leader 选举、发送合适的 LeaderAndIsr, UpdateMetadata 请求,完成状态转移

主要字段如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class PartitionStateMachine(controller: KafkaController) {
private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller) // 变更状态会发送请求
val partitionState: Map[TopicAndPartition, PartitionState] = Map.empty // 全局的 tp 及其状态
// 推动 PSM 进行分区状态变更的 3 个 zk listener
val topicChangeListener = new TopicChangeListener() // /brokers/topics
val deleteTopicsListener = new DeleteTopicsListener() // /admin/deleted_topics
val partitionModificationsListeners: Map[String, PartitionModificationsListener] = Map.empty // /brokers/topics/<T>
}

// Controller 全局上下文数据,会被各 listener 及时更新,可视为 zk 缓存
class ctx(val zkUtils: ZkUtils) {
// 持有集群内所有 topic,各分区的 AR、LeaderIsrAndControllerEpoch 信息
var allTopics: Set[String] = Set.empty
var partitionReplicaAssignment: Map[TopicAndPartition, Seq[Int]] = Map.empty
var partitionLeadershipInfo: Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = Map.empty
// 正在做重分配的 tp,正在做优先副本选举的 tp
val partitionsBeingReassigned: Map[TopicAndPartition, ReassignedPartitionsContext] = new HashMap
val partitionsUndergoingPreferredReplicaElection: Set[TopicAndPartition] = new HashSet
}

本小节先分析状态转移的实现:发送请求、触发 leader 选举等机制,至于推动状态转移的 zk listener 则统一放到第四节梳理


2.1 状态转移

分区有四种状态:

1
2
3
4
5
trait PartitionState 
object NonExistentPartition extends PartitionState // 不存在,或已删除
object NewPartition extends PartitionState // 新创建,如新 topic 或扩分区,暂无 leader,ISR
object OnlinePartition extends PartitionState // 选出新 leader
object OfflinePartition extends PartitionState // leader 不可用

流程:

image-20200927094224773


实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 将 partitions 逐个分区变更到 targetState 状态,若有必要则使用 leaderSelector 策略选出新 leader
def handleStateChanges(partitions: Set[TopicAndPartition], targetState: PartitionState,
leaderSelector: PartitionLeaderSelector = noOpPartitionLeaderSelector) {
brokerRequestBatch.newBatch() // batch 的标准使用模式 // 上图绿线会发请求
partitions.foreach(tp => handleStateChange(tp.topic, tp.partition, targetState, leaderSelector))
brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
}

private def handleStateChange(topic: String, partition: Int, targetState: PartitionState,
leaderSelector: PartitionLeaderSelector) {
val tp = TopicAndPartition(topic, partition)
targetState match {
case NewPartition => // 1. NonExistent --> New:仅标记,AR 已被 listener 从 zk 加载到 ctx
partitionState.put(tp, NewPartition)
case OnlinePartition =>
partitionState(tp) match {
case NewPartition => // 2.1 New --> Online:新分区,正常选举
initializeLeaderAndIsrForPartition(tp)
case OfflinePartition => // 2.2 Offline --> Online:failover 选新 leader
electLeaderForPartition(topic, partition, leaderSelector)
case OnlinePartition => // 2.3 Online --> Online:leader 可用,但重选 leader
electLeaderForPartition(topic, partition, leaderSelector)
}
partitionState.put(tp, OnlinePartition)
case OfflinePartition => // 3. New,Online,Offline --> Offline:leader 下线,仅标记
partitionState.put(tp, OfflinePartition)
case NonExistentPartition => // 4. Offline --> NonExistent:topic 被删,仅标记
partitionState.put(tp, NonExistentPartition)
}
}

如上,PSM 的核心是将分区状态变更为 Online 时执行的 leader 选举,实现如下:

  • New –> Online:执行默认的 Offline 选举策略

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    // 2.1 NewPartition --> OnlinePartition // 默认 leader 选举策略
    def initializeLeaderAndIsrForPartition(tp: TopicAndPartition) {
    val ar = ctx.partitionReplicaAssignment(tp)
    val liveAR = ar.filter(r => ctx.liveBrokerIds.contains(r))
    liveAR.size match {
    case 0 => throw new StateChangeFailedException(failMsg) // AR 全部不可用
    case _ =>
    val leader = liveAR.head // AR 中首个可用 replica 作为 leader
    val li = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader, liveAR.toList), controller.epoch)
    // 将 leaderAndIsr JSON 发布到 /brokers/topics/<T>/partition/<P>/state
    zkUtils.createPersistentPath(getTopicPartitionLeaderAndIsrPath(tp.topic, tp.partition), zkUtils.leaderAndIsrZkData(li.leaderAndIsr, controller.epoch))
    ctx.partitionLeadershipInfo.put(tp, li)
    // AR 内广播 LeaderAndIsr
    brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAR, tp.topic, tp.partition, li, ar)
    }
    }
  • Offline, Online –> Online:执行指定的选举策略

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector) {
    val tp = TopicAndPartition(topic, partition)
    val currentLeaderAndIsr = getLeaderIsrAndEpochOrThrowException(topic, partition).leaderAndIsr
    // 执行指定的选举策略,选举出新的 leader 和 ISR
    val (leaderAndIsr, liveAR) = leaderSelector.selectLeader(tp, currentLeaderAndIsr)
    // 选举结果发布到 zk
    val (_, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partition, leaderAndIsr, controller.epoch, currentLeaderAndIsr.zkVersion)
    // 刷新 ctx 缓存,并在 liveAR 内广播 LeaderAndIsr 请求
    ctx.partitionLeadershipInfo.put(tp, new LeaderIsrAndControllerEpoch(leaderAndIsr, controller.epoch))
    brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAR, topic, partition, newLeaderIsrAndControllerEpoch, ctx.partitionReplicaAssignment(tp))
    }

至此,分析了 PSM 转移状态时,更新 ctx 缓存、发布结果到 zk、广播对应请求使转移结果生效的机制


2.2 Leader 选举策略

如上的 PSM 依赖 PartitionLeaderSelector 接口实现 leader 选举 和 ISR 筛选

1
2
3
4
5
trait PartitionLeaderSelector {
// 输入 // tp:要进行 leader 选举的分区 // currentLeaderAndIsr:当前 zk 上的 leaderAndIsr 分布
// 输出 // leaderAndIsr:选举结果 // replicas:要接收此结果的 broker
def selectLeader(tp: TopicAndPartition, curLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int])
}

四种策略逻辑如下,代码不再贴出

1. offline 策略:优先选中 ISR 中可用的 replica,AR 中的可用 replica 视情况备选

image-20200927200120322


2. Reassigned 策略:RAR 必须与 ISR 有可用的交集 replica,才能重选

image-20200927200151896


3. Preferred 策略:preferred replica 必须可用,且在 ISR 中才能当选

image-20200927200222745


4. ControllerShutdown 策略:剔除下线 replica 后 ISR 中可用 replica 当选

image-20200927200246213

至此,分析了四种 leader 选举策略的流程,分别适用于默认情况、手动触发 reassign、手动触发优先副本选举、broker graceful shutdown 的情况


最后说明 PSM 的启动流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class PartitionStateMachine(controller: KafkaController) {
// Controller 当选上线后,启动 PSM 子组件
def startup() {
initializePartitionState()
triggerOnlinePartitionStateChange()
}

// 读取 zk,重建所有 tp 的状态
def initializePartitionState() {
for((tp, replicaAssignment) <- ctx.partitionReplicaAssignment) {
ctx.partitionLeadershipInfo.get(tp) match {
case Some(currentLeaderIsrAndEpoch) => // 有 leader
ctx.liveBrokerIds.contains(currentLeaderIsrAndEpoch.leaderAndIsr.leader) match {
case true => partitionState.put(tp, OnlinePartition) // leader 可用
case false => partitionState.put(tp, OfflinePartition) // leader 不可用
}
case None => partitionState.put(tp, NewPartition) // 无 leader 信息,准备选举产生
}
}
}

// 尝试将所有 New, Offline 的分区转为 Online,触发必要的 leader 选举
def triggerOnlinePartitionStateChange() {
brokerRequestBatch.newBatch()
for((tp, partitionState) <- partitionState
if(partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))
handleStateChange(tp.topic, tp.partition, OnlinePartition, controller.offlinePartitionSelector,)
brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
}
}

至此,完整分析了 PSM 加载和维护全局分区状态的机制


三:ReplicaStateMachine

职责:副本状态机(RSM) 负责维护集群所有分区副本的状态;当 broker 上下线、执行 reassign 时会修改 znode,进而通知 PSM 发送合适的 LeaderAndIsr, UpdateMetadata, StopReplica 等请求,实现状态转移

副本有七种状态:

1
2
3
4
5
6
7
8
trait ReplicaState
object NonExistentReplica extends ReplicaState // 副本不存在,或已被彻底删除
object NewReplica extends ReplicaState // 创建 topic 或 reassign 产生的新副本,只能为 follower
object OnlineReplica extends ReplicaState // 成功加入 AR,可作为 leader,follower
object OfflineReplica extends ReplicaState // 副本所在 broker 下线
object ReplicaDeletionStarted extends ReplicaState // 开始删除
object ReplicaDeletionSuccessful extends ReplicaState // 删除成功
object ReplicaDeletionIneligible extends ReplicaState // 暂时无法删除,标记状态

状态转移的流程:

image-20200927142515260

状态转移的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def handleStateChange(pr: PartitionAndReplica, targetState: ReplicaState, callbacks: Callbacks) {
val topic = pr.topic; val partition = pr.partition; val tp = TopicAndPartition(topic, partition)
val replicaId = pr.replica
val replicaAssignment = ctx.partitionReplicaAssignment(tp)

targetState match {
case NewReplica => // 1. NonExistent --> New: 向新 replica 发送 LeaderAndIsr 请求
val li = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
batch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, li, replicaAssignment)
replicaState.put(pr, NewReplica)

case ReplicaDeletionStarted => // 2.1 Offline --> DeletionStarted:发送 StopReplica 请求
replicaState.put(pr, ReplicaDeletionStarted)
batch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true, callbacks.stopReplicaResponseCallback)
case ReplicaDeletionIneligible => // 2.2 DeletionStarted --> DeletionIneligible:暂无法删除,仅标记
replicaState.put(pr, ReplicaDeletionIneligible)
case ReplicaDeletionSuccessful => // 2.3 DeletionStarted --> DeletionSuccessful:tp 删除成功
replicaState.put(pr, ReplicaDeletionSuccessful)
case NonExistentReplica => // 2.4 DeletionSuccessful --> NonExistent:所有 tp 都删除成功,topic 被彻底删除
val curAR = ctx.partitionReplicaAssignment(tp)
ctx.partitionReplicaAssignment.put(tp, curAR.filterNot(_ == replicaId))
replicaState.remove(pr)

case OnlineReplica =>
replicaState(pr) match {
case NewReplica => // 3.1 New --> Online:标记加入 AR
ctx.partitionReplicaAssignment.put(tp, ctx.partitionReplicaAssignment(tp) :+ replicaId)
case _ => // 3.2 Online, Offline, DeletionIneligible --> Online:
ctx.partitionLeadershipInfo.get(tp) match {
case Some(li) => // 有 leader 则通知当前 replica 上线
batch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, li, replicaAssignment)
replicaState.put(pr, OnlineReplica)
}
}
replicaState.put(pr, OnlineReplica)

// 4. New, Online, Offline, DeletionIneligible --> Offline: 发送 StopReplica 请求
case OfflineReplica =>
/*..*/ // 发送 StopReplica(deletePartition = false) 请求,从 ISR 剔除,向 AR 广播新的 LeaderAndIsr
}
}

至此,分析了 RSM 执行副本状态转移时,发送对应请求的机制


四:Zookeeper

如上 PSM 和 RSM 的状态转移是由 8 个 zk listener 驱动的,而 listener 可分为两类

(1). Child Listener:检测子节点变化

watch znode listener 触发
/brokers/topics TopicChangeListener 新建 topic
/brokers/ids BrokerChangeListener broker 上下线
/isr_change_notification IsrChangeNotificationListener 有 tp ISR 发生变化
/admin/delete_topics DeleteTopicsListener 删除 topic

(2). Data Listener:检测节点数据修改

watch znode listener 触发
/brokers/topics/<T> PartitionModificationsListener topic 扩分区
/admin
/preferred_replica_election
PreferredReplicaElectionListener 执行优先副本选举
/brokers/topics
/<T>/partitions/<P>/state
ReassignedPartitionsIsrChange
Listener
推进 reassign
/admin/reassign_partitions PartitionsReassignedListener 执行 reassign
/controller LeaderChangeListener 竞选 Controller

如下将逐个 listener 分析其实现,特别是 topic 删除、分区 reassign


4.1 Child Listener

1. TopicChangeListener

使用 kafka-topics.sh 脚本新建 topic 时,底层的 admin 模块会根据机架信息,尽量均匀地将新 topic 的分区、副本分布在各 broker 上,并将此分布结果用 JSON 描述,发布到 /brokers/topics/<T>

PSM 会 watch /brokers/topics,检测到创建了新 topic 后,将推动新分区的状态转移:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class PartitionStateMachine(controller: KafkaController) {
class TopicChangeListener extends IZkChildListener with Logging {
override def handleChildChange(parentPath : String, children : java.util.List[String]) {
val newTopics = children -- ctx.allTopics // 将 admin 的分布结果载入 ctx 的 AR 缓存
val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)
ctx.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet)
}
}
}

class Controller{
def onNewTopicCreation(topics: Set[String], newPartitions: Set[TopicAndPartition]) {
topics.foreach(topic => psm.registerPartitionChangeListener(topic)) // 逐一注册分区扩容 listener
// 仅标记状态
psm.handleStateChanges(newPartitions, NewPartition) // NonExistent -> New
// 无 leaderAndIsr 信息,仅标记状态
rsm.handleStateChanges(ctx.replicasForPartition(newPartitions), NewReplica)

// 执行默认策略:AR.head 选为 leader,结果发布到 zk,AR 内广播 LeaderAndIsr,全局广播 UpdateMetadata
psm.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector) // New -> Online
rsm.handleStateChanges(ctx.replicasForPartition(newPartitions), OnlineReplica) // no op
}
}

如上分析了新建 topic 时,PSM 推进新分区转移状态、执行选举、发布结果到 zk、广播选举结果的机制,最终使得 topic 可用


2. BrokerChangeListener

RSM 会 watch /brokers/ids 子节点,检测到 broker 上下线后,及时推动其上分区、副本的状态转移:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class BrokerChangeListener() extends IZkChildListener {
override def handleChildChange(parentPath : String, currentBrokerList : List[String]) {
val curBrokers: Set[Broker] = currentBrokerList.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo)
val curBrokerIds = curBrokers.map(_.id)
val liveOrShuttingDownBrokerIds = ctx.liveOrShuttingDownBrokerIds

val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds // 新上线的 broker
val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds // 已下线的 broker
val newBrokers = curBrokers.filter(broker => newBrokerIds(broker.id))

newBrokers.foreach(ctx.controllerChannelManager.addBroker) // 启动 RequestSend 线程
deadBrokerIds.foreach(ctx.controllerChannelManager.removeBroker) // 终止 RequestSend 线程
if(newBrokerIds.size > 0) controller.onBrokerStartup(newBrokerIds)
if(deadBrokerIds.size > 0) controller.onBrokerFailure(deadBrokerIds)
}
}

class Contrller {
def onBrokerStartup(newBrokersSet: Set[Int]) {
// 1. RSM:Offline -> Online:发送 LeaderAndIsr
rsm.handleStateChanges(ctx.replicasOnBrokers(newBrokersSet), OnlineReplica)
// 2. RSM:New/Offline -> Online:触发必要的 leader 选举
psm.triggerOnlinePartitionStateChange()
// 3. 恢复 reassign
val partitionsWithReplicasOnNewBrokers = ctx.partitionsBeingReassigned.filter
case (_, reassignmentContext) => reassignmentContext.newReplicas.exists(newBrokersSet.contains(_))
partitionsWithReplicasOnNewBrokers.foreach(p => onPartitionReassignment(p._1, p._2))
// 4. 恢复 deletion
val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
if(replicasForTopicsToBeDeleted.size > 0)
deleteTopicManager.resumeDeletionForTopics(replicasForTopicsToBeDeleted.map(_.topic))
}

def onBrokerFailure(deadBrokersSet: Set[Int]) {
val partitionsWithoutLeader: Set[TopicAndPartition] = ctx.partitionLeadershipInfo.filter(partitionAndLeader =>
deadBrokersSet.contains(partitionAndLeader._2.leaderAndIsr.leader) // deadBrokers 作为 leader 的 tp
// 1. PSM:Online -> Offline -> Online:执行 failover,选出新 leader,发送 LeaderAndIsr
psm.handleStateChanges(partitionsWithoutLeader, OfflinePartition) // 标记状态,leader 下线
psm.triggerOnlinePartitionStateChange()

val allReplicasOnDeadBrokers = ctx.replicasOnBrokers(deadBrokersSet) // deadBrokers 作为 replica 的 tp
val activeReplicasOnDeadBrokers = allReplicasOnDeadBrokers.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
// 2. RSM:Online -> Offline:从 ISR 中剔除,发送 LeaderAndIsr
rsm.handleStateChanges(activeReplicasOnDeadBrokers, OfflineReplica)

// 3. deadBrokers 上待删除的 replica 被标记为 DeletionIneligible,暂时无法删除
val topicsToBeDeleted = allReplicasOnDeadBrokers.filter(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
if(topicsToBeDeleted.size > 0) deleteTopicManager.failReplicaDeletion(topicsToBeDeleted)
}
}

如上分析了 Controller 得知其他 broker 上下线后的处理逻辑:

  • 上线:发送 LeaderAndIsr 恢复副本身份,恢复 reassign 和 deletion 进度
  • 下线:对 leader 分区做 failover 选出新 leader,对 follower 分区则从 ISR 剔除,暂停 deletion

3. IsrChangeNotificationListener

背景:对于 Replication 模块,当 leader replica 在 ISR 扩张或收缩时,会将结果发布到 zk

  • 实时将最新的 leaderAndIsr 信息发布到 /brokers/topics/<T>/partitions/<P>/state
  • isr-change-propagation 定时任务每隔 2.5s 将 ISR 有变化的 tp 列表,发布到 /isr_change_notification/isr_change_xxx 持久顺序节点上

image-20200927134000194

Controller 会 watch /isr_change_notification,读取绿色子节点得知哪些 tp 的 ISR 有变动后,才去对应蓝色节点读最新的 ISR 分布信息(原因:当集群有大量副本复制不稳定时,蓝色节点的数据会频繁更新,故不直接 watch,而是通过绿色节点做缓冲)

处理逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
class IsrChangeNotificationListener(controller: KafkaController) extends IZkChildListener {
override def handleChildChange(parentPath: String, children: List[String]): Unit = {
val topicAndPartitions: Set[TopicAndPartition] = children.map(x => getTopicAndPartition(x)).flatten.toSet
// 逐个 tp,从 zk 读取最新的 leaderAndIsr 到 ctx 缓存
controller.updateLeaderAndIsrCache(topicAndPartitions)
// 广播 UpdateMetadata
processUpdateNotifications(topicAndPartitions)
// 及时删除 /isr_change_xxx
children.map(x => controller.ctx.zkUtils.deletePath(ZkUtils.IsrChangeNotificationPath + "/" + x))
}
}

如上分析了 Controller 及时读取将各 leader replica 上报的最新 ISR 分布,并在集群广播的机制(注意此处并不涉及 RSM 的状态迁移,也即是复制进度慢导致被踢出 ISR 的 replica 会依旧是 OnlineReplica 状态)


4. DeleteTopicsListener

PSM 会 watch /admin/delete_topics 节点,得知要被删除的 topic 后,会将该 topic 交由 TopicDeletionManager 在后台异步地删除,逻辑如下:

1
2
3
4
5
6
7
8
class DeleteTopicsListener() extends IZkChildListener {
override def handleChildChange(parentPath : String, topicsToBeDeleted: List[String]) {
// 正在执行优先副本选举或 reassign 的 topic,让 deletionManager 标记为暂不删除
topicsToBeDeleted.foreach { topic => /*..*/ }
// 交由 deleteTopicManager 入队列异步删除
controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
}
}

回到 DeletionManager,删除 topic 的原理是向所有分区的 replica 发送 StopReplica,都返回成功后,删除 topic 对应 znode,其主要字段如下:

1
2
3
4
5
6
7
8
9
class TopicDeletionManager(controller: KafkaController, /*..*/) {
// 解耦 TopicDeletionManager 和 deletion 两个线程的 Set 队列
val topicsToBeDeleted: Set[String] = Set.empty[String] ++ initialTopicsToBeDeleted
val partitionsToBeDeleted: Set[TopicAndPartition] = topicsToBeDeleted.flatMap(ctx.partitionsForTopic)
// 因 broker 下线等原因暂不能删除的 topic,上线后 Controller 会调用 resumeDeletionForTopics 继续推进删除
val topicsIneligibleForDeletion: Set[String] = Set.empty[String] ++ (initialTopicsIneligibleForDeletion/*..*/)
// 后台删除 topic 的 deletion 线程
var deleteTopicsThread: DeleteTopicsThread = null
}

当有新 topic 需要删除时会入队 topicsToBeDeleted,并唤醒 deletion 线程执行删除,操作分六步:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class DeleteTopicsThread() extends ShutdownableThread() {
override def doWork() {
awaitTopicDeletionNotification() // 等待 2 个 Set 队列有新 topic 入队
topicsToBeDeleted.foreach { topic =>
// Step 4. topic 所有分区的 replica 已转移为 DeletionSuccessful 状态
if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) {
completeDeleteTopic(topic) // 删除 topic 相关的 znode,取消注册对应的 listener
} else {
if(controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)) {
// Step 5. 仍有 replica 处于 DeletionStarted 状态,说明还未收到 StopReplicaResponse,需继续等待
} else
// Step 6. topic 暂时无法删除:重新标记为 OfflineReplica 等待重试
if(controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible))
markTopicForDeletionRetry(topic)
}
onTopicDeletion(Set(topic)) // Step 1
}
}
}

// Step 1. 开始删除 topic
def onTopicDeletion(topics: Set[String]) {
// 广播 UpdateMetadata 请求,其中 partitions 的 leader 被设为 -2,其他 broker 将不再处理这些 tp 相关的请求
val partitions = topics.flatMap(ctx.partitionsForTopic)
controller.sendUpdateMetadataRequest(ctx.liveOrShuttingDownBrokerIds.toSeq, partitions)
// 逐个 topic 删除所有 tp
val partitionReplicaAssignmentByTopic = ctx.partitionReplicaAssignment.groupBy(p => p._1.topic)
topics.foreach(topic => onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).keySet))
}

// Step 2. 删除一个 topic 下的所有 tp
def onPartitionDeletion(partitionsToBeDeleted: Set[TopicAndPartition]) {
val replicasPerPartition: Set[PartitionAndReplica] = ctx.replicasForPartition(partitionsToBeDeleted)
replicasPerPartition.groupBy(_.topic).foreach { case(topic, replicas) =>
val aliveReplicasForTopic = ctx.allLiveReplicas().filter(p => p.topic.equals(topic))
val deadReplicasForTopic = replicasPerPartition -- aliveReplicasForTopic

val successfullyDeletedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas // 首次删除,或重试
// 2.1 replica 不可用:标记为 DeletionIneligible
replicaStateMachine.handleStateChanges(deadReplicasForTopic, ReplicaDeletionIneligible)
// 2.2 replica 可用:转为 Offline,发送 StopReplica(deletePartition=false) 请求
replicaStateMachine.handleStateChanges(replicasForDeletionRetry, OfflineReplica)
// 2.3 replica 可用:转为 DeletionStarted,继续发送 StopReplica(deletePartition=true) 请求
replicaStateMachine.handleStateChanges(replicasForDeletionRetry,
ReplicaDeletionStarted, // Step 3. 带了 callback 处理 StopReplicaResponse
new Callbacks.CallbackBuilder().stopReplicaCallback(deleteTopicStopReplicaCallback).build
)
if(deadReplicasForTopic.size > 0)
markTopicIneligibleForDeletion(Set(topic)) // 有 replica 下线,此 topic 暂时无法被删除
}
}

// Step 3. 回调处理 StopReplica 响应
private def deleteTopicStopReplicaCallback(stopReplicaResponseObj: AbstractRequestResponse, replicaId: Int) {
val resp = stopReplicaResponseObj.asInstanceOf[StopReplicaResponse]
val partitionsInError = resp.filter { case (_, error) => error != Errors.NONE.code }.keySet
val replicasInError = partitionsInError.map(p => PartitionAndReplica(p.topic, p.partition, replicaId))
// 3.1 StopReplica 返回失败,标记 replica 状态为 DeletionIneligible
failReplicaDeletion(replicasInError)
if (replicasInError.size != responseMap.size) {
val deletedReplicas = responseMap.keySet -- partitionsInError
// 3.2 StopReplica 返回成功,标记为 DeletionSuccessful
completeReplicaDeletion(deletedReplicas.map(p => PartitionAndReplica(p.topic, p.partition, replicaId)))
}
}

至此,分析了删除 topic 的原理,即广播 StopReplica 请求,由于要做 topic 去重,此处使用集合 + 条件变量更佳,而非阻塞队列来做线程解耦


4.2 Data Listener

1. PartitionModificationsListener

PSM 会 watch /brokers/topics/<T> 每个节点的数据变化,即 topic 分区发生扩容:

1
2
3
4
5
6
7
8
9
10
class PartitionModificationsListener(topic: String) extends IZkDataListener {
def handleDataChange(dataPath : String, data: Object) {
val ar = zkUtils.getReplicaAssignmentForTopics(List(topic))
val partitionsToBeAdded = ar.filter(p => !ctx.partitionReplicaAssignment.contains(p._1))
if (partitionsToBeAdded.size > 0) { // 扩容的新分区,与新 topic 的分区,处理逻辑一致
ctx.partitionReplicaAssignment.++=(partitionsToBeAdded)
controller.onNewPartitionCreation(partitionsToBeAdded.keySet.toSet)
} // 忽略分区数减少的情况
}
}

2. PreferredReplicaElectionListener

优先副本是指 AR 中的第一个副本,在新分区创建时被指派为 leader,admin 模块保证优先副本会尽可能均匀地分布在所有 broker 上,进而实现集群的负载均衡。随着 broker 上下线,leader 会迁移到 ISR 中的其他副本,进而导致 broker 流量不均匀,此时有 2 种手段校正:

  • 自动均衡:开启 auto.leader.rebalance.enable 配置后,会默认每间隔 5min 扫描所有 broker,若其 imbalance 系数(作为优先副本却不是 leader 的分区数 / 服务的总分区数)比例超过 10%,则强制触发优先副本选举
  • 手动均衡:将分区列表写入 /admin/preferred_replica_election

Controller 会 watch /admin/preferred_replica_election 节点的数据变化,及时对指定的分区执行优先副本选举

1
2
3
4
5
6
7
8
9
10
11
12
13
class PreferredReplicaElectionListener(controller: KafkaController) extends IZkDataListener {
override def handleDataChange(dataPath: String, data: Object) {
val partitions: Set[TopicAndPartition] = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(data.toString)
controller.onPreferredReplicaElection(partitions) // 为指定的分区,将优先副本重新选举为新 leader
}
}

def onPreferredReplicaElection(partitions: Set[TopicAndPartition], isTriggeredByAutoRebalance: Boolean = false) {
ctx.partitionsUndergoingPreferredReplicaElection ++= partitions
// PSM: Online --> Online,为指定的 tp 执行 Preferred 选举策略,结果发布到 zk
partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance) // 从 ctx 移除记录,第二个参数为 true 则删除 znode
}

同时,Controller 在当选后会启动 partition-rebalance-thread 后台线程,执行自动负载均衡,逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def checkAndTriggerPartitionRebalance(): Unit = {
// 将所有分区的 AR 按优先副本分组 // broker -> [tp, [AR]]
var preferredReplicasForTopicsByBrokers = ctx.partitionReplicaAssignment.groupBy {
case(_, assignedReplicas) => assignedReplicas.head}
// 逐个 broker 计算不均衡的比例
preferredReplicasForTopicsByBrokers.foreach {
case(leaderBroker, topicAndPartitionsForBroker) => {
// 若当前 broker 是优先副本,但并不是 leader,即为负载不均衡
var topicsNotInPreferredReplica = topicAndPartitionsForBroker.filter { case(tp, _) =>
ctx.partitionLeadershipInfo(tp).leaderAndIsr.leader != leaderBroker
val imbalanceRatio = topicsNotInPreferredReplica.size / topicAndPartitionsForBroker.size
// 失衡比例超过了默认的 10% 则强制执行 Preferred 选举策略,将此 broker 选为新 leader
if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100))
topicsNotInPreferredReplica.foreach(case(tp, _) => onPreferredReplicaElection(tp, true))
}
}
}

可见,Kafka 做负载均衡的基本单位是副本,而非实际的吞吐量,这就导致了集群负载是否均衡,常常取决于吞吐头部的 topic 分区分布是否均匀,要做到这点则还需要在集群扩容时及时手动做 reassign,比较麻烦


3. PartitionsReassignedListener

集群加入新 broker 扩容后,需对高吞吐 topic 做 reassign 使新 broker 分摊负载,可用 kafka-reassign-partitions.sh 生成迁移计划 JSON,执行时会将该数据发布到 /admin/reassign_partitions,而 Controller 会 watch 此节点的数据变化,触发对指定的分区做 reassign,主要有 2 大步骤,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty,
var isrChangeListener: ReassignedPartitionsIsrChangeListener = null)

class PartitionsReassignedListener(controller: KafkaController) extends IZkDataListener {
override def handleDataChange(dataPath: String, data: Object) {
// 读取最新的 RAR 分布信息,逐个 tp 执行 reassign
val partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]] = zkUtils.parsePartitionReassignmentData(data)
partitionsToBeReassigned.foreach(partitionToBeReassigned =>
controller.initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1,
ReassignedPartitionsContext(partitionToBeReassigned._2)))
}
}

// Step1. 准备工作:RAR 有效性校验,注册 ISR listener
def initiateReassignReplicasForTopicPartition(tp: TopicAndPartition, ressignCtx: ReassignedPartitionsContext) {
val newReplicas = ressignCtx.newReplicas // new AR
val aliveNewReplicas = newReplicas.filter(r => ctx.liveBrokerIds.contains(r))
if(ctx.partitionReplicaAssignment.get(tp) == newReplicas) {
throw new KafkaException(/*..*/) // 1.1 RAR 与 AR 完全一致,取消 reassign
else if (aliveNewReplicas != newReplicas)
throw new KafkaException(/*..*/) // 1.2 RAR 中有不可用的 replica,取消 reassign
else // 1.3 为此 tp 注册 ISR 变化的 listener,当 RAR-OAR 加入 ISR 后会推进 reassign 完成
watchIsrChangesForReassignedPartition(tp.topic, tp.partition, ressignCtx)
ctx.partitionsBeingReassigned.put(tp, ressignCtx) // 记录进度
onPartitionReassignment(tp, ressignCtx) // Step2. 开始执行 reassign
}
}

// Step2. 单个分区执行 reassign
// - 向 RAR-OAR 发送 LeaderAndIsr 并等待新 replica 全部加入 ISR
// - 向 OAR-RAR 发送 StopReplica 并等待旧 replica 全部删除成功
// - reassign 结果发布到 zk
def onPartitionReassignment(tp: TopicAndPartition, reassignCtx: ReassignedPartitionsContext) {
val reassignedReplicas = reassignCtx.newReplicas
areReplicasInIsr(tp.topic, tp.partition, reassignedReplicas) match {
case false => // RAR-OAR(全新 replica)、 RAR+OAR(所有 replica)
val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- ctx.partitionReplicaAssignment(tp).toSet
val newAndOldReplicas = (reassignCtx.newReplicas ++ ctx.partitionReplicaAssignment(tp)).toSet
// 2.1 将 RAR+OAR 发布到 zk
updateAssignedReplicasForPartition(tp, newAndOldReplicas.toSeq)
// 2.2 递增 leader epoch,向 RAR+OAR 发送 LeaderAndIsr
updateLeaderEpochAndSendRequest(tp, ctx.partitionReplicaAssignment(tp), newAndOldReplicas.toSeq)
// 2.3 将 RAR-OAR 状态转移为 NewReplica
startNewReplicasForReassignedPartition(tp, reassignCtx, newReplicasNotInOldReplicaList)
/*... 等待 RAR-OAR 复制进度追赶上加入 ISR,而 Step1 中注册的 isr-change-listener 会重新调用 onPartitionReassignment*/
case true =>
// 2.4 RAR 全部在 ISR 中,复制进度已全部追赶上
val oldReplicas = ctx.partitionReplicaAssignment(tp).toSet -- reassignedReplicas.toSet // OAR-RAR
// 2.5 将 RAR 状态转移为 Online
reassignedReplicas.foreach(replica =>
replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(tp.topic, tp.partition, replica)), OnlineReplica))
// 2.6 若有必要则选出新 leader,比如现有的 leader 并不在 RAR 中
moveReassignedPartitionLeaderIfRequired(tp, reassignCtx)
// 2.7 执行 OAR-RAR 状态转移:Online -> Offline -> NonExistent,旧副本彻底下线
stopOldReplicasOfReassignedPartition(tp, reassignCtx, oldReplicas)
// 2.8 将 RAR 作为新的 AR 发布到 zk
updateAssignedReplicasForPartition(tp, reassignedReplicas)
removePartitionFromReassignedPartitions(tp) // 删除 /admin/reassign_partitions znode
ctx.partitionsBeingReassigned.remove(tp) // reassign 结束
// 2.9 广播 UpdateMetadata
sendUpdateMetadataRequest(ctx.liveOrShuttingDownBrokerIds.toSeq, Set(tp))
}
}

// 异步推进 reassign 完成的 isr-change-listener 逻辑如下
class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic: String, partition: Int, rar: Set[Int])
override def handleDataChange(dataPath: String, data: Object) {
val tp = TopicAndPartition(topic, partition)
ctx.partitionsBeingReassigned.get(tp) match {
case Some(ressignCtx) =>
zkUtils.getLeaderAndIsrForPartition(topic, partition) match {
case Some(leaderAndIsr) => // RAR & ISR 取交集
if(rar & leaderAndIsr.isr.toSet == rar) // RAR 已全部进入 ISR,一次性推进完成 reassign
controller.onPartitionReassignment(tp, ressignCtx)
}
}
}

Step2 举个例子,假设 OAR 为 {1,2,3},RAR 为 {4,5,6},则 zk 中 leader 和 ISR 变化过程如下:

1
2
3
4
5
6
7
AR                 leader/isr
{1,2,3} 1/{1,2,3} // reassign 开始前
{1,2,3,4,5,6} 1/{1,2,3} // 2.2 向 OAR+RAR 发送 LeaderAndIsr 请求
{1,2,3,4,5,6} 1/{1,2,3,4,5,6} // 2.4 等待 RAR 全部加入 ISR
{1,2,3,4,5,6} 4/{1,2,3,4,5,6} // 2.6 若 leader 在 RAR 中则触发新的 leader 选举
{1,2,3,4,5,6} 4/{4,5,6} // 2.7 将 OAR-RAR 下线,最终删除
{4,5,6} 4/{4,5,6} // 2.8 reassign 结果发布到 zk

至此,分析了 Kafka 做副本迁移的原理,即向新 replica 发送 LeaderAndIsr,向旧 replica 发送 StopReplica,再通过 isr-change-listener 异步完成整个 reassign 流程

  可见,reassign 整体进度取决于 RAR 中最慢的一个 replica,而新 replica 进入 ISR 的耗时又取决于要复制的数据量的多少(发起 Fetch 请求时 offset 设置为 0,会从 leader 最早的一条消息开始全量复制,直到跟上 LEO);所以我们在 reassign 前会先根据 topic 的消费情况,并确认没有回溯需求之后,尽可能短地临时缩短保留时间,等到消息被标记为 .deleted 后才开始执行 reassign,以此尽量缩短扩容耗时

   快手的 Kafka 团队修改了 reassign 机制实现平滑扩容,直接从 HW 开始复制,等到所有 consumer 都消费到这个 offset 后再加入 ISR,如此解决了全量复制导致的速度慢、IO 高的问题,但也引入了主从数据不一致的新问题,如果此新 replica 成为新 leader 将无法再回溯数据,故社区并未接受此方案,参考 KAFKA-8328


4. LeaderChangeListener

各 broker 上线后都会 watch /controller 节点的数据变化,及时更新 Controller 地址信息;若该节点被删除,则竞争创建该节点,成功者为新 Controller,实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 实现了创建 znode 来竞选 leader 的通用逻辑
class ZookeeperLeaderElector(onBecomingLeader: () => Unit, // 竞选为新 controller 的回调
onResigningAsLeader: () => Unit,/*..*/) { // 丢失 controller 身份后的回调
val leaderChangeListener = new LeaderChangeListener
def elect() {/*..*/ } // 首次启动或 /controller 被删除时调用

class LeaderChangeListener extends IZkDataListener {
def handleDataChange(dataPath: String, data: Object) {
val amILeaderBeforeDataChange = amILeader
leaderId = KafkaController.parseControllerId(data.toString)
if (amILeaderBeforeDataChange && leaderId != brokerId) // Controller 身份丢失
onResigningAsLeader() // 执行回调退出
}

def handleDataDeleted(dataPath: String) {
if(amILeader) onResigningAsLeader()
elect()
}
}
}

新 Controller 上线后会启动两个状态机,注册如上的大量 zk listener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def onControllerFailover() {
readControllerEpochFromZookeeper() // 1. 读取、递增 epoch 并发布到 /controller_epoch
incrementControllerEpoch(zkUtils.zkClient)

// 2. 注册一堆 zk listener
registerReassignedPartitionsListener() // /admin/reassign_partitions
registerIsrChangeNotificationListener() // /isr_change_notification
registerPreferredReplicaElectionListener() // /admin/preferred_replica_election
partitionStateMachine.registerListeners() // /brokers/topics, /admin/delete_topics // PSM
replicaStateMachine.registerListeners() // /brokers/ids // RSM

initializectx() // 3. 从 zk 加载 controller ctx
replicaStateMachine.startup() // 4. 启动 PSM, RSM,初始化各分区、各副本的状态
partitionStateMachine.startup() // 为所有 topic 注册分区数变化的 listener
ctx.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))

maybeTriggerPartitionReassignment() // 尝试触发被中断的 reassign, preferred 选举等任务
maybeTriggerPreferredReplicaElection()
sendUpdateMetadataRequest(ctx.liveOrShuttingDownBrokerIds.toSeq)

if (config.autoLeaderRebalanceEnable) {
autoRebalanceScheduler.startup() // 6. 启动 auto rebalance 定时任务
autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance, /*..*/)
}
deleteTopicManager.start() // 7. 启动删除 topic 的后台 deletion 线程
}

def onControllerResignation() {
// Controller 丢失身份,退出逻辑是如上的反向操作,不再赘述
}

总结

原理方面:

  • Controller 一对一维护着到其他节点的连接,通过 batch 机制来管理批量请求,通过任务队列解耦了主线程与 RequestSend 线程,来实现到多个节点的并行请求
  • 持有两个状态机,通过 zk listener 来得知 admin 发起的 topic 增删、优先副本选举、reassign 等任务,以及 ISR 扩张及收缩、broker 上下线等事件,推动状态转移,发出对应的请求,最终完成任务
  • 四种 Leader 选举策略,Controller 竞选等逻辑

设计方面:

  • 将分区、副本的状态变化抽离为状态机,确定的状态变化处理流程也是确定的,逻辑更清晰
  • 使用 zk 解耦 admin 与状态机,并存储状态变更的进度(如 reassign),实现 Controller 可靠切换