前言

在分析 Redis 的五种数据类型及命令实现之前,需搞懂命令执行的三个阶段(请求,处理,响应)在 Redis 中是如何实现的,本文将从 Server 启动流程、命令的执行生命周期两个方面梳理清楚


结构简述

简要介绍与命令执行流程高度相关的结构及字段,详细信息可见源码注释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
typedef struct redisObject {
unsigned type:4; // 五种数据类型:STRING,LIST,SET,ZSET,HASH
unsigned encoding:4; // 八种内部编码:RAW,INT,EMBSTR,HT,LINKEDLIST,ZIPLIST,INTSET,SKIPLIST
unsigned lru:REDIS_LRU_BITS; // 秒级 lru 时间戳
int refcount; // 引用次数,用作垃圾回收
void *ptr; // 编码存储的数据
} robj;

typedef struct redisDb {
int id;
dict *dict; // keyspace // sds->robj
dict *expires; // expire keyspace // sds->int64
dict *blocking_keys; // BLPOP 阻塞客户端的 key // key->client list
dict *ready_keys; // PUSH 唤醒客户端的 key // key set
dict *watched_keys; // 记录事务 watch 关系 // key->client_list
// ...
} redisDb;

struct redisServer {
redisDb *db; // database 数组,默认长度 16
aeEventLoop *el; // ae 事件循环
dict *commands; // 命令表
char *bindaddr[REDIS_BINDADDR_MAX]; // 配置绑定的目标地址
int ipfd[REDIS_BINDADDR_MAX]; // server socket fd 数组
// aof, rdb, replication, ...
};

typedef struct redisClient {
int fd; // client socket fd
redisDb *db; // select db
sds querybuf; // 读缓冲区
int argc; // 缓冲区数据按 redis 协议解析后的参数
robj **argv; //
struct redisCommand *cmd; // 当前待执行的命令
list *reply; // 二级缓冲的数据链表
unsigned long reply_bytes; // 二级缓冲中带写出字节数
int sentlen; // 两个缓冲区临时共用的已发送字节数
int bufpos; // 缓冲区的有效边界索引
char buf[REDIS_REPLY_CHUNK_BYTES]; // 长度固定的一级缓冲区
// name, ...
} redisClient;

Server 启动流程

参考入口函数main,Server 启动大体分三步

1
2
3
4
5
6
int main(int argc, char **argv) {
initServerConfig(); // 1. 初始化默认配置
loadServerConfig(configfile,options); // 解析命令行参数及配置文件
initServer(); // 2. 初始化数据结构
aeMain(server.el); // 3. 绑定端口并监听,陷入 eventloop
}

1. 解析配置

Redis 从配置文件、标准输入、命令行参数三个途径读到的配置项会合并为 k1 "v1"\nk2 "v2"...一个大字符串,最后统一校验:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void loadServerConfigFromString(char *config) {
lines = sdssplitlen(config,strlen(config),"\n",1,&totlines);
for (int i = 0; i < totlines; i++) { // 覆盖式逐行处理
int argc;
lines[i] = sdstrim(lines[i]," \t\r\n"); // 各种 sds 工具函数配上用场
if (lines[i][0] == '#' || lines[i][0] == '\0') continue; // 跳过注释行或空行
sds *argv = sdssplitargs(lines[i],&argc); // 分割配置行
if (!strcasecmp(argv[0],"port") && argc == 2) { // 多路判断,校验配置参数个数
server.port = atoi(argv[1]); // 配置存入 server
if (server.port < 0 || server.port > 65535) // 校验各配置值是否合法
err = "Invalid port"; goto loaderr; // 非法则结束进程
} else if (!strcasecmp(argv[0],"include") && argc == 2) {
loadServerConfig(argv[1],NULL); // 递归加载公共配置文件
} else if (...) // ...
return;
loaderr: exit(1);
}

特别地,解析途中会从全局命令表把命令转换并填充到 server 的命令字典,命令结构:

1
2
3
4
5
6
7
8
9
struct redisCommand {
char *name;
redisCommandProc *proc; // cmd handler
int arity; // 命令参数个数(包括命令本身),负数表示可变参数的最少个数
int flags; // binary flag,标记命令属性,便于做位运算
char *sflags; // string flag,易读
long long microseconds, calls; // 累计命令总执行次数、总耗时
// ...
};

命令表是全局数组:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
struct redisCommand redisCommandTable[] = {
{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0}, // r:read-only; F:fast cmd, O(1),O(logN)
{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0}, // w:write; m: memory increase
// ...
}

#define REDIS_CMD_WRITE 1 // "w" flag // string flag 背后是独占 1 个二进制位 flag
#define REDIS_CMD_READONLY 2 // "r" flag
#define REDIS_CMD_DENYOOM 4 // "m" flag
// 8, 16...

// flag 用作快速判断命令行为,比如 'm' 标记的命令可能导致内存上涨,当 server 内存耗尽后将被拒绝执行
int retval = freeMemoryIfNeeded();
if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {
addReply(c, shared.oomerr);
return REDIS_OK;
}

void populateCommandTable(); // 将 string flag 转为 binary flag,并填充 server.commands

2. 字段初始化

  • 日志处理

    默认写 stdout,可配置写 logfile 和 syslog,支持与 syslog 对应的 4 种级别:DEBUG,VERBOSE,NOTICE(默认),WARNING,日志格式为 <PID>:<ROLE><TIME><LEVEL><FMT_MSG>,参考 redisLogRaw

  • 信号捕捉

    启动时注册处理SIGSEGV,SIGBUS等致命信号的 sigsegvHandler,会在退出前将各种指标、失败断言、stack backtrace 都写入日志,用作 bug report

    1
    2
    3
    4
    38486:M 13 Dec 19:22:17.941 # Failed assertion: <no assertion failed> (<no file>:0)
    38486:M 13 Dec 19:22:17.941 # --- STACK TRACE
    ./redis-server(logStackTrace+0xb0)[0x557d192c1900]
    ./redis-server(main+0x1b9)[0x557d19285c89]... # addr2line
  • 资源限制检查

    检查maxclients是否超过RLIMIT_NOFILE配额,超过则 backoff 尽可能多申请 fd 资源

  • 共享对象:避免高频对象被频繁创建和销毁

    1
    2
    3
    4
    5
    6
    struct sharedObjectsStruct { // 使用前先 incrRef 避免被 decrRef 误释放
    robj *ok, // 高频 reply sds
    *integers[REDIS_SHARED_INTEGERS], // [0,100000] 的小整数 robj
    // err, bulkhdr, ...
    };
    shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
  • 初始化 DB:各字段创建对应类型的 dict

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    for (j = 0; j < server.dbnum; j++) {
    server.db[j].dict = dictCreate(&dbDictType,NULL);
    server.db[j].expires = dictCreate(&keyptrDictType,NULL);
    // blocking_keys,...
    }
    dictType dbDictType = { // keyspace 键类型
    dictSdsHash, // hash function
    NULL, NULL, // key, val dup
    dictSdsKeyCompare, // key comperator
    dictSdsDestructor, // key destructor
    dictRedisObjectDestructor // val destructor
    };

3. 监听端口并陷入 eventloop

listenToPort 默认监听本机所有网卡0.0.0.0:6379,可配置 bind 指定要绑定的网卡 ip 列表,完成绑定后为 server socket fd 设置 O_NONBLOCK;如上常见 TCP 操作被封装到了 anet 模块,不再赘述

创建 eventloop,并为各 server socket 注册可读事件,准备 accept 连接

1
2
3
server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR); // setsize 源于 maxclients
for (j = 0; j < server.ipfd_count; j++)
aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL);

也注册 Redis 的唯一 timeEvent:serverCron 定时任务

1
aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL)

最后陷入 eventloop(简称 el)循环处理各种事件

1
2
aeSetBeforeSleepProc(server.el,beforeSleep); // beforeSleep 主动删除过期 key
aeMain(server.el);

命令处理流程

1. 建立连接

如上给 server socket 注册读事件的处理逻辑acceptTcpHandler,会在 ae 检测到 server socket 可读后 accept 连接,并将 client socket fd 封装创建redisClient对象,同时给 client socket 注册读事件的处理逻辑readQueryFromClient,当有数据可读时按 Redis 协议进行解析

2. 数据读取

如上,当 client socket 可读时会调用readQueryFromClient执行网络 IO,读取数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *c = (redisClient*) privdata;
int nread, readlen;
size_t qblen;
readlen = REDIS_IOBUF_LEN; // 预设 16KB
// 大 bulk 上次只读了一部分,则争取本次把 bulk 剩下的数据读完,先扩容
if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= REDIS_MBULK_BIG_ARG) {
int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);
if (remaining < readlen) readlen = remaining;
}
qblen = sdslen(c->querybuf);
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
nread = read(fd, c->querybuf+qblen, readlen);
if (nread == -1)
if (errno == EAGAIN) nread = 0; // 可重试错误
else return; // 致命错误
else if (nread == 0)
return; // EOF
sdsIncrLen(c->querybuf,nread); // 校正 querybuf sds
processInputBuffer(c); // 按 Redis 协议解析缓冲区数据,并执行命令
// ...
}

void processInputBuffer(redisClient *c) {
while(sdslen(c->querybuf)) {
// ...
if (processMultibulkBuffer(c) != REDIS_OK) break; // 缓冲区数据不是完整的命令,返回继续读
if (c->argc == 0)
resetClient(c);
else if (processCommand(c) == REDIS_OK) // 命令和参数都完整则执行
resetClient(c);
}
}

3. 数据解析

Redis 协议的六个约定

  • bulk:普通数据单元称为bulk块,约定$首字节指示块长度,以实现 binary safe
  • multibluk:若单次传输多个 bulk,约定头部*首字节指示块个数

为提高可读性、实现安全边界,约定各数据单元用 CRLF \r\n 分割;如SET keyX valueX对应的协议表示:

1
*3\r\n$3\r\nSET$4\r\nkeyX\r\n$6\r\nvalueX\r\n

协议还约定 3 种常见短数据回复的首字节标记

首字节 数据类型 示例
+ Simple Strings,标识状态回复 +OK\r\n
- Errors,标识错误信息 -ERR unknown command 'x'\r\n
: Integers,标识整型 :2\r\n

执行解析

注:Redis 也支持 telnet 等工具直接进行命令交互,参数约定空格分割,需自行处理转义等问题,多用于调试;本小节仅分析标准协议的数据解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
int processMultibulkBuffer(redisClient *c) {
char *newline = NULL;
int pos = 0, ok;
long long ll;

// 1. 读 bulk 个数,单个请求只需读一次
if (c->multibulklen == 0) {
redisAssertWithInfo(c,NULL,c->argc == 0); // 上个请求必须读取完毕

// 1.1 CRLF 检查缓冲区的合法性
newline = strchr(c->querybuf,'\r');
if (newline == NULL) return REDIS_ERR;
if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2)) return REDIS_ERR;
redisAssertWithInfo(c,NULL,c->querybuf[0] == '*'); // multibulk 首字节必须是 *

// 1.2 解析 bulk 个数
ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll); // +1 跳过 *,直到 \r 前一个字符,视为整数
if (!ok || ll > 1024*1024) return REDIS_ERR; // 非整数则非法,bulk 个数不能超 2^20 个

pos = (newline-c->querybuf)+2; // 偏移量多加了 1 跳过 \n
if (ll <= 0) return REDIS_OK;
c->multibulklen = ll; // 标记 multibulk 读取开始
if (c->argv) zfree(c->argv);
c->argv = zmalloc(sizeof(robj*)*c->multibulklen); // 预分配参数的内存
}

// 2. 逐个读 bulk
while(c->multibulklen) {
// 2.1 读 bulk 大小
if (c->bulklen == -1) {
// 2.1.1 CRLF 检查缓冲区的合法性和完整性
newline = strchr(c->querybuf+pos,'\r');
if (newline == NULL) {
if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) return REDIS_ERR; // bulk 长度超 64KB,非法
break;
}
if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2)) break;

// 2.1.2 解析 bulk 大小
if (c->querybuf[pos] != '$') return REDIS_ERR; // bulk 首字节必须是 $
ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);
if (!ok || ll < 0 || ll > 512*1024*1024) return REDIS_ERR; // 参数大小不能超 512MB

pos += newline-(c->querybuf+pos)+2; // ...
c->bulklen = ll;
}

// 2.2 读取 bulk 值
if (sdslen(c->querybuf)-pos < (unsigned)(c->bulklen+2)) {
break; // 缓冲区剩余可读数据少于 c->bulklen,继续读更多数据
} else {
// 读到完整 bulk,拷贝参数值
c->argv[c->argc++] = createStringObject(c->querybuf+pos,c->bulklen);
pos += c->bulklen+2;
c->bulklen = -1; // 标记当前 bulk 读取完毕
c->multibulklen--;
}
}
if (pos) sdsrange(c->querybuf,pos,-1); // 丢弃已读部分
if (c->multibulklen == 0) return REDIS_OK; // multibulk 内的所有 bulk 都读完
return REDIS_ERR; // 数据不完整,继续读
}

至此,客户端当前待处理的这条请求,被读取、解析到了robj **client->argv数组中,长度由client->argc 指示,接下来执行这条完整的命令

4. 执行命令

大体分三步:命令校验,server 校验,执行命令等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
int processCommand(redisClient *c) {
// 1. 命令校验
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); // 命令查找
if (!c->cmd) {
flagTransaction(c); // 执行失败,标记 client dirty 标识事务失败
addReplyErrorFormat(c,"unknown command '%s'", (char*)c->argv[0]->ptr);
return REDIS_OK;
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) { // 参数个数校验:为正则需 ==,为负则需 >=
flagTransaction(c);
addReplyErrorFormat(c,"wrong number of arguments for '%s' command", c->cmd->name);
return REDIS_OK;
}
// auth 校验等...

// 2. server 状态校验
// LRU 清理 key 失败内存超限则拒绝执行写命令
// slave 无法执行写命令,slave 个数不足时不执行写命令,master 持久化失败时不执行写命令等...

// 3. 执行命令
if (c->flags & REDIS_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) {
queueMultiCommand(c); // 事务 context 则排队
addReply(c,shared.queued);
} else call(c,REDIS_CALL_FULL); // 最终执行 cmd
return REDIS_OK;
}

void call(redisClient *c, int flags) {
long long dirty, start, duration;
// ...
start = ustime();
c->cmd->proc(c); // 执行 cmd handler
duration = ustime()-start;
if (flags & REDIS_CALL_SLOWLOG && c->cmd->proc != execCommand) { // 记录 latency 和 slowlog
char *latency_event = (c->cmd->flags & REDIS_CMD_FAST) ? "fast-command" : "command";
latencyAddSampleIfNeeded(latency_event,duration/1000);
slowlogPushEntryIfNeeded(c->argv,c->argc,duration);
}
if (flags & REDIS_CALL_STATS) { // 记录命令指标
c->cmd->microseconds += duration;
c->cmd->calls++;
}
// aof, replication...
}

set k 1 为例,继续分析 set handler 的执行流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/* Set key value [NX] [XX] [EX <seconds>] [PX <milliseconds>] */
void setCommand(redisClient *c) {
robj *expire = NULL;
int j, unit = UNIT_SECONDS, flags = REDIS_SET_NO_FLAGS;
for (j = 3; j < c->argc; j++) { // 检测 NX,XX 等 flag,标记过期时间
char *a = c->argv[j]->ptr;
robj *next = (j == c->argc-1) ? NULL : c->argv[j+1];
if ((a[0] == 'n' || a[0] == 'N') && (a[1] == 'x' || a[1] == 'X') && a[2] == '\0') {
flags |= REDIS_SET_NX; // XX 同理
} else if ((a[0] == 'e' || a[0] == 'E') && (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && next) {
unit = UNIT_SECONDS; expire = next; j++; // PX 同理
} else {
addReply(c,shared.syntaxerr);
return;
}
}
c->argv[2] = tryObjectEncoding(c->argv[2]); // 值 "1" 会被转为整数返回 shared.integers[1]
setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}

// *GenericCommand 抽离公共逻辑,参数控制各种条件检查、参数转换等;SET, SETNX, SETEX, PSETEX 都会调用
void setGenericCommand(redisClient *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
long long milliseconds = 0;
if (expire) { // 1. 过期时间单位统一为 ms
if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != REDIS_OK)
return;
if (milliseconds <= 0) {
addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name);
return;
}
if (unit == UNIT_SECONDS) milliseconds *= 1000;
}
// 2. flag 控制判断:NX 则 key 不能存在,EX 则 key 须存在
if ((flags & REDIS_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
(flags & REDIS_SET_XX && lookupKeyWrite(c->db,key) == NULL)) {
addReply(c, abort_reply ? abort_reply : shared.nullbulk);
return;
}
setKey(c->db,key,val); // 3. dbAdd(c->db->dict,key,val); // 将 kv 添加到 db keyspace
server.dirty++;
if (expire) setExpire(c->db,key,mstime()+milliseconds); // dictFind(db->expires,key)->v.s64 = ms // 将 key 的新过期时间更新到 expire keyspace
addReply(c, ok_reply ? ok_reply : shared.ok);
}

至此,新 kv 被成功写入 db 的 keyspace, expire keyspace,接下来向客户端回复执行成功

5. 数据写出

写入数据分为 2 步:将数据写入一级缓存或二级缓存,注册可写事件等待可写时写出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
void addReply(redisClient *c, robj *obj) {
if (prepareClientToWrite(c) != REDIS_OK) return;
if (sdsEncodedObject(obj)) {
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
_addReplyObjectToList(c,obj);
} // ...
}

int _addReplyToBuffer(redisClient *c, char *s, size_t len) {
size_t available = sizeof(c->buf)-c->bufpos;
if (listLength(c->reply) > 0) return REDIS_ERR; // reply 链表有值即 c->buf 已满,不再写入
if (len > available) return REDIS_ERR;
memcpy(c->buf+c->bufpos,s,len); // 拷贝到 c->buf
c->bufpos+=len;
return REDIS_OK;
}

void _addReplyObjectToList(redisClient *c, robj *o) {
robj *tail;
if (listLength(c->reply) == 0) { // ...
} else {
tail = listNodeValue(listLast(c->reply)); // 复用 tail node
if (tail->ptr != NULL && tail->encoding == REDIS_ENCODING_RAW &&
sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES) {
c->reply_bytes -= zmalloc_size_sds(tail->ptr);
tail = dupLastObjectIfNeeded(c->reply);
tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr)); // 追加合并到 tail 的数据部分
c->reply_bytes += zmalloc_size_sds(tail->ptr); // 同步更新待写数据大小
} else {
incrRefCount(o);
listAddNodeTail(c->reply,o); // 只能新增一个 node
c->reply_bytes += getStringObjectSdsUsedMemory(o);
}
}
}

至此,要写出的数据已被写入到 2 个缓冲区,当 socket 可写时,将执行网络 IO 写出数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
int prepareClientToWrite(redisClient *c) { // ...
if (c->bufpos == 0 && listLength(c->reply) == 0 /* &&...*/) // 注册可写事件
aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,sendReplyToClient, c);
return REDIS_OK; // 调用方可向 c->buf 或 c->reply 中写入数据
}

void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *c = privdata;
int nwritten = 0, objlen;
size_t objmem;
robj *o;

while(c->bufpos > 0 || listLength(c->reply)) {
if (c->bufpos > 0) {
// 写一级缓冲区 [buf+sentlen, buf+bufpos] 区间的字节
nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
if (c->sentlen == c->bufpos) { // 写完
c->bufpos = 0; // 重置缓冲区有效边界
c->sentlen = 0; // 重置 setlen,将被 c->reply 复用
}
} else {
o = listNodeValue(listFirst(c->reply));
objlen = sdslen(o->ptr);
objmem = getStringObjectSdsUsedMemory(o);
// 跳过空节点...

// 写二级缓冲区 head node 的缓冲数据
nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;

if (c->sentlen == objlen) {
listDelNode(c->reply,listFirst(c->reply)); // 删除此 node
c->sentlen = 0;
c->reply_bytes -= objmem;
}
}
// 限制单次写数据上限,避免造成 client “饥饿”...
}
if (nwritten == -1) {
if (errno == EAGAIN) { // 可重试错误则等待下次重试
nwritten = 0;
} else { // 致命错误
redisLog(REDIS_VERBOSE, "Error writing to client: %s", strerror(errno));
freeClient(c);
return;
}
}

if (c->bufpos == 0 && listLength(c->reply) == 0) {
c->sentlen = 0;
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); // 数据写完毕后不再关注可写事件,避免无用地检查缓冲区
}
}

总结

本节简述了 Server 的三个启动流程与相关结构,并用set举例简要说明命令执行的四个过程:数据读取到缓存、Multibulk 解析、命令表查找执行、从两个缓冲区将数据写出。限于篇幅仅配合部分源码做了阐述,省略了集群复制、AOF/RDB 相关逻辑,流程梳理清楚后,细节可以直接看源码