前言

dict 字典是 Redis 的核心数据结构,用在存储 kv 映射关系、key 过期时间等场景,本篇从 dict 定义及操作、iterator、scanner 三个方面分析其实现


Dict 定义

KV 结构:描述哈希表中 key 与 value 的映射关系

1
2
3
4
5
6
7
8
9
10
typedef struct dictEntry {
void *key; // 对 key, value 类型不做假设
union { // union 常用类型,避免过多 *(int*)val 式的类型转换
void *val;
uint64_t u64;
int64_t s64; // 场景:存储 key 过期毫秒时间戳
double d;
} v;
struct dictEntry *next; // 解决 hash 冲突的单链表指针
} dictEntry;

哈希表:存储 kv 链表数组,使用拉链法解决哈希冲突,单条 kv 链表称为 bucket 或 slot

1
2
3
4
5
6
typedef struct dictht {
dictEntry **table; // buckets 数组
unsigned long size; // buckets 长度
unsigned long sizemask; // & 二进制与运算快速取余
unsigned long used; // 现有节点数量,即各 bucket 中 entries 数量之和
} dictht;

dict 字典:封装 KV 操作,提供 rehash 和遍历功能的哈希表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
typedef struct dict {
dictType *type; // 实现接口的字典类型,指导键值的操作
void *privdata; // 调用接口函数所需的公用的外部私有数据,一般为 NULL
dictht ht[2]; // 两张哈希表,0 存值,1 用作渐进式哈希
long rehashidx; // 指示 ht[0] 中下一个待迁移的 bucket 索引
int iterators; // 是否有安全迭代器
} dict;

// 字典类型接口,操作 void* 键值
typedef struct dictType {
unsigned int (*hashFunction)(const void *key); // 哈希算法
void *(*keyDup)(void *privdata, const void *key); // rehash 时拷贝 kv,避免指针参数的修改影响到字典内的键值
void *(*valDup)(void *privdata, const void *obj);
int (*keyCompare)(void *privdata, const void *key1, const void *key2); // 搜索时比较 key
void (*keyDestructor)(void *privdata, void *key); // 删除时释放 kv
void (*valDestructor)(void *privdata, void *obj);
} dictType;

示意图:存储 3 个 kv 的 dict,k2 与 k3 发生哈希冲突


Dict API

1. 哈希函数

哈希算法通过位移、取反、异或等位操作,实现雪崩式生成哈希值,让 key 的连续性不影响哈希值的高度离散性。算法细节如murmurhash2的幻数并非关注重点

2. 扩缩容

dict 的哈希冲突程度可用 load factor 描述,即 bucket 的平均长度;为保证查询性能,过高需扩容,过低则缩容:

  • 扩容:ht[1]容量翻倍准备 rehash;默认 factor 超过 1 即触发扩容,大于 5 严重冲突会强制 rehash
  • 缩容:键值被删除,ht[0]内有大量空 bucket,需 rehash 后释放内存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 释放空 bucket,缩容后 load factor 接近 1
int dictResize(dict *d) {
int minimal;
if (!dict_can_resize || dictIsRehashing(d)) return DICT_ERR;
minimal = d->ht[0].used;
if (minimal < DICT_HT_INITIAL_SIZE)
minimal = DICT_HT_INITIAL_SIZE; // ht 最少要有 4 个 bucket
return dictExpand(d, minimal);
}

// ht[1] 分配为指定大小的哈希表,准备 rehash
int dictExpand(dict *d, unsigned long size) {
dictht n;
unsigned long realsize = _dictNextPower(size); // size 向上取 2 次方整数,简化掩码取余
if (dictIsRehashing(d) || d->ht[0].used > size) // 扩容后必定存在哈希冲突,缩容无意义
return DICT_ERR;
if (realsize == d->ht[0].size) return DICT_ERR; // 无需调整

n.size = realsize;
n.sizemask = realsize-1;
n.table = zcalloc(realsize*sizeof(dictEntry*));
n.used = 0;

if (d->ht[0].table == NULL) { // dict 初始化
d->ht[0] = n;
return DICT_OK;
}
d->ht[1] = n;
d->rehashidx = 0; // 标记 rehash 开始
return DICT_OK;
}

3. Rehash

  • 目的:将ht[0]内的所有键值,重新分布到大小更佳的ht[1]上,在内存用量与查询性能间保持动态平衡

  • 流程:对ht[0]以 bucket 为单位迁移,逐个遍历其内的键值,计算mod(hash(k), ht[1].size)即得新 bucket 索引,拷贝键值完成迁移;当ht[0]遍历完毕,对 2 张哈希表执行swap

corner case:ht[0]内有连续的大量空 bucket,为避免遍历耗时过长造成线程阻塞,需限制遍历次数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 对 dict 执行 n 步 rehash
int dictRehash(dict *d, int n) {
int empty_visits = n*10;
if (!dictIsRehashing(d)) return 0;

while(n-- && d->ht[0].used != 0) {
dictEntry *de, *nextde;
// ht[0].used 显示还有 key 待迁移,rehashidx 不可能越界
assert(d->ht[0].size > (unsigned long)d->rehashidx);
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1; // 查找下一个非空 bucket 并检查遍历次数
}
de = d->ht[0].table[d->rehashidx];
// 对 bucket 内的所有 kv 执行 rehash
while(de) {
unsigned int h;
nextde = de->next;
h = dictHashKey(d, de->key) & d->ht[1].sizemask; // 计算在 ht[1] 中的 bucket idx
de->next = d->ht[1].table[h]; // prepend 插入
d->ht[1].table[h] = de;
d->ht[0].used--; // 更新哈希表长度
d->ht[1].used++;
de = nextde;
}
d->ht[0].table[d->rehashidx] = NULL; // 此 bucket 迁移完毕置空
d->rehashidx++; // 更新 rehash 进度
}
if (d->ht[0].used == 0) {
zfree(d->ht[0].table);
d->ht[0] = d->ht[1]; // rehash 结束则 swap
_dictReset(&d->ht[1]);
d->rehashidx = -1;
return 0;
}
return 1; // 未结束返回 1
}

// 渐进式的单步 rehash
static void _dictRehashStep(dict *d) {
if (d->iterators == 0) dictRehash(d,1); // 若当前有 safe iterator 正在遍历,则不进行 rehash
}

4. 增删改查

公共操作:执行单步 rehash 来均摊时间成本,并处理 rehash 中间状态

查找:操作 key 之前需定位 key,先计算 buckectIdx 再遍历 bucket 逐个比较 key,复杂度O(N)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 计算 key 插入 dict 的 bucketIdx,已存在则返回 -1
static int _dictKeyIndex(dict *d, const void *key) {
unsigned int h, idx, table;
dictEntry *he;
if (_dictExpandIfNeeded(d) == DICT_ERR) return -1;

h = dictHashKey(d, key);
for (table = 0; table <= 1; table++) {
idx = h & d->ht[table].sizemask;
he = d->ht[table].table[idx];
while(he) {
// 遍历 bucket 比较 key,存在即返回 -1
if (dictCompareKeys(d, key, he->key))
return -1;
he = he->next;
}
// 处理 rehash 中间态:继续遍历 ht[1]
if (!dictIsRehashing(d)) break;
}
return idx; // idx 即 key 的 bucketIdx(在 rehash 则为 ht[1] 索引,否则为 ht[0] 索引)
}

注意,每次查找都会尝试触发扩容(不是很合理),尽早避免哈希冲突

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static int dict_can_resize = 1; // 默认启用自动扩容,可关闭来避免 BGSAVE 等命令 fork 的子进程进行过多的 COW 内存拷贝
static unsigned int dict_force_resize_ratio = 5; // 强制 rehash 边界,避免严重冲突导致查询性能下降
static int _dictExpandIfNeeded(dict *d) {
if (dictIsRehashing(d)) return DICT_OK;
if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE); // 首次扩容

// 若 load factor 超过 1,即必有哈希冲突发生
// 且启用了自动扩容,或超过了强制扩容的边界
if (d->ht[0].used >= d->ht[0].size &&
(dict_can_resize ||
d->ht[0].used/d->ht[0].size > dict_force_resize_ratio)) {
return dictExpand(d, d->ht[0].used*2); // ht[1] 为 ht[0] 翻倍扩容
}
return DICT_OK;
}

新增:确认 key 不存在后,拷贝 key 和 value 再写入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 由于 value 类型不定,故返回 dictEntry* 按类型来调用不同的值拷贝函数
dictEntry *dictAddRaw(dict *d, void *key) {
int index;
dictEntry *entry;
dictht *ht;

if (dictIsRehashing(d)) _dictRehashStep(d);
if ((index = _dictKeyIndex(d, key)) == -1)
return NULL;

// rehash 期间新 key 只能写到 ht[1],其 bucketIdx 已由 _dictKeyIndex 计算出
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
entry = zmalloc(sizeof(*entry));
entry->next = ht->table[index]; // prepend 插入 bucket
ht->table[index] = entry;
ht->used++;
dictSetKey(d, entry, key); // 拷贝 key
return entry;
}

int dictAdd(dict *d, void *key, void *val) { // mustAdd
dictEntry *entry = dictAddRaw(d,key);
if (!entry) return DICT_ERR;
dictSetVal(d, entry, val); // 拷贝 value
return DICT_OK;
}
#define dictSetSignedIntegerVal(entry, _val_) \
do { entry->v.s64 = _val_; } while(0) // 直接赋值

更新:需处理新值与旧值指向了同一对象的情况,避免错误地提前释放内存

1
2
3
4
5
6
7
8
9
10
11
12
int dictReplace(dict *d, void *key, void *val) {
dictEntry *entry, auxentry;
if (dictAdd(d, key, val) == DICT_OK) // 新 key
return 1;
entry = dictFind(d, key);

auxentry = *entry;
dictSetVal(d, entry, val); // 先设置新值,robj->refcount++
dictFreeVal(d, &auxentry); // 再释放旧值,robj->refcount-- // 无副作用,内存依旧有效
return 0;
}

删除:查找后直接更新前驱节点指针


Dict Iterator

关于迭代器:

  • 正确性:数据集内的所有元素都被遍历一次,不重复也不遗漏
  • 两种实现:
    • 生成只读视图遍历:完整拷贝数据集代价高,但能保证正确性;不适合会保存大量 kv 的 dict
    • 单步遍历:需额外约束数据集的操作,遍历期间保证元素分布不变动,才能保证正确性

dict 迭代器实现为后者,根据正确性约束条件的实现,又分为 2 类

  • safe iterator:由 dict 内部约束,迭代期间不进行 rehash,保证元素分布始终不变。见 _dictRehashStep

  • unsafe iterator:由 dict 使用方自行约束,迭代期间自觉地不执行增删,迭代前后对比 dict 快照指纹来检查正确性。用 dict 的各种属性来计算指纹:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    long long dictFingerprint(dict *d) {
    long long integers[6], hash = 0;
    integers[0] = (long) d->ht[0].table;
    integers[1] = d->ht[0].size;
    integers[2] = d->ht[0].used;
    // ...

    for (int j = 0; j < 6; j++) {
    hash += integers[j]; // integers 的值顺序也反映到指纹中
    // ... // 对 hash 值进行取反、位移等位操作,将碰撞概率降到极低
    }
    return hash;
    }

迭代器结构

1
2
3
4
5
6
7
8
9
10
11
typedef struct dictIterator {
dict *d;

int table; // 哈希表编号
long index; // bucket 索引,指示哈希表迭代进度。初值 -1 标记是否被使用过
dictEntry *entry; // 当前迭代元素,指示 bucket 迭代进度
dictEntry *nextEntry; // 保存下一个待迭代元素,避免 entry 被删除导致迭代进度丢失

int safe; // 指示迭代操作是否安全
long long fingerprint; // dict 快照指纹,用于检测迭代前后 dict 数据是否有更新
} dictIterator;

迭代操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
void do_iterate(dict *d) {
it = dictGetIterator(d);
while ((de = dictNext(it)) != NULL)
handleEntry(de);
}

// 读取下一个待迭代元素
dictEntry *dictNext(dictIterator *iter) {
while (1) {
if (iter->entry == NULL) {
dictht *ht = &iter->d->ht[iter->table]; // 刷新哈希表,随时可能从 ht[0] 切到 ht[1]

if (iter->index == -1 && iter->table == 0) { // iter 首次迭代
if (iter->safe)
iter->d->iterators++; // 安全迭代,则 +1 标记当前迭代器,阻断 rehash
else
iter->fingerprint = dictFingerprint(iter->d); // 不安全迭代,则计算快照指纹
}
iter->index++;

if (iter->index >= (long) ht->size) { // 当前哈希表遍历完毕
// rehash 暂停期间,ht[0] 遍历完切到 ht[1] 继续遍历
if (dictIsRehashing(iter->d) && iter->table == 0) {
iter->table++;
iter->index = 0;
ht = &iter->d->ht[1];
} else {
break; // 迭代结束
}
}
iter->entry = ht->table[iter->index]; // 遍历下一个 bucket
} else {
iter->entry = iter->nextEntry; // bucket 内向后遍历
}

if (iter->entry) {
// 不对返回给调用方的 entry 做有效性假设(可被删除),暂存 nextEntry 作为下次迭代起点
iter->nextEntry = iter->entry->next;
return iter->entry;
}
}
return NULL;
}

释放迭代器:注意 unsafe iterator 在释放前会做快照校验,若不匹配将直接结束 Redis 进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void dictReleaseIterator(dictIterator *iter) {
if (!(iter->index == -1 && iter->table == 0)) {
if (iter->safe)
iter->d->iterators--;
else
// 不安全的迭代,调用方必须保证 ht 在此期间不增删
assert(iter->fingerprint == dictFingerprint(iter->d));
}
zfree(iter);
}

#define assert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),_exit(1)))
void _redisAssert(char *estr, char *file, int line) {
*((char*)-1) = 'x'; // 发生严重错误,手动发送 SIGSEGV 结束进程
}

注意:dict 快照指纹仅只取 dict 属性计算哈希值,若迭代期间执行了等量的增删操作,则无法检测,如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int main() {
struct dict *d = dictCreate(&dbDictType, NULL);
sds k1 = sdsnew("k1"), v1 = sdsnew("v1"), k2 = sdsnew("k2"), v2 = sdsnew("v2");
dictAdd(d, k1, v1); dictAdd(d, k2, v2);
sds k3 = sdsnew("k3"), v3 = sdsnew("v3");
dictIterator *it = dictGetIterator(d);
dictEntry*de;
int i=0;
while ((de = dictNext(it)) != NULL) {
if (++i == 2)
printf("add:%d, delete:%d\n", dictAdd(d, k3, v3), dictDelete(d, k1)); // 0, 0
printf("%s => %s\n", (char*)de->key, (char*)de->v.val);
}
dictReleaseIterator(it); // 指纹未变化,实际上遍历期间 k1 被删除,新增了 k3
}

鉴于 dict 仅在 Redis 内部使用,快照指纹逻辑尽可能简单为好。若要实现精确指纹,可遍历所有 KV 取哈希,也可以让 dict 动态维护类似文件 mtime 的字段。


Dict Scanner

使用

与面向 Redis 内部使用的 dict iterator 不同,scanner 通过 SCAN 系命令将遍历操作暴露给应用层。用法:

1
2
3
// 从游标 cursor 指向的位置开始,遍历匹配 patter 的 key,最多查找 count 个 bucket(不是 count 个元素)
// 返回新的游标,供下次继续扫描使用
SCAN cursor [MATCH pattern] [COUNT count]

问题

最简单直接的扫描是依次顺序遍历 2 张哈希表,但是 dict rehash 会使ht[0]的 bucket 重分布到ht[1],将导致 key 被重复遍历或被遗漏

  • 扩容 rehash 肯定会重复遍历元素

    ht[0] 的 bucket_0 先被遍历,但随后发生了扩容 rehash,其内的部分元素(绿色)重新分布到了 ht[1] 的 bucket_4,当遍历ht[1]时绿色元素将被遍历 2 次:

  • 缩容 rehash 可能遗漏元素:遍历ht[0]的 bucket_1,随后发生完整 rehash,继续从ht[1] bucket_2 遍历,将遗漏原ht[0] bucket_5 内的所有元素:

解决:为以最低成本解决上述问题,#579 讨论引入了无状态扫描算法,使 scanner 状态仅需用一个整数 cursor 来描述,保证了暴露给用户的接口足够简单,同时 scanner 也不占内存;但代价是算法逻辑复杂

Scan 算法分析

前置条件

  • 哈希表大小总是 2 的次方:rehash 操作时可用二进制表述 bucketIdx,如上:
    • 扩容 rehash:bucket_0 内的元素哈希值二进制低两位必定是00,根据第高位第三位是 0 还是 1 来决定分配到 bucket_0 和 bucket_4
    • 缩容 rehash:同理 bucket_1 和 bucket_5 内元素的哈希值二进制低两位都是 01,最终都汇聚到 bucket_1
  • rehash 操作单位是 bucket:降低复杂度

算法:设哈希表长度比例为 1:N,一次扫描会扫 2 张哈希表,遍历低位相同的 1+N 个 bucket;低位使用 cursor 描述,指示大表中下一个待扫描的 bucketIdx,屏蔽掉小表 sizemask 后递增,小表扫描完毕即结束

示例:扫描期间做扩容 rehash,详见代码注释

缺陷:对于扩容 rehash,所有元素都会被遍历且只遍历 1 次。但对于缩容 rehash,在大表中被扫描过的 bucket 由于聚合操作会被再次遍历。比如上图中ht[0]扫描到 bucket_4,缩容 rehash 会让已扫描的 bucket_1 和未扫描的 bucket_5 聚合到ht[0]的 bucket_1,最终导致重复

实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// 对字典 d、游标 v 处 bucket 内的所有元素,执行 fn(privdata) 函数
unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *privdata) {
dictht *t0, *t1;
const dictEntry *de;
unsigned long m0, m1;

if (dictSize(d) == 0) return 0;

if (!dictIsRehashing(d)) {
// 没有 rehash,直接遍历 ht[0] cursor 处的 bucket
t0 = &(d->ht[0]);
m0 = t0->sizemask;

de = t0->table[v & m0];
while (de) {
fn(privdata, de);
de = de->next;
}
} else {
t0 = &d->ht[0];
t1 = &d->ht[1];
if (t0->size > t1->size) {
t0 = &d->ht[1]; // 确保 t0 是小表,t1 是大表
t1 = &d->ht[0];
}
m0 = t0->sizemask;
m1 = t1->sizemask;

// 1. 扫描小表中 cursor 指向的 bucket
de = t0->table[v & m0];
while (de) {
fn(privdata, de);
de = de->next;
}

// 2. 扫描大表中索引低位为 cursor 的多个 buckets
do {
de = t1->table[v & m1];
while (de) {
fn(privdata, de);
de = de->next;
}

/* Increment bits not covered by the smaller mask */
// 固定小表的索引后缀,不断自增高位来遍历大表对应的多个 buckets
// (v|m0)+1 // 将高位+1
// & ~m0 // 只保留高位
// |(v&m0) // 还原 v 在小表的后缀
v = (((v | m0) + 1) & ~m0) | (v & m0);

// m0^m1 结果是将 m1 中 m0 的部分翻转为 0,留下全为 1 的高位
// 游标又进一位突破 m1 则子 buckets 遍历结束
} while (v & (m0 ^ m1));
}

// 3. 计算下一个 cursor
// 取消 sizemask 屏蔽 找到左侧最高位;如长为 4 的表,sizemask 为 3,取反为 1100
// 0: 0000 | 1100 = 1100
// 2: 0010 | 1100 = 1110
// 1: 0001 | 1100 = 1101
// 3: 0011 | 1100 = 1111
v |= ~m0;

// 翻转后自增再翻转
// 0: 1100 --rev--> 0011 --+1--> 0100 --rev--> 0010; 2
// 2: 1110 --rev--> 0111 --+1--> 1000 --rev--> 0001; 1
// 1: 1101 --rev--> 1011 --+1--> 1100 --rev--> 0011; 3
// 3: 1111 --rev--> 1111 --+1--> 0000 --rev--> 0000; 0 // 扫描结束
v = rev(v);
v++;
v = rev(v);

// 高位向低位做加法,得出的结果是同模的,1%4 == 5%4 == 1
// 同模的链式访问,保证了扩容后不会重复扫描已访问过的元素,实现每个原书扫描且仅扫一次
// 但缩容后同模的高位被聚合到小表的低位 bucket 中,将导致重复扫描原来低位 bucket
return v;
}

附常用位运算:

1
2
3
4
5
6
// 1. & 与:屏蔽和读取指定位; &0 清除特定位,&1 读取特定位
// 2. | 或:将指定位置 1; |1 置 1,|0 不变
// 3. ^ 异或:将指定位反转;^1 时 0->1,1->0;^0 时不变
// 4. ~ 取反:01 互转
// 5. << 左移:右侧补 0,x<<N 为 x*(2^N)
// 6. >> 右移:左侧补 0,x>>N 为 x/(2^N)

总结

Redis dict 在结构上对 kv 做了多态封装;并为哈希表实现了 rehash,动态扩缩容来保证 load factor 在 1 附近,进而保持哈希冲突与内存占用的动态平衡,此外 rehash 的时间成本以单步操作的形式均摊到高频的增删改查操作;由于 rehash 会影响迭代的正确性,故引入 safe iterator 计数器来中断单步 rehash,保证迭代安全;dict 的扫描算法基于位运算,以游标进位的方式保证扫描的完整性,十分巧妙。