前言

由于 Replication 模块处理读写请求都使用了延迟任务,本文先捋清楚延迟任务是如何组织和执行的


一:TimeWheel

Kafka 中生产、消费都会涉及延迟任务:在未来某个时间点,执行某个操作

  • Produce 请求:要求在 request.timeout.ms 时限内,完成该消息在 acks 个副本之间的同步,若无法及时完成则返回超时错误
  • Fetch 请求:要求在 fetch.max.wait.ms 时限内,尽可能多地收集 fetch.min.bytes 大小的消息,若超时则有多少消息就返回多少

问题:普通的延迟任务,可直接使用 JUC 的 ScheduledExecutorService,但任务由内置线程池执行,用户无法干涉任务的生命周期(如提前删除未到期的任务);或使用 DelayedQueue 配合自定义线程池,但基于最小堆导致增删任务复杂度都为 O(logN),消息量过大的场景会降低吞吐

解决:时间轮,出队复杂度 O(1),入队复杂度 O(M),其中 M 是 bucket 数量

1.1 普通时间轮

两年前曾实现过简单的时间轮:wuYin/timewheel,逻辑如下:

  • 组件:设延迟精度为 N ms,则创建 1000/N 个 slot 数组,每个 slot 都是一个双向链表,存放延迟时间取余后落在此区间的任务
  • 任务执行:主 goroutine 起一个 1ms 的 Ticker 定时遍历 slot,由新的 goroutine1 去遍历 task 链表,若到期则从链表删除,放入 goroutine2 去执行,否则继续留在链表等待下一轮遍历

image-20200710222540463

现在看,如上设计存在 3 个明显缺陷:

  • goroutine 数量不可控,有泄露风险:goroutine 数量与任务到期密集程度成正相关,且无 Context 超时机制保证及时结束任务
  • 无任务执行时,Ticker 会轮询空转浪费 CPU 资源:特别是精度越小如微妙级,浪费越严重
  • slot 链表过长会导致出队复杂度退化为 O(N/M) ≈ O(N):双向链表的遍历复杂度为 O(N)

Kafka 的 TimingWheel 使用大小固定的线程池、大小可控的 DelayedQueue、时间轮分层等优秀设计解决了以上问题


1.2 TimingWheel

下图是现实时钟版的时间轮,假设启动时间为零时间戳,相关组件如下:

image-20200712103424323

关系说明:

  • task 和 entry 一对一绑定,而task.cancel() 会解除 task -> entry 的单向绑定,但 entry 还绑定到 task
  • entry 会在 list 间迁移复用:从旧 list 中删除,插入到新 list(升级或降级都会迁移)

上图涉及的相关组件如下:


1. TimerTask:用绝对时间戳描述执行时间点的 Runnable,子类将与 TimerTaskEntry 一一绑定

1
2
3
4
5
6
trait TimerTask extends Runnable {
val delayMs: Long
var entry: TimerTaskEntry = null
def setTimerTaskEntry(e: TimerTaskEntry) = { entry = e } // task <--> entry
def cancel() = { entry.remove() } // 提前取消任务,将 entry 从 list 中删除,task <-- entry
}

2. TimeTaskEntry:一对一为 TimerTask 封装链表节点的结构

1
2
3
4
5
6
7
private[timer] class TimerTaskEntry(val task: TimerTask,  val expirationMs: Long) {
var list: TimerTaskList = null
var next, prev: TimerTaskEntry = null
if (task != null) task.setTimerTaskEntry(this) // task <--> entry
def cancelled: Boolean = task.getTimerTaskEntry != this // true: task <-- entry
def remove() = { list.remove(this) } // 并发 cancel 时乐观重试,从 list 中删除自己
}

3. TimeTaskList:即一个 bucket,带哑节点的双向循环链表,由到期时间参差不一、但都在同一格子范围内的 entry 元素组成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
val expiration = new AtomicLong(-1L) // getDelayed(): expiration 先到期的 list 先出队
// bucket 的过期时间被成功更新,说明时钟被推进
def setExpiration(expirationMs: Long) = expiration.getAndSet(expirationMs) != expirationMs

def foreach(f: (TimerTask)=>Unit): Unit = {/*...*/} // 遍历检查 task<-->entry 则执行 f(task)
def flush(f: (TimerTaskEntry)=>Unit): Unit = {/*...*/} // 逐个执行 f(entry) 后删除 entry

def add(entry: TimerTaskEntry): Unit = { // 将 entry 迁移到此 list
entry.remove()
synchronized { entry.synchronized { // 顺序加锁防止死锁
if (entry.list == null) {/* append...*/}
}}
}
def remove(timerTaskEntry: TimerTaskEntry) = {/*...*/} // 重置 entry 的链表指针,从 list 中删除
}

4. TimingWheel:描述时间轮的精度、格子数、起始时间戳,维护不断递增的、整齐的当前时间指针

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long,  
taskCounter: AtomicInteger, // 全局任务数量
queue: DelayQueue[TimerTaskList]) { // 全局 list 延迟队列
val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }
var currentTime = startMs - (startMs % tickMs) // 修剪时间轮的指针,后续按照 tickMs 推进
var overflowWheel: TimingWheel = null // 上层时间轮,精度更宽

// 参数是 queue.poll() 出队的到期 list 中的 entry
def add(timerTaskEntry: TimerTaskEntry): Boolean = {
val expiration = timerTaskEntry.expirationMs
if (timerTaskEntry.cancelled) { // 1. 被提前取消
false
} else if (expiration < currentTime + tickMs) { // 2. 将在本次过期
false
} else if (expiration < currentTime + interval) { // 3.1 未过期,但还在本层时间轮范围内
val virtualId = expiration / tickMs
val bucket = buckets((virtualId % wheelSize.toLong).toInt)
bucket.add(timerTaskEntry)
// 指针前进,可能已经进入下一轮,尝试更新 list 过期时间,CAS 成功则添加到 queue 重新等待过期
if (bucket.setExpiration(virtualId * tickMs))
queue.offer(bucket)
true
} else
overflowWheel.add(timerTaskEntry) // 3.2 任务未过期,但时间溢出,由上层处理
}

def advanceClock(timeMs: Long): Unit = { // queue.pol() 到了过期的 list,则推进指针
if (timeMs >= currentTime + tickMs) {
currentTime = timeMs - (timeMs % tickMs) // 此处可能会越过多个连续的 bucket
if (overflowWheel != null) overflowWheel.advanceClock(currentTime) // 尝试推进上层时间轮
}
}
}

至此,分析了时间轮的各子组件:描述延迟任务的 TimerTask、封装 task 作为链表节点的 TimerTaskEntry、记录同一次 tick 的所有任务的 TimerTaskList、维护所有 bucket 的 TimingWheel

并且,整个 TimingWheel,没有后台推进指针的机制、也没有线程池执行 TimerTask,仅仅只是维护时间轮这种数据结构的状态。将数量有限的 bucket 放入 DelayedQueue 中,由调用方自行从队列 poll() 拉取到期的 bucket,自行处理其中的 entry


1.3 SystemTimer

Timer 接口描述了定时器的两个核心操作:

1
2
3
4
trait Timer {
def add(timerTask: TimerTask): Unit // 添加新的延迟任务
def advanceClock(timeoutMs: Long): Boolean // 将时钟推进到指定时间点
}

SystemTimer 封装了时间轮和线程池,实现了上述操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class SystemTimer(executorName: String,
tickMs: Long = 1, wheelSize: Int = 20, // 时间轮由下到上:20*1ms,20*20ms...
startMs: Long = System.currentTimeMillis) extends Timer {
val taskExecutor = Executors.newFixedThreadPool(1, /*...*/) // 使用无界队列异步执行到期的延迟任务
val delayQueue = new DelayQueue[TimerTaskList]() // 全局的延迟队列,各层级都在用
val taskCounter = new AtomicInteger(0)
val timingWheel = new TimingWheel(tickMs,wheelSize,startMs,taskCounter,delayQueue) // 第一层

// 1. 入队:计算延迟的绝对时间戳,封装成 TimerTaskEntry 再入队
override def add(timerTask: TimerTask): Unit =
addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + System.currentTimeMillis)

def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
if (!timingWheel.add(timerTaskEntry)) // 入队失败:任务被取消,或已到期
if (!timerTaskEntry.cancelled) // 任务到期则提交执行
taskExecutor.submit(timerTaskEntry.timerTask)
}

// 2. 等待任务到期并执行,推进时间轮指针
val reinsert = (timerTaskEntry: TimerTaskEntry) => addTimerTaskEntry(timerTaskEntry)
override def advanceClock(timeoutMs: Long): Boolean = {
var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS) // 最快到期的 TimeTaskList
if (bucket != null) {
while (bucket != null) {
timingWheel.advanceClock(bucket.getExpiration()) // 用当前到期任务的时间戳,推进时间轮的指针
bucket.flush(reinsert) // 清空 list 中的任务:到期执行,否则降级到下层(实则从第一层逐层溢出)
bucket = delayQueue.poll() // 不是 take 不阻塞
}
true
} else false
}
}

注意,SystemTimer 本来必须要有 tick 逻辑,即要有后台线程一直执行 advanceClock 保证到期的任务及时出队,同时保证此线程在 SystemTimer.taskExecutor.shutdown() 前停止,保证不再提交任务。这部分工作挪到了 SystemTimer 的使用方 DelayedOperationPurgatory 去实现,先停止此线程再停止线程池,代码逻辑更清晰

1
2
3
4
5
6
class DelayedOperationPurgatory[T <: DelayedOperation] {
def shutdown() {
expirationReaper.shutdown() // SystemTimer
timeoutTimer.shutdown()
}
}

至此,分析了如何向时间轮提交新任务、如何及时执行到期的任务、如何推进时间轮指针


二:purgatory

2.1 DelayedOperation

TimerTask 精确描述了某个操作要延迟多久之后才执行,并初步提供 TimerTask.cancel() 方法来取消执行。但这还不够,如开始处理 Produce 请求时,提交了 request.timeout.ms 延迟的响应任务,若 acks 条件被提前满足,需要能立刻执行该延迟任务,不要等到超时,尽可能降低写延迟。因此,DelayedOperation 在 TimerTask 上扩展了立刻执行的功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
abstract class DelayedOperation(override val delayMs: Long) extends TimerTask with Logging {
private val completed = new AtomicBoolean(false)

// 执行延迟任务,执行成功返回 true
// 1. 经过 tryComplete() 验证,可被提前执行
// 2. 任务正常过期
def forceComplete(): Boolean = {
if (completed.compareAndSet(false, true)) { // CAS 保证任务只执行一次
cancel() // 从时间轮中删除
onComplete()
true
} else false
}

def tryComplete(): Boolean // 尝试提前执行
def onComplete(): Unit // 延迟任务的内容

def onExpiration(): Unit // 到期执行的回调
override def run(): Unit = // 任务到期,由 SystemTimer.taskExecutor 线程执行
if (forceComplete()) onExpiration()
}

2.2 Watchers

维护一组 DelayedOperation,都 watch 一个任意类型的 Key,二者是多对多的关系:一个 delayedOp 会 watch 多个 key,当任一 key 有事件发生时,都会尝试完成 delayedOp (场景见 3.1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class DelayedOperationPurgatory[T <: DelayedOperation](/*...*/) {
private class Watchers(val key: Any) {
val operations = new util.LinkedList[T]() // 一个 tp 可有序绑定多个 delayedProduce 延迟任务
def watch(t: T) = operations synchronized operations.add(t) // 有新 Produce 请求到达,绑定新任务

def tryCompleteWatched(): Int = { // 遍历所有 dealyedOp,还未执行的尝试手动提前执行
operations synchronized {
val iter = operations.iterator()
while (iter.hasNext) {
val delayedOp = iter.next()
if (delayedOp.isCompleted) // 此 delayedOp 也绑定到了其他 key,已被其他 key 触发执行完毕
iter.remove()
else if (delayedOp synchronized delayedOp.tryComplete()) // 尝试提前执行
iter.remove()
}
}
if (operations.size == 0) removeKeyIfEmpty(key, this) // 删除 key
}
}
}

2.3 DelayedOperationPurgatory

持有 Timer 及推进 Timer 时钟的后台线程,维护 Key 与 Watchers 的对应关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class DelayedOperationPurgatory[T <: DelayedOperation](timeoutTimer: Timer, /*...*/) {
val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))
private class ExpiredOperationReaper extends ShutdownableThread(/*...*/) {
override def doWork() = advanceClock(200L) // 后台 reaper 线程负责推进时钟
}
new ExpiredOperationReaper().start()

def tryCompleteElseWatch(op: T, watchKeys: Seq[Any]): Boolean = { // 建立 watch 关系
var isCompletedByMe = op synchronized op.tryComplete()
if (isCompletedByMe) return true // 尽量不将 delayedOp 放入时间轮
for(key <- watchKeys)
watchForOperation(key, op)
timeoutTimer.add(op) // 放入时间轮等待
}
def checkAndComplete(key: Any) = watchersForKey.get(key).tryCompleteWatched() // 调用时机见下节
}

如上分析了 purgatory 的实现,以下分析其应用


三:purgatory 应用

3.1 DelayedProduce

背景:对于 acks=-1 的 Produce 请求,broker 先把消息写入本地 Log,再等待所有 ISR Replicas 将此次请求的多个 tp 的消息都 Fetch 后,才会响应 producer,或上述流程超过 request.timeout.ms 后返回超时错误

问题:当最后一个 tp 的消息被复制完毕后如何触发响应?又如何检测超时?

解决

  • 使用 DelayedProduce 维护一次 Produce 请求的多个 tp 复制任务,带超时地放入 purgatory 管理
  • 对某个 tp,当 ISR Replicas 都复制完毕后推进 HW,有序触发该 tp 上的所有 DelayedProduce 检查复制状态
  • 对某个 DelayedProduce,当其上所有 tp 的复制状态结束后,提前执行回调响应 Produce 请求

image-20200725141609144

实现

handleProduce API 开始分析,其封装了 DelayedProduce 延迟任务要执行的 callback

1
2
3
4
5
6
7
8
9
def handleProducerRequest(request: RequestChannel.Request) { /*...*/
def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) { /*...*/
if (produceRequest.acks == 0) //1. acks=0:超时等错误则断开连接、无错误则无需响应
/*...*/
else // 2. acks=1 或 -1:写出响应
requestChannel.sendResponse(new RequestChannel.Response(/*...*/, responseStatus))
}replicationManager
replicaManager.appendMessages(/*...*/, sendResponseCallback)
}

随后 replicaManager 将请求中各 tp 的消息 append 到对应的 Log 中,当acks=-1时生成 DelayedProduce 延迟任务,放入 purgatory 中管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def appendMessages(timeout: Long, requiredAcks: Short,
messagesPerPartition: Map[TopicPartition, MessageSet],
responseCallback: Map[TopicPartition, PartitionResponse] => Unit) {
// 各 tp 都将消息写本地的 Log activeSegment,拿到 firstOffset,lastOffset 等写入结果
val localResults: Map[TopicPartition, LogAppendResult] = appendToLocalLog(messagesPerPartition)
val produceStatus = localResults.map {case (topicPartition, result) =>
topicPartition -> ProducePartitionStatus(
result.info.lastOffset + 1, // 复制进度的目标,当 HW 推进至此即复制完成
new PartitionResponse( // leader 写本地后就提前生成 produceResponse
result.errorCode, result.info.firstOffset, result.info.timestamp))
}
if (delayedRequestRequired(requiredAcks, messagesPerPartition, localResults)) { // acks=-1
val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback)
// 多个 tp 对应多个 tpOpKey,一个 delayedProduce 则要 watch 多个 tpOpKey
val producerRequestKeys = messagesPerPartition.keys.map(new TopicPartitionOperationKey(_))
delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
} else { // acks=1 or 0,无需等待复制
val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
responseCallback(produceResponseStatus) // 直接执行 callback 响应
}
}

回到 DelayedProduce 延迟任务本身,其记录了各 tp 消息的复制目标、完成状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
case class ProducePartitionStatus(requiredOffset: Long, responseStatus: PartitionResponse) {
var acksPending = false // 描述一个 tp 的响应数据、复制目标及完成状态
}
case class ProduceMetadata(produceRequiredAcks: Short,
produceStatus: Map[TopicPartition, ProducePartitionStatus]) {}

// 管理一个 Produce 请求下,各 tp 的复制目标、完成状态
class DelayedProduce(responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
produceMetadata: ProduceMetadata,/*...*/) extends DelayedOperation(delayMs){
// 将各 tp 的复制完成状态 ackPending 预设为 true,错误码预设为 REQUEST_TIMED_OUT
produceMetadata.produceStatus.foreach {/*...*/}

// 尝试完成当前 delayedProduce 请求
override def tryComplete(): Boolean = {
produceMetadata.produceStatus.foreach { // 逐个检查 tp 的复制状态
case (topicAndPartition, status) =>
if (status.acksPending) {
val partitionOpt = replicaManager.getPartition(topic, topicAndPartition.partition)
val (hasEnough, errorCode) =
partitionOpt.get.checkEnoughReplicasReachOffset(status.requiredOffset) // 见下一节
if (hasEnough) { // HW 已推进覆盖 requiredOffset,表明同步任务已完成
status.acksPending = false
status.responseStatus.errorCode = Errors.NONE.code // 清除 errorCode,无需补充其他信息
}
}
}
if (!produceMetadata.produceStatus.values.exists(p => p.acksPending))
forceComplete() // 所有 tp 都已复制成功,
else false
}

// 副本复制提前完成,强制执行;或到期后超时执行
override def onComplete() = // 最终执行 callback 响应
responseCallback(produceMetadata.produceStatus.mapValues(status=>status.responseStatus))
}

至此,分析了 Produce 请求的处理逻辑:

  • 及时处理:DelayedProduce 通过 watch 到多个 tpOpKey,当某个 tp 的 HW 因为 Fetch 请求被推进时会尝试完成其下的多个 DelayedProduces,如此多个 tp 最终推进该 DelayedProuce 被及时完成,提前响应
  • 处理超时:purgatory 的 timer 定时执行,最终未完成复制任务的 tp 返回超时错误

3.2 DelayedFetch

背景:对 Fetch 请求,在 fetch.max.wait.ms 时限内,尽可能多地读 fetch.min.bytes 大小的消息后返回

问题:当收集到足够多数据时,如何提前响应?如何检测超时?

解决

  • 使用 DelayedFetch 延迟任务管理多个 tp 的 Fetch 任务,带超时地放入 purgatory 管理
  • 当某个 tp 有新消息可读时,触发其上的多个 DelayedFetch 任务继续累积消息
  • 对某个 DelayedFetch,当各 tp 累积到的消息大小足够时,提前执行回调响应 Fetch 请求

image-20200725215544307

实现

handleFetch API 开始分析,其封装了 DelayedFetch 延迟任务要执行的 callback

1
2
3
4
5
6
7
8
def handleFetchRequest(request: RequestChannel.Request) { /*...*/
// 已累积到足够多的消息,现在返回
def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) {
val response = FetchResponse(fetchRequest.correlationId, responsePartitionData, fetchRequest.versionId, delayTimeMs)
requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response)))
}
replicaManager.fetchMessages( fetchRequest.maxWait.toLong, fetchRequest.minBytes, /*...*/, sendResponseCallback)
}

随后 replicaManager 将为各 tp,从本地对应 Log 中从指定 fetch_offset 处开始读取至多 max_bytes 大小的消息,若消息不足则生成 DelayedFetch 延迟任务,交由 purgatory 管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def fetchMessages(timeout: Long,      // fetch.max.wait.ms 
replicaId: Int, // replica broker.id
fetchMinBytes: Int, // fetch.min.bytes
fetchInfo: immutable.Map[TopicAndPartition, PartitionFetchInfo], // max.partition.fetch.bytes
responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) {
val fetchOnlyCommitted: Boolean = !Request.isValidBrokerId(replicaId) // replica or consumer?
val logReadResults = readFromLocalLog(fetchOnlyFromLeader, fetchOnlyCommitted, fetchInfo)
if(Request.isValidBrokerId(replicaId)) { // fetch 请求来自 replica 而非 consumer
// 1. 更新对应 replica 的 LEO, caughtUpTime
// 2. 尝试扩张 ISR、推进 HW
// 3. 尝试完成 delayProducePurgatory 中该 tp 的任务 // 实现两种请求的协作,非常关键
updateFollowerLogReadResults(replicaId, logReadResults)
}
val bytesReadable = logReadResults.values.map(_.info.messageSet.sizeInBytes).sum
val errorReadingData: Boolean = logReadResults.values.foldLeft(false)((errorIncurred, readResult) =>
errorIncurred || (readResult.errorCode != Errors.NONE.code))

// fetch.max.wait.ms<0、fetch.min.bytes 提前满足、读日志发生错误
if(timeout <= 0 || fetchInfo.size <= 0 || bytesReadable >= fetchMinBytes || errorReadingData) {
val fetchPartitionData = logReadResults.mapValues(
result => FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet))
responseCallback(fetchPartitionData)
} else {
val fetchPartitionStatus: Map[TopicAndPartition, FetchPartitionStatus] = logReadResults.map {
case (topicAndPartition, result) => // 为各 tp 封装读取结果
(topicAndPartition, FetchPartitionStatus(result.info.fetchOffsetMetadata, fetchInfo(topicAndPartition)))
}
// 生成 delayedFetch 延迟任务
val fetchMetadata = FetchMetadata(/*...*/, fetchPartitionStatus)
val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, responseCallback)
val delayedFetchKeys = fetchPartitionStatus.keys.map(new TopicPartitionOperationKey(_)).toSeq

// watch 多个 tpOpKey 后,放入 purgatory 管理
delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
}
}

def readFromLocalLog(/*...*/): Map[TopicAndPartition, LogReadResult] = {
readPartitionInfo.map { // 逐个分区读日志
case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) =>
val partitionDataAndOffsetInfo = {
val localReplica = getLeaderReplicaIfLocal(topic, partition)
val initialLogEndOffset = localReplica.logEndOffset // leader LEO
val maxOffsetOpt = if (readOnlyCommitted)
Some(localReplica.highWatermark.messageOffset) // 限制 consumer 只能读到 HW 之前的消息
else None
val fetchDataInfo = localReplica.log.read(offset, fetchSize, maxOffsetOpt) // 读取指定区间的消息

// replica 开始从 LEO 读取,表明复制进度已完全赶上,即 in-sync-replica
val readToEndOfLog = initialLogEndOffset.messageOffset <= fetchDataInfo.fetchOffsetMetadata.messageOffset

LogReadResult(fetchDataInfo, localReplica.highWatermark.messageOffset, fetchSize, readToEndOfLog, None)
(TopicAndPartition(topic, partition), partitionDataAndOffsetInfo)
}
}

回到 DelayedFetch 任务本身,维护了各 tp 要返回的状态、各自累积消息的字节数、各自的累积目标:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// 描述一个 tp 的 fetch 目标、已累积到的消息集
case class PartitionFetchInfo(offset: Long, fetchSize: Int)
case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInfo: PartitionFetchInfo)
case class FetchMetadata(fetchMinBytes: Int, /*...*/
fetchOnlyCommitted: Boolean, // 记录每个 tp 的当前 fetch 进度
fetchPartitionStatus: Map[TopicAndPartition, FetchPartitionStatus]) {

class DelayedFetch(delayMs: Long, // fetch.max.wait.ms
fetchMetadata: FetchMetadata,
replicaManager: ReplicaManager,
responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit)
extends DelayedOperation(delayMs) {

override def tryComplete() : Boolean = {
var accumulatedSize = 0
// 逐个 tp 检查是否有新消息可读
fetchMetadata.fetchPartitionStatus.foreach {
case (topicAndPartition, fetchStatus) =>
val lastFetchOffset = fetchStatus.startOffsetMetadata
if (lastFetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
// 查找 tp 的 leader
val replica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
val endOffset =
if (fetchMetadata.fetchOnlyCommitted)
replica.highWatermark // consumer 只能读到 HW
else
replica.logEndOffset // 而 replica 可读到 LEO
// 距上次 fetch 后,又有新消息可读
if (endOffset.messageOffset != lastFetchOffset.messageOffset) {
if (endOffset.onOlderSegment(lastFetchOffset)) {
// 1. offset 向上溢出:leader 的 activeSegment 日志发生了截断
return forceComplete() // 最多读到 LEO,即溢出的 offset 实际无效
} else if (lastFetchOffset.onOlderSegment(endOffset)) {
// 2. offset 落后:在不同 segment 上,已生成新 activeSegment
// 忽略 fetch.min.size 的目标提前返回,保证在同一个 FileMessageSet,才能使用 zero copy 技术
return forceComplete()
} else if (lastFetchOffset.messageOffset < endOffset.messageOffset) {
// 3 依旧在同一个 segment 上,可继续累加消息
accumulatedSize += math.min(endOffset.positionDiff(lastFetchOffset), fetchStatus.fetchInfo.fetchSize)
}
}
}
if (accumulatedSize >= fetchMetadata.fetchMinBytes)
forceComplete() // 消息已足够,提前响应
else
false
}

override def onComplete() {
// 最终读取各 tp 的文件区间日志
val logReadResults = replicaManager.readFromLocalLog(
fetchMetadata.fetchOnlyLeader,
fetchMetadata.fetchOnlyCommitted,
fetchMetadata.fetchPartitionStatus.mapValues(status => status.fetchInfo)
)
val fetchPartitionData = logReadResults.mapValues(
result => // 进一步包装 response 后返回
FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet))
responseCallback(fetchPartitionData)
}
}

至此,分析了 Fetch 请求的处理逻辑:

  • 及时处理:DelayedFetch 通过 watch 多个 tpOpKey,当某个 tp 有新消息可读时,会尝试完成其下的多个 DelayedFetch 任务,如此多个 tp 最终推进该 DelayedFetch 被及时完成,最终提前响应
  • 处理超时:purgatory 的 timer 定时执行,各 tp 有多少消息就返回多少

总结

本文梳理了 Kafka 时间轮,相关组件如下:

  • TimerTask 与 TimerTaskEntry 绑定实现任务的提前取消
  • TimerTaskList 由 DelayedQueue 管理实现任务的到期检测
  • TimingWheel 分层存放不同过期时间区间的 TimerTaskList,虽然不同层但都放入同一个 DelayedQueue

对于时间轮的使用方 SystemTimer,持有执行延迟任务的线程池,负责从 DelayedQueue 中 poll 到最快到期的 TimerTaskList,再检查其中的 TimerTaskEntry 是否过期,过期则交由线程池执行,未过期则尝试降级到下一层时间轮。可见其中 DelayedQueue 是关键设计:

  • 实现任务降级:所有层时间轮的 List 都交由 DelayedQueue 管理,如此 SystemTimer 才能将任务逐级降层,直到最底层,出队提交给线程池执行
  • 长度可控:按默认设计 20ms,400ms,8s 的三层时间轮,整个 DelayedQueue 长度上限为 pow(20,3) = 8000

介绍 DelayedOperation 对 TimerTask 增强可提前完成的功能,引出 DelayedOperationPurgatory 类,其持有 SystemTimer 和后台推进时钟的 reaper 线程,通过 tpOpKey 绑定到 Watchers 的机制,管理这大量的延迟任务。特别是 replicaManager 持有的 producerPurgatory 负责维护全局的 DelayedProduce 延迟任务,当各 tp 的 HW 被推进时触发执行,最终实现提前响应,亦或等待时间轮超时执行返回错误,consumerPurgatory 同理


综上,时间轮的优秀设计值得借鉴学习,purgatory 作为时间轮的使用方,维护着多个 delayedOperation 的进度并负责提前执行,对应关系较多需仔细梳理