前言

简要分析 Redis Go 客户端 go-redis/redis 的设计与实现


demo client 及缺陷

实现了 Redis 协议的 demo client 执行命令 SET keyX valueX

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

var r *bufio.Reader
var w *bufio.Writer

func init() {
conn, err := net.Dial("tcp", "127.0.0.1:6379")
if err != nil { panic(err) }
r = bufio.NewReader(conn)
w = bufio.NewWriter(conn)
}

func write(format string, args ...interface{}) {
if _, err := w.WriteString(fmt.Sprintf(format, args...)); err != nil { panic(err) }
}

func read() string {
line, _, err := r.ReadLine()
if err != nil { panic(err) }
return string(line)
}

func set(k, v string) string {
args := []string{"SET", k, v}
write("*%d\r\n", len(args))
for _, arg := range args {
write("$%d\r\n%s\r\n", len(arg), arg)
}
_ = w.Flush()
return read()
}
// func get(k string) ... // other cmds

func main() {
fmt.Println(set("keyX", "valueX")) // +OK
}

如上实现明显存在的缺陷:

  • 连接管理:可用性低(连接断开未重连)、吞吐低(全局仅一条 TCP 连接)等
  • 逻辑混乱:模块间紧耦合(命令逻辑与协议细节未解耦)、可观测性低(未记录命令执行时长)等

为解决以上问题,go-redis 设计了连接池、将命令分类、把命令逻辑、协议实现与 IO 操作进行了解耦


连接池

典型的连接池分为三个模块

  • 初始化:建立多个可用的空闲连接

  • 关闭:关闭池中所有连接,释放资源

  • 连接管理

go-redis 连接池模块相关配置:

1
2
3
4
5
6
7
8
type Options struct {
PoolSize int // 总连接数上限,默认值为 CPU 数量的 10 倍(非计算密集型应用)
MinIdleConns int // 最少空闲连接数
IdleTimeout time.Duration // 连接空闲最大时长,默认 5min
IdleCheckFrequency time.Duration // 连接空闲检测周期,默认 1min
PoolTimeout time.Duration // 无可用连接的等待超时时间,默认 4s
MaxConnAge time.Duration // 连接最大存活时间,默认 0
}

结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
type ConnPool struct {
queue chan struct{} // 信号量实现
connsMu sync.Mutex // 实现 Get,Put 操作线程安全
idleConns []*Conn // 闲置连接队列,面向 Get,Put,reaper 操作
conns []*Conn // 全局连接队列,面向新建和关闭连接

poolSize, idleConnsLen int // 动态维护的连接池元信息
dialErrorsNum uint32 // 熔断边界
lastDialError atomic.Value // 最后一次连接失败的错误信息

stats Stats // 连接池命中率、错误率等统计信息
closedCh chan struct{} // 连接池关闭标记,避免独立 goroutine 泄漏
}

连接有三种状态:using 正在使用中、idle 闲置待命中、stale 已失效待回收(闲置时间过长、存活时间过长);conns队列中的连接按创建时间排序,而idleConns队列则面向 Get, Put, reaper 操作,顺序不定

初始化

  • 创建MinIdleConns个 idle 连接
  • 启动 reaper goroutine,定时检测并回收连接池中的 stale 连接

连接管理

1
2
3
4
5
type Pooler interface {
Get(context.Context) (*Conn, error) // 取 // 操作前 connsMu 加锁保证线程安全
Put(context.Context, *Conn) // 写 // 约定 Context 为第一个参数做超时控制
Remove(context.Context, *Conn, error) // 删
}

信号量

连接池是典型的信号量资源,Get 操作先到先得否则等待。但由于 sync.semaphore 仅支持 contenxt 控制超时,Get 操作还需实现 PoolTimeout 等待超时,故用缓冲 channel 实现了信号量的 accquire, release 逻辑;示意图:

实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// accquire
func (p *ConnPool) waitTurn(ctx context.Context) error {
select {
case <-ctx.Done(): // 操作超时及时返回
return ctx.Err()
default:
}
select {
case p.queue <- struct{}{}: // 优化:尝试直接 accquire
return nil
default:
}

timer := timers.Get().(*time.Timer) // sync.Pool 优化过的高频临时对象
timer.Reset(p.opt.PoolTimeout)
select {
case <-ctx.Done(): // 1. 操作超时
if !timer.Stop() { <-timer.C }
timers.Put(timer)
return ctx.Err()
case p.queue <- struct{}{}: // 2. accquire 成功
if !timer.Stop() { <-timer.C }
timers.Put(timer)
return nil
case <-timer.C: // 3. 等待超时
timers.Put(timer)
return ErrPoolTimeout
}
}

// release
func (p *ConnPool) freeTurn() {
<-p.queue
}
Get

逻辑:先从idleConns队列取 idle 连接,若实为 stale 连接则回收,若无可用的 idle 连接则穿透新建连接

熔断机制:当所有连接都失败后,再穿透新建连接将直接返回错误;同时在单独的 goroutine 中轮询探测服务端可用性,若成功则及时终止熔断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {
/*...*/
if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) { // 熔断中
return nil, p.getLastDialError() // 所有连接都失败,直接返回最后一次连接错误的原因
}
netConn, err := p.opt.Dialer(ctx)
if err != nil {
p.setLastDialError(err)
if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) { // 开始熔断,触发可用性探测
go p.tryDial()
}
return nil, err
}
cn := NewConn(netConn)
cn.pooled = pooled
return cn, nil
}

func (p *ConnPool) tryDial() {
for {
if p.closed() { return } // 避免连接池中各种独立 goroutine 泄漏
conn, err := p.opt.Dialer(context.Background())
if err != nil {
p.setLastDialError(err)
time.Sleep(time.Second) // 定时探测
continue
}
atomic.StoreUint32(&p.dialErrorsNum, 0) // server 恢复,终止熔断
_ = conn.Close()
return
}
}
Put, Remove
  • Put:完成命令请求并读取响应后,将连接放回idleConns 队列
  • Remove:请求超时后将连接从conns队列中移出(此连接先前已从idleConns出队)

优秀设计:用缓冲 channel 模拟信号量实现精确的超时控制、用连接错误数实现了穿透熔断、用 reaper goroutine 定时回收 stale 连接


通信协议

go-redis 为 TCP 连接装饰了读写缓冲,并将协议操作解耦到了 proto 模块:

proto.Writer

基于bufio.Writer实现 Redis 协议的 encoder,用于将命令及其参数 encode 到单个 multibulk 请求

  • encode 逻辑:将各种参数值进行类型断言,统一转为byte[]写入 buffer

  • 值缓冲优化:bulk len 和 multibulk size 等整数值会高频地转为byte[]并写入 buffer,故预分配字节缓冲区,使用前丢弃旧数据即可

1
2
3
4
5
6
7
8
9
10
11
type Writer struct {
writer // bufio.Writer
lenBuf []byte // 高频使用的字节缓冲区
numBuf []byte // 同理,初始大小为 64,足够
}
func (w *Writer) writeLen(n int) error {
w.lenBuf = strconv.AppendUint(w.lenBuf[:0], uint64(n), 10) // [:0] 复用同一个 byte[] slice
w.lenBuf = append(w.lenBuf, '\r', '\n') // 避免大量分配临时 byte[]
_, err := w.Write(w.lenBuf)
return err
}

proto.Reader

基于bufio.Reader实现 5 种 Redis 协议响应数据的 decoder,读取回复并按指定类型解释字节数据

  • 数据读取:普通数据按行\n分割读,bulk 数据长度已知直接io.ReadFull阻塞 IO 读

  • 接口设计:由于响应数据的层次不定(简单的如GET只有一层 bulk,而复杂的如XREADGROUP则有五层),故 Writer 把 multi bulk 的读取逻辑交由调用方自行组织

    1
    2
    3
    4
    5
    6
    7
    func (r *Reader) ReadIntReply() (int64, error); // :N // 读取整型回复
    func (r *Reader) ReadString() (string, error); // -ERR, +OK, $N // 读取错误回复、状态回复、bulk 数据

    // 开放 MultiBulkParse 给不同命令实现,来自行组织数据的读取顺序
    type MultiBulkParse func(*Reader, int64) (interface{}, error)
    func (r *Reader) ReadArrayReply(m MultiBulkParse) (interface{}, error); // *N // 读取 multibulk 数据
    func (r *Reader) ReadArrayLen() (int, error);

proto.Conn

主要实现net.Conn的读写超时控制

  • 接口设计:由于数据读写并非 Conn 的职责,故以闭包的形式,将 Reader, Writer 开放给调用方自行使用,并不关心到底读写的数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type Conn struct {
netConn net.Conn // TCP Connection
rd *proto.Reader // read + decode
wr *proto.Writer // encode + write
bw *bufio.Writer // 暴露给 Conn 控制 write 完毕后触发 flush
Inited bool // 是否已初始化,管道执行 AUTH,SELECT 都成功后才是可使用的有效连接
usedAt int64 // stale 连接检测
/*...*/
}

func (cn *Conn) WithReader(ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error) error {
if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil {
return err
}
return fn(cn.rd) // reader 交由调用方自行按格式解释字节数据
}

func (cn *Conn) WithWriter(ctx context.Context, timeout time.Duration, fn func(wr *proto.Writer) error) error {
if err := cn.netConn.SetWriteDeadline(cn.deadline(ctx, timeout)); err != nil {
return err
}
if err := fn(cn.wr); err != nil { // writer 交由调用方自行写入一批格式化后的数据
return err
}
return cn.bw.Flush(); // 触发 flush
}

命令处理流程

三步流程:准备命令及参数表,执行网络 IO 读写数据,返回响应数据;go-redis 通过接口、组合、依赖等方式高度抽象了流程中的各模块,关系图:

Cmder

命令分类,分离回复数据的读取逻辑;Redis 有 200+ 个命令,为提高可维护性,go-redis 将命令按回复数据的格式分成 34 类,抽象成 Cmder 接口

1
2
3
4
type Cmder interface {
Args() []interface{} // 命令及其参数列表
readReply(rd *proto.Reader) error // 从 proto.Reader 读取回复数据
}

以 StatusCmd 为例,描述回复数据为单个状态字符串的命令,如 AUTH,SET,...

1
2
3
4
5
6
7
8
9
type StatusCmd struct {
baseCmd // 组合参数表 args []interface{}
val string // 读取到的单行回复,如 "OK"
}

func (cmd *StatusCmd) readReply(rd *proto.Reader) (err error) {
cmd.val, err = rd.ReadString() // 仅需读取一个 string
return err
}

最终 cmdable 在实现面向用户的各种命令函数时,会引用对应的 Cmder 实现:

1
2
3
4
5
6
7
type cmdable func(ctx context.Context, cmd Cmder) error

func (c cmdable) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *StatusCmd {
cmd := NewStatusCmd(ctx, "set", key, value) /* ex, px... */
_ = c(ctx, cmd)
return cmd
}

至此,流程第一步中命令及参数表已准备好,同时实现了回复数据的读取;注意 cmdable 是函数类型,签名为func(context.Context, Cmder) error,仅用于表示 cmder 的处理过程,也即第二步

baseClient

封装连接池执行网络 IO 读写数据;核心逻辑:

  • process:负责从连接池中获取连接,写出Cmder.Args命令参数表,并委托Cmder.readReply从连接中读取回复,最后归还连接
  • 管道操作pipelineProcessCmds:将[]Cmder的所有命令参数依次写出,再依次读取回复,做好错误处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool, error) {
// 1. 从连接池获取连接,将读写操作以闭包形式传入执行
err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
// 2. 配置写超时后,写入命令及参数表
if err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmd(wr, cmd)
}) ; err != nil {
return err
}
// 3. 配置读超时后,执行 cmd.readReply 从连接中读取并解析数据到 cmd.val
return cn.WithReader(ctx, c.cmdTimeout(cmd), cmd.readReply)
});
/*...*/
}

注:process 重试逻辑实现为带随机抖动的二次规避算法,比线性二次更佳,参考 exponential-backoff-and-jitter;至此,流程第二步执行网络 IO 读写已完成。

Client

Client 组合了三个模块:cmdable 的参数表和回复读取、baseClient 的连接读写、hooks 执行命令回调

1
2
3
4
type Hook interface {
BeforeProcess(ctx context.Context, cmd Cmder) (context.Context, error)
AfterProcess(ctx context.Context, cmd Cmder) error // 常用于实现 tracing 记录命令参数和执行时长
}

组合的目的在于,将 cmdable 的职责委托给 baseClient.process,完成流程第三步

1
2
3
4
5
6
func NewClient(opt *Options) *Client {
c.cmdable = c.Process /*...*/
}
func (c *Client) Process(ctx context.Context, cmd Cmder) error {
return c.hooks.process(ctx, cmd, c.baseClient.process)
}

批处理

Pipeline

  • 场景:参考 redis.io/topics/pipelining,用于解决高吞吐场景 RTT(Round Trip Time)导致的高延迟问题

  • 原理:client 一次性将多个命令写出,再依次读取 server 回复的数据。因为 client 在写完最后一条命令前都不读连接,故 server 视角 client socket 一直未发生EPOLLOUT事件,所有命令的处理结果被依次缓冲在client.reply链表

接口及用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type Pipeliner interface {
StatefulCmdable // 保证 pipeline 能直接执行面向用户的命令
Do(ctx context.Context, args ...interface{}) *Cmd // 命令排队
Exec(ctx context.Context) ([]Cmder, error) // 处理队列中的命令
Discard() error // 清理命令队列,复用 pipeline
}

func main(){
client := redis.NewClient(&redis.Options{})
p := client.Pipeline()
p.Set(ctx, "k", "v", 0)
p.Do(ctx, "get", "k")
cmds, _ := p.Exec(ctx)
for _, cmd := range cmds {
fmt.Println(cmd.Args(), cmd.Err())
}
}

管道逻辑实现

Pipeline 本质是命令队列,Do时 append,Exec时 pop,最后按入队顺序逐个读取回复数据返回给调用方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type pipelineExecer func(context.Context, []Cmder) error // 描述如何处理命令队列
type Pipeline struct {
cmdable, statefulCmdable // 都指向 p.Process
exec pipelineExecer // 命令队列的实际执行并不关心,Pipeline 只负责管道逻辑
cmds []Cmder /*...*/
}
func (p *Pipeline) Process(ctx context.Context, cmd Cmder) error { // 直接执行命令也是入队等着
p.cmds = append(p.cmds, cmd)
return nil
}
func (p *Pipeline) Do(ctx context.Context, args ...interface{}) *Cmd {
cmd := NewCmd(ctx, args...)
_ = p.Process(ctx, cmd)
return cmd
}
func (p *Pipeline) Exec(ctx context.Context) ([]Cmder, error) {
if len(p.cmds) == 0 { return nil, nil }
cmds, p.cmds := p.cmds, nil
return cmds, p.exec(ctx, cmds) // 命令执行委托给 pipelineExecer
}

pipelineExecer

管道中的命令队列最终会被 baseClient 读取执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (c *baseClient) pipelineProcessCmds(ctx context.Context, cn *pool.Conn, cmds []Cmder) (bool, error) {
cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmds(wr, cmds) // 将管道中所有命令都写出
})
cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
return pipelineReadCmds(rd, cmds)
}) /*...*/
}
// 依次读取各命令的回复数据,设置对应的错误信息
func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error {
for _, cmd := range cmds {
err := cmd.readReply(rd)
cmd.SetErr(err)
if err != nil && !isRedisError(err) { // 非逻辑错误(-Err, $-1, *-1)则提前返回
return err
}
} /*...*/
}

优秀设计:优雅的闭包;管道默认是一次性的,Pipeline 提供了闭包让调用方自行入队命令,自动执行后关闭队列

1
2
3
4
5
6
7
8
func (p *Pipeline) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
if err := fn(p); err != nil {
return nil, err
}
cmds, err := p.Exec(ctx)
_ = p.Close()
return cmds, err
}

Transaction

  • 场景:参考 redis.io/topics/transactions,用于在 server 端原子地执行一组命令
  • 命令:WATCH,UNWATCH监控 key,MULTI开启事务,DISCARD放弃事务,EXEC执行事务
  • 特性:事务不支持回滚,有 2 类错误用法会导致事务失败
    • 语法错误:会导致整个事务被放弃,Exec 时没有命令会被执行
    • 键类型错误:比如INCR string 值类型的 key,Exec 中途遇到此错误不会中断事务,也不回滚之前执行成功的命令;anteriz 认为这是用法问题,测试期间就应发现解决,不要等到生产环境事务执行失败才发现,故 Redis 并未实现事务内各 key 类型动态跟踪功能(增加了实现复杂度、增大了事务执行延迟)
  • ACID:Redis 事务只实现了 ACID 中的 Isolation(事务由单线程原子执行) 和 Durability(事务记录会被持久化),不支持回滚,因而不满足 Atomicity 和 Consistency

结构及用法

transaction 可认为是在 server 端执行的 pipeline,只多了WATCH来指定事务执行的前置条件,若不满足则不执行事务,类似乐观锁的 CAS 操作,不满足再重试;go-redis 将事务操作与WATCH通过闭包形式强绑定:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type Tx struct {
baseClient, hooks
cmdable, statefulCmdable // 和 Client 一样组合 hooks + baseClient.process 操作
ctx context.Context
}

func main() {
client := redis.NewClient(&redis.Options{})
ctx := context.Background()
_ = client.Watch(ctx, func(tx *redis.Tx) error {
// tx.Set(ctx, "k", "v", 0) // 修改 watch 的 key,将导致事务失败
_, err := tx.TxPipelined(ctx, func(p redis.Pipeliner) error {
p.Set(ctx, "k", "v", 0)
p.Do(ctx, "set", "k", "vv")
return nil
})
return err
}, "k")
}

事务逻辑实现

函数 Client.Watch 作为事务入口,先执行WATCH keys命令,再把Tx.TxPipeline以闭包形式传给用户,自行往 TxPipeline 中加入事务命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (c *Client) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) error {
tx := c.newTx(ctx)
tx.Watch(ctx, keys...) /* handle errors...*/
return fn(tx)
}
func (c *Tx) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
return c.TxPipeline().Pipelined(ctx, fn)
}
func (c *Tx) TxPipeline() Pipeliner {
return Pipeline{
exec: func(ctx context.Context, cmds []Cmder) error {
return c.hooks.processTxPipeline(ctx, cmds, c.baseClient.processTxPipeline)
},
} /*...*/
}

注意 TxPipeline 和 Pipeline 共用一套 Pipeliner 接口,区别是pipelineExecer执行逻辑有 3 处不同:

  • TxPipeline 比 Pipeline 多了预先 WATCH
  • TxPipeline 比 Pipeline 在执行 hook 前向命令队列 prepend MULTI 命令,append EXEC 命令
  • TxPipeline 比 Pipeline 多了读取 QUEUED 回复,处理 *-1 事务执行失败的错误

最终事务执行会也是批量写命令,批量读:

1
2
3
4
5
6
7
8
9
10
11
12
13
func (c *baseClient) txPipelineProcessCmds(ctx context.Context, cn *pool.Conn, cmds []Cmder) (bool, error) {
cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmds(wr, cmds)
})
cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
multiCmd := cmds[0].(*StatusCmd)
cmds = cmds[1 : len(cmds)-1] //
if err := txPipelineReadQueued(rd, multiCmd, cmds); err != nil { // 提前处理语法错误导致的事务取消
return err
}
return pipelineReadCmds(rd, cmds) // 分解 EXEC 的数组回复依次作为各命令回复
}) /*...*/
}

StickyConnPool

为确保事务相关命令都走同一条连接读写,go-redis 设计了只含 1 个 “有状态” 连接的连接池StickyConnPool;相比之下,SingleConnPool则包含 1 条 “无状态” 连接,用于配合 Pipeline 初始化物理连接

至此,SinglePool 接口有三种实现,在三种不同场景下为baseClient提供了统一的接口,十分优雅


发布订阅

概念

参考 redis.io/topics/pubsub,其实现了内存态消息队列,接口非常简洁:

  • 发布消息:publish
  • 消费消息:(p)subscribe, (p)unsubcribe

缺点:消息不会被持久化,消息直接以单个连接为单位进行下发;常用于高吞吐、消息需被异步处理的场景

回复格式

  • (p)subscribe:”subscribe” + 本次订阅 channel + 已订阅 channel 数

    1
    *3\r\n$9\r\nsubscribe\r\n$3\r\nCHA\r\n:3\r\n
  • (p)unsubscribe:标记变为 “unsubcribe”,其他同上

  • 消息格式:”message” + 消息源 channel + 消息体 bulk

    1
    *3\r\n$7\r\nmessage\r\n$3\r\nCHX\r\n$3\r\nMSG\r\n

订阅限制(p)subscribe的下文,只允许执行(p)subscribe/(p)unsubcribe/ping 等命令,避免回复干扰

示例:

1
2
3
4
5
6
7
8
9
10
func main() {
cli := redis.NewClient(&redis.Options{})
ctx := context.Background()
ps := cli.Subscribe(ctx, "chx")
replay, _ := ps.Receive(ctx)
fmt.Println(replay) // subscribe: chx // subscribe 命令的回复
for msg := range ps.Channel() { /* redis-cli> PUBLUSH chx MSGX */
fmt.Println(msg.Payload) // MSGX
} /* blocked... */
}

面向 IO 实现

逻辑:发送完(p)subscribe命令后,阻塞读 reply,同时支持发送 (p)unsubscribe命令;额外功能:

  • 连接闪断重连并重新 subscribe
  • 可选带超时限制地消费:ReceiveTimeout
  • 将回复分为Subscription,Message,Pong三类,ReceiveMessage向调用方只返回真正的消息
1
2
3
4
5
type PubSub struct {
newConn func(ctx context.Context, channels []string) (*pool.Conn, error)
cn *pool.Conn // conn provider 与 conn
channels, patterns map[string]struct{} // 名字去重,避免无意义的重复订阅 /*...*/
}

Go Style 实现

PubSub 的 Receive 直接操作 IO,可导致调用方阻塞,消息处理能力受限于此,故用 go 的 channel 在此之上进一步解耦,实现消息的高速读取

实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
type channel struct {
pubSub *PubSub // 负责 IO 操作
msgCh chan *Message // 缓冲 Message
allCh chan interface{} // 缓冲 Message,Subscription,Pong 所有上下文回复

checkInterval time.Duration // 定时 Ping check health,若失败则及时重连
chanSendTimeout time.Duration //
}

type ChannelOption func(c *channel) // channel 各字段的 setter
func (p *PubSub) Channel(opts ...ChannelOption) <-chan *Message {
p.chOnce.Do(func() {
p.msgCh = newChannel(p, opts...)
p.msgCh.initMsgChan() // 后台 Receive 消息到 msgCh
})
return p.msgCh.msgCh
}

func (c *channel) initMsgChan() {
c.msgCh = make(chan *Message, c.chanSize) /*...*/
go func() {
timer := time.NewTimer(time.Minute)
for {
msg, err := c.pubSub.Receive(ctx)
if err != nil { /* error handle, backoff retry...*/ }
switch msg := msg.(type) {
case *Subscription, *Pong: // 直接忽略
case *Message:
timer.Reset(c.chanSendTimeout) // c.msgCh 满后要等多久才丢弃消息
select {
case c.msgCh <- msg:
if !timer.Stop() {
<-timer.C
}
case <-timer.C:
// c.msgCh 已满,且 1min 都未读取,则本次读到的消息不再入队,直接丢弃 /* logging */
}
}
}
}()
}

Stream, Cluster

Stream 由 5.0 引入的高可用消息队列,其操作命令XADD,XREADGROUP等和SET,GET一样,都是单次执行、单次回复,所以并未和 Pipeline 一样抽象成独立的模块,仅仅增加了XMessageSliceCmd,XStreamSliceCmd等一系列Cmder来读取 Stream 特有格式的回复

Cluster #TODO


总结

从 high-level 看,go-redis 可分为六个模块:

  • 连接池:Pooler 接口规范了连接管理;ConnPool 实现了真的连接池,并封装出无状态的 SingleConnPool、有状态的 StickyConnPool
  • 协议 IO:负责执行带缓冲的 TCP 连接读写 IO;Writer 将命令参数表转为 []byte 写出,Reader 则实现五种协议数据的单元读取,最终超时控制聚合到 proto.Conn
  • 命令抽象:将面向用户的命令,按回复数据的读取过程是否相同进行分类,抽象出 Cmder 接口便于 baseClient 统一处理
  • 批处理:批量执行一组命令;Pipeline 先在本地追加一批命令,提交时批量写出再依次读取回复;Transaction 只比 Pipeline 是多了三个命令,拆成了 tx 模块便于处理上下文
  • 发布订阅:负责收发消息;在 (un)subscribe 基础上,将订阅回复分为三类,并提供带超时的消费,最后结合 Go channel 机制进行了解耦优化,降低了读延迟
  • 集群操作:#TODO

从实现看,go-redis 主要有两大亮点:

  • 抽象:通过接口、聚合、组合等 OOP 方式,将六个模块解耦;比如用 cmdable 函数类型描述命令的处理流程,最终组合到 Client,Pipeline,Transaction,为上层提供了统一的命令函数表
  • 闭包:将函数视为值进行赋值和传参,分离了预处理和使用过程,最终串联配合,完成模块解耦;比如 ConnPool 不关心写入的数据,让 withWriter 通过闭包将 writer 暴露给调用方,再比如 hook 机制的实现等等

此外,很多机制的实现也值得学习,如 pubsub 中用 timer 做流控,连接池中用缓冲 channel 实现带超时的信号量等