业务背景大概为:生产者在其他平台使用其他框架进行消息生产,每分钟约7000条数据,对应topic有30个分区,消费者使用内部框架封装的consumer(对kafka-go的v0.4.38版本做了简单封装)进行消费,消费者部署了6个pod(pod的机器规格是4u8g),每个pod有5个goroutine进行消费。
在消费过程中,发现CPU持续缓慢增加,consumer消费能力不断减小,跟不上生产者速度,导致数据消费不过来。经过大约一天时间后,对应pod的CPU已经跑满,消费能力基本为0,严重影响业务功能,只能进行pod重启,重启后积压数据被瞬间消费完成,持续观察发现一天后现象重复,CPU又被跑满。对应监控图为:
在测试环境上,在相应的代码中增加端口监听pprof采集数据。
import _ "net/http/pprof"
go func() {
ip := "0.0.0.0:6060"
if err := http.ListenAndServe(ip, nil); err != nil {
fmt.Printf("start pprof failed on %s\n", ip)
os.Exit(1)
}
}()
2.2.2 pprof具体分析
cpu(CPU Profiling):$HOST/debug/pprof/profile,默认进行30s的 CPU Profiling,得到一个分析用的profile文件;
goroutine:$HOST/debug/pprof/goroutine,查看当前所有运行的goroutines 堆栈跟踪。
本地通过go tool pprof -http=:8081 pprof.main.samples.cpu.003.pb 生成火焰图,发现CPU主要耗在kafka-go的CommitMessage和FetchMessage两个方法上:
通过list 查看CommitMessage信息,发现主要是在kafka-go的reader.go源码的ctx.Done()和errCh上。
结合使用go tool pprof http://127.0.0.1:6060/debug/pprof/goroutine查看,都是在runtime.gopark上,这个是代表协程处于休眠状态。
找到对应版本的kafka-go的源码进行分析。
func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error {
if !r.useConsumerGroup() {
return errOnlyAvailableWithGroup
}
var errch <-chan error
creq := commitRequest{
commits: makeCommits(msgs...),
}
if r.useSyncCommits() {
ch := make(chan error, 1)
errch, creq.errch = ch, ch
}
select {
case r.commits <- creq:
case <-ctx.Done():
return ctx.Err()
case <-r.stctx.Done():
// This context is used to ensure we don't allow commits after the
// reader was closed.
return io.ErrClosedPipe
}
if !r.useSyncCommits() {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errch:
return err
}
}
继续分析源码,看r.commits这个chan的大小,在reader.go文件中,chan的大小是由config.QueueCapacity决定的,这个参数在没有设置时,默认是100。
通过命令go tool pprof http://127.0.0.1:6060/debug/pprof/goroutine?debug=2查看各协程的运行情况,不存在大量长期未关闭的协程。该命令都是获取程序运行时的快照信息,在某次执行时的goroutine信息中发现:
了解到上述情况后,分析业务代码,发现每次在获取消息之前,都会在ctx中写入traceId的value,而写入的过程是类似于:
ctx = context.WithValue(ctx, traceIdKey, traceValue)
package main
import (
"context"
"fmt"
"time"
)
func main() {
ch := make(chan int, 100)
go func() {
for {
<-ch
time.Sleep(10 * time.Millisecond) // simulate business operation
}
}()
ctx := context.Background()
traceValue := 0
for {
traceValue++
ctx = context.WithValue(ctx, "traceId", traceValue) // value write to ctx
select {
case ch <- traceValue: // value write to chan
case <-ctx.Done():
fmt.Println("ctx done")
return
}
}
}
在linux上执行go run main.go,通过top命令查看cpu情况,发现CPU在缓慢增长:
通过以上问题分析,主要收获点有:
如何更好地使用pprof工具进行CPU/内存分析;
对于开源软件(不限于kafka-go),如果需要查找相关问题,最好是根据源码进行分析;