前言

得益于精简的数据结构设计,Redis 约 50% 的命令复杂度仅为O(1),15% 为O(logN),30% 为O(N),可见 Redis 是非 CPU 密集型应用,瓶颈在内存和网络 IO;因此多线程实现收益小,还要保证共享数据的并发安全,逻辑复杂且难以调试和维护,弊大于利;实现为单线程,可通过 IO 多路复用实现网络 IO 的并发读写,无需考虑多线程同步,利大于弊


ae 模块

不同平台实现 IO 多路复用的库不同,Linux 有 epoll,BSD 有 kqueue,其他 Unix 平台有 select。为了给上层提供统一的网络接口,Redis 集成了文件事件与时间事件,统一事件注册、等待与处理接口,封装成 ae (A simple event-driven programming libray) 模块

第一层封装:跨平台

封装 6 个 aeApi* 接口来统一调用底层的 IO 多路复用库,如封装 epoll:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/* ae_epoll.c */
// 带事件缓冲的 epoll fd
typedef struct aeApiState {
int epfd;
// server 根据 maxclients 配置能知道 epoll fd 最多要监听多少个 client socket fd
// 才能给 epoll fd 配置事件数组,与 client fd 一一对应,注册新事件不再需要先 EPOLL_CTL_GET
struct epoll_event *events;
} aeApiState; // attach 到 eventLoop->apidata

int aeApiCreate(aeEventLoop *eventLoop); // 创建 epoll fd 和事件池
int aeApiResize(aeEventLoop *eventLoop, int setsize); // 事件池扩容
void aeApiFree(aeEventLoop *eventLoop); // 关闭 epoll fd,释放内存
void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask); // 屏蔽 delmask 事件

// 注册 mask 事件
int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee;
int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;

ee.events = 0;
mask |= eventLoop->events[fd].mask; // 新增事件
if (mask & AE_READABLE) ee.events |= EPOLLIN; // 读写标记转换
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;

if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; // 注册新事件
return 0;
}

// 等待事件发生或超时
int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;

retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); // tvp 为 NULL 则永久阻塞
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) { // 逐个处理事件
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE; // 可读
if (e->events & EPOLLOUT) mask |= AE_WRITABLE; // 可写
eventLoop->fired[j].fd = e->data.fd; // wrap 成 FireEvent 委托 eventloop 处理
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}

再根据条件宏选定要使用的事件库实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* ae.h */
#ifdef __linux__
#define HAVE_EPOLL 1
#endif
#if (defined(__APPLE__) || defined(__OpenBSD__)
#define HAVE_KQUEUE 1
#endif

/* ae.c */
#ifdef HAVE_EPOLL
#include "ae_epoll.c" // 直接导入文件内容
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#endif
#endif

第二层封装:事件处理

ae 库要处理 2 类事件:

  • 描述 file fd 或 socket fd 的文件事件:在当前事件状态之上,封装处理读写的 handler

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    typedef struct aeFileEvent {
    int mask; // AE_READABLE, AE_WRITABLE
    aeFileProc *rfileProc, *wfileProc; // rw handler
    void *clientData; // handler 实参,eventloop 对象
    } aeFileEvent;

    typedef struct aeFiredEvent {
    int fd, mask; // 描述 aeFileEvent 上已发生的事件
    } aeFiredEvent;

    // handler 函数签名:el 监听的 fd 发生了 mask 事件,附上实参,处理该事件
    typedef void aeFileProc(aeEventLoop *el, int fd, void *clientData, int mask);
  • 描述定时任务的时间事件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    typedef struct aeTimeEvent {
    long long id; // 自增 id
    long when_sec, when_ms; // 毫秒级到期时间戳
    aeTimeProc *timeProc; // timer handler
    aeEventFinalizerProc *finalizerProc; // 本对象的析构函数 // 未使用
    void *clientData; // handler 实参 // 未使用
    struct aeTimeEvent *next; // 事件链表指针
    } aeTimeEvent;

    // handler 函数签名:处理 el 已到期的 id 时间事件,处理完毕返回 AE_NOMORE
    typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);

ae API

本节分析 ae 围绕 aeEventLoop 实现事件注册、监听与处理的 API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
typedef struct aeEventLoop {
int maxfd; // 为 select 维护的 max fd
int setsize; // el 要监听的 fd 上限,等于 maxclients+预留 fd 之和(1000+32+96 = 1128)
long long timeEventNextId; // te.id 自增
time_t lastTime; // 解决系统时间后移导致 timeEvent 额外延期
aeFileEvent *events; // 已注册事件数组,client socket fd 为索引
aeFiredEvent *fired; // 待处理事件数组
aeTimeEvent *timeEventHead; // 时间事件链表
int stop; // 退出标记
void *apidata; // 跨平台事件模型具体实现,即 aeApiState
aeBeforeSleepProc *beforesleep; // 事件循环前的准备工作
} aeEventLoop;

// 创建 eventloop 对象
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); // 待处理事件数组
// ...
if (aeApiCreate(eventLoop) == -1) goto err; // 创建 epoll fd
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE; // AE_NONE 初始状态,不可读也不可写
return eventLoop;
err: // ...
return NULL;
}

部分 ae API 参与的部分事件循环示意图:

aeFileEvent API

aeCreateFileEvent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 向 eventloop 注册 fd 的 mask 事件,并指定该事件 handler 及实参
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) {
if (fd >= eventLoop->setsize) { // 连接数超出配额
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1) // 向 epoll 请求注册 fd 的 mask 事件
return AE_ERR;

fe->mask |= mask; // 同步更新 fe
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd) eventLoop->maxfd = fd;
return AE_OK;
}

// 用例:向 el 注册 client socket 的读事件,有数据可读时,调用 readQueryFromClient,参数为 redisClient
aeCreateFileEvent(server.el, client_sockfd, AE_READABLE, readQueryFromClient, redisClient);
aeDeleteFileEvent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 通知 eventloop 不再关注 fd 的 mask 事件,取消注册
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) {
if (fd >= eventLoop->setsize) return;
aeFileEvent *fe = &eventLoop->events[fd];
if (fe->mask == AE_NONE) return;

aeApiDelEvent(eventLoop, fd, mask); // 向 epoll 请求屏蔽 fd 的 mask 事件
fe->mask = fe->mask & (~mask); // 同样 fe 也删
if (fd == eventLoop->maxfd && fe->mask == AE_NONE) { // maxfd 所有事件都被屏蔽,则重新计算
for (int j = eventLoop->maxfd-1; j >= 0; j--)
if (eventLoop->events[j].mask != AE_NONE) break;
eventLoop->maxfd = j;
}
}

aeTimeEvent API

aeCreateTimeEvent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 向 eventloop 注册 ms 后才执行 proc 的定时任务
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long ms, aeTimeProc *proc, /*..*/) {
long long id = eventLoop->timeEventNextId++;
aeTimeEvent *te;
if ((te = zmalloc(sizeof(*te))) == NULL) return AE_ERR;
te->id = id;
aeAddMillisecondsToNow(ms,&te->when_sec,&te->when_ms); // 将 now_ms+ms 转为时间戳
te->timeProc = proc;
// ...
te->next = eventLoop->timeEventHead; // prepend
eventLoop->timeEventHead = te;
return id;
}
// 用例:向 el 注册 1ms 后执行的 serverCron 的时间事件
aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
processTimeEvents
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 处理链表中所有已过期的时间事件
int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
time_t now = time(NULL);

// redis 主机时间先被快进 N 秒,一段时间后才恢复,会让这段时间注册的 te 被多延期 N 秒,导致错乱
if (now < eventLoop->lastTime) {
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0; // 如此若发生偏移,则强制过期所有 te,早过期总比延迟过期好
te = te->next;
}
}
eventLoop->lastTime = now;

te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
while(te) { // 遍历链表,逐个处理已过期的 te
long now_sec, now_ms;
long long id;
if (te->id > maxId) { // 避免 te handler 插入新 te,导致无限循环
te = te->next;
continue;
}
aeGetTime(&now_sec, &now_ms);
if (now_sec > te->when_sec || (now_sec == te->when_sec && now_ms >= te->when_ms)) {
int retval;
id = te->id;
retval = te->timeProc(eventLoop, id, te->clientData); // 过期则执行 te handler
processed++;
if (retval != AE_NOMORE) {
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); // 延期则刷新过期时间
} else {
aeDeleteTimeEvent(eventLoop, id); // 执行成功则删除
}
te = eventLoop->timeEventHead; // 删除操作会导致链表中断,重新从头开始
} else {
te = te->next; // 还未过期,跳过
}
}
return processed;
}

aeProcessEvents

两种事件的等待和触发处理由 aeProcessEvents 实现,有 4 个选项控制处理行为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#define AE_FILE_EVENTS 1                              // 0001:只处理 file event
#define AE_TIME_EVENTS 2 // 0010:只处理 time event
#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS) // 0011:两种事件都处理,无事件则阻塞等待
#define AE_DONT_WAIT 4 // 0100:无事件时不等待

// 处理已过期 time event,并等待 eventloop 监控的 file event 发生事件并处理
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
int processed = 0, numevents;
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
// 1. 计算 epoll_wait 超时时间
struct timeval tv, *tvp;

if ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop); // 链表搜索最快到期的 te
if (shortest) {
long now_sec, now_ms;
aeGetTime(&now_sec, &now_ms);
tvp = &tv; // 当下距离 shortest 到期的时间间隔
tvp->tv_sec = shortest->when_sec - now_sec; // 类比两位数的减法,先取 diff sec
if (shortest->when_ms < now_ms) {
tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000; // 再取 diff ns
tvp->tv_sec--; // sec 退位 1s 给 us
} else {
tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
}
if (tvp->tv_sec < 0) tvp->tv_sec = 0; // sec 或 us 为负即已过期
if (tvp->tv_usec < 0) tvp->tv_usec = 0;
} else {
// 没有 te
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0; // 不等待 fe
tvp = &tv;
} else {
tvp = NULL; // 阻塞等待 fe
}
}

// 2. epoll_wait 等待 fe 事件发生并处理
numevents = aeApiPoll(eventLoop, tvp); // 在 shortest 超时前返回
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; // fired->fd->fe
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask); // 处理读事件
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc) // 读写 handler 相同则避免重复执行
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
// 3. 处理 te
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed;
}

至此,aeEventLoop 的创建,两种事件的创建、注册、屏蔽与等待处理都已实现,最终陷入处理即可

1
2
3
4
5
6
7
8
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}

总结

Redis 的 ae 模块封装了 6 个操作系统透明的 IO 多路复用接口、为 epoll_event 搭配事件 handler 封装为 aeFileEvent、还将定时任务封装为 aeTimeEvent;在并发处理 IO 时,Redis 用最快超时 te 的剩余时间,作为 fe 等待事件发生的超时时间,同时保证了 te 和网络 IO 都能被及时处理,设计非常巧妙