cover_image

kafka-go消费者代码分析(一)

三七互娱技术团队
2022年08月01日 09:39

kafka环境

为方便测试,创建一个名为`test-topic`的主题,4个分区。

主题已经保存了7条消息,随机分布在4个分区里,如下:

注: 我使用的是kafka gui工具`kowl`,你也可以使用`kafka/bin`里的命令行工具。

接下来编写消费代码,如下:

func main() {   r := kafka.NewReader(kafka.ReaderConfig{      GroupTopics:    []string{"test-topic"},       //消费topic      Brokers:        []string{"localhost:9092"},   //连接bootstrap server      GroupID:        "test-consumer-group",        //指定消费组`test-consumer-group`      SessionTimeout: time.Second * 6,              //测试期间为降低重平衡耗时,把session过期时间调低   })   ctx := context.Background()   for {      m, err := r.FetchMessage(ctx)      if err != nil {         log.Fatal(err)      }      log.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))   }}

执行代码输出以下结果:

2022/06/07 15:13:19 message at topic/partition/offset test-topic/2/0: Key-C = Value-C2022/06/07 15:13:19 message at topic/partition/offset test-topic/2/1: Key-G = Value-G2022/06/07 15:13:19 message at topic/partition/offset test-topic/0/0: Key-A = Value-A2022/06/07 15:13:19 message at topic/partition/offset test-topic/0/1: Key-E = Value-E2022/06/07 15:13:19 message at topic/partition/offset test-topic/1/0: Key-B = Value-B2022/06/07 15:13:19 message at topic/partition/offset test-topic/1/1: Key-F = Value-F2022/06/07 15:13:19 message at topic/partition/offset test-topic/3/0: Key-D = Value-D



消费组与消费者

从上面测试代码可以看到,已经把`test-topic`的7条消息都读出来了。

消费组的名称是`test-consumer-group`,那么这个消费组里有多少个消费者?4个 or 1个 or N个?

打开`kowl`可以清晰看到只有1个消费者,如下:

图片


此消费者标识为`2e0a5cf5-1a52-4d31-aa1c-9de2d31688fa`, topic的4个分区全部交由它负责。

在消费代码我们创建了一个`kafka.Reader{}`实例,一个`kafka.Reader{}`就代表一个消费者

稍微调整程序,创建多个`kafka.Reader{}`实例,那么就可以看到多个消费者,代码如下:

...var wg sync.WaitGroupwg.Add(2)r1 := kafka.NewReader(kafka.ReaderConfig{   GroupTopics:    []string{"test-topic"},   Brokers:        []string{"localhost:9092"},   GroupID:        "test-consumer-group",   SessionTimeout: time.Second * 6,})r2 := kafka.NewReader(kafka.ReaderConfig{   GroupTopics:    []string{"test-topic"},   Brokers:        []string{"localhost:9092"},   GroupID:        "test-consumer-group",   SessionTimeout: time.Second * 6,})ctx := context.Background()go func() {   defer wg.Done()   for {      m, err := r1.FetchMessage(ctx)      ...   }}()go func() {   defer wg.Done()   for {      m, err := r2.FetchMessage(ctx)      ...   }}()wg.Wait()...

`kowl`展示的消费者如下:

图片


经过重平衡,组内的两个消费者均匀分配了两个分区。

在`kafka-go`里是如何创建消费者?从源码可以看到,其实就是一个重平衡里的`join`过程:

https://github.com/segmentio/kafka-go/blob/main/consumergroup.go#L798

图片

后面还会重点说重平衡,现在只需知道创建了`kafka.Reader{}`实例后,会请求`join`到消费组,然后就成了一个消费者。




消费者Poll数据

消费者从Kafka获取数据的过程叫`poll`,即`拉取`。

创建的`test-topic`有4个分区,但消费组`test-consumer-group`里只有一个消费者, 那么消费者是怎么poll数据?轮询每个分区poll or 并行poll ?

这时候就要借助Goland IDE的协程分析工具,它可以在debug模式下打印当前时刻的所有协程和对应的调用栈。

消费程序的协程信息如下:

图片


程序一共开了14个协程,其中有4个`pollWait`协程,看名字是跟poll数据有关,数量也跟分区数一致。因此初步猜想是开了4个协程,对应4个分区,各协程并行poll数据。

为了证实想法,还是要看代码:

https://github.com/segmentio/kafka-go/blob/main/reader.go#L1179


图片



果然,`kafka-go`循环遍历该消费者(`kafka.Reader{}`)分配到的所有分区,每个分区起一个独立协程来poll数据

再回到消费代码,在主协程里只需要调用`r.FetchMessage()`就可以把所有消息读出来,如下:

...for {   m, err := r.FetchMessage(ctx)   ...}...

4个子协程如何把数据传出来让`r.FetchMessage()`获取?相信你也能想到,协程间的通信可以使用channel,`kafka-go`就是这么做的:

https://github.com/segmentio/kafka-go/blob/main/reader.go#L1224

图片


从上面代码可以看到,在创建4个poll协程的时候,每个协程都初始化一个`kafka.reader{}`(注意是小写)实例,这些`kafka.reader{}`共享一个`r.msgs  channel`。poll出来的数据都推进去,最后在` r.FetchMessage()`把channel数据读出来即可:

https://github.com/segmentio/kafka-go/blob/main/reader.go#L824


图片


现在可以对消费组和消费者做个简单小结:

消费组内单消费者:图片


消费组内多消费者:

图片



消费者重平衡

重平衡的作用和触发原因google一下就能搜到,这里不赘述。

在`消费组与消费者`一节,我们把消费者从1个变成2个,消费者分配到的分区就会做调整,这就是重平衡所做的工作。

重平衡是kafka一个重要概念,但同时也饱受诟病,因为在重平衡期间会`stop the world`,所有消费都暂停,直至重平衡完成。那么重平衡过程`kafka-go`里是如何实现?

重平衡是通过`Heartbeat`机制告知消费者暂停消费,那么先看下`kafka-go`的`Heartbeat`实现代码。同样是先打开IDE协程分析器:

图片

可见每个消费者都有一个独立的协程维持心跳,协程执行的代码如下:

https://github.com/segmentio/kafka-go/blob/main/consumergroup.go#L464

图片

代码并不复杂,就是定时发一个`heartbeat`请求给`coordinator`(协调者),正常情况下,`g.conn.heartbeat()`函数返回的err是`nil`,那么循环继续。一旦`coordinator`告知消费者要进行重平衡,`g.conn.heartbeat()`函数返回的err就不为`nil`了。我们可以断点看看:

图片


看到err的内容是`RebalanceInProgress (27)`,意思是告诉消费者,你停一下消费,要准备重平衡。

` Heartbeat`协程收到重平衡err后就会退出,然后把`done channel`关闭,所有`pollWait`协程都退出,接下来就进行新一轮的` generatation`, 消费者重新加入消费组(`join`),重新分配分区(`sync`),最后重新创建`pollWait`协程 ... ...代码有点长,耐心看,核心代码如下:

https://github.com/segmentio/kafka-go/blob/main/consumergroup.go#L723

图片


在重平衡期间,消费是停止的(阻塞),具体是这里实现:

https://github.com/segmentio/kafka-go/blob/main/consumergroup.go#L709

图片

`cg.next channel`只有在重平衡完成后才有数据写入,因此在重平完成之前,只能阻塞等待。





位移提交

为什么需要位移提交?

再次回到消费程序,注意我是故意没有提交位移:

...{   ctx := context.Background()   for {      m, err := r.FetchMessage(ctx)      if err != nil {         log.Fatal(err)      }      ...   }}...

虽然没有提交位移,但还是能顺序消费下去。因为在内存里已经维护了一个位移计数器,如下:

https://github.com/segmentio/kafka-go/blob/v0.4.32/reader.go#L1511图片

poll完一条消息就+1,再poll下一条。

试想一下,如果进程重启或者重平衡,那这个在内存里保存的位移计数器就会清零,即要从头再消费,这显然是不靠谱的。

因此我们需要把这个位移保存在远端——位移提交。当前版本kafka的消费者组位移保存在`__consumer_offsets`主题,使用`kowl`可以看到数据格式如下:

Key:

图片

Value:图片`kafka-go`提供的位移提交方式又分`自动提交`和`手动提交`。



1). 自动提交

指定`kafka.ReaderConfig.CommitInterval`就可以开启`自动提交`。

`自动提交`的写法有两种:


//方式1: FetchMessage()+CommitMessage()func main() {   r := kafka.NewReader(kafka.ReaderConfig{      GroupTopics:    []string{"test-topic"},      Brokers:        []string{"localhost:9092"},      GroupID:        "test-consumer-group",      SessionTimeout: time.Second * 6,      CommitInterval: time.Second,                        //1.指定自动提间隔时间   })   ctx := context.Background()   for {      m, err := r.FetchMessage(ctx)                       //2.调用r.FetchMessage()      if err != nil {         log.Fatal(err)      }      log.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))      if err := r.CommitMessages(ctx, m); err != nil {   //3.注意,必须手动调用r.CommitMessages()         log.Fatal(err)      }   }} //方式2: ReadMessage()func main() {   r := kafka.NewReader(kafka.ReaderConfig{      GroupTopics:    []string{"test-topic"},      Brokers:        []string{"localhost:9092"},      GroupID:        "test-consumer-group",      SessionTimeout: time.Second * 6,      CommitInterval: time.Second,                   //1.指定自动提间隔时间   })   ctx := context.Background()   for {      m, err := r.ReadMessage(ctx)                  //2.调用r.ReadMessage()里已经包含了r.CommitMessages()      if err != nil {         log.Fatal(err)      }      log.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))   }}


`Reader.FetchMessage()`和`Reader.ReadMessage()`最大的区别是: `Reader.ReadMessage()`里面包含了`Reader.CommitMessages()`。代码如下:

https://github.com/segmentio/kafka-go/blob/main/reader.go#L781图片

也就是说,上面的两种写法其实都有调用`Reader.CommitMessages()`。

`kafka-go`的`自动提交`是如何实现?还是先看IDE的协程分析: 图片

为了实现自动提交,`kafka-go`开启了一个独立的`commitLoopInterval`协程,具体代码如下:

https://github.com/segmentio/kafka-go/blob/main/reader.go#L334

图片


这个协程做的事情很简单,时间到点了就把`Reader.CommitMessages()`提交过来的`offset`合并,取最大值,然后提交到`coordinator`,代码如下:

https://github.com/segmentio/kafka-go/blob/main/reader.go#L263

图片

所以我们可以看到,`Reader.CommitMessages()`负责把`offset`推到`r.commits channel`,然后在`commitLoopInterval`协程把它取出来,合并提交。

因此开启了`自动提交`,调用`Reader.CommitMessage()`是非阻塞的,除非`r.commits channel`满了,`r.commits channel`的容量可以通过配置`kafka.ReaderConfig.QueueCapacity`调整。

`自动提交`的流程图如下:图片



2). 手动提交

不指定`kafka.ReaderConfig.CommitInterval`就是`手动提交`。

同样,`手动提交`的写法也有两种,代码如下:

//方式1: FetchMessage()+CommitMessage()func main() {   r := kafka.NewReader(kafka.ReaderConfig{      GroupTopics:    []string{"test-topic"},      Brokers:        []string{"localhost:9092"},      GroupID:        "test-consumer-group",      SessionTimeout: time.Second * 6,   })   ctx := context.Background()   for {      m, err := r.FetchMessage(ctx)                     //1.调用r.FetchMessage()      if err != nil {         log.Fatal(err)      }      log.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))      if err := r.CommitMessages(ctx, m); err != nil {   //2.注意,必须手动调用r.CommitMessages()         log.Fatal(err)      }   }}  //方式2: ReadMessage()func main() {   r := kafka.NewReader(kafka.ReaderConfig{      GroupTopics:    []string{"test-topic"},      Brokers:        []string{"localhost:9092"},      GroupID:        "test-consumer-group",      SessionTimeout: time.Second * 6,   })   ctx := context.Background()   for {      m, err := r.ReadMessage(ctx)            //2.调用r.ReadMessage()里已经包含了r.CommitMessages()      if err != nil {         log.Fatal(err)      }      log.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))   }}


你没看错,代码几乎是跟`自动提交`一样的,除了少了一行`CommitInterval: time.Second`。

 `kafka-go`如何实现手动提交?还是先从协程分析看起:

图片


跟`自动提交`类似,实现`手动提交`也是起了一个独立协程,`commitLoopImmediate`,执行代码如下:

https://github.com/segmentio/kafka-go/blob/main/reader.go#L221

图片


还是从`r.commits channel`里取数据,但跟`自动提交`不一样的是,这里是取出数据,合并,马上提交到`coordinator`,而`自动提交`是到点了再提交。

`手动提交`的返回结果放到了另一个`req.errch channel`,在`Reader.CommitMessages()`就可以马上取出来,实现`同步`提交效果,代码如下:

https://github.com/segmentio/kafka-go/blob/main/reader.go#L899

图片


即在`自动提交`下,调用`Reader.CommitMessages()`是阻塞的,需要等待返回提交结果。

流程图如下:

图片



采用哪种提交方式

使用消息队列中间件,在数据消费层面需要关注的无非两个问题:

1).消费速度,能否充分利用计算性能,多进程/多线程,甚至是分布式并行消费;

2).数据安全,能否不丢失数据和避免重复消费;

采用哪种提交方式直接影响消费速度和数据安全,也是使用kafka比较有技巧的一部分,下一篇wiki重点讨论 : )



继续滑动看下一个
三七互娱技术团队
向上滑动看下一个