前言

Redis 的有序集合,具有双重属性:集合说明内部元素不重复,有序说明元素是带权重有序排列的。不重复属性可用 dict 哈希表实现,有序结构可用平衡树或跳跃表实现,为了方便实现区间操作而选用后者。


SkipList 论文阅读

原文:《Skip Lists: A Probabilistic Alternative to Balanced Trees》– William Pugh

平衡树现存问题

平衡树为了实现O(logN)的快速查找,增删元素后需保持平衡:左子节点值比父节点小,右子节点值比父节点大。但绝对的平衡是有代价的:

  • 二叉查找树:插入大量有序元素时会退化为单链表,查找复杂度从O(logN)涨到O(N)
  • AVL:在插入时及时调整树结构避免退化,旋转操作实现较为复杂

分层有序链表

为了绕开树结构的先天限制,论文作者设计了等效的分层有序链表:上一层的元素个数是下一层的 1/2,查找时逐层向下定位元素,类似有序数组的二分查找过程,查找复杂度和平衡树一样只有O(logN)

但是为了保证向下相邻层的元素个数比为 1:2,链表每次插入、删除元素后,都必须重新调整周边元素的高度。比如插入 6 后,7 和 8 的高度都有调整,调整操作需要遍历链表,复杂度仍为O(N)

随机高度

由于O(N)增删操作无法接受,论文作者尝试让新插入元素的高度随机,不再维护相邻层元素个数 1:2 ,如此直接查找插入位置、更新前驱节点指针即可,插入操作的复杂度降到了 O(logN)

由于要在大体上要保证高层元素少、低层元素多,才能实现二分查找的效果。所以取随机高度有 2 个限制:

  • 高度有最值:下限为 1 来存储元素;要有上限,避免层数太高增加不必要的下沉判断

  • 随机到高层的难度呈指数级上升:设增加一层的概率为 P,则连续增加 N 层的概率为 pow(P,N),伪代码:

    1
    2
    3
    4
    5
    6
    randomLevel()
    level := 1
    // random() 函数生产 [0,1)
    while random() < p and level < MaxLevel do
    level++
    return level

    比如 P 为 0.5,新元素高度取 2,3,4 的概率分别为 50%,25%,12.5%,以此类推。指数算法有 2 个好处:

    • 通过增减 P 值,能调整低层、高层的分布
    • 能保证平均的查找复杂度为O(logN),证明过程见论文

Redis 取随机高度的算法:

1
2
3
4
5
6
7
8
#define ZSKIPLIST_MAXLEVEL 32 // 等效于 32 层的平衡树 2^32 elements */
#define ZSKIPLIST_P 0.25 // 25%, 6.25%, 1.5625%, ...
int zslRandomLevel(void) {
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF)) // 类似 random()/LONG_MAX < 1/4
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL; // MAX_LEVEL 高度上限
}

Skiplist 结构

Skiplist 主要用于存储大 zset 中有序的 score 到 member 的映射关系,结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 单个节点
typedef struct zskiplistNode {
robj *obj; // member,实为 sds
double score; // 排序权重
struct zskiplistNode *backward; // 单个后继节点,用作倒序取范围
struct zskiplistLevel {
struct zskiplistNode *forward; // 层级链表指针
unsigned int span;
} level[]; // 描述层级关系的多个前驱节点
} zskiplistNode;

// 跳跃表
typedef struct zskiplist {
struct zskiplistNode *header, *tail; // 头尾节点
unsigned long length; // 表长度
int level; // 截止目前随机到的最高层数,无需每次都从最高层下沉
} zskiplist;

跳跃表初始化:header 节点具有最高高度,全部初始化为哑节点,仅做指示方向使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl = zmalloc(sizeof(*zsl));
zsl->level = 1; // 截止目前的最高层为 1,header 的 32 层不算
zsl->length = 0;
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL); // header 各层的哑节点仅用作索引
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL; // 指向 NULL
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
zsl->tail = NULL;
return zsl;
}

Skiplist API

插入

参考论文,插入操作分 2 步

  • 查找插入位置:从顶层出发,逐个向前比较 key,相等则更新 value 并返回,更大则下沉,更小则在本层前进,直到下沉到最底层搜索链表。在插入新节点时,其左视图 RightMostNodes 的前驱指针将要指向新节点,所以在下沉途中需暂存这些节点

  • 生成随机高度,在各层链表中插入节点,并更新 span

实现:查找操作与论文中有两点不同

  • 会忽略已存在的 key,skiplist 中 key 可重复,value 的唯一性由 zset 搭配 dict 来保证
  • 不单比较 key,还会比较 sds value 的字典序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; // level -> RightMostNode,简称 rmNode
unsigned int rank[ZSKIPLIST_MAXLEVEL]; // 节点在第一层的水平索引,取 diff 计算 span
int i, level;
redisAssert(!isnan(score));
x = zsl->header;

// 1. 下沉查找插入层,每层需记录:
// RightMostNode:可能要在 rmNode 与 rmNode->forawrd 之间插入新节点
// RightMostNode Rank:插入节点要更新 rmNode 和新节点的 span
for (i = zsl->level-1; i >= 0; i--) {
// 最顶层的 rank 为 0,下沉时只向右移动,故保留上层 rank
rank[i] = (i == (zsl->level-1)) ? 0 : rank[i+1];
// 在本层向前查找第一个权重 score 大(字典序比 obj 大)的节点
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
// 累加右移的 span 到 rank
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
// 记录本层的 RightMostNode,准备下沉到更下一层
// 此处即使 key 已存在也要下沉,查找插入位置
update[i] = x;
}

// 2. 生成随机高度
level = zslRandomLevel();
if (level > zsl->level) {
// 若刷新了高度记录,则将 header 指定为刷新记录的各层的 RightMostNode
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length; // header 到 tail 的距离从 0 变为有意义的 len
}
zsl->level = level;
}
x = zslCreateNode(level,score,obj);

// 3. 插入新节点
for (i = 0; i < level; i++) {
// before: rmNode --> forward
// insert: newNode --> forward
// rmNode --> newNode
// after: rmNode --> newNode --> forward
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;

// 用 rmNode->newNode 和 newNode->forward 的新距离更新各自的 span
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}

// 由于插入了新节点,比新节点更高的 rmNode 的 span 都要自增 1
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}

// 4. 更新 newNode,forward 二者的后继节点
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}

删除

作为插入的反向操作,删除节点要断开 RightMostNodes 与待删除节点间的指针,复杂度 O(N)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// 从 skiplist 中删除 key 为 score,value 为 obj 的节点
int zslDelete(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;

// 和插入一样,先进行下沉查找
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,obj) < 0)))
x = x->level[i].forward;
update[i] = x;
}
// 由于 key 会重,需遍历比较 value
x = x->level[0].forward;
if (x && score == x->score && equalStringObjects(x->obj,obj)) {
zslDeleteNode(zsl, x, update);
zslFreeNode(x);
return 1;
}
return 0; // 未找到
}

// 从 skiplist 中删除节点 x,其 rmNodes 已存于 update 数组
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
// 1. 断开 forward 指针,递减 span
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == x) {
// 直连 x 的 rmNode 断开连接,合并 span
update[i]->level[i].span += x->level[i].span - 1;
update[i]->level[i].forward = x->level[i].forward;
} else {
// 未直连则 span-1
update[i]->level[i].span -= 1;
}
}

// 2. 断开 backward 指针
if (x->level[0].forward) {
x->level[0].forward->backward = x->backward;
} else {
zsl->tail = x->backward;
}

// 3. 若删除了最高节点,则刷新高度记录
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--;
zsl->length--;
}

区间操作

zset 基于 skiplist 实现了 RANGE* 系列命令,用户指定的 score 区间由 zrangespec 描述:

1
2
3
4
5
6
7
8
typedef struct {
double min, max;
int minex, maxex; // exclude,标记区间是否闭合
} zrangespec;

static int zslValueGteMin(double value, zrangespec *spec) {
return spec->minex ? (value > spec->min) : (value >= spec->min); // 若不闭合,则不做等值比较
}

操作前需对区间做合法性、相交性检测:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 检查 range 是否与 skiplist 存在交集
int zslIsInRange(zskiplist *zsl, zrangespec *range) {
zskiplistNode *x;
// 区间有效性检查
if (range->min > range->max ||
(range->min == range->max && (range->minex || range->maxex)))
return 0;

// 2 个区间的相交性检查
x = zsl->tail;
if (x == NULL || !zslValueGteMin(x->score,range)) // range 最小值,大于 skiplist 的最大值
return 0;
x = zsl->header->level[0].forward;
if (x == NULL || !zslValueLteMax(x->score,range)) // range 最大值,小于 skiplist 的最小值
return 0;
return 1;
}

注意,区间有的三个维度

  • *BYSCORE:score 维度,面向 skiplist 节点的 key
  • *BYRANK:rank 维度,面向 skiplist 节点的序号(排名)
  • *BYLEX:lexicographical 字典序维度,面向 score 相同的子区间内节点的 value(score 不同则无意义)

以下从 score 维度简述区间操作

区间读

ZRANGEBYSCORE:在 skiplist 中向后查找第一个出现在 range 区间的节点,再向后遍历直到区间上限

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) {
zskiplistNode *x;
int i;
if (!zslIsInRange(zsl,range)) return NULL; // 无交集

x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
// 找到首个出现在 range 区间内的节点
while (x->level[i].forward && !zslValueGteMin(x->level[i].forward->score,range))
x = x->level[i].forward;
}
x = x->level[0].forward;
redisAssert(x != NULL); // 验证过有交集,x 节点必定存在

// 检查 x 是否在 range 范围内,不在则说明 range 区间恰好没有节点
if (!zslValueLteMax(x->score,range)) return NULL;
return x;
}

区间删除

ZREMRANGEBYSCORE:找出首个出现在区间的节点后,向后遍历操作删除,同时也从 dict 中彻底删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec *range, dict *dict) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned long removed = 0;
int i;
x = zsl->header;
// 和区间读一样,先查找首个出现在 range 区间的节点 x,顺带记录 rmNodes
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && !zslValueGteMin(x->level[i].forward->score, range))
x = x->level[i].forward;
update[i] = x;
}
x = x->level[0].forward;
while (x && zslValueLteMax(x->score, range)) { // 删除区间内的所有 node
zskiplistNode *next = x->level[0].forward;
zslDeleteNode(zsl,x,update); // 从 skiplist 中删除
dictDelete(dict,x->obj); // 同时也从 zset 的 dict 中彻底删除
zslFreeNode(x);
removed++;
x = next;
}
return removed;
}

总结

在有序集合的使用场景中,元素的权重 score 不可避免地会重复,所以 Redis 为原生的 skiplist 配套了 dict 去实现集合的去重属性,使得 skiplist 中的 key 可重复;对于 skiplist 本身,Redis 还进行多个改造:加入 backward 指针做反向遍历、将节点的 value 也纳入比较范围、引入 rank 的概念丰富集合操作等等,对应设计了三个维度的区间操作,功能更丰富