前言

在存储海量的小整数、短字符串时,若用链表存储,每个节点都要多占用 2*sizeof(listNode*) 共 8 字节。为省去这部分开销,Redis 设计了内存连续的存储结构 ziplist,可简单理解为类型不定、长度不定的数组


Ziplist 结构

内存分布:

ziplist

表信息由 4 个字段描述

1
2
3
4
uint32_t zlbytes; // 整张表所占字节数,不包括标记表尾的 1B,用于重分配内存
uint32_t zltail; // 最后一个元素相对表起始地址的 offset,用于反向遍历
uint16_t zllen; // 表内元素个数,O(1) 读表长,但有上限 UINT16_MAX 65525,个数超过则需遍历整张表取表长
unsigned char zllend = 0xFF; // 值为 255 的表尾标记,表操作的安全边界

初始化:分配内存创建空表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
unsigned char *ziplistNew(void) {
unsigned int bytes = ZIPLIST_HEADER_SIZE+1; // 表尾 OxFF 占 1B
unsigned char *zl = zmalloc(bytes);
ZIPLIST_BYTES(zl) = intrev32ifbe(bytes); // 参考 intset,把整型拍扁为字节需兼容大端字节序
ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE); // 展开宏得整型指针,修改值
ZIPLIST_LENGTH(zl) = 0;
zl[bytes-1] = ZIP_END;
return zl;
}

#define ZIPLIST_HEADER_SIZE (sizeof(uint32_t)*2+sizeof(uint16_t)) // <zlbytes><zltail><zllen>
// 表头地址加各字段的 offset 来定位字段
#define ZIPLIST_BYTES(zl) (*((uint32_t*)(zl)))
#define ZIPLIST_TAIL_OFFSET(zl) (*((uint32_t*)((zl)+sizeof(uint32_t))))
#define ZIPLIST_LENGTH(zl) (*((uint16_t*)((zl)+sizeof(uint32_t)*2)))

注:若将 ziplist 定义为 struct 结构,那存储元素的柔性数组将作为最后一个字段,zllend 紧随其后,这种布局会导致柔性数组长度变化时要对应移动 zllend ,过于复杂。故 ziplist 定义为字节类型直接操作内存,通过 offset 读写其他字段

zlentry

元素用 zlentry 结构描述:

1
2
3
4
5
6
7
typedef struct zlentry {
unsigned int prevrawlensize, prevrawlen; // 前一个 entry 的长度,用于反向遍历
unsigned char encoding; // 1B,标记 string/int 类型及其长度
unsigned int lensize, len; // 后 ?B 存储 data 长度
unsigned int headersize; // prevrawlensize + lensize
unsigned char *data; // data 连续内存
} zlentry;

注:zlentry 不是所有字节都会存入内存,如内存分布所示,只有 prevrawlen,len,data 三个字段的值会写入内存,其他字段仅用作辅助计算


Ziplist 插入

1. 计算新元素的字节数

1.1 prevrawlen

存储前一个元素大小所需字节数

a. 头部或中间插入

实现:直接复用指定元素的 prevrawlen

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 读取 ptr 元素的 prevrawlen,字段大小存于 prevlensize,字段值存于 prevlen
#define ZIP_DECODE_PREVLEN(ptr, prevlensize, prevlen) do {
ZIP_DECODE_PREVLENSIZE(ptr, prevlensize);
if ((prevlensize) == 1) {
(prevlen) = (ptr)[0];
} else if ((prevlensize) == 5) {
assert(sizeof((prevlensize)) == 4);
memcpy(&(prevlen), ((char*)(ptr)) + 1, 4); // 跳过 0xFE
memrev32ifbe(&prevlen);
}
} while(0);

#define ZIP_BIGLEN 254 // 单字节值上限为 UINT8_MAX/255,但与边界标记字节 0xFF 冲突,避免使用
#define ZIP_DECODE_PREVLENSIZE(ptr, prevlensize) do {
if ((ptr)[0] < ZIP_BIGLEN) {
(prevlensize) = 1; // 首字节小于 0xFE,说明值存储在紧邻的 uint8
} else {
(prevlensize) = 5; // 首字节是 0xFE,则紧邻 int
}
} while(0);

b. 尾部插入

实现:从表头通过 zltail offset 定位到 tail 元素,同样分 3 个模块计算 tail 的总长度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// 计算 p 指向的元素所占用的内存大小
unsigned int zipRawEntryLength(unsigned char *p) {
unsigned int prevlensize, encoding, lensize, len;
ZIP_DECODE_PREVLENSIZE(p, prevlensize); // 读 prevrawlen 前一元素的大小
ZIP_DECODE_LENGTH(p + prevlensize, encoding, lensize, len); // 读 len, data 二者的大小
return prevlensize + lensize + len;
}


// 读一级 encoding
#define ZIP_STR_MASK 0xc0 // &1100,0000
#define ZIP_ENTRY_ENCODING(ptr, encoding) do {
(encoding) = (ptr[0]);
if ((encoding) < ZIP_STR_MASK) (encoding) &= ZIP_STR_MASK; // 前 2b 区分 str,int
} while(0)


// 读二级 encoding 对应的 len 和 lensize
// len: 存储 data 所需字节数
// lensize: 存储 len 所需字节数
#define ZIP_STR_06B (0 << 6) // 0000,0000
#define ZIP_STR_14B (1 << 6) // 0100,0000
#define ZIP_STR_32B (2 << 6) // 1000,0000
#define ZIP_DECODE_LENGTH(ptr, encoding, lensize, len) do {
ZIP_ENTRY_ENCODING((ptr), (encoding));
if ((encoding) < ZIP_STR_MASK) {
if ((encoding) == ZIP_STR_06B) {
(lensize) = 1;
(len) = (ptr)[0] & 0x3f; // &0011,1111 // 后 6b 即字符串长度
} else if ((encoding) == ZIP_STR_14B) {
(lensize) = 2;
(len) = (((ptr)[0] & 0x3f) << 8) | (ptr)[1];
} else if (encoding == ZIP_STR_32B) {
(lensize) = 5;
(len) = ((ptr)[1] << 24) | ((ptr)[2] << 16) | ((ptr)[3] << 8) | ((ptr)[4]);
}
} else {
(lensize) = 1;
(len) = zipIntSize(encoding); // 整数数据占用的字节数
}
} while(0);

#define ZIP_INT_16B (0xc0 | 0<<4) // 1100,0000 // 11__,____ 前缀固定
#define ZIP_INT_32B (0xc0 | 1<<4) // 1101,0000
#define ZIP_INT_64B (0xc0 | 2<<4) // 1110,0000
#define ZIP_INT_24B (0xc0 | 3<<4) // 1111,0000
#define ZIP_INT_8B 0xfe // 1111,1110
unsigned int zipIntSize(unsigned char encoding) {
switch(encoding) {
case ZIP_INT_8B: return 1;
case ZIP_INT_16B: return 2;
case ZIP_INT_24B: return 3;
case ZIP_INT_32B: return 4;
case ZIP_INT_64B: return 8;
default: return 0; /* 4 bit immediate */
}
}
1.2. encoding

存储数据的长度所需字节数:数据的字节数固定,先尝试将数据解析为整数,成功则按整数编码,失败则按字符串编码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// 尝试将字符串 entry 解析为整型,若成功则将值存入 v,值的长度编码存入 encoding
int zipTryEncoding(unsigned char *entry, unsigned int entrylen, long long *v, unsigned char *encoding) {
long long value;

if (entrylen >= 32 || entrylen == 0) return 0;
if (string2ll((char*)entry,entrylen,&value)) {
if (value >= 0 && value <= 12) {
// [0,12] 编码为 [1111,[1,13]],避免 0 值与 ZIP_INT_24B 的 1111,0000 冲突
*encoding = ZIP_INT_IMM_MIN+value;
} else if (value >= INT8_MIN && value <= INT8_MAX) {
*encoding = ZIP_INT_8B;
} else if (value >= INT16_MIN && value <= INT16_MAX) {
*encoding = ZIP_INT_16B;
} else if (value >= INT24_MIN && value <= INT24_MAX) {
*encoding = ZIP_INT_24B;
} else if (value >= INT32_MIN && value <= INT32_MAX) {
*encoding = ZIP_INT_32B;
} else {
*encoding = ZIP_INT_64B;
}
*v = value;
return 1;
}
return 0; // 无法解析为整数,按字符串处理
}

#define ZIP_IS_STR(enc) (((enc) & ZIP_STR_MASK) < ZIP_STR_MASK)
// 将数据长度 rawlen 按 encoding 指定的类型编码,encoding 都写入 p 指向的地址
unsigned int zipEncodeLength(unsigned char *p, unsigned char encoding, unsigned int rawlen) {
unsigned char len = 1, buf[5];
if (ZIP_IS_STR(encoding)) {
// 1. 字符串的 encoding 按长度分级
if (rawlen <= 0x3f) { // 字符串长度 <= 2^6-1
if (!p) return len;
buf[0] = ZIP_STR_06B | rawlen;
} else if (rawlen <= 0x3fff) { // <= 2^14-1
len += 1;
if (!p) return len;
buf[0] = ZIP_STR_14B | ((rawlen >> 8) & 0x3f); // 高 6 位写入 buf[0][2:8]
buf[1] = rawlen & 0xff; // 低 8 位存入 buf[1]
} else {
len += 4;
if (!p) return len;
buf[0] = ZIP_STR_32B; // <= 2^31-1
buf[1] = (rawlen >> 24) & 0xff; // 右移、屏蔽其他位,取完整字节
buf[2] = (rawlen >> 16) & 0xff;
buf[3] = (rawlen >> 8) & 0xff;
buf[4] = rawlen & 0xff;
}
} else {
// 2. 整数 encoding 仅需 1 字节
if (!p) return len;
buf[0] = encoding;
}
memcpy(p,buf,len);
return len;
}
1.3. data length

存储数据所需字节数:数据固定,长度就固定,strlen 计算即可

2. 执行插入

至此,新元素所需字节数已知,分配内存写入三个模块的值,在表头或表尾插入:

1
2
3
4
5
6
7
8
9
#define ZIPLIST_HEAD 0 // prepend push
#define ZIPLIST_ENTRY_HEAD(zl) ((zl)+ZIPLIST_HEADER_SIZE)
#define ZIPLIST_TAIL 1 // append
#define ZIPLIST_ENTRY_END(zl) ((zl)+intrev32ifbe(ZIPLIST_BYTES(zl))-1)
unsigned char *ziplistPush(unsigned char *zl, unsigned char *s, unsigned int slen, int where) {
unsigned char *p;
p = (where == ZIPLIST_HEAD) ? ZIPLIST_ENTRY_HEAD(zl) : ZIPLIST_ENTRY_END(zl);
return __ziplistInsert(zl,p,s,slen);
}

特别的,如果发生内存缩容,示意图如下:

插入实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// 在 ziplist 中元素 p 的位置,插入长为 slen 值为 s 的字符串 
unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen;
unsigned int prevlensize, prevlen = 0;
size_t offset;
int nextdiff = 0;
unsigned char encoding = 0;
long long value = 123456789;
zlentry tail;

// 1. 计算新元素所需字节数
// 1.1. prevrawlen:前一个元素的长度
if (p[0] != ZIP_END) {
ZIP_DECODE_PREVLEN(p, prevlensize, prevlen); // 表头或中间插入,则复用 p 元素的 prevrawlen
} else {
unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl); // 表尾插入,计算 tail 节点长度
if (ptail[0] != ZIP_END)
prevlen = zipRawEntryLength(ptail);
}
reqlen = zipPrevEncodeLength(NULL,prevlen);

// 1.2. datalen:尝试将字符串解析为最合适的整型,成功则返回整型长度,失败则使用字符串长度
if (zipTryEncoding(s,slen,&value,&encoding))
reqlen += zipIntSize(encoding); // 顺手计算了整型值的 encoding
else
reqlen += slen;

// 1.3 encoding:计算整型或字符串的 encoding 所需字节数
reqlen += zipEncodeLength(NULL,encoding,slen);


// 2. 标记 next.prevlen 扩缩容
// 新元素插入后,下一个元素的 prevlen 的值要对应更新
// reqlen>=254 && next.prevlensize==1:溢出,next.prevlen 从 1B 扩容到 5B
// reqlen< 254 && next.prevlensize==5:浪费,next.prevlen 从 5B 缩容到 1B
int forcelarge = 0;
// reqlensize - next.prevlensize // 4: 扩容,-4: 缩容
nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;
if (nextdiff == -4 && reqlen < 4) { // bugfix: github.com/redis/redis/commit/c495d095
nextdiff = 0; // 让 next.prev 继续保持浪费,不缩容
forcelarge = 1; // bug 触发: 若 next.prevlen 缩容额外保留了 4B,新插入极短元素会触发重新计算长度,导致表的总长度不涨反缩,最终导致 tail 的数据被 0xFF 破坏掉。为避免此情况,强制扩容
}


// 3. 内存扩容,移动内存
offset = p-zl;
zl = ziplistResize(zl,curlen+reqlen+nextdiff); // 重分配内存后尾端也标记 0xFF
p = zl+offset;
if (p[0] != ZIP_END) {
// 后移 next 及其之后的内存
memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff); // -1 是去掉 ZIP_END
if (forcelarge)
zipPrevEncodeLengthForceLarge(p+reqlen,reqlen); // 直接设置值,继续浪费
else
zipPrevEncodeLength(p+reqlen,reqlen); // 扩缩容都要更新 next.prevlen

// 维护表的元信息,同步递增 tail offset
ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);
tail = zipEntry(p+reqlen);
if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
}
} else {
ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl); // 表尾插入重算 tail offset
}


// 4. 连锁升级
if (nextdiff != 0) {
offset = p-zl;
zl = __ziplistCascadeUpdate(zl,p+reqlen); // next 已升级
p = zl+offset;
}


// 5. 依次写入 prevrawlen, len 和 data
p += zipPrevEncodeLength(p,prevlen);
p += zipEncodeLength(p,encoding,slen);
if (ZIP_IS_STR(encoding))
memcpy(p,s,slen);
else
zipSaveInteger(p,value,encoding); // int24 左移升级为 int32,再右移拷贝内存,避免丢失负号
ZIPLIST_INCR_LENGTH(zl,1);
return zl;
}

连锁扩容

增删元素后,next.prevlen从 1B 扩到 5B,next本身大小也增长 4B,如果恰好导致next的大小突破了 254B,将会导致next.next.prevlen同样要从 1B 扩到 5B,如此向后套娃进行扩容。注意没有连锁缩容

示意图:

实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// 从 p 的 next 开始判断升级,直到中断或到表尾
unsigned char *__ziplistCascadeUpdate(unsigned char *zl, unsigned char *p) {
size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), rawlen, rawlensize;
size_t offset, noffset, extra;
unsigned char *np;
zlentry cur, next;

while (p[0] != ZIP_END) {
cur = zipEntry(p);
rawlen = cur.headersize + cur.len;
rawlensize = zipPrevEncodeLength(NULL,rawlen);

if (p[rawlen] == ZIP_END) break; // 升级结束
next = zipEntry(p+rawlen);
if (next.prevrawlen == rawlen) break; // next.prevlen 能容纳下,升级中断

if (next.prevrawlensize < rawlensize) { // 1. 1B 扩容到 5B
offset = p-zl;
extra = rawlensize-next.prevrawlensize; // 4B
zl = ziplistResize(zl,curlen+extra);

p = zl+offset; // 重分配内存后重定位
np = p+rawlen;
noffset = np-zl;

if ((zl+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))) != np) { // 同样要维护 tail offset
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+extra);
}

memmove(np+rawlensize, // 后移 next.prevrawlen 后的内存
np+next.prevrawlensize,
curlen-noffset-next.prevrawlensize-1);
zipPrevEncodeLength(np,rawlen);

p += rawlen; // 处理下一节点
curlen += extra;
} else {
// 2. 5B 到 1B,注意不缩容
if (next.prevrawlensize > rawlensize) {
// rawlen 用 1B 足矣,但为了性能考虑不缩容,还是用 5B 存
zipPrevEncodeLengthForceLarge(p+rawlen,rawlen);
} else {
// 3. 无需扩缩容
zipPrevEncodeLength(p+rawlen,rawlen);
}
break; // 升级中断*2
}
}
return zl;
}

Ziplist 删除

以删除区间后,新元素要扩容为例,示意图:

实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 从 p 处开始删除 num 个元素
unsigned char *__ziplistDelete(unsigned char *zl, unsigned char *p, unsigned int num) {
unsigned int i, totlen, deleted = 0;
size_t offset;
int nextdiff = 0;
zlentry first, tail;

first = zipEntry(p);
for (i = 0; p[0] != ZIP_END && i < num; i++) {
p += zipRawEntryLength(p); // 向后定位第一个不被删的元素,用其覆盖 first 内存
deleted++;
}

totlen = p-first.p;
if (totlen > 0) {
if (p[0] != ZIP_END) { // 表头或表中删除
nextdiff = zipPrevLenByteDiff(p,first.prevrawlen); // p.prevlen 扩缩容
p -= nextdiff;
zipPrevEncodeLength(p,first.prevrawlen); // 覆盖写

// 同样维护 tail offset
ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))-totlen);
tail = zipEntry(p);
if (p[tail.headersize+tail.len] != ZIP_END) {
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
}

// 将 p 之后的内存移到 first 上,覆盖删除
memmove(first.p, p, intrev32ifbe(ZIPLIST_BYTES(zl))-(p-zl)-1);
} else {
// 表尾删除,直接截断
ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe((first.p-zl)-first.prevrawlen);
}

offset = first.p-zl; // 重分配内存缩容
zl = ziplistResize(zl, intrev32ifbe(ZIPLIST_BYTES(zl))-totlen+nextdiff);
ZIPLIST_INCR_LENGTH(zl,-deleted);
p = zl+offset;

if (nextdiff != 0) zl = __ziplistCascadeUpdate(zl,p); // p.prevlen 变化,尝试连锁扩容
}
return zl;
}

总结

ziplist 是 Redis 实现细节最多的数据结构,需要动态维护表的 3 个元信息、各元素的 3 个模块,来模拟数组的增删改查操作,用连续内存存储,尽可能减少内存占用;实现中用到了大量的宏展开、位移、内存移动等操作,由于涉及指针计算,最好梳理流程并画示意图