为方便测试,创建一个名为`test-topic`的主题,4个分区。
主题已经保存了7条消息,随机分布在4个分区里,如下:
注: 我使用的是kafka gui工具`kowl`,你也可以使用`kafka/bin`里的命令行工具。
接下来编写消费代码,如下:
func main() {
r := kafka.NewReader(kafka.ReaderConfig{
GroupTopics: []string{"test-topic"}, //消费topic
Brokers: []string{"localhost:9092"}, //连接bootstrap server
GroupID: "test-consumer-group", //指定消费组`test-consumer-group`
SessionTimeout: time.Second * 6, //测试期间为降低重平衡耗时,把session过期时间调低
})
ctx := context.Background()
for {
m, err := r.FetchMessage(ctx)
if err != nil {
log.Fatal(err)
}
log.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}
}
执行代码输出以下结果:
2022/06/07 15:13:19 message at topic/partition/offset test-topic/2/0: Key-C = Value-C
2022/06/07 15:13:19 message at topic/partition/offset test-topic/2/1: Key-G = Value-G
2022/06/07 15:13:19 message at topic/partition/offset test-topic/0/0: Key-A = Value-A
2022/06/07 15:13:19 message at topic/partition/offset test-topic/0/1: Key-E = Value-E
2022/06/07 15:13:19 message at topic/partition/offset test-topic/1/0: Key-B = Value-B
2022/06/07 15:13:19 message at topic/partition/offset test-topic/1/1: Key-F = Value-F
2022/06/07 15:13:19 message at topic/partition/offset test-topic/3/0: Key-D = Value-D
从上面测试代码可以看到,已经把`test-topic`的7条消息都读出来了。
消费组的名称是`test-consumer-group`,那么这个消费组里有多少个消费者?4个 or 1个 or N个?
打开`kowl`可以清晰看到只有1个消费者,如下:
此消费者标识为`2e0a5cf5-1a52-4d31-aa1c-9de2d31688fa`, topic的4个分区全部交由它负责。
在消费代码我们创建了一个`kafka.Reader{}`实例,一个`kafka.Reader{}`就代表一个消费者。
稍微调整程序,创建多个`kafka.Reader{}`实例,那么就可以看到多个消费者,代码如下:
...
var wg sync.WaitGroup
wg.Add(2)
r1 := kafka.NewReader(kafka.ReaderConfig{
GroupTopics: []string{"test-topic"},
Brokers: []string{"localhost:9092"},
GroupID: "test-consumer-group",
SessionTimeout: time.Second * 6,
})
r2 := kafka.NewReader(kafka.ReaderConfig{
GroupTopics: []string{"test-topic"},
Brokers: []string{"localhost:9092"},
GroupID: "test-consumer-group",
SessionTimeout: time.Second * 6,
})
ctx := context.Background()
go func() {
defer wg.Done()
for {
m, err := r1.FetchMessage(ctx)
...
}
}()
go func() {
defer wg.Done()
for {
m, err := r2.FetchMessage(ctx)
...
}
}()
wg.Wait()
...
`kowl`展示的消费者如下:
经过重平衡,组内的两个消费者均匀分配了两个分区。
在`kafka-go`里是如何创建消费者?从源码可以看到,其实就是一个重平衡里的`join`过程:
https://github.com/segmentio/kafka-go/blob/main/consumergroup.go#L798
后面还会重点说重平衡,现在只需知道创建了`kafka.Reader{}`实例后,会请求`join`到消费组,然后就成了一个消费者。
消费者从Kafka获取数据的过程叫`poll`,即`拉取`。
创建的`test-topic`有4个分区,但消费组`test-consumer-group`里只有一个消费者, 那么消费者是怎么poll数据?轮询每个分区poll or 并行poll ?
这时候就要借助Goland IDE的协程分析工具,它可以在debug模式下打印当前时刻的所有协程和对应的调用栈。
消费程序的协程信息如下:
程序一共开了14个协程,其中有4个`pollWait`协程,看名字是跟poll数据有关,数量也跟分区数一致。因此初步猜想是开了4个协程,对应4个分区,各协程并行poll数据。
为了证实想法,还是要看代码:
https://github.com/segmentio/kafka-go/blob/main/reader.go#L1179
果然,`kafka-go`循环遍历该消费者(`kafka.Reader{}`)分配到的所有分区,每个分区起一个独立协程来poll数据。
再回到消费代码,在主协程里只需要调用`r.FetchMessage()`就可以把所有消息读出来,如下:
...
for {
m, err := r.FetchMessage(ctx)
...
}
...
4个子协程如何把数据传出来让`r.FetchMessage()`获取?相信你也能想到,协程间的通信可以使用channel,`kafka-go`就是这么做的:
https://github.com/segmentio/kafka-go/blob/main/reader.go#L1224
从上面代码可以看到,在创建4个poll协程的时候,每个协程都初始化一个`kafka.reader{}`(注意是小写)实例,这些`kafka.reader{}`共享一个`r.msgs channel`。poll出来的数据都推进去,最后在` r.FetchMessage()`把channel数据读出来即可:
https://github.com/segmentio/kafka-go/blob/main/reader.go#L824
现在可以对消费组和消费者做个简单小结:
消费组内单消费者:
消费组内多消费者:
重平衡的作用和触发原因google一下就能搜到,这里不赘述。
在`消费组与消费者`一节,我们把消费者从1个变成2个,消费者分配到的分区就会做调整,这就是重平衡所做的工作。
重平衡是kafka一个重要概念,但同时也饱受诟病,因为在重平衡期间会`stop the world`,所有消费都暂停,直至重平衡完成。那么重平衡过程`kafka-go`里是如何实现?
重平衡是通过`Heartbeat`机制告知消费者暂停消费,那么先看下`kafka-go`的`Heartbeat`实现代码。同样是先打开IDE协程分析器:
可见每个消费者都有一个独立的协程维持心跳,协程执行的代码如下:
https://github.com/segmentio/kafka-go/blob/main/consumergroup.go#L464
代码并不复杂,就是定时发一个`heartbeat`请求给`coordinator`(协调者),正常情况下,`g.conn.heartbeat()`函数返回的err是`nil`,那么循环继续。一旦`coordinator`告知消费者要进行重平衡,`g.conn.heartbeat()`函数返回的err就不为`nil`了。我们可以断点看看:
看到err的内容是`RebalanceInProgress (27)`,意思是告诉消费者,你停一下消费,要准备重平衡。
` Heartbeat`协程收到重平衡err后就会退出,然后把`done channel`关闭,所有`pollWait`协程都退出,接下来就进行新一轮的` generatation`, 消费者重新加入消费组(`join`),重新分配分区(`sync`),最后重新创建`pollWait`协程 ... ...代码有点长,耐心看,核心代码如下:
https://github.com/segmentio/kafka-go/blob/main/consumergroup.go#L723
在重平衡期间,消费是停止的(阻塞),具体是这里实现:
https://github.com/segmentio/kafka-go/blob/main/consumergroup.go#L709
`cg.next channel`只有在重平衡完成后才有数据写入,因此在重平完成之前,只能阻塞等待。
为什么需要位移提交?
再次回到消费程序,注意我是故意没有提交位移:
...
{
ctx := context.Background()
for {
m, err := r.FetchMessage(ctx)
if err != nil {
log.Fatal(err)
}
...
}
}
...
虽然没有提交位移,但还是能顺序消费下去。因为在内存里已经维护了一个位移计数器,如下:
https://github.com/segmentio/kafka-go/blob/v0.4.32/reader.go#L1511
poll完一条消息就+1,再poll下一条。
试想一下,如果进程重启或者重平衡,那这个在内存里保存的位移计数器就会清零,即要从头再消费,这显然是不靠谱的。
因此我们需要把这个位移保存在远端——位移提交。当前版本kafka的消费者组位移保存在`__consumer_offsets`主题,使用`kowl`可以看到数据格式如下:
Key:
Value:`kafka-go`提供的位移提交方式又分`自动提交`和`手动提交`。
指定`kafka.ReaderConfig.CommitInterval`就可以开启`自动提交`。
`自动提交`的写法有两种:
//方式1: FetchMessage()+CommitMessage()
func main() {
r := kafka.NewReader(kafka.ReaderConfig{
GroupTopics: []string{"test-topic"},
Brokers: []string{"localhost:9092"},
GroupID: "test-consumer-group",
SessionTimeout: time.Second * 6,
CommitInterval: time.Second, //1.指定自动提间隔时间
})
ctx := context.Background()
for {
m, err := r.FetchMessage(ctx) //2.调用r.FetchMessage()
if err != nil {
log.Fatal(err)
}
log.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
if err := r.CommitMessages(ctx, m); err != nil { //3.注意,必须手动调用r.CommitMessages()
log.Fatal(err)
}
}
}
//方式2: ReadMessage()
func main() {
r := kafka.NewReader(kafka.ReaderConfig{
GroupTopics: []string{"test-topic"},
Brokers: []string{"localhost:9092"},
GroupID: "test-consumer-group",
SessionTimeout: time.Second * 6,
CommitInterval: time.Second, //1.指定自动提间隔时间
})
ctx := context.Background()
for {
m, err := r.ReadMessage(ctx) //2.调用r.ReadMessage()里已经包含了r.CommitMessages()
if err != nil {
log.Fatal(err)
}
log.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}
}
`Reader.FetchMessage()`和`Reader.ReadMessage()`最大的区别是: `Reader.ReadMessage()`里面包含了`Reader.CommitMessages()`。代码如下:
https://github.com/segmentio/kafka-go/blob/main/reader.go#L781
也就是说,上面的两种写法其实都有调用`Reader.CommitMessages()`。
`kafka-go`的`自动提交`是如何实现?还是先看IDE的协程分析:
为了实现自动提交,`kafka-go`开启了一个独立的`commitLoopInterval`协程,具体代码如下:
https://github.com/segmentio/kafka-go/blob/main/reader.go#L334
这个协程做的事情很简单,时间到点了就把`Reader.CommitMessages()`提交过来的`offset`合并,取最大值,然后提交到`coordinator`,代码如下:
https://github.com/segmentio/kafka-go/blob/main/reader.go#L263
所以我们可以看到,`Reader.CommitMessages()`负责把`offset`推到`r.commits channel`,然后在`commitLoopInterval`协程把它取出来,合并提交。
因此开启了`自动提交`,调用`Reader.CommitMessage()`是非阻塞的,除非`r.commits channel`满了,`r.commits channel`的容量可以通过配置`kafka.ReaderConfig.QueueCapacity`调整。
`自动提交`的流程图如下:
不指定`kafka.ReaderConfig.CommitInterval`就是`手动提交`。
同样,`手动提交`的写法也有两种,代码如下:
//方式1: FetchMessage()+CommitMessage()
func main() {
r := kafka.NewReader(kafka.ReaderConfig{
GroupTopics: []string{"test-topic"},
Brokers: []string{"localhost:9092"},
GroupID: "test-consumer-group",
SessionTimeout: time.Second * 6,
})
ctx := context.Background()
for {
m, err := r.FetchMessage(ctx) //1.调用r.FetchMessage()
if err != nil {
log.Fatal(err)
}
log.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
if err := r.CommitMessages(ctx, m); err != nil { //2.注意,必须手动调用r.CommitMessages()
log.Fatal(err)
}
}
}
//方式2: ReadMessage()
func main() {
r := kafka.NewReader(kafka.ReaderConfig{
GroupTopics: []string{"test-topic"},
Brokers: []string{"localhost:9092"},
GroupID: "test-consumer-group",
SessionTimeout: time.Second * 6,
})
ctx := context.Background()
for {
m, err := r.ReadMessage(ctx) //2.调用r.ReadMessage()里已经包含了r.CommitMessages()
if err != nil {
log.Fatal(err)
}
log.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}
}
你没看错,代码几乎是跟`自动提交`一样的,除了少了一行`CommitInterval: time.Second`。
`kafka-go`如何实现手动提交?还是先从协程分析看起:
跟`自动提交`类似,实现`手动提交`也是起了一个独立协程,`commitLoopImmediate`,执行代码如下:
https://github.com/segmentio/kafka-go/blob/main/reader.go#L221
还是从`r.commits channel`里取数据,但跟`自动提交`不一样的是,这里是取出数据,合并,马上提交到`coordinator`,而`自动提交`是到点了再提交。
`手动提交`的返回结果放到了另一个`req.errch channel`,在`Reader.CommitMessages()`就可以马上取出来,实现`同步`提交效果,代码如下:
https://github.com/segmentio/kafka-go/blob/main/reader.go#L899
即在`自动提交`下,调用`Reader.CommitMessages()`是阻塞的,需要等待返回提交结果。
流程图如下:
采用哪种提交方式
使用消息队列中间件,在数据消费层面需要关注的无非两个问题:
1).消费速度,能否充分利用计算性能,多进程/多线程,甚至是分布式并行消费;
2).数据安全,能否不丢失数据和避免重复消费;
采用哪种提交方式直接影响消费速度和数据安全,也是使用kafka比较有技巧的一部分,下一篇wiki重点讨论 : )