go-redis 超时机制


近期有同事问了一个问题:

使用 go-redis 作为 client 访问 redis cluster。ReadTimeout 配置为 1 ms,但请求整体耗时 76 ms,并且成功返回(没有超时)。

为什么 ReadTimeout 没有生效?

超时控制

弄清楚这个问题,最简单的做法是查看源码。go-redis 命令处理的逻辑在 func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool, error) 方法。

func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool, error) {
	// ...
	if err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
        // 写入请求
		if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
			return writeCmd(wr, cmd)
		}); err != nil {
			atomic.StoreUint32(&retryTimeout, 1)
			return err
		}

        // 读取响应
		if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), cmd.readReply); err != nil {
			if cmd.readTimeout() == nil {
				atomic.StoreUint32(&retryTimeout, 1)
			} else {
				atomic.StoreUint32(&retryTimeout, 0)
			}
			return err
		}

		return nil
	}); err != nil {
		retry := shouldRetry(err, atomic.LoadUint32(&retryTimeout) == 1)
		return retry, err
	}

	return false, nil
}

该方法负责根据配置控制重试次数:

  • c.withConn:从连接池获取链接。
  • cn.WithWriter:发送请求到连接。即,实际使用 opt.WriteTimeout 的地方
  • cn.WithReader:接收连接上的响应。即,实际使用 opt.ReadTimeout 的地方

    由于 Redis 存在阻塞式命令,因此首先调用 c.cmdTimeout 判断是否存在命令维度的读超时时间(优先级:命令维度 > Client 维度)。

打开 WithReader 可以看到 cn.deadline 计算读取的截止时间并设置给 conn (注意:截止时间为绝对时间,因此连接复用时,需要在每次调用前更新截止时间) 。


func (cn *Conn) WithReader(
	ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error,
) error {
	if timeout >= 0 {
		if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil {
			return err
		}
	}
	return fn(cn.rd)
}

func (cn *Conn) deadline(ctx context.Context, timeout time.Duration) time.Time {
	tm := time.Now()
	cn.SetUsedAt(tm)

	if timeout > 0 {
		tm = tm.Add(timeout)
	}

	if ctx != nil {
		deadline, ok := ctx.Deadline()
		if ok {
			if timeout == 0 {
				return deadline
			}
			if deadline.Before(tm) {
				return deadline
			}
			return tm
		}
	}

	if timeout > 0 {
		return tm
	}

	return noDeadline
}

计算逻辑比较简单,取最小的截止时间 min(Context Deadline, Read Deadline)

  • Context Deadline = context.Deadline()
  • Read Deadline = time.Now().Add(opt.ReadTimeout)

协程调度

不妨再深入底层,net 包的调用直接向下传递给 netFD

type conn struct {
    fd *netFD
}

// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
    if !c.ok() {
        return 0, syscall.EINVAL
    }
    return c.fd.Read(b)
}

// SetReadDeadline implements the Conn SetReadDeadline method.
func (c *conn) SetReadDeadline(t time.Time) error {
    if !c.ok() {
        return syscall.EINVAL
    }
    return c.fd.setReadDeadline(t)
}

netFD 是最终调用 poll.FD 相关的函数。从 poll.FD 的名字可以看出,它是调度器的一部分,也是文件描述符(fd)的封装。

poll.FD 通过 syscall.Read 读取数据,该调用为非阻塞的。如果 I/O 就绪,则将数据从内核缓存区拷贝到用户缓冲区,并返回拷贝的字节数。如果发生错误,则判断错误类型:

  • EAGAIN 类型错误,说明内核缓冲区为空,未读取到任何数据,则将 goroutine 自身挂起
  • 其他错误,则返回给调用者
// Network file descriptor.
type netFD struct {
	pfd poll.FD
	// ...
}

func (fd *netFD) Read(p []byte) (n int, err error) {
	n, err = fd.pfd.Read(p)
	runtime.KeepAlive(fd)
	return n, wrapSyscallError(readSyscallName, err)
}

type FD struct {
}

func (fd *FD) Read(p []byte) (int, error) {
	// ...
	if fd.IsStream && len(p) > maxRW {
		p = p[:maxRW]
	}
	for {
        // 通过 syscall.Read 读取数据
		n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p) 
        // 如果发生错误,则判断错误类型:
        // - EAGAIN 类型错误,内核缓冲区为空,未读取到任何数据
        // - 其他错误,则返回给调用者
		if err != nil {	 
			n = 0
            // 挂起前检查
			if err == syscall.EAGAIN && fd.pd.pollable() {
				if err = fd.pd.waitRead(fd.isFile); err == nil {
					continue
				}
			}
		}
		err = fd.eofError(n, err)
		return n, err
	}
}

func (pd *pollDesc) waitRead(isFile bool) error { return pd.wait('r', isFile) }

func (pd *pollDesc) wait(mode int, isFile bool) error {
	if pd.runtimeCtx == 0 {
		return errors.New("waiting for unsupported file type")
	}
    // 挂起协程
	res := runtime_pollWait(pd.runtimeCtx, mode)
	return convertErr(res, isFile)
}

在 I/O 就绪或超时,Golang 调度器将挂起的 goroutine 重新调入执行。

 func convertErr(res int, isFile bool) error {
	switch res {
	case pollNoError: // I/O 就绪
		return nil
	case pollErrClosing: // 连接关闭
		return errClosing(isFile)
	case pollErrTimeout: // 读写超时
		return ErrDeadlineExceeded
	case pollErrNotPollable:
		return ErrNotPollable
	}
	println("unreachable: ", res)
	panic("unreachable")
}

调度器相关细节,后续再深入探讨。

Buffer Reader/Writer

conn 可读就会执行 go-redis 的 cmd.readReply。连接创建时,conn 的读写操作被封装为 bufio.Reader

// ---------- internal/pool/conn.go--------
type Conn struct {
	usedAt  int64 // atomic
	netConn net.Conn
	
	rd *proto.Reader
	bw *bufio.Writer
	wr *proto.Writer
	
	Inited    bool
	pooled    bool
	createdAt time.Time
}

func NewConn(netConn net.Conn) *Conn {
	cn := &Conn{
		netConn:   netConn,
		createdAt: time.Now(),
	}
	cn.rd = proto.NewReader(netConn)
	cn.bw = bufio.NewWriter(netConn) // buffer writer
	cn.wr = proto.NewWriter(cn.bw)
	cn.SetUsedAt(time.Now())
	return cn
}

// ---------- proto/reader.go--------
package proto

type Reader struct {
	rd *bufio.Reader
}

func NewReader(rd io.Reader) *Reader {
	return &Reader{
		rd: bufio.NewReader(rd), // buffer reader
	}
}

在超过截止时间之前,内核缓冲区内的 reply 数据已就绪,cmd.readReply 就可以借助 bufio.Reader 通过一次或多次 Read 调用,将已就绪的数据从内核换冲突拷贝到用户缓冲区。否则, Read 调用就会因为超过截止时间返回 ErrDeadlineExceeded

分析验证

最后,可以猜测为什么会出现本文开头的现象:

  1. Read 所需的数据就绪并没有超过 1 ms
  2. 76 ms 可能包含了其他耗时,包括但不限于:Goroutine 调度、排队等待、DNS 解析、TCP 握手…

后续同事配合一起调整 min idle conn 大小之后,相关延迟毛刺消失。

总结

go-redis 超时控制说复杂也复杂,说简单也简单。相关参数集中起来,可以汇总成以下这张图:

本文作者 : cyningsun
本文地址https://www.cyningsun.com/08-20-2023/go-redis-connection-timeout.html
版权声明 :本博客所有文章除特别声明外,均采用 CC BY-NC-ND 3.0 CN 许可协议。转载请注明出处!

# 数据库