前言

Kafka Server 可分为七个大模块:Network, API, Log, Replication, Controller, Coordinator, ACL,参考《Kafka权威指南》§5.5,本文着重梳理 Log 模块各种机制的实现逻辑,笔记仅供个人 Review


一:准备

1.1 概念

日志模块的文件概念较多,对应关系如下:

image-20200606182125385

1.2 配置

可参考全局配置KafkaConfig.scala,日志模块独有的配置会被合并到LogConfig.scala,可分为四类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 配置项 [后备配置项]                # 含义                             # 默认值
## 1. 基础配置
log.dirs [log.dir] 存放日志数据的目录列表或单个目录 null [/tmp/kafka-logs]
log.segment.bytes 单个.log文件大小上限 1GB
log.index.size.max.bytes 单个.index文件大小上限 10MB
log.index.interval.bytes 向.log写多少数据后,写一条索引 4KB
log.roll.ms [.hours] 时间维度(mtime)rolloverlog的周期 null [7d]
log.roll.jitter.ms [.hours] 提前触发rollover的抖动时间 null [0]
log.preallocate 创建新segment时是否直接预分配整个文件 FALSE
log.segment.delete.delay.ms 文件被标记为.deleted后,异步删除的延迟 60s
num.recovery.threads.per.data.dir 启动时单个log.dir用于重建Log的线程池大小 1
log.message.format.version broker 指定的日志消息版本 KAFKA_0_10_0_IV1 (v1)
log.message.timestamp.type 消息的时间戳类型 CreateTime
compression.type broker 指定的消息压缩方式 producer

## 2. flush 策略
log.flush.interval.messages Log 新写入多少条消息,触发 [reocveryPoint, LEO] 间的 segments 执行 fsync 强制落盘 Long.MaxValue
log.flush[.scheduler].interval.ms 后台线程执行 flush 任务的周期 null; Long.MaxValue
log.flush.offset.checkpoint.interval.ms 后台线程对各 log.dir 下 recovery-point-offset-checkpoint 文本文件的更新周期 1min

## 3. retention 策略
log.retention.check.interval.ms 是否有 log 要清理的检查周期 5min
log.retention.ms [.minutes, .hours] 时间维度: mtime+retention.ms < now 的日志将被删除 null [null, 7d]
log.retention.bytes 磁盘空间维度:从最老的 segment 开始删除,直到用量低于此阈值 -1

## 4. compaction 策略
log.cleaner.enable 是否启用 cleaner 线程做 compaction true
log.cleanup.policy delete 删除超出 retention 策略的日志;compact 则做去重 delete
log.cleaner.min.cleanable.ratio 触发 compact 的最低 dityLogSize/logSize 占比 50%
log.cleaner.backoff.ms 无日志需 compact 时 cleaner 线程的休眠时间 15s
log.cleaner.threads 全局执行日志 compact 的后台线程数 1
log.cleaner.io.max.bytes.per.second 所有 cleaner 线程的磁盘 IO 吞吐的总上限,超过此阈值则线程将被限流 Double.MaxValue
log.cleaner.dedupe.buffer.size 所有 cleaner 线程的 OffsetMap 所能占用的最大内存 128MB
log.cleaner.io.buffer.size 单个 cleaner 线程读、写 log 数据时能使用的缓冲区内存大小 512KB
log.cleaner.io.buffer.load.factor 限制 OffsetMap 的负载上限 0.9
log.cleaner.delete.retention.ms firstDirtyOffset 之前的 clean log,若 mtime 在阈值内则暂不删除 24h


# 勿混淆 retention 和 cleaner 配置,前者会直接删除 log,后者只做 compaction 但不删除
# 某些配置项间隐含对应关系,如单个 log 上限默认 1GB,按每写 4KB 日志就写 1 条索引计算
>>> KB=1024; MB=1024*KB; GB=1024*MB
>>> (1*GB)/(4*KB)*8/MB # 每条索引占 8 字节
2.0 # 1GB 的 .log 最终至多对应 2MB .index 数据

二:MessageSet

2.1 Message

描述一条消息。参考文档:A Guide To The Kafka Protocol # Message sets,消息有以下字段:

字段 解释 v0 备注 v1 备注
CRC 余下字段字节数据的 CRC32 Checksum
Magic 标识消息版本做兼容 0 1
Attributes 消息属性,低 3 位标识压缩类型,第 4 位标识时间戳类型 只用低 3 位 低 4 位都用
[Timestamp] 消息时间戳:CreateTime, LogAppendTime 无此字段 v1 新增
KeyLength 消息 Key 的字节数,无 key 则为 -1
[Key] 可选
ValueLength 消息 Value 的字节数,空消息为 -1
[Value] 可选

每条消息都会有 Offset 和 MessageSize 两个前缀字段,称为 LOG_OVERHEAD,来标识消息的偏移量:

image-202005028175617052


2.2 Offset

在不同场景下 offset 字段的含义不同:

  • 在 producer 侧,offset 仅用于表示消息顺序
    • Inner Offset:batch 会为内部消息从 0 递增分配 offset,仅在 batch 内有意义
    • Wrapper Offset:若启用压缩,则压缩后的 batch 数据被写入一条 wrapper 消息的 value 字段,其 offset 等于 batch 中最后一条消息的 Inner Offset
  • 在 broker 侧,写日志前会重写 offset
    • 无压缩:batch 消息的 Inner Offset 会被逐条、有序地更新为 Absolute Offset(分区唯一 offset)
      image-202005028154137678
    • 有压缩:仅 Wrapper Offset 会被更新为最后一条内层消息的 Absolute Offset
      image-202005028154257376

验证:单分区的 topic1, topic2,分别不带压缩、带 gzip 压缩地写 5*10000 条消息

1
2
3
4
5
6
7
8
// props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); // 仅 topic2 使用

final List<String> msgs = Arrays.asList("AAA", "BBB", "CCC", "DDD", "EEE");
for (int i = 0; i < 10000; i++) {
for (String msg : msgs)
future = producer.send(new ProducerRecord<Object, String>("topic1", msg));
future.get();
}

写入完毕后,日志分布如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# server.properties # log.dirs=/data1,/data2; log.segment.bytes=1048576
➜ ~ tree -sh -a /data1 /data2
/data1
├── [ 0] cleaner-offset-checkpoint # 记录各 tp 已做 compaction 的最新 offset
├── [ 0] .lock # 数据目录锁文件
├── [ 57] meta.properties # 存储 broker.id,标记此数据文件所属 broker
├── [ 19] recovery-point-offset-checkpoint # 记录各 tp 已落盘的最新 offset
├── [ 19] replication-offset-checkpoint # TODO
└── [4.0K] topic2-0 # topic2 分区 0 的 Log 目录
├── [2.0K] 00000000000000000000.index # 第一个 segment 的索引文件
├── [1024K] 00000000000000000000.log # 第一个 segment 的日志文件,已满 100MB
├── [ 10M] 00000000000000037385.index # 开启预分配,故为 10MB
└── [341K] 00000000000000037385.log # 正在写入消息的日志文件 # 文件名:内部第一条消息的绝对 offset
/data2
├── [ 0] cleaner-offset-checkpoint
├── [ 0] .lock
├── [ 57] meta.properties
├── [ 0] recovery-point-offset-checkpoint
├── [ 19] replication-offset-checkpoint
└── [4.0K] topic1-0 # Log 会分布在各数据目录中
├── [1.9K] 00000000000000000000.index
├── [1024K] 00000000000000000000.log
├── [ 10M] 00000000000000028335.index
└── [783K] 00000000000000028335.log

脚本读取 .log 文件中的消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def parse_uint(ba: bytearray, offset: int, size: int) -> int:
return int.from_bytes(ba[offset:offset + size], byteorder='big', signed=False)

class MessageV1:
def __init__(self, ba: bytearray): # IndexError ignored #
self._offset = parse_uint(ba, 0, 8)
self._msg_size = parse_uint(ba, 8, +4)
self._crc = '{:x}'.format(parse_uint(ba, 12, +4))
self._magic = parse_uint(ba, 16, +1)
self._attr = parse_uint(ba, 17, +1)
self._timestamp = parse_uint(ba, 18, +8)
self._key_len = parse_int(ba, 26, +4)
self._key = None if self._key_len == -1 else ba[30:30 + self._key_len]
vo = 30 if self._key is None else 30 + self._key_len # value offset
self._val_len = parse_int(ba, vo, +4)
self.size = vo + 4 + self._val_len
self._val = None if self._val_len == -1 else ba[vo + 4:self.size]
def __str__(self):
return 'offset: {}, key: {}, value: {}'.format(self._offset, self._key, self._val)

def main():
with open('/data2/topic1-0/00000000000000028335.log', 'rb') as f: # 读非压缩消息
ba = bytearray(f.read()); i = 0
while len(ba) > 0 and i < 6: # 读一个 batch 的 5 条消息、下一个 batch 的第 1 条消息
msg = MessageV1(ba); ba = ba[msg.size:]; i+=1; print(msg)

with open('/data1/topic2-0/00000000000000037385.log', 'rb') as f: # 读压缩消息
ba = bytearray(f.read())
wrapper_msg = MessageV1(ba)
print('\nwrapper offset: ', wrapper_msg._offset) # 外层消息的 offset
ba = gzip.decompress(wrapper_msg._val) # 需手动解压
while len(ba) > 0:
msg = MessageV1(ba); ba = ba[msg.size:]; print(msg)

解析结果:
image-202005028175014676

至此,分析了消息中各字段的含义,尤其是 offset 字段分别在无压缩消息、压缩消息的存储方式


client 只有内存消息,用 MemoryRecords 表示,但在 broker 端,消息需在内存中重写 offset,也需从文件中读写,故抽象出 MessageSet 类表示消息集合:

1
2
3
4
5
6
7
abstract class MessageSet extends Iterable[MessageAndOffset] {
def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Int): Int // 消息写出
def iterator: Iterator[MessageAndOffset] // 逐条读消息
}

class ByteBufferMessageSet(val b: ByteBuffer) extends MessageSet // 內存消息集,类似 MemoryRecords
class FileMessageSet (var file: File, /*...*/) extends MessageSet // 文件消息集,读写 .log 中的消息

2.3 FileMessageSet

使用 java.io.File 表示 .log 日志文件或文件分片;负责日志文件创建、消息写入读出、消息查找、外层迭代

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class FileMessageSet (@volatile var file: File,             // .log 日志文件
private[log] val channel: FileChannel, //
private[log] val start: Int, // [start, end) 位置区间的文件分片
private[log] val end: Int,
isSlice: Boolean) extends MessageSet with Logging {
private val _size = /*...*/ // 文件大小,用作 rollover

override def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = {
val position = start + writePosition
val count = math.min(size, sizeInBytes)
val bytesTransferred = (destChannel match {
case tl: TransportLayer => tl.transferFrom(channel, position, count) // 写出:零拷贝,高性能
case dc => channel.transferTo(position, count, dc) // disk -> kernal buffer -> socket buffer
}).toInt
}

override def iterator = iterator(Int.MaxValue)
def iterator(maxMessageSize: Int): Iterator[MessageAndOffset] = {
new IteratorTemplate[MessageAndOffset] { // 带状态标识的简化版迭代器
override def makeNext(): MessageAndOffset = {/*...*/} // 读取文件数据,记录读取位置,返回下一条外层消息,不会重写
}
}

def append(messages: ByteBufferMessageSet) { // 写入:messages 是已被重写的消息,可直接追加写入
val written = messages.writeTo(channel, 0, messages.sizeInBytes)
}
}

object FileMessageSet { // 创建或打开文件,准备读写
def openChannel(file: File, mutable: Boolean, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false): FileChannel = {/*...*/}
}

2.4 ByteBufferMessageSet

使用 ByteBuffer 表示一批内存消息;提供解压校验,分配 offset,更新 timestamp,压缩方式转换,外层消息迭代、内层消息迭代等功能

1
class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet {/*...*/}

内层迭代:输入一条外层消息,返回解压后的、重写后的、内层消息的迭代器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
object ByteBufferMessageSet {  
def deepIterator(wrapperMessageAndOffset: MessageAndOffset): Iterator[MessageAndOffset] = {
new IteratorTemplate[MessageAndOffset] {
val MessageAndOffset(wrapperMessage, wrapperMessageOffset) = wrapperMessageAndOffset // case class exacter

// 1. magic:为 1 则统一使用 wrapper msg 的时间戳、时间戳类型
val wrapperMessageTimestampOpt: Option[Long] = if (wrapperMessage.magic > MagicValue_V0) Some(wrapperMessage.timestamp) else None
val wrapperMessageTimestampTypeOpt: Option[TimestampType] = if (wrapperMessage.magic > MagicValue_V0) Some(wrapperMessage.timestampType) else None

// 2. codec:加载 compressor 做解压读
val inputStream = new ByteBufferBackedInputStream(wrapperMessage.payload) // 持有压缩后所有消息的 ByteBuffer
val compressed = new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, wrapperMessage.magic, inputStream))
val messageAndOffsets = if (wrapperMessageAndOffset.message.magic > MagicValue_V0) {
val innerMessageAndOffsets = new util.ArrayDeque[MessageAndOffset]()
while (true)
innerMessageAndOffsets.add(readMessageFromStream()) // 一次性解压所有内层消息,放入队列
Some(innerMessageAndOffsets)
} else None

var lastInnerOffset = -1L
private def readMessageFromStream(): MessageAndOffset = { /* 1). 重写时间戳,与 wrapper 整体消息对齐 */
// 从 compressed 中解压读取一条消息返回
}

// 迭代 messageAndOffsets 队列读消息
override def makeNext(): MessageAndOffset = {
messageAndOffsets match {
case Some(innerMessageAndOffsets) =>
innerMessageAndOffsets.pollFirst() match {
case null => allDone()
case MessageAndOffset(message, offset) =>
val relativeOffset = offset - lastInnerOffset // 负值,相对于最后一条内层消息的负偏移量
val absoluteOffset = wrapperMessageOffset + relativeOffset // 得出该内层消息的绝对 offset
new MessageAndOffset(message, absoluteOffset) /* 2). 重写 offset */
}
case None =>
readMessageFromStream()
}
}
}
}
}

对于 Producer 发来的 MemoryRecords,需做三件事

  • 消息校验:
    • 协议校验:消息版本需与 log.message.format.version 一致,否则重写兼容
    • key 校验:若 broker 配置 cleanup.policy=compact,则消息须有 key
    • 时效性校验:使用 CreateTime 且 broker 超过 message.timestamp.difference.max.ms 才收到消息,则认为该消息已失效
    • CRC 校验:对收到的消息计算 CRC,若结果不等于 crc 字段值,则消息已损坏
  • 消息重写:
    • 分配 offset:无压缩消息的 offset 字段逐条重写为 AO,压缩消息只有外层消息会重写为 AO
    • 更新 timestamp
      • CreateTime:若使用压缩,则外层消息的 timestamp 是所有内层消息 timestamp 的最大值
      • LogAppendTime:重写为当前时间戳
    • 重新压缩:若 producer 与 broker 使用的压缩算法不同,则会在解压后重新压缩
  • 协议兼容:v0 消息会被重写 magic, attribute 字段,并插入 timestamp 字段,重新计算 CRC
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def validateMessagesAndAssignOffsets(offsetCounter: LongRef,
now: Long,
sourceCodec: CompressionCodec, // producer 的 ompression.type
targetCodec: CompressionCodec, // broker 的 compression.type
compactedTopic: Boolean = false,
messageFormatVersion: Byte = Message.CurrentMagicValue,
messageTimestampType: TimestampType,
messageTimestampDiffMaxMs: Long): (ByteBufferMessageSet, Boolean) = {
// 1. 无压缩
if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
if (!isMagicValueInAllWrapperMessages(messageFormatVersion)) {
// 1.1 重写兼容 v0 消息
(convertNonCompressedMessages(offsetCounter, compactedTopic, now, messageTimestampType, messageTimestampDiffMaxMs, messageFormatVersion), true)
} else {
// 1.2 直接遍历消息,校验、重写
(validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter, now, compactedTopic, messageTimestampType, messageTimestampDiffMaxMs), false)
}
} else {
// 2. 压缩消息
var inPlaceAssignment = sourceCodec == targetCodec && messageFormatVersion > Message.MagicValue_V0 // 是否需重新压缩
val validatedMessages = new mutable.ArrayBuffer[Message]
this.internalIterator(isShallow = false).foreach {/*...*/} // 解压、校验内层消息,收集到 validatedMessages 队列

if (!inPlaceAssignment) {
/*...*/
(new ByteBufferMessageSet(compressionCodec = targetCodec, // 2.1 重新压缩,返回新的 ByteBufferMessageSet
offsetCounter = offsetCounter,
wrapperMessageTimestamp = wrapperMessageTimestamp,
timestampType = messageTimestampType,
messages = validatedMessages: _*), true) // 将 validatedMessages 重新压缩
} else {
buffer.putLong(0, offsetCounter.addAndGet(validatedMessages.size) - 1) // 最后一条内层消息的 AO
/*...*/ // 2.2 无需重新压缩,重写外层消息即可
(this, false)
}
}
}

至此,分析了消息集在内存、文件中的表示,外层、内层迭代的实现,消息的校验、重写和协议兼容

三:Log

3.1 LogSegment

由 .log 和 .index 组成的单个日志段。前者由 FileMessageSet 读写,后者由 OffsetIndex 读写,文件名都是FileMessageSet 中第一条消息的 AO

1
2
3
4
5
6
7
class LogSegment(/*...*/) {
this(
new FileMessageSet(file = Log.logFilename(dir, startOffset), // 在 log 目录下创建 .log 文件
fileAlreadyExists, initFileSize, preallocate = preallocate),
new OffsetIndex(Log.indexFilename(dir, startOffset), startOffset, maxIndexSize), // .index
startOffset, indexIntervalBytes, rollJitterMs, time)
}

1. OffsetIndex

为了在 .log 中快速查找指定 offset 的消息,采用空间换时间策略,设计了稀疏索引:每向 .log 写一定量的消息后,向 .index 写入一条长度固定为 8 字节的 (offset:int, position:int) 索引项,查找时先对 .index 做二分查找,再从 .log 指定位置向后遍历,复杂度从 O(N) 降低到了 O(logN)

1
2
3
4
5
6
7
8
9
class OffsetIndex(var _file: File, val baseOffset: Long, val maxIndexSize: Int = -1) {
private[this] var mmap: MappedByteBuffer = {/*...*/} // 生成 .index 的 mmap 内存映射,加速读写

def append(offset: Long, position: Int) {
mmap.putInt((offset - baseOffset).toInt) // 消息 offset
mmap.putInt(position) // 消息在 .log 文件中的 position
}
def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int={/*...*/} // 二分查找向右最靠近的索引项
}

示例:查找 1004 消息,先从 .index 中二分查找到 1002 消息的 position,再向右遍历 2 条消息即可

image-20200606113108153

回到 LogSegment,有 3 个关键方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class LogSegment {
// 1. 写入:每向 .log 写 log.index.interval.bytes 字节的消息后,写一条索引项
def append(offset: Long, messages: ByteBufferMessageSet) {
if (messages.sizeInBytes > 0) {
if(bytesSinceLastIndexEntry > indexIntervalBytes) {
index.append(offset, log.sizeInBytes()) // 追加写 .log,索引值即 log 当前大小
this.bytesSinceLastIndexEntry = 0 // 重置计数
}
// 此处 ByteBufferMessageSet.writeTo 会将消息数据写入到 FileMessageSet 底层的 FileChannel
// 此刻还未落盘,而是等待 kafka-log-flusher 定期做 flush,或 rollover 时后台线程 flush
log.append(messages)
this.bytesSinceLastIndexEntry += messages.sizeInBytes
}
}
// 2. 查找
def translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition = {
val leftest = index.lookup(offset) // 读取索引文件,二分查找 offset
log.searchFor(offset, max(leftest.position, startingFilePosition)) // 再向后逐条查找日志
}
// 3. 重建索引:重启后需读取 recoveryPoint 之外的 .log,逐条消息校验 CRC,重建索引
def recover(maxMessageSize: Int):Int = {/*...*/} // 若检查到损坏消息则停止,返回失效消息字节数
}

至此,分析了日志的读写逻辑,索引原理


3.2 Log

描述一个 tp 的多个日志段,负责读写消息,触发 rollover 删除旧 segment,维护 recoveryPoint 描述落盘进度,维护 tp 的 LEO 递增为消息分配 AO;相关文件较多:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 相关文件
object Log {
val LogFileSuffix = ".log" // 1.1 日志文件,存储消息 /* segment */
val IndexFileSuffix = ".index" // 1.2 索引文件,存储稀疏索引 /* segment */
val DeletedFileSuffix = ".deleted" // 2. 被标记为删除的 segment
val CleanedFileSuffix = ".cleaned" // 3.1 正在做 compaction 的临时 segment
val SwapFileSuffix = ".swap" // 3.2 已完成 compaction 的结果 segment
val CleanShutdownFile = ".kafka_cleanshutdown" // 4. broker 退出前是否已将该数据目录下各 log 的所有 segment 都落盘
}

// 主要字段
class Log(val dir: File, // tp 的 log 目录,如 /data1/topic2-0
var config: LogConfig, // 日志配置,比如 log.segment.bytes
var recoveryPoint: Long = 0L, // 第一条还未被 flush 的消息的 offset
scheduler: Scheduler) // 对 juc.ScheduledThreadPoolExecutor 的易用性封装
// segment 存储在跳跃表中实现快速查找,key 是其 baseOffset,最后一个负责写新消息,称为 activeSegment
val segments = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
// 只读的递增 LEO
var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt)
}

1. append

Produce 请求的消息数据交由 tp 的 Log 做校验、重写,最后写入 activeSegment

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = {
val appendInfo = analyzeAndValidateMessageSet(messages) // 1. 初步校验消息
val offset = new LongRef(nextOffsetMetadata.messageOffset) // 2. 从当前 LEO 开始分配 offset
val (validatedMessages, messageSizesMaybeChanged) =
validMessages.validateMessagesAndAssignOffsets(offset, now, // 3. 由 ByteBufferMessageSet 分配 offset,重写消息,重新压缩
appendInfo.sourceCodec, appendInfo.targetCodec,
config.compact,
config.messageFormatVersion.messageFormatVersion,
config.messageTimestampType, onfig.messageTimestampDifferenceMaxMs)
validMessages = validatedMessages
if (messageSizesMaybeChanged) {/*...*/ } // 由于协议转换、重新压缩,有消息可能已超过 max.message.bytes,需重新检查
val segment = maybeRoll(validMessages.sizeInBytes) // 4. rollover:若当前 activeSegment 即将写满,则 rollover 生成新的 activeSegment
segment.append(appendInfo.firstOffset, validMessages) // 5. 写 log 文件,如有必要则写 index 文件
updateLogEndOffset(appendInfo.lastOffset + 1) // 6. 更新 LEO
}

private def analyzeAndValidateMessageSet(messages: ByteBufferMessageSet): LogAppendInfo = {
var validBytesCount = 0
var firstOffset, lastOffset = -1L
var sourceCodec: CompressionCodec = NoCompressionCodec
for(messageAndOffset <- messages.shallowIterator) { // 迭代外层消息
/* 校验消息大小、CRC */
validBytesCount += messageSize
sourceCodec = m.compressionCodec // producer 使用的压缩方式
}
val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec) // topic 的压缩方式
LogAppendInfo(firstOffset, lastOffset, Message.NoTimestamp, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)
}

2. rollover

触发生成新 activeSegment 的条件有三个:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private def maybeRoll(messagesSize: Int): LogSegment = {
// 1. 再 append 新日志,将超出了 segment.bytes
if (segment.size > config.segmentSize - messagesSize ||
// 2. 空闲时间已超过 segment.ms
segment.size > 0 && (time.milliseconds - segment.created) > (config.segmentMs - segment.rollJitterMs) ||
// 3. index 文件已超出 segment.index.bytes
segment.index.isFull) {
roll() /*...*/
}

def roll(): LogSegment = {
val newOffset = logEndOffset // LEO 作为文件名
val logFile = logFilename(dir, newOffset) // 创建 LEO.log, LEO.index 文件名
val indexFile = indexFilename(dir, newOffset)
segments.lastEntry() match {
case entry => {
entry.getValue.index.trimToValidSize() // 截断当前 activeSegment,避免预分配的零字节数据作为无效字节
entry.getValue.log.trim()
}
}
val segment = new LogSegment(/*...*/) // 创建新的 activeSegment 并添加到跳表
addSegment(segment)
scheduler.schedule("flush-log", ()=>flush(newOffset), delay=0L) // 异步将 newOffset 前的 segments 落盘
segment
}

3. scheduler

Log 模块有 2 个定时任务需要处理
flush-log:当发生 rollover 时,触发旧的、还未落盘的 segment 写入磁盘

1
2
3
4
5
6
def flush(offset: Long) : Unit = {
// 将 [recoveryPoint, offset) 之间的 segment 通通执行 fysnc 落盘
for(segment <- logSegments(this.recoveryPoint, offset))
segment.flush() // 执行 .log/.index 的 fileChannel.force()
this.recoveryPoint = offset // 更新 recoveryPoint // 保证 recoveryPoint AO 前的日志都已持久化
}

delete-file:当 segment 保存时间超 retention 策略后,LogManager 会请求 Log 删除指定的 segment

1
2
3
4
5
def deleteSegment(segment: LogSegment) {
segments.remove(segment.baseOffset) // 1. 从 segments 跳表中移除
segment.changeFileSuffixes("", Log.DeletedFileSuffix) // 2. 追加 .deleted 后缀,标记文件已作废,等待删除
scheduler.schedule("delete-file", segment.delete(), delay = config.fileDeleteDelayMs) // 延迟 file.delete.delay.ms 后删除物理文件
}

4. recover

当 broker 重启后,各 Log 模块需根据现有的 segment 文件重建状态,流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
private def loadSegments() {
var swapFiles = Set[File]()

// 1. 删除 .deleted 文件,筛选出 .swap 文件
for(file <- dir.listFiles if file.isFile) {
val filename = file.getName
if(filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) {
// 1.1 .deleted 待删除文件、.cleaned 失效的 compaction 中间态文件,都直接删除
file.delete()
} else if(filename.endsWith(SwapFileSuffix)) {
// 1.2 .swap 文件存储了 compaction 的可用结果,读取恢复
val baseName = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, "")) // .log.swap -> .log
if(baseName.getPath.endsWith(IndexFileSuffix)) {
file.delete() // 1.2.1 删除 .index.swap,等待重建索引
} else if(baseName.getPath.endsWith(LogFileSuffix)){
val index = new File(CoreUtils.replaceSuffix(baseName.getPath, LogFileSuffix, IndexFileSuffix))
index.delete() // 1.2.2 .index 可能失效,删除等待重建
swapFiles += file
}
}
}

// 2. 加载 segment,重建索引
for(file <- dir.listFiles if file.isFile) {
val filename = file.getName
// 2.1 只有 .index,则 .log 可能已人为删除,索引失效,直接删除
if(filename.endsWith(IndexFileSuffix)) {
val logFile = new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))
if(!logFile.exists) file.delete()
} else if(filename.endsWith(LogFileSuffix)) {
// 2.2 加载 .log,文件名就是 baseOffset
val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
val indexFile = Log.indexFilename(dir, start)
val segment = new LogSegment(dir, start, /*...*/) // 在 dir 下创建 start.log 的 segment 对象

if(indexFile.exists()) {
try {
segment.index.sanityCheck() // 对 .index 做完整性校验:最后一条索引项的 offset 必须比 baseOffset 大
} catch {
case e: java.lang.IllegalArgumentException =>
indexFile.delete()
segment.recover(config.maxMessageSize) // 删除重建
}
} else segment.recover(config.maxMessageSize) // 不存在也重建
segments.put(start, segment)
}
}

// 3. 恢复 .swap,删除对应的源 compected segments
for (swapFile <- swapFiles) {
val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, ""))
val startOffset = logFile.getName.substring(0, logFile.getName.length - LogFileSuffix.length).toLong
val indexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, IndexFileSuffix) + SwapFileSuffix)
val index = new OffsetIndex(indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize)
val swapSegment = new LogSegment(new FileMessageSet(file = swapFile), index, baseOffset = startOffset, /*...*/)
swapSegment.recover(config.maxMessageSize)
val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.nextOffset)
// 删除 swapSegment 的第一条、最后一条消息的 offset 区间,对应的源 segments
replaceSegments(swapSegment, oldSegments.toSeq, isRecoveredSwapFile = true)
}

// 4. 校验 recoveryPoint 之后的 segment,落盘被中断可能导致文件损坏
if(logSegments.isEmpty)
segments.put(0L, new LogSegment(dir = dir, startOffset = 0, /*...*/) // 首次启动,创建 activeSegment,从零开始分配 offset
else
recoverLog()
}

// recoveryPoint 之后的 segment 落盘状态不确定,逐个确认
private def recoverLog() {
if(hasCleanShutdownFile) { // broker 退出时,已成功将所有 segment 落盘,无需再次检查
this.recoveryPoint = activeSegment.nextOffset
return
}
val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator
while(unflushed.hasNext) {
val truncatedBytes = unflushed.next.recover(config.maxMessageSize)
if(truncatedBytes > 0)
// 中途有 segment 损坏,则删除之后的所有 segments(合理:flush 也是顺序执行,后边的消息多半也是损坏的)
unflushed.foreach(deleteSegment)
}
}

至此,分析了 Log 模块的消息写入流程,对 segment 的管理机制(rollover、异步删除、异步 flush),以及重启时加载日志的逻辑


四:LogManager

维护 tp 到 Log 的映射,在启动时并发加载数据目录,退出时并发 flush,还负责触发 cleaner 线程做 compaction,以及维护 3 个定时任务;主要字段如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class LogManager(val logDirs: Array[File], // 数据目录列表,如 /data1,/data2
val topicConfigs: Map[String, LogConfig], // 各 topic 自己的日志配置
val defaultConfig: LogConfig, // broker 的默认配置
val cleanerConfig: CleanerConfig, // compaction 线程配置
ioThreads: Int, // num.recovery.threads.per.data.dir // 1
val flushCheckMs: Long, // log.flush.scheduler.interval.ms // Long.MaxValue
val flushCheckpointMs: Long, // log.flush.offset.checkpoint.interval.ms // 1min
val retentionCheckMs: Long, // log.retention.check.interval.ms // 5min
scheduler: Scheduler){ // 负责执行 3 个定时任务
/*...*/ // 确保数据目录可读写,可对 .lock 文件加锁
val logs = new Pool[TopicAndPartition, Log]() // 跨数据目录的 tp -> Log 关系
// 每个数据目录都有 recovery-point-offset-checkpoint 文本文件,由 OffsetCheckpoint 对象描述,记录各 Log 持久化进度
val recoveryPointCheckpoints: Map[File, OffsetCheckpoint] =
logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
val cleaner: LogCleaner = new LogCleaner(cleanerConfig, logDirs, logs, time) // compaction

def startup() {
if(scheduler != null) {
scheduler.schedule("kafka-log-retention", cleanupLogs, period = retentionCheckMs, /*...*/)
scheduler.schedule("kafka-log-flusher", flushDirtyLogs, period = flushCheckMs, /*...*/)
scheduler.schedule("kafka-recovery-point-checkpoint", checkpointRecoveryPointOffsets, period = flushCheckpointMs, /*...*/)
}
if(cleanerConfig.enableCleaner)
cleaner.startup()
}
}

4.1 kafka-log-retention

负责定期清理各 Log 中不符合 retention 策略的 segment,两个维度判定日志过期:

  • retention.ms:只读 segment 的最长保留时间,超出即删除
  • retention.bytes:单个 Log 的所有现存 segment 磁盘用量上限,超出则删除最旧的 segment
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def cleanupLogs() {
for(log <- allLogs; if !log.config.compact) { // cleanup.policy=delete 才能删日志
cleanupExpiredSegments(log)
cleanupSegmentsToMaintainSize(log)
}
}

// 时间维度:删除所有 mtime + retention.ms < now 的 segment
private def cleanupExpiredSegments(log: Log): Int = /*...*/
log.deleteOldSegments(time.milliseconds - _.lastModified > log.config.retentionMs)

// 空间维度:从最老的日志开始删除 segment,直到磁盘用量少于 retention.bytes
private def cleanupSegmentsToMaintainSize(log: Log): Int = {
var diff = log.size - log.config.retentionSize
def shouldDelete(segment: LogSegment) = {
if(diff - segment.size >= 0) {
diff -= segment.size
true
} else false // 清理完毕
}
log.deleteOldSegments(shouldDelete) // 入参是一个带状态的闭包函数
}

4.2 kafka-log-flusher

负责定期触发各 Log 执行 flush 落盘

1
2
3
4
5
private def flushDirtyLogs() = {
for ((topicAndPartition, log) <- logs)
if(time.milliseconds - log.lastFlushTime >= log.config.flushMs) // 距上次落盘间隔已超过 flush.ms
log.flush // 将 [recoveryPoint, LEO) 之间的新 segment 全部落盘
}

4.3 kafka-recovery-point-checkpoint

负责定期将各 Log 最新的 recoveryPoint 更新到各自数据目录的 recovery-point-offset-checkpoint 文本文件中

1
2
3
4
5
6
7
8
def checkpointRecoveryPointOffsets() = this.logDirs.foreach(checkpointLogsInDir)

def checkpointLogsInDir(dir: File): Unit = {
val recoveryPoints: Option[Map[TopicAndPartition, Log]] = this.logsByDir.get(dir.toString)
if (recoveryPoints.isDefined)
// 将 log.dir 下的所有 Log 的 recoveryPoint 逐个写入文件 // log -> log.recoveryPoint
this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint))
}

4.4 reload, shutdown

在 §3.2.4 中已分析过,如何从各种后缀状态的 segment 文件、recoveryPoint 落盘安全边界,来重建 Log 状态。当 broker 重启时,每个数据目录有 num.recovery.threads.per.data.dir 个线程,并发地重建其下的多个 Log 的状态,实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private def loadLogs(): Unit = {
val jobs = mutable.Map.empty[File, Seq[Future[_]]] // log.dir/.kafka_cleanshutdown -> 其内各 Log 的异步重建结果

for (dir <- this.logDirs) { // 1. 第一层:遍历数据目录
val pool = Executors.newFixedThreadPool(ioThreads) // 一个数据目录独占一个线程池
val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)
var recoveryPoints = Map[TopicAndPartition, Long]()
recoveryPoints = this.recoveryPointCheckpoints(dir).read // 读取 log.dir/recovery-point-offset-checkpoint

val jobsForDir: List[Runnable] = for {
dirContent <- Option(dir.listFiles).toList
logDir <- dirContent if logDir.isDirectory // 2. 第二层:遍历内部所有 log 目录
} yield {
CoreUtils.runnable {
val topicPartition = Log.parseTopicPartitionName(logDir)
val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L) // 读取该 log 的 recoveryPoint
// 创建 Log 对象,会执行 Log.recover() 加载各 segment 重建 Log 状态
val current = new Log(logDir, config, logRecoveryPoint, scheduler, time)
this.logs.put(topicPartition, current)
}
}
// 3. 向 pool 提交内部所有 log 目录的重建任务,异步执行
jobs(cleanShutdownFile) = jobsForDir.map(pool.submit).toSeq
}

for ((cleanShutdownFile, dirJobs) <- jobs) { // 等待各个数据目录下的所有 Log 重建完成
dirJobs.foreach(_.get)
cleanShutdownFile.delete() // 删除 shutdown 标记文件
}
}

同样的,当 graceful shutdown 时,每个数据目录都会并发地为其下的多个 Log 执行 flush,确保消息被持久化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def shutdown() {
val jobs = mutable.Map.empty[File, Seq[Future[_]]]
CoreUtils.swallow(cleaner.shutdown()) // 先停止 compaction 线程,避免写入

for (dir <- this.logDirs) {
val pool = Executors.newFixedThreadPool(ioThreads) // 同样也是每个数据目录一个线程池
val logsInDir = logsByDir.getOrElse(dir.toString, Map()).values
val jobsForDir = logsInDir map { log =>
CoreUtils.runnable { log.flush(); log.close() } // 执行 flush
}
jobs(dir) = jobsForDir.map(pool.submit).toSeq
}

for ((dir, dirJobs) <- jobs) {
dirJobs.foreach(_.get)
checkpointLogsInDir(dir) // 和 4.3 一样,将各 log 目录最新的 recoveryPoint 更新到 checkpoint 文件
CoreUtils.swallow(new File(dir, Log.CleanShutdownFile).createNewFile()) // 标记文件都已落盘
}
}

至此,分析了 LogManager 全局视角的三个定时任务:retention 策略清理日志、flush 策略持久化日志、recoveryPoint 进度更新,还分析了 broker 重启时各数据目录使用线程池并发重建 Log 状态,退出前并发 flush 等机制


五:Compaction

参考文档 #design_compaction,会对 key 相同的消息做去重,只保留最新的消息,删除旧消息,流程:

  • 找出 dirtest Log:从所有数据目录中找出 dirty/(clean+dirty) 占比最高,也即最需要做 compact 的 Log
  • 生成最新的 key 分布情况:遍历此 Log 的 dirty segments 生成 OffsetMap(大小有上限)
  • Group:从头开始,将大小不一的 segment 按 segment.bytes 标准大小为目标进行分组
  • Compact, Copy:逐个遍历分组内的 segments,通过检查消息是否带 key、若带 key 值是否已被覆盖、若是tombstone 消息是否已过期等条件,来判断是否保留,最终将要保留的消息逐条拷贝写入 .cleaned
  • Swap:compact 执行完毕,将中间状态 .cleaned 更新为结束状态 .swap,为各个 compacted segments 提交异步删除任务后,去掉 .swap 后缀得到最终可用一个的 clean segment
  • Checkpoint:整个 Log 本次 compact 执行完毕,将 endOffset 更新到 cleaner-offset-checkpoint 持久化进度

image-20200613100613707

5.1 LogCleanerManager

维护各 Log 当前的 compaction 状态:与 recovery 线程池不同,cleaner 线程池与数据目录无对应关系,而是针对全局执行 compaction,故需维护 Log 状态避免重复执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log])  {
// 每个数据目录都有 cleaner-offset-checkpoint 文本文件,由 OffsetCheckpoint 对象描述,记录 compaction 进度
private val checkpoints: Map[File, OffsetCheckpoint] =
logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, offsetCheckpointFile)))).toMap
val inProgress = mutable.HashMap[TopicAndPartition, LogCleaningState]()

// 找出最急需做 compaction 的 Log
def grabFilthiestLog(): Option[LogToClean] = {
val lastClean = allCleanerCheckpoints()

val dirtyLogs = logs.filter {
// 1. 过滤出 cleanup.policy=compact 策略的 Log
case (topicAndPartition, log) => log.config.compact
}.filterNot {
// 2. 过滤掉正在做 compaction 的 Log
case (topicAndPartition, log) => inProgress.contains(topicAndPartition)
}.map {
// 3. 读取 checkpoint 文件得到 clean,dirty 边界,用 LogToClean 对象描述
case (topicAndPartition, log) =>
val logStartOffset = log.logSegments.head.baseOffset
val firstDirtyOffset = {
val offset = lastClean.getOrElse(topicAndPartition, logStartOffset) // 首次做 clean
if (offset < logStartOffset)
logStartOffset // 进度失效:若 checkpoint offset 所在的 segment 被人为删除,则重头做 compaction
else
offset // 上次 compaction 结束的 endOffset 还有效
}
LogToClean(topicAndPartition, log, firstDirtyOffset)
// 4. 过滤掉空的 Log
}.filter(ltc => ltc.totalBytes > 0)
this.dirtiestLogCleanableRatio = if (dirtyLogs.nonEmpty) dirtyLogs.max.cleanableRatio else 0
// 5. 过滤出 dirty 占比超过 min.cleanable.dirty.ratio 的 Log
val cleanableLogs = dirtyLogs.filter(ltc => ltc.cleanableRatio > ltc.log.config.minCleanableRatio) // >0.5
if(cleanableLogs.isEmpty) {
None
} else {
val filthiest = cleanableLogs.max // 排序得出 dirty 占比最高的 Log
inProgress.put(filthiest.topicPartition, LogCleaningInProgress) // 避免其他 cleaner 线程重复执行 compaction
Some(filthiest)
}
}
}

5.2 LogCleaner

LogCleaner 负责维护 cleaner 线程池,统一 startup 和 shutdown,还持有全局的 throttler 对 IO 进行限流,避免影响正常的消息读写

1
2
3
4
5
6
7
class LogCleaner(val config: CleanerConfig, val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]){
val cleanerManager = new LogCleanerManager(logDirs, logs)
private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond, /*...*/)
private val cleaners = (0 until config.numThreads).map(new CleanerThread(_))
def startup() = cleaners.foreach(_.start())
def shutdown() = cleaners.foreach(_.shutdown())
}

CleanerThread 实现了无符合 compaction 条件的 Log 时,等待 log.cleaner.backoff.ms 后重新检查的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private class CleanerThread(threadId: Int)
extends ShutdownableThread(name = "kafka-log-cleaner-thread-" + threadId, isInterruptible = false) {
val cleaner = new Cleaner(/*...*/)

private val backOffWaitLatch = new CountDownLatch(1)
override def doWork() = cleanOrSleep()
override def shutdown() = backOffWaitLatch.countDown()
private def cleanOrSleep() {
cleanerManager.grabFilthiestLog() match {
case None => // 等待 15s 超时,或线程被 interrupt
backOffWaitLatch.await(config.backOffMs, TimeUnit.MILLISECONDS)
case Some(cleanable) =>
var endOffset = cleanable.firstDirtyOffset
endOffset = cleaner.clean(cleanable) // 执行 compaction,结束后更新到 cleaner checkpoint
cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
}
}
}

5.3 Cleaner

负责实现 compaction 流程,注意其 IO 操作有内存用量限制

1. SkimpyOffsetMap

问题:消息 key 长度不定,使用Map<key, latestOffset>表示带 key 消息的最新 offset,内存用量不可控

解决:将 key 转用定长的 hash(key) 表示,用探测法解决哈希冲突。哈希函数默认 md5,哈希值长度为 16 字节,offset 为 long 类型长度 8 字节,使用 ByteBuffer 重新实现哈希表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extends OffsetMap {
private val bytes = ByteBuffer.allocate(memory) // log.cleaner.dedupe.buffer.size / log.cleaner.threads,默认 128MB/1
private val digest = MessageDigest.getInstance(hashAlgorithm) // md5 加密函数,作为默认的哈希函数

val bytesPerEntry = hashSize + 8 // 16+8 = 24B
val slots: Int = memory / bytesPerEntry // 默认 128MB 能容纳 560 万个 key

// 探测解决哈希冲突
// 原理:发生冲突后,先分解 hash 字节数组,4 个字节为一组转为整数重试,最多生成 20 个整数,冲突概率已经低足够低
// 此处有冲突溢出的 bug,已在 KAFKA-3682 中修复:https://github.com/apache/kafka/pull/1352
private def positionOf(hash: Array[Byte], attempt: Int): Int = {
val probe = CoreUtils.readInt(hash, math.min(attempt, hashSize-4)) + math.max(0, attempt-hashSize+4)
val slot = Utils.abs(probe) % slots
slot * bytesPerEntry
}
private def isEmpty(position: Int): Boolean = // FIXME: 硬编码长度
bytes.getLong(position) == 0 && bytes.getLong(position + 8) == 0 && bytes.getLong(position + 16) == 0 // 长 24 的零值字节数组

override def put(key: ByteBuffer, offset: Long) {
hashInto(key, hash1)
var attempt = 0
var pos = positionOf(hash1, attempt)
while(!isEmpty(pos)) {
bytes.position(pos)
bytes.get(hash2)
if(Arrays.equals(hash1, hash2)) {
bytes.putLong(offset) // key 已存在则覆盖
return
}
attempt += 1
pos = positionOf(hash1, attempt) // 哈希冲突,尝试下一个 slot
}
bytes.position(pos)
bytes.put(hash1)
bytes.putLong(offset)
}

override def get(key: ByteBuffer): Long = {/*...*/} // 检查哈希位置、冲突位置上是否存在该 key
}

2. Cleaner

如下分析 compaction 的两个关键步骤:Group 和 Swap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// Group:为 Log 建立最新的 OffsetMap,并对 endOffset 之前的所有 clean,dirty 日志进行分组,按组进行 compaction
private[log] def clean(cleanable: LogToClean): Long = {
val log = cleanable.log
val upperBoundOffset = log.activeSegment.baseOffset // activeSegment 不参与 compaction

// 1. 遍历 [firstDirtyOffset, upperBoundOffset) 区间内的 segment,逐条读取消息,若有 key 则记录到 OffsetMap
// 由于 offsetMap 大小有限,endOffset <= upperBoundOffset
val endOffset = buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap) + 1

val deleteHorizonMs = // 最后一个 clean segment 的 mtime,往前数 24h,再之前的 tombstone 消息将被删除
log.logSegments(0, cleanable.firstDirtyOffset).lastOption match {
case None => 0L
case Some(seg) => seg.lastModified - log.config.deleteRetentionMs
}

// 2. [0, endOffset) 区间的 segments 重新分组
for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize)) {
// 3. 对组内 segments 执行 compaction
cleanSegments(log, group, offsetMap, deleteHorizonMs)
}
endOffset
}

// Swap:对一组 segments 执行 compaction
private[log] def cleanSegments(log: Log, segments: Seq[LogSegment], map: OffsetMap, deleteHorizonMs: Long) {
// 1. 用第一个 logSegment 的文件名,添加 .cleaned 后缀
val logFile = new File(segments.head.log.file.getPath + Log.CleanedFileSuffix)
val indexFile = new File(segments.head.index.file.getPath + Log.CleanedFileSuffix)

// 创建新的 logSegment
val messages = new FileMessageSet(logFile, fileAlreadyExists = false, initFileSize = log.initFileSize(), preallocate = log.config.preallocate)
val index = new OffsetIndex(indexFile, segments.head.baseOffset, segments.head.index.maxIndexSize)
val cleaned = new LogSegment(messages, index, segments.head.baseOffset, segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time)

// 逐个遍历组内的 segment,执行 compact,将无 key 的消息、有 key 的最新消息拷贝到 cleaned 新日志
for (old <- segments) {
val retainDeletes = old.lastModified > deleteHorizonMs // true: 此日志 24h 内被修改过,tombstone 消息需要保留
cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes, log.config.messageFormatVersion.messageFormatVersion)
}
index.trimToValidSize()
cleaned.flush()
// cleaned 的 mtime 保持在组内最后一个 segment 的 mtime,而不是当前时间 // 非常关键:避免误删 tombstone 消息
val modified = segments.last.lastModified
cleaned.lastModified = modified

// .cleaned -> .swap,并将 segments 添加 .deleted 后缀异步删除
log.replaceSegments(cleaned, segments)
}

至此,分析了后台 cleaner 线程实现的 compaction 的流程,特别是 Log 优先级排序,定长 key-value 场景下 OffsetMap,IO 限流逻辑,以及分组合并的机制


总结

本文分为五个小节,分别介绍了:

  1. Kafka 日志相关的配置项目,含义及默认值
  2. 消息字段解释及版本差异,内外层消息的区别,消息集在文件、内存中的表示
  3. Log 的重点操作如状态重建、rollover 机制、定时清理和落盘,稀疏索引的写入和查找原理
  4. LogManager 的 retention、flusher、recoveryPoint 定时任务,启动时并发重建,退出前并发落盘
  5. Compaction 的六个步骤,OffsetMap 等原理

整个日志模块中,个人认为有三点最值得学习

  1. checkpoint 文本文件持久化任务进度,搭配后台线程定时更新,加速状态重建

    • Log 模块用 recoveryPoint 作为日志落盘的安全边界,由后台线程定时更新到 log.dir/recovery-point-offset-checkpoint 文件。重建 Log 状态时,只需检查边界后的日志是否在落盘过程中损坏即可
    • Compaction 模块用 endOffset 作为 clean,dirty 的安全边界,每次做完 compaction 都更新到 log.dir/cleaner-offset-checkpoint 文件。再次 compaction 时,只需对边界后的 segment 建立 OffsetMap
  2. compaction 机制与流程:Kafka 日志模块是典型的 append-only 的 key-value 日志系统,compaction 涉及的进度记录 、key-value 的筛选与复制、.cleaned 中间态与 .swap 可用态的切换、.deleted 失效文件的异步删除等都是设计亮点

  3. 若特殊场景下标准库中无最佳数据结构,可权衡时间(算法复杂度)和空间(内存、磁盘用量)自行设计,充分利用 NIO 的 mmap, ByteBuffer 等工具

    • OffsetIndex:索引项长度固定,将索引文件 mmap 到内存中,二分搜索
    • OffsetMap:key 和 value 长度都固定的哈希表,用 ByteBuffer 实现快速读取和更新

    当然,同时会引入更高的逻辑复杂度、线程安全等问题,需针对场景做出权衡

至此,单机版的 Kafka 存储原理已经分析完毕,之后将分析集群相关的 Replication 模块