导读
第一部分: 基本介绍
1. 领域模型概述
2. 消息传输模型介绍
3. 普通消息的可靠性
小节
第二部分: 顺序消息
1. 我们遇到的线上问题
2. RocketMQ消息队列为什么会有顺序问题?
3. 顺序消息的使用场景
4. 实际开发过程中如何保证消息的顺序性?
5. 使用顺序消息需要注意的点
小节
第三部分: 事务消息
1. 我们遇到的线上问题
2. 为什么使用了消息队列反而不可控呢?
3. 事务消息的使用场景
4. 为什么需要引入分布式事务消息
5. 开源版本事务消息
6. 转转版本事务消息
小节
作者
团队
最近团队内部在RocketMQ的业务实践上有一些心得,想给大家分享一下,首先转转这边是有架构团队自研的ZZMQ的,所以我们自然而然的用的ZZMQ,考虑到受众人群,开篇会先讲开源版本的一些基础知识,然后从顺序消息和事务消息2个炽手可热的话题上逐渐转入到与ZZMQ的比较,希望可以帮助到大家绕过"坑"。
生产者(Producer):
Apache RocketMQ 中用于产生消息的运行实体,一般集成于业务调用链路的上游。生产者是轻量级匿名无身份的。
主流的消息中间件的传输模型主要为点对点模型和发布订阅模型。
点对点模型
点对点模型也叫队列模型,具有如下特点:
消费匿名:消息上下游沟通的唯一的身份就是队列,下游消费者从队列获取消息无法申明独立身份。
一对一通信:基于消费匿名特点,下游消费者即使有多个,但都没有自己独立的身份,因此共享队列中的消息,每一条消息都只会被唯一一个消费者处理。因此点对点模型只能实现一对一通信。
发布订阅模型
发布订阅模型具有如下特点:
消费独立:相比队列模型的匿名消费方式,发布订阅模型中消费方都会具备的身份,一般叫做订阅组(订阅关系),不同订阅组之间相互独立不会相互影响。
一对多通信:基于独立身份的设计,同一个主题内的消息可以被多个订阅组处理,每个订阅组都可以拿到全量消息。因此发布订阅模型可以实现一对多通信。
传输模型对比
点对点模型和发布订阅模型各有优势,点对点模型更为简单,而发布订阅模型的扩展性更高。Apache RocketMQ 使用的传输模型为发布订阅模型,因此也具有发布订阅模型的特点。
注:以上信息来源于官网
普通消息一般应用于微服务解耦、事件驱动、数据集成等场景,这些场景大多数要求数据传输通道具有可靠传输的能力,且对消息的处理时机、处理顺序没有特别要求。
RocketMQ存储端也即Broker端在存储消息的时候会面临以下的存储可靠性挑战:
1正常关闭,Broker可以正常启动并恢复所有数据。2、3、4同步刷盘可以保证数据不丢失,异步刷盘可能导致少量数据丢失。5、6属于单点故障,且无法恢复。解决单点故障可以采用增加Slave节点,主从异步复制仍然可能有极少量数据丢失,同步复制可以完全避免单点问题。
这里一般来说就需要在性能和可靠性之间做出取舍,对于RocketMQ来说,Broker的可靠性主要由两个方面保障:
页缓存:操作系统中用于存储文件系统缓存的内存区域。RocketMQ通过将消息首先写入页缓存,实现了消息在内存中的持久化。
CommitLog:是RocketMQ中消息的物理存储结构,包含了所有已发送的消息。CommitLog的持久化保证了即使在异常情况下,如Broker宕机,消息也能够被恢复。同步刷盘:是指将内存中的数据同步刷写到磁盘。RocketMQ确保消息在被发送后,首先在内存中得到持久化,然后再刷写到磁盘,从而防止数据的丢失。
异步刷盘:消息写入到页缓存中,就立刻给客户端返回写操作成功,当页缓存中的消息积累到一定的量时,触发一次写操作,或者定时等策略将页缓存中的消息写入到磁盘中。这种方式吞吐量大,性能高,但是页缓存中的数据可能丢失,不能保证数据绝对的安全。
实际应用中要结合业务场景,合理设置刷盘方式,尤其是同步刷盘的方式,由于频繁的触发磁盘写动作,会明显降低性能。
主 Broker:负责消息的读写和写入 CommitLog。从 Broker:用于备份主 Broker 的消息,确保在主 Broker 故障时可以顺利切换。同步复制:主节点将消息同步复制到所有从节点,确保从节点具有相同的消息副本。切换:在主节点发生故障时,从节点可以快速切换为新的主节点,确保消息服务的持续性。
Rocket Mq是通过offset来标记一个消费者组在队列上的消费进度,消费成功之后都会返回一个ACK消息告诉broker去更新offset,但是RocketMQ并不是每消费一条消息就做一次ACK,而是消费完批量消息后只做一次ACK。
所以ACK机制是为了准确的告知Broker批量消费成功的信息并且更新消费进度。
那批量消费时具体是如何更新消费进度?
优点:防止消息丢失。
缺点:会造成消息重复消费(使用方需要做幂等。
消费者从RocketMQ拉取到消息之后,需要返回消费成功来表示业务方正常消费完成。因此只有返回CONSUME_SUCCESS才算消费完成,如果返回 CONSUME_LATER 则会按照【重试次数】进行再次消费,【重试间隔为阶梯时间】。如果消费满16次之后还是未能消费成功,则不再重试,会将消息发送到死信队列,从而保证【消息消费】的可靠性。
默认最多重试16次,总时长4小时46分钟。
未能成功消费的消息,消息队列并不会立刻将消息丢弃,而是将消息发送到死信队列,其名称是在【消费组】前加 %DLQ% 的【特殊 topic】,如果消息最终进入了死信队列,则可以通过RocketMQ提供的相关接口从死信队列获取到相应的消息,进行报警人工干预或其他手段,保证了消费组的至少一次消费。
至此应该清晰的知道RocketMq为了保证可靠性做了哪些工作。接下来我们再把视角切换到今天的第一个核心问题顺序消息。
客服同学: @XXX 我们判责了,为啥客户还收不到退款? 售后单号xxxxxx。
研发同学: 让我看看。
…..
30分钟后
研发同学: 修复了,再看一下。
测试同学: 提个online, 研发填一下问题原因,责任归属。
研发同学:问题原因: 历史代码,我们没有顺序消费消息,正常的流程是 先判责完成,再打款. 这一单 先消费了打款消息, 还没有消费判责完成消息,状态不对导致打款失败。
测试同学:那后续如何修改啊。
研发同学: 为了保证消息的有序性,我等会把消息修改为顺序消费。
……
以上故事纯属于虚构
从上面的消息队列模型我们知道,1个topic有N个queue,将数据均匀分配到各个queue上,这样可以提升消费端总体的消费性能。比如一个topic发送10条消息,这10条消息会自动分散在topic下的所有queue中,所以消费的时候不一定是先消费哪个queue,后消费哪个queue,这就导致了无序消费。
消费消息时需要严格按照接收—处理—应答的顺序处理消息,避免使用异步回调或多线程处理,这样可以防止消息处理过程中的并发问题。对于每条消息,只有当它完全处理完毕并发送应答后,才继续处理下一条消息。
消费消息时异常如何处理
消息组尽可能打散,避免集中导致热点
顺序消息是一个老生常谈的问题,但是简单粗暴的硬搬网上的解决方案往往效果不尽如意,实际想要解决的彻底的确不是那么容易,希望可以带给各位看官一些思考和帮助。我们再把视角切换到事务消息身上去。
客服同学: @XXX 用户在我们的app上一直获取不到报价,比较急,麻烦看一下怎么回事。
产品同学: 好的,收到,我这边马上找研发同学看一下,@XXX 需要帮忙看一下。
研发同学:好的,我看一下。
…..
30分钟后
研发同学: 修复了,再看一下。
产品同学: 把问题原因同步一下吧。
研发同学:询价过程中,风控命中了特殊报价池,我们这边把特殊报价池的数据存到了数据库,同时发送了MQ消息,结果MQ消息发送成功了,但是数据库存储失败,导致再次询价的时候,查不到数据导致的。
测试同学:那后续如何修改啊。
研发同学: 后续我们会把普通消息改成事务消息,这样就能保证消息发送和数据库存储的一致性了。
……
以上故事纯属于虚构
MQ消息本身就具有解耦性,消息本身并不关注接收方的状态是否符合预期,只要消息成功发送并且被成功接收,在MQ本身看来就是成功,如果想要保证发送方和接受方的状态变更符合预期,就要保证本次事务操作和消息发送的一致性,这里我们就必须要提到事务消息。
所谓事务消息,其实是为了解决上下游写一致性,也即是完成当前操作的同时给下游发送指令,并且保证上下游要么同时成功或者同时失败。
基于MQ的事务消息方案主要依靠MQ的Half消息机制来实现投递消息和参与者自身本地事务的一致性保障。
Half消息:在原有队列消息执行后的逻辑,如果后面的本地逻辑出错,则不发送该消息,如果通过则告知MQ发送。Half消息机制实现原理其实借鉴的2PC的思路,是二阶段提交的广义拓展。
事务消息共有三种状态,提交状态、回滚状态、中间状态:
CommitTransaction: 提交事务,它允许消费者消费此消息。RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。Unknown: 中间状态,它代表需要检查消息队列来确定状态。事务消息的核心类为TransactionListenerImpl,里面提供了两个核心方法,具体的代码如下图:executeLocalTransaction方法:用来执行本地事务,返回本地事务给到broker,同时,将事务状态进行记录:
checkLocalTransaction 方法:用来查询本地事务的执行结果提供给broker
事务消息是由两个消息来实现的,一个是RMQ_SYS_TRANS_HALF_TOPIC消息,作用是用来存储第一阶段的parpare消息,事务消息首先先进入到该主题消息,消息具体是提交还是回滚要根据第二阶段的消息来判断。另一个是RMQ_SYS_TRANS_OP_HALF_TOPIC消息,用来接收第二阶段的Commit或Rollback消息。
特别需要注意的一点,RMQ_SYS_TRANS_HALF_TOPIC消息是用来存储不能被消费者发现的消息,通过RMQ_SYS_TRANS_OP_HALF_TOPIC消息,来对RMQ_SYS_TRANS_HALF_TOPIC消息对应的事务状态来进行确认的,确认commit之后,需要将一阶段中设置的特殊Topic和Queue替换成真正的目标的Topic和Queue,后通过一次普通消息的写入操作来生成一条对用户可见的消息。所以RocketMQ事务消息二阶段其实是利用了一阶段存储的消息的内容,在二阶段时恢复出一条完整的普通消息。
如果在RocketMQ事务消息的二阶段过程中失败了,例如在做Commit操作时,出现网络问题导致Commit失败,那么需要通过一定的策略使这条消息最终被Commit。RocketMQ采用了一种补偿机制,称为“回查”。
Broker端对未确定状态的消息发起回查,将消息发送到对应的Producer端(同一个Group的Producer),由Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback。Broker端通过对比Half消息和Op消息进行事务消息的回查并且推进CheckPoint(记录那些事务消息的状态是确定的)。
需要注意的是,RocketMQ并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,RocketMQ默认回滚该消息。
事务消息无论是开源版本还是转转版本,都是绕不过去的点,因为我们作为业务侧团队在如今遍地都是分布式系统的情况下太需要这样的能力来帮我们兜底了。而作为各位看官,为了能够正确得使用事务消息以及方便排查这里的问题,也是很有必要了解清楚这里的技术实现细节。
黄培祖 转转采货侠后端工程师
朱洪旭 转转采货侠后端工程师
我们是隶属于转转C2B2C循环经济体中的B2B技术团队,我们叫"采货侠-根",致力于打造高标准化高专业度的B2B二手电商SaaS平台
想了解更多转转公司的业务实践,欢迎点击关注下方公众号: