近期有同事问了一个问题:
使用 go-redis 作为 client 访问 redis cluster。ReadTimeout 配置为 1 ms,但请求整体耗时 76 ms,并且成功返回(没有超时)。
为什么 ReadTimeout 没有生效?
超时控制
弄清楚这个问题,最简单的做法是查看源码。go-redis 命令处理的逻辑在 func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool, error)
方法。
func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool, error) {
// ...
if err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
// 写入请求
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmd(wr, cmd)
}); err != nil {
atomic.StoreUint32(&retryTimeout, 1)
return err
}
// 读取响应
if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), cmd.readReply); err != nil {
if cmd.readTimeout() == nil {
atomic.StoreUint32(&retryTimeout, 1)
} else {
atomic.StoreUint32(&retryTimeout, 0)
}
return err
}
return nil
}); err != nil {
retry := shouldRetry(err, atomic.LoadUint32(&retryTimeout) == 1)
return retry, err
}
return false, nil
}
该方法负责根据配置控制重试次数:
c.withConn
:从连接池获取链接。cn.WithWriter
:发送请求到连接。即,实际使用opt.WriteTimeout
的地方cn.WithReader
:接收连接上的响应。即,实际使用opt.ReadTimeout
的地方由于 Redis 存在阻塞式命令,因此首先调用
c.cmdTimeout
判断是否存在命令维度的读超时时间(优先级:命令维度 > Client 维度)。
打开 WithReader 可以看到 cn.deadline
计算读取的截止时间并设置给 conn
(注意:截止时间为绝对时间,因此连接复用时,需要在每次调用前更新截止时间) 。
func (cn *Conn) WithReader(
ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error,
) error {
if timeout >= 0 {
if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil {
return err
}
}
return fn(cn.rd)
}
func (cn *Conn) deadline(ctx context.Context, timeout time.Duration) time.Time {
tm := time.Now()
cn.SetUsedAt(tm)
if timeout > 0 {
tm = tm.Add(timeout)
}
if ctx != nil {
deadline, ok := ctx.Deadline()
if ok {
if timeout == 0 {
return deadline
}
if deadline.Before(tm) {
return deadline
}
return tm
}
}
if timeout > 0 {
return tm
}
return noDeadline
}
计算逻辑比较简单,取最小的截止时间 min(Context Deadline, Read Deadline)
- Context Deadline = context.Deadline()
- Read Deadline = time.Now().Add(opt.ReadTimeout)
协程调度
不妨再深入底层,net
包的调用直接向下传递给 netFD
type conn struct {
fd *netFD
}
// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
return c.fd.Read(b)
}
// SetReadDeadline implements the Conn SetReadDeadline method.
func (c *conn) SetReadDeadline(t time.Time) error {
if !c.ok() {
return syscall.EINVAL
}
return c.fd.setReadDeadline(t)
}
netFD
是最终调用 poll.FD
相关的函数。从 poll.FD
的名字可以看出,它是调度器的一部分,也是文件描述符(fd)的封装。
poll.FD
通过 syscall.Read
读取数据,该调用为非阻塞的。如果 I/O 就绪,则将数据从内核缓存区拷贝到用户缓冲区,并返回拷贝的字节数。如果发生错误,则判断错误类型:
- EAGAIN 类型错误,说明内核缓冲区为空,未读取到任何数据,则将
goroutine
自身挂起 - 其他错误,则返回给调用者
// Network file descriptor.
type netFD struct {
pfd poll.FD
// ...
}
func (fd *netFD) Read(p []byte) (n int, err error) {
n, err = fd.pfd.Read(p)
runtime.KeepAlive(fd)
return n, wrapSyscallError(readSyscallName, err)
}
type FD struct {
}
func (fd *FD) Read(p []byte) (int, error) {
// ...
if fd.IsStream && len(p) > maxRW {
p = p[:maxRW]
}
for {
// 通过 syscall.Read 读取数据
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
// 如果发生错误,则判断错误类型:
// - EAGAIN 类型错误,内核缓冲区为空,未读取到任何数据
// - 其他错误,则返回给调用者
if err != nil {
n = 0
// 挂起前检查
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}
func (pd *pollDesc) waitRead(isFile bool) error { return pd.wait('r', isFile) }
func (pd *pollDesc) wait(mode int, isFile bool) error {
if pd.runtimeCtx == 0 {
return errors.New("waiting for unsupported file type")
}
// 挂起协程
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}
在 I/O 就绪或超时,Golang 调度器将挂起的 goroutine
重新调入执行。
func convertErr(res int, isFile bool) error {
switch res {
case pollNoError: // I/O 就绪
return nil
case pollErrClosing: // 连接关闭
return errClosing(isFile)
case pollErrTimeout: // 读写超时
return ErrDeadlineExceeded
case pollErrNotPollable:
return ErrNotPollable
}
println("unreachable: ", res)
panic("unreachable")
}
调度器相关细节,后续再深入探讨。
Buffer Reader/Writer
conn
可读就会执行 go-redis 的 cmd.readReply
。连接创建时,conn
的读写操作被封装为 bufio.Reader
。
// ---------- internal/pool/conn.go--------
type Conn struct {
usedAt int64 // atomic
netConn net.Conn
rd *proto.Reader
bw *bufio.Writer
wr *proto.Writer
Inited bool
pooled bool
createdAt time.Time
}
func NewConn(netConn net.Conn) *Conn {
cn := &Conn{
netConn: netConn,
createdAt: time.Now(),
}
cn.rd = proto.NewReader(netConn)
cn.bw = bufio.NewWriter(netConn) // buffer writer
cn.wr = proto.NewWriter(cn.bw)
cn.SetUsedAt(time.Now())
return cn
}
// ---------- proto/reader.go--------
package proto
type Reader struct {
rd *bufio.Reader
}
func NewReader(rd io.Reader) *Reader {
return &Reader{
rd: bufio.NewReader(rd), // buffer reader
}
}
在超过截止时间之前,内核缓冲区内的 reply
数据已就绪,cmd.readReply
就可以借助 bufio.Reader
通过一次或多次 Read
调用,将已就绪的数据从内核换冲突拷贝到用户缓冲区。否则, Read
调用就会因为超过截止时间返回 ErrDeadlineExceeded
。
分析验证
最后,可以猜测为什么会出现本文开头的现象:
Read
所需的数据就绪并没有超过 1 ms- 76 ms 可能包含了其他耗时,包括但不限于:Goroutine 调度、排队等待、DNS 解析、TCP 握手…
后续同事配合一起调整 min idle conn 大小之后,相关延迟毛刺消失。
总结
go-redis 超时控制说复杂也复杂,说简单也简单。相关参数集中起来,可以汇总成以下这张图:
本文作者 : cyningsun
本文地址 : https://www.cyningsun.com/08-20-2023/go-redis-connection-timeout.html
版权声明 :本博客所有文章除特别声明外,均采用 CC BY-NC-ND 3.0 CN 许可协议。转载请注明出处!