前言

Redis 为存储大量整数场景设计了 intset 结构,底层实现为变长的有序数组以使用二分查找。为了尽可能节约内存,还实现了数组类型的动态升级


Intset 结构

类似 SDS 的变长字符数组,Intset 也将整数存于变长数组中,类型声明为int8_t表示signed char单个字节,由encoding指示数组中多少连续字节对应一个完整的整数

1
2
3
4
5
6
7
8
9
10
11
// 带编码和长度元信息的变长整数数组
typedef struct intset {
uint32_t encoding;
uint32_t length;
int8_t contents[]; // raw bytes
} intset;

// 三种长度的整型及其编码值
#define INTSET_ENC_INT16 (sizeof(int16_t)) // 2
#define INTSET_ENC_INT32 (sizeof(int32_t)) // 4
#define INTSET_ENC_INT64 (sizeof(int64_t)) // 8

整型值的大小决定其编码:

1
2
3
4
5
static uint8_t _intsetValueEncoding(int64_t v) {
if (v < INT32_MIN || v > INT32_MAX) return INTSET_ENC_INT64; // int32 溢出
else if (v < INT16_MIN || v > INT16_MAX) return INTSET_ENC_INT32; // int16 溢出
else return INTSET_ENC_INT16;
}

Intset API

大端转小端

字节序不同,整数的字节存储顺序就不同。比如整数 63793306 被拍扁为字节后的内存分布:

CPU 会自动根据自己的字节序来解释内存,返回给用户的值都是 63793306;但是由我们自己从堆增长方向来解释内存时:

  • 小端序:读到 0x03cd689a,值为 63793306
  • 大端序:读到 0x9a68cd03,值为 2590559491,错误,要先做二进制翻转

为解决堆增长方向与大端顺序方向相反的问题,Redis 将大端字节分布做了翻转,解释为小端字节序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include<machine/endian.h>
#if (BYTE_ORDER == LITTLE_ENDIAN)
#define memrev16ifbe(p)
#define intrev16ifbe(v) (v)
#else
#define memrev16ifbe(p) memrev16(p) // 大端序做内存翻转
#define intrev16ifbe(v) intrev16(v)
#endif

void memrev16(void *p) {
unsigned char *x = p, t;
t = x[0];
x[0] = x[1]; // 按字节对称 swap
x[1] = t;
}

uint16_t intrev16(uint16_t v) {
memrev16(&v);
return v;
}

二分查找

查找是插入的前置操作,复杂度 O(logN)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 在 intset 中搜索 value,存在则将索引存入 pos 返回 1,不存在则将插入索引存入 pos 返回 0
static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {
int min = 0, max = intrev32ifbe(is->length)-1, mid = -1;
int64_t cur = -1;

// 1. intset 为空
if (intrev32ifbe(is->length) == 0) {
if (pos) *pos = 0; // prepend
return 0;
} else {
// 2. value 比 intset 的最大值还大,则 append
if (value > _intsetGet(is,intrev32ifbe(is->length)-1)) {
if (pos) *pos = intrev32ifbe(is->length);
return 0;
} else if (value < _intsetGet(is,0)) {
if (pos) *pos = 0; // 比最小值还小,则 prepend
return 0;
}
}

// 3. 二分查找
while(max >= min) {
mid = ((unsigned int)min + (unsigned int)max) >> 1; // 高效位操作求均值
cur = _intsetGet(is,mid);
if (value > cur) {
min = mid+1;
} else if (value < cur) {
max = mid-1;
} else {
break;
}
}

if (value == cur) {
if (pos) *pos = mid;
return 1;
} else {
if (pos) *pos = min; // 未找到则存入插入索引
return 0;
}
}

插入

流程:查找插入索引,先扩容,再将索引后的元素都向后挪 1 个位置,最后插入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {
uint8_t valenc = _intsetValueEncoding(value);
uint32_t pos;
if (success) *success = 1;
if (valenc > intrev32ifbe(is->encoding)) {
return intsetUpgradeAndAdd(is,value); // 新值对于当前类型会溢出,则做类型升级
} else {
if (intsetSearch(is,value,&pos)) {
if (success) *success = 0; // 已存在则插入失败
return is;
}
is = intsetResize(is,intrev32ifbe(is->length)+1); // 重新分配内存扩容
if (pos < intrev32ifbe(is->length)) // 非 append 则需移动内存
intsetMoveTail(is,pos,pos+1); // [pos,tail] 挪到 [pos+1,tail]
}
_intsetSet(is,pos,value);
is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
return is;
}

static void intsetMoveTail(intset *is, uint32_t from, uint32_t to) {
void *src, *dst;
uint32_t bytes = intrev32ifbe(is->length)-from;
uint32_t encoding = intrev32ifbe(is->encoding);
if (encoding == INTSET_ENC_INT64) {
src = (int64_t*)is->contents+from; // 将内存解释为 int64_t*
dst = (int64_t*)is->contents+to;
bytes *= sizeof(int64_t);
} // ...
memmove(dst,src,bytes); // 直接操作内存,复杂度 O(1)
}

类型升级:若待插入的整数,比当前编码类型的最小值还小、或比最大值还大,为保持插入后数组元素类型一致,需对数组中所有旧元素进行类型升级

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 数组长度增减、编码发生变化,都要及时重新分配内存
static intset *intsetResize(intset *is, uint32_t len) {
uint32_t size = len*intrev32ifbe(is->encoding);
is = zrealloc(is,sizeof(intset)+size);
return is;
}

static intset *intsetUpgradeAndAdd(intset *is, int64_t value) {
uint8_t curenc = intrev32ifbe(is->encoding);
uint8_t newenc = _intsetValueEncoding(value);
int length = intrev32ifbe(is->length);
// 由于 value 突破当前类型的最值才触发了升级,故为负则 prepend,为正则 append
int prepend = value < 0 ? 1 : 0;
is->encoding = intrev32ifbe(newenc);
is = intsetResize(is,intrev32ifbe(is->length)+1);

// 从后向前逐个升级类型,并为 prepend 操作提前预留 1 个位置
// prepend 变量一值两用:作为布尔值决定插入位置,作为整数值决定新旧索引的 gap,很精简
while(length--) // O(N)
_intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc));

if (prepend)
_intsetSet(is,0,value); // prepend
else
_intsetSet(is,intrev32ifbe(is->length),value); // append
is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
return is;
}

static void _intsetSet(intset *is, int pos, int64_t value) {
uint32_t encoding = intrev32ifbe(is->encoding);
if (encoding == INTSET_ENC_INT64) {
((int64_t*)is->contents)[pos] = value;
memrev64ifbe(((int64_t*)is->contents)+pos);
} // ...
}

注:若要最大限度节约内存,删除时可做类型降级,但是降级没有触发条件,必须遍历所有元素来确保值都能落到降级类型的区间内,这会导致删除操作复杂度从O(logN)涨到 O(N),得不偿失

删除

流程:查找元素,若存在则直接移动内存覆盖该值,最后重分配内存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
intset *intsetRemove(intset *is, int64_t value, int *success) {
uint8_t valenc = _intsetValueEncoding(value);
uint32_t pos;
if (success) *success = 0;

if (valenc <= intrev32ifbe(is->encoding) && intsetSearch(is,value,&pos)) {
uint32_t len = intrev32ifbe(is->length);
if (success) *success = 1;

// 用 [pos+1,tail] 内存覆盖 [pos,tail],来删除 pos 处元素
if (pos < (len-1)) intsetMoveTail(is,pos+1,pos);
is = intsetResize(is,len-1);
is->length = intrev32ifbe(len-1);
}
return is;
}

总结

Intset 底层是堆上的变长整型数组,增删操作直接O(1)移动内存,为了实现二分查找,动态地维护了数组的有序性。为了尽可能减少内存占用,数组类型从int16开始,只有真的要存储会导致溢出的新值时,才将数组类型升级为int32, int64;为了升级时高效操作内存,还将整型数组拍扁了存储为原生字节数组,并解决了 BigEndian 存储顺序与内存增长方向冲突的问题