前言
相信kafka大家也许都不会陌生,作为Apache的顶级项目,在平时的项目工作中或多或少都和kafka打过交道,大家使用方式无非是接入kafka客户端来进行生产或者消费消息。但是直连kafka消费的时候会存在一些问题:
开发人员水平参差不齐: 对于不同的程序员,对于kafka的熟悉程度是不同的,使用kafka客户端的时候,参数配置,使用方式均会有所差异,这样子就会造成生产环境上面运行的不同质量,不同参数的kafka代码,很容易出现问题。
千差万别的kafka客户端:不同的开发语言对应不同的kafka客户端sdk,同一语言也对应多种kafka客户端sdk库,同一kafka客户端sdk库同样对应着各种不同的版本。这对于多语言团队选择kafka的客户端版本来说简直是场灾难
Kafka版本升级困难重重:如果kafka服务端的版本已经不能满足现有的业务,或者有重大漏洞需要升级kafka版本的时候,需要找所有接入kafka的业务方一起评估影响,随之影响的是所有接入kafka的业务方也得统一升级客户端sdk的版本,升级过程极其复杂和漫长,并且伴随着很高的风险。
服务滚动重启rebalance风暴:对于有着成百上千消费kafka服务实例的应用来说,上线重启简直是一场灾难,因为滚动重启成百上千台服务实例的应用耗时会很长,每次重启一次都会伴随着消费者的下线上线,kafka会一直进行rebalance,意味着重启过程中下游服务将会停止消费,对于实时性要求很高的应用来说,这是不可接受的。
消费者实例数量和partition的数量强绑定:partition只能被一个消费者消费,意味着如果想要提升消费能力,水平扩服务实例的同时,partition的数量也得扩大,但是partition的数量不可能是无限增长的,随着partition数量增大会对kafka的性能产生很大的影响。
千变万化的业务需求:业务使用kafka除了发布订阅的模型外,还会有很多业务需求,例如,业务需要使用延时消息,事务消息,死信队列,等等需求的时候,kafka是没有提供的,需要业务方自己实现。
毒丸消息和消息处理延时:对于同一个partition中的消息,当某一条消息下游无法消费的时候,后面的所有消息也将无法消费,或者当第一条消息处理耗时长,第二条消息处理耗时短,那么对于第二条消息来说,处理耗时仍然是很长的。
本文将围绕上述的一些问题来分享顺丰同城内部的相关探索与实践,以及延时消息和事务消息的原理,全篇摒弃了枯燥的知识点罗列的方式,采用风趣诙谐的讲故事的方式来带大家了解SFMQ,希望对大家有所帮助和启发。
楔子
周一早上刚到了公司,小Bo接到了产品经理的消息,“阿Bo,我们想根据用户的消费额度和频率给用户积分,根据这个来给用户做画像进行产品推送。。。”。blabla聊完一通后,小Bo在以前的代码中添加了几个跟用户积分平台的接口调用。
周三顺利上线后,小Bo美美地在家中睡去。
周四早上到了公司后,产品经理又发来一个消息,“阿Bo,用户反馈下单比原来慢了很多,并且还总失败,体验极差,能优化下么?”。blabla聊完一通后,小Bo思考如何解决这个问题,找到了公司的资深架构师李哥。。。
小Bo的下单系统是公司的流量的主要入口,当用户下完单时,需要同仓库系统、优惠券系统交互,需要跟邮件系统、推送系统交互给用户发邮件,还有新写的和积分系统的交互。。。李哥简单看了下小Bo业务的耗时链路和业务架构分析,给小Bo提了几个建议:跟邮件系统、推送系统、积分系统之间的交互可以同时进行。
同时进行,小Bo灵机一动,我整一个线程池,并发去进行以上的业务逻辑就可以了吧。
突然伴随着某一天业务的流量高峰的到来,服务挂了,有一些订单处在了中间状态,怎么办,需要写脚本去数据库中捞数据,根据订单状态走流程。小Bo又找到了李哥。。。
李哥说可以优化业务架构,用消息队列解决这个问题,当用户下完单后,可以将下单这一条信息写到消息队列中,异步系统消费消息队列,根据不同的业务逻辑去消费下单消息去给用户发邮件、发短信、计算积分等等。
小Bo根据李哥的建议将业务系统进行了优化,觉得消息队列这个中间件很神奇,开始对消息队列进行了进一步了解,小Bo熟练的打开了Chrome,依次输入了以下几个问题:
“什么是消息队列?”
“消息队列解决哪些问题?”
“业界有哪些消息队列?”
...
经过了一天的学习,小Bo打开了公司的Confluence在自己的个人空间下写下来如下的笔记:
消息队列是与业务无关,高效可靠地进行消息传递的中间件;消息队列主要由三部分组成,分别是生产者、消息队列服务端、消费者。生产者发送消息给消息队列服务端,无需等待;消费者去消息队列客户端获取消息进行消费,无需对生产者做出同步相应;生产者无需知道是哪些消费者,只需要知道自己把消息生产到哪里,同理,消费者也无需知道消息是来自于哪些生产者,只需要知道自己想要消费哪些消息;消息队列服务端负责将生产者的消息持久化,并且负责记录消费组对消息的消费进度,以便消费者出现异常重启后,避免大量的重复消费的情况。
消息队列主要带给业务的三个功能是:削峰、异步、解耦。
削峰:可以通过消息队列长度控制请求量,可以缓解短时间内的高并发请求。消费侧可以通过控制消费速度,从而稳定系统的负载。
异步:主要目的是减少请求响应时间和解耦。主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。这样A系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写库,自己本地写库 要 3ms,BCD 三个系统分别写库要300ms、450ms、200ms。最终请求总延时是 3 + 300 + 450 + 200 = 953ms,接近 1s,用户感觉搞个什么东西,慢死了慢死了。用户通过浏览器发起请求。如果使用消息队列,那么 A 系统连续发送 3 条消息到消息队列中,假如耗时 200ms,A系统从接受一个请求到返回响应给用户,总时长是203ms。
解耦:同时由于使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦。A系统发送数据到 BCD 三个系统,通过接口调用发送。如果E系统也要这个数据呢?那如果C系统现在不需要了呢?A系统负责人几乎崩溃…A系统跟其它各种乱七八糟的系统严重耦合,A系统产生一条比较关键的数据,很多系统都需要A系统将这个数据发送过来。如果使用消息队列,A系统产生一条数据,发送到消息队列里面去,哪个系统需要数据自己去消息队列里面消费。如果新系统需要数据,直接从消息队列里消费即可;如果某个系统不需要这条数据了,就取消对消息的消费即可。这样下来,A系统压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超时等情况。
业界常用的消息队列主要有Kafka、ActiveMQ、RabbitMQ、RocketMQ、Pulsar等等。
了解完消息队列基本的一些问题后,小Bo通过跟同事了解,知道公司有一套自己的消息队列的接入叫SFMQ,于是又找到李哥,向李哥请教了如下几个问题:
“我了解了Kafka这些业界使用的消息队列,我们公司用的是SFMQ,SFMQ是什么?”
“为什么要使用SFMQ,我直接用开源Kafka给的客户端的sdk接入可以么?”
通过跟李哥进行了深入沟通,小Bo了解到了,为什么公司会有个SFMQ;
SFMQ是基于Kafka包了一层Http代理,这样做有哪些好处:
统一业务接入:通过代理屏蔽多技术栈接入Kafka的细节。公司的技术栈丰富,同时存在Java、PHP、Go技术栈,不同语言技术栈接入Kafka有着不同的实现,并且有些语言由于语言特性的原因并没有稳定的Kafka客户端,在业务服务和Kafka之间添加http代理可以屏蔽不同的语言之间的差异;
Kafka及SDK升级透明:当公司对kafka升级和对不同版本的Kafka接入时,仅需要代理层进行测试升级即可,无需业务系统关注;
Rebalance透明:Kafka的rebalance仅仅影响代理层的服务,业务系统无需关心,只需要关注Http接口的结果即可;
资源节约:生产者和消费者同Kafka集群之间都是通过tcp长连接来保持通信;添加代理层后,仅需代理层同Kafka保持长连接,降低了Kafka服务端的压力;
代理层可以封装通用能力:代理层可以统一对消费进行公共能力的封装,比如在生产代理层封装延迟消息、事务消息,在消费者代理层封装了重试、死信、延迟、流控、过滤。
权限控制负载均衡:这些功能可以很方便的在代理层控制。
最后,小Bo贴上了李哥给的公司的SFMQ目前的系统图;
didadidadida。。。伴随着悦耳的微信语音铃声,小Bo缓缓接听起了产品经理的来电,“阿Bo,忙么,根据目前的业务需求,客户再下单后有15分钟的冷静时间,在这15分钟内,客户可以直接取消订单,所以,系统给客户发送短信需要15分钟后判断是否用户是否取消来判断是否发送短信。”
接到需求后,小Bo想到,等待十五分钟,那就在消费到消息时,直接Sleep(15min),然后在进行判断,看是否发送短信不就行了么;当提出了这个想法后,小Bo迅速写好了代码,结果可想而知,小Bo想起了和李哥深入交流的那天,李哥提到的在SFMQ中的延迟队列功能。那天,李哥是这么介绍SFMQ的延迟队列的:
李哥先将SFMQ延迟消息的架构图发给了小Bo:
然后婉婉到来,阿Bo你这种延迟消息的场景其实在咱们公司比较普遍,我来给你举两个例子;
你大振哥负责的SDS路径规划的业务场景需要在判断未来一段时间内,判断小哥有没有对收派件做操作,就将消息先写入延迟队列,在消费到这条消息时进行业务判断是否进行下一步业务逻辑;
还有你远哥负责的新O的业务中,存在部分订单在下单后一段时间内,可能存在取消的操作,延迟一段时间后判断该订单是否应该下发;
远哥这场景不是跟我的基本一样么,小Bo心想。心里这么想着,嘴上却问出了其他的问题,“那SFMQ的延迟队列功能是怎么实现的呢?”。
李哥说,你看看图。业界实现延迟队列功能有很多方式,有些消息队列天然有延迟消息的功能,比如RocketMQ;有些消息队列通过插件的方式也能实现延迟消息的功能,比如RabbitMQ。SFMQ是基于Redis来实现的延迟队列,Redis的数据结构zset是一个有序集合,会储存一个score和一个value,可以将value按照score进行排序,我们消息想要发送的时间作为score放到zset中,用一个线程定时查询zset中score小于等于当前时间的消息将其推送给一个消息队列,消费队列中的都是满足延迟条件的消息。
小Bo又问道,“如果当前我的消息量很大,一个zset满足不了我当前的业务诉求怎么办呢。”
李哥嘴角微微上扬,心想“看来随着我们公司业务的增长,我们确实面临了很多量上面的设计考量。”同时回到,“根据我们的调研,一个zset的容量大概达到10万的时候会成为redis的慢查询,所以针对量大的场景,SFMQ的延迟消息可以平行扩容,通过生产者代理层进行负载均衡将消息写入不同的zset中,同时每一个zset都有独立的线程进行消费,这样就能根据业务的实际场景进行扩缩容。”
“啊???那线上的zset是根据什么规则分配给不同的线程进行消费的呢?也是Kafka的rebalance么?”
李哥说,看图。说着,随手将一张图发了过来,随着图的同时还有一段话。
“我们有个分片服务,只要将生产者代理层的配置中注册上你的任务,并将你的任务注册到分片服务中,分片服务会通过接口将代理层负责处理的zset发送过来,这时就可以对该zset进行消费,同时分片服务会和生产者代理层保持心跳,在单个生产者代理层中某一台机器宕机或者发生异常后,会将其负责的zset分配给生产者代理层的其他机器上。”
“听上去和Kafka的rebalance很像,”小Bo说到。
“是的,但是有很多优化。”
“优化了哪些内容呢?”
“改天再讲吧,我打的车到了。”
* * *
又有一天,小Bo发现,业务系统出现了如下的问题。系统收到一个订单后,然后将该订单存到本地数据库,再发送一个消息到库存系统,库存系统再将库存减一,正常的情况下该流程是没有问题的,小Bo梳理了一下其中的流程:
1.先保存订单(本地事务)
2.再发送消息到库存系统
一、按照上述的流程,可能会出现两种情况:
1.如果保存订单出错,那么直接返回错误,对一致性没有影响
2.保存订单正常,发送消息失败,那么保存订单已经完成,无法回滚,一致性出现问题
二、那么再看第二种情况:
1. 先发送消息到系统库存
2. 再保存订单
小Bo对现有的情况一顿分析,发现系统发送消息成功,但是本地订单保存失败,但是消息无法回滚,造成数据不一致的问题;将发送消息的过程放到本地订单的事务中,都成功了再提交,但是还是不可避免的问题就是,失败的话,消息是无法回滚的。
小Bo又找到了李哥,说明了来意。
李哥说,“可以了解下SFMQ事务消息,主要解决的就是发送的消息无法回滚的问题。给你看个图。”
“为了实现消息无法回滚的问题,我们将发送消息的步骤从一步增加到了三步。第一步:和发送正常消息一样,发送一条消息到生产者代理层,此时的消息叫做半消息,该消息是不会投递到kafka的,而是先将消息存储到redis中。第二步:根据本地事务的执行结果,再次发送一条消息到生产者代理层,此时的消息叫ack消息,如果本地事务成功,那么就发送commit,如果本地事务失败,那么就发送rollback。当发送ack消息不成功时生产者代理层是不知道该条半消息到底是提交还是回滚,所以我们还需要第三步来确保半消息会被处理。第三步:回查,生产者代理层发现收到半消息后,迟迟没有收到第二步的ack消息,就会去向半消息的发送方发送请求,询问该条消息到底是提交还是回滚;通过回查来保证生产者代理层只要能收到半消息,就能确保该消息被处理。”
“好重啊。。。”,阿Bo小声嘀咕。
“确实很重,事务消息的可靠性依赖于业务方的消息回查接口,增加了业务的复杂度;生产者代理层复杂度增加,需要额外的存储开销来对事务消息进行存储。。
但其实也解决了业务方遇到的问题,复杂的业务场景确实需要复杂的交互流程来保证。架构在某种意义上讲就是取舍。”
“架构在某种意义上讲就是取舍。。。”小Bo既没看懂,也没敢继续问了,因为已经到了21点30分。
用上了SFMQ后,小Bo根据李哥给的Confluence指导,根据自己的业务场景配置好了相应的配置。面对缤纷复杂的配置,小Bo不禁陷入了沉思,每个配置的作用分别是什么,如何更好的配置,使用SFMQ提供的通用能力,减少业务开发成本。
小Bo根据学习,知道消费者模型分为两种普遍的模型,推模型和拉模型;Kafka原本的消费者是拉模型,为什么SFMQ的消费者代理层是推模型呢?小Bo经过多次和李哥的交流学习过后,进行了分析消费代理层去Kafka拉取相应的消息,将消息通过http接口推送给业务的方式。如果是拉模式的话,需要设计一套完善的协议,同时可能需要消费代理层来存储一些消息,供业务方来拉取;推模型就简单,消费代理层去Kafka拉取消息,并将消息调用下游接口,成功则提交offset,不成功则不提交offset。
小Bo将自己的想法跟李哥聊了聊,李哥讲到,“你说的对,并且在推模型中,SFMQ还做了很多一些业务通用能力,比如:delay、过滤、流量识别(灰度/压测)、调用下游限流、推送重试规则。”
“能详细地介绍下这些能力分别面对什么场景么?”
“delay这个是用来解决主从延迟的,比如当你的消息在消费到的时候,可能刚写入主库,查从库查不到,这个时候delay一定的时间就能解决这个问题;可以根据消息体中的一些消息内容进行过滤,比如你只想消费消息体CityName: "ShenZhen"的消息,就可以通过过滤配置将其他不符合的消息过滤掉,这时消费代理层会直接提交offset,不会将其推送给下游配置;可以根据消息体中的信息识别出来消息的类型,比如识别灰度,会将消息推送给下游的灰度机器,识别压测,会在推送下游http接口的时候带上压测Header;同时可以通过QPS配置来控制对下游的压力,避免业务系统被压垮;推送下游http接口可以通过配置来决定消息是否丢弃和下一次重试的间隔。”
说完上一番话,李哥顿了顿,“kafka的原生消费方式存在一些问题。”
“别卖关子了,李哥,给我讲讲存在什么问题,SFMQ又是如何避免这些问题的吧,我请你喝奶茶。”
“看在你请我喝喜茶的份上,我给你讲讲。老规矩,先看图。
Kafka原生消费主要存在三个问题;partition扩展,消费组基于kafka进行消费能力的扩容只能对原始kafka的partition数量进行扩容,但是partition的数量是无法无限增加的;消息延迟,相同p的消息的消费顺序是根据该p内的消息的生产顺序而定的,前面的消息消费速度慢会导致后面的消息阻塞;消息阻塞,前面的消费消费失败会导致后面的消息直接阻塞。”
“那SFMQ是如何解决的呢,还看看图。
“消费单partition并发,SFMQ在消费单个partition时通过SFMQ的Window组件并发推送下游,但是相同key的消息有序推送下游。SFMQ保证了单P中的相同Key的有序性。乱序提交,推送成功后,SFMQ的Window会将该条消息的offset提交到OffsetManager,由OffsetManager来判断能提交的最大offset,解决同一个partition中消息延迟问题,不同消息的消费速度的消息不会互相影响。SFMQ还有死信队列,推送下游失败后,SFMQ将消息保存在死信队列中,将该条消息的offset提交到OffsetManager,并根据配置进行重试,避免消息阻塞问题,消息阻塞不影响提交offset从而影响该partition中的其他消息。”
“原来如此。多谢李哥。”
“不客气!”
第二天,李哥的桌上多了一杯蜜雪冰城。“你爱我啊我爱你啊,蜜雪冰城甜蜜蜜”,李
哥不禁唱了出来。
伴随着多次和李哥深入的交流,小Bo对如何在业务中使用SFMQ已经十分熟练了,并且多次通过修改配置来解决业务方向的难题。为了更好的了解消息队列和SFMQ中的通用能力实现,小Bo找李哥要来了SFMQ的gitlab库。
通过上述了解相信大家对SFMQ也有了不少了解,SFMQ就是kafka的代理层,通过加了一层代理,屏蔽了内部实现,展示给使用者最简单的使用方式,可以在代理层实现各种和消息队列相关的业务功能,业务接入kafka只需要像调用别人提供的接口一样,便得无比的简单和无脑,让业务更加关注与业务,所有和kafka的交互升级全部交给SFMQ来实现。在前言中所罗列的一些问题,也都迎刃而解。