cover_image

Go连接池复用踩坑

立源 三七互娱技术团队
2023年10月23日 10:00
01

背景

图片
图片

服务发生大量请求dns解析,高峰QPS  2.4k/s。高峰期导致服务偶尔出现dns解析超时问题(连接5秒超时),但是内存没有明显上涨。

02

排查历程

图片
图片

是否http.Client未设置单例导致连接未复用?

全局唯一变量,不会有这个问题:

var (   doOption = &httpreq.HttpDoOption{      DisableLog: true,   }    triggerHandlerClient = &http.Client{      Timeout: 30 * time.Minute, // 30分钟兜底 实际超时由handle时设置      Transport: &http.Transport{         Proxy:               http.ProxyFromEnvironment,         DialContext:         knet.WrapTcpDialer(time.Second*5, time.Minute, nil),         ForceAttemptHTTP2:   true,         MaxIdleConns:        256,         MaxIdleConnsPerHost: 256,         MaxConnsPerHost:     512,         IdleConnTimeout:     time.Minute,         TLSHandshakeTimeout: 10 * time.Second,      },   })


是否并发控制问题,导致连接池不够用?

确认并发代码使用channel进行控制,且下游没有明显并发请求上升迹象,排除这个原因。

是否为keepalive没有维护导致?

看到代码配置:

knet.WrapTcpDialer(time.Second*5, time.Minute, nil) ;

这里设置 keepAlive为 time.Minute,客户端是有维护的。

服务端只有可能是lb层没有维护导致,但是这个可能性太小了,如果lb没有维护keepAlive很多服务的连接复用都有问题才对。

是否回收代码逻辑错误?

最后在这里确认了,response.Body 没有读取完就进行close会导致连接不复用。
这里200就直接返回了,没有先读取就close,代码如下:(左边为有问题代码

图片
03

复现与分析

图片
图片

复现代码

package main import (   "fmt"   "io/ioutil"   "net/http"   "net/http/httptrace"   "sync"   "testing") func TestAAA(t *testing.T) {   c := make(chan struct{}, 5)   wg := sync.WaitGroup{}   for i := 0; i < 20; i++ {      c <- struct{}{}      go func() {         wg.Add(1)         defer wg.Done()         HttpGet()         <-c      }()   }   wg.Wait()   fmt.Println("getConnCount", getConnCount)   fmt.Println("getDNSCount", getDNSCount)} var getConnCount = 0var getConnCountLock = sync.Mutex{} var getDNSCount = 0var getDNSCountLock = sync.Mutex{} var httpTrace = &httptrace.ClientTrace{   ConnectDone: func(network, addr string, err error) {      getConnCountLock.Lock()      defer getConnCountLock.Unlock()      getConnCount++   },   DNSDone: func(info httptrace.DNSDoneInfo) {      getDNSCountLock.Lock()      defer getDNSCountLock.Unlock()      getDNSCount++   },} func HttpGet() {   req, _ := http.NewRequest("GET", "https://www.baidu.com", nil)   req = req.WithContext(httptrace.WithClientTrace(req.Context(), httpTrace))   resp, err := http.DefaultTransport.RoundTrip(req)   if err != nil {      fmt.Println(err)      return   }   defer resp.Body.Close()} func HttpGet2() {   req, _ := http.NewRequest("GET", "https://www.baidu.com", nil)   req = req.WithContext(httptrace.WithClientTrace(req.Context(), httpTrace))   resp, err := http.DefaultTransport.RoundTrip(req)   if err != nil {      fmt.Println(err)      return   }   defer resp.Body.Close()   _, _ = ioutil.ReadAll(resp.Body)}


执行结果

复现代码里  HttpGet 与 HttpGet2 只相差了 

_, _ = ioutil.ReadAll(resp.Body)

一句,但HttpGet没有进行连接复用。

两个执行结果截图如下:

图片
图片

原理分析

httpclient 每个连接会创建读写协程两个协程,分别使用 reqch 和 writech 来跟 roundTrip 通信。上层使用的response.Body 其实是经过多次封装的,一次封装的 body 是直接跟 net.conn 进行交互读取,二次封装的 body 则是加强了 close 和 eof 处理的 bodyEOFSignal。

当未读取 body 就进行 close 时,会触发 earlyCloseFn() 回调,看 earlyCloseFn 的函数定义,在 close 未见 io.EOF 时才调用。自定义的 earlyCloseFn 方法会给 readLoop 监听的 waitForBodyRead 传入 false, 这样引发 alive 为 false 不能继续循环的接收新请求,只能是退出调用注册过的 defer 方法,关闭连接和清理连接池。

func (pc *persistConn) readLoop() {   alive := true   for alive {           waitForBodyRead := make(chan bool, 2)      body := &bodyEOFSignal{         body: resp.Body,        // 非读取完关闭,输入false         earlyCloseFn: func() error {            waitForBodyRead <- false            <-eofc // will be closed by deferred call at the end of the function            return nil          },        // 正常读取完后关闭,输入 isEOF = true         fn: func(err error) error {            isEOF := err == io.EOF            waitForBodyRead <- isEOF            if isEOF {               <-eofc // see comment above eofc declaration            } else if err != nil {               if cerr := pc.canceled(); cerr != nil {                  return cerr               }            }            return err         },      }         // 赋值resp的body,加强了 close 和 eof 处理的,后续调用resp.Body.Close() = bodyEOFSignal.Close()      resp.Body = body      ......       // Before looping back to the top of this function and peeking on      // the bufio.Reader, wait for the caller goroutine to finish      // reading the response body. (or for cancellation or death)      select {      case bodyEOF := <-waitForBodyRead:         replaced := pc.t.replaceReqCanceler(rc.cancelKey, nil) // before pc might return to idle pool         alive = alive &&            // 非读取完,设置alive是false,标识连接不能继续使用            bodyEOF &&            !pc.sawEOF &&            pc.wroteRequest() &&            replaced && tryPutIdleConn(trace)         if bodyEOF {            eofc <- struct{}{}         }      case <-rc.req.Cancel:         alive = false         pc.t.CancelRequest(rc.req)      case <-rc.req.Context().Done():         alive = false         pc.t.cancelRequest(rc.cancelKey, rc.req.Context().Err())      case <-pc.closech:         alive = false      }    }} type bodyEOFSignal struct {   body         io.ReadCloser   mu           sync.Mutex        // guards following 4 fields   closed       bool              // whether Close has been called   rerr         error             // sticky Read error   fn           func(error) error // err will be nil on Read io.EOF   earlyCloseFn func() error      // optional alt Close func used if io.EOF not seen} func (es *bodyEOFSignal) Close() error {   es.mu.Lock()   defer es.mu.Unlock()   if es.closed {      return nil   }   es.closed = true    // 如果非io.EOF,也就是读取完数据的情况,调用earlyCloseFn方法   if es.earlyCloseFn != nil && es.rerr != io.EOF {      return es.earlyCloseFn()   }   err := es.body.Close()   return es.condfn(err)} // caller must hold es.mu.func (es *bodyEOFSignal) condfn(err error) error {   if es.fn == nil {      return err   }   err = es.fn(err)   es.fn = nil   return err}


04

上线后效果

图片
图片

可以看到在 21:00 上线后dns请求QPS急剧下降

图片
05

建议

图片
图片

http服务与客户端最好都接入httptrace,采集数据并上报面板,完整的trace监控可以帮助快速定位问题,之前单从dns解析超时没有确认到是连接复用问题导致排查历程比较曲直。

图片

参考链接

图片


  • Go http client 连接池不复用的问题: 

    https://blog.csdn.net/qcrao/article/details/111570114




三七互娱技术团队

扫码关注 了解更多

图片


继续滑动看下一个
三七互娱技术团队
向上滑动看下一个