一、背景
二、转转版
2.1 基本原理
2.2 如何发送消息
2.3 如何处理未发送消息
三、社区版
3.1 基本原理
3.2 如何发送消息
3.3 如何处理未知状态消息
四、总结
在公司使用RocketMQ的时候发现事务消息不是社区版的,而是自研版本。
这就引发了强烈的好奇,为什么要自己研发一套呢? 和社区版的又有什么不同呢?
带着问题,咨询了公司的架构部苑同学后,得到了答案:
转转的
RocketMQ
版本是3.4.6
并且事务消息第一版提交于2017年12月
,但是社区版直到2018年8月
的4.3.0
版本才开始支持事务消息。
这里查找了一下社区版对于事务消息支持的一些重要版本信息:
RocketMQ
3.0.8 以及之前的版本是 支持分布式事务消息
;
RocketMQ
3.0.8 之后 ,分布式事务
阉割了,不支持分布式事务消息;
RocketMQ
4.0.0 开始 apache
孵化,但是也不支持分布式事务消息;
直到上面说的4.3.0
版本才支持事务消息。
在这种情况下,如果要用到事务消息该怎么处理呢? 相信你也猜到了, 那就是咱们做自己的事务消息。
那么各自主要的实现方式是怎样的呢?
转转版
是将事务消息处理的压力放在了发送方
这一端,需要在发送方
的数据库(MySQL
)中建一张事务消息表。
RocketMQ
由于本身是开源项目无法强制要求使用方来配合,所以将事务消息处理放在了自己的Broker
中,但是Broker
自身无法得知事务的结果,只能设计一个回查的流程
来确认消息的结果
,这就导致了在使用社区版事务消息时无法避免的需要提供回查的能力
。
转转的事务消息利用了数据库(
MySQL
)本地事务的特性,需要在发送端的数据库(MySQL
)中创建一个事务消息表
来和本地事务一起提交或者回滚。
在使用上非常简单,这里列一个简单的使用例子: 仓储商品入库需要向上游同步入库消息
@Transactional(rollbackFor = Exception.class) // 事务中
public void testTransaction() throws Exception {
SingleInboundReq inboundReq= new SingleInboundReq(); // 入库实体
inboundService.singleInbound(inboundReq); // 本地事务操作 提交入库
InventoryOrderMsg inventoryOrderMsg = new InventoryOrderMsg(); // 消息内容
mqProduceComponent.sendCisAddMq(inventoryOrderMsg); //发送事务消息 同步入库信息
}
可以看到,只要你的消息放在本地事务中即可,不用做其他处理。
那么它是如何发送事务消息的呢?
发送中和本地事务一起提交数据,随后交给内存中的一个队列,再由其他线程异步处理。
当然这里的处理线程是可以配置的,到底需要多少个线程由客户端根据自己的业务情况决定。
那么取到msg后怎么处理呢?
msg中会存放数据库主键以及相应的数据库信息, 取到信息后会去数据库查询一次信息内容,经过一些
基础的判断
后发送MQ消息给到Broker
,随后删除本地事务表中的数据。
解释下一下基础判断
的内容:
主要是对消息的内容,重试的次数,重试的时间等做验证,后面的图中会详细说明
在发送消息时可能会出现1个很重要的问题: 发送失败
原因有很多,但结果都是同样的,针对这种问题,又要怎么处理呢?
在发送失败或者未查询到数据后,会将这个msg丢入到另一个队列
timeWheelQueue
,由另一个定时任务去处理。
具体流程如图:
这里采用scheduleAtFixedRate
定时任务每5毫秒
从另一个队列(timeWheelQueue)
来获取重试消息,并且逻辑只是判断过期时间,没过期则重新丢入处理队列(msgQueue)
进行处理。
为什么是5毫秒
一次呢?
同一条消息的
重试次数
和时间间隔
是有限制的,经过这几年的迭代后,目前定的时间间隔(单位毫秒
)是0,5,10,25,50,100,200,300,500,800,1000,1000,1000
,大部分的消息是在前面5
个时间点就能发出,可以看到首次的重试时间就是5毫秒
,所以重试任务的执行时间就定为了5毫秒
一次。
如果因为上线或者其他情况导致服务重启,未发送完的消息该如何处理呢?
启动一个线程定时拉取
10秒
之前的数据,组成消息发送给broker
,随后删除该条数据。
为什么是10秒
之前的数据?
大部分事务
10秒
内都会提交完成,这部分数据会在内存中直接发送掉,为了避免和扫表的任务并发,综合考虑是只扫10秒
之前的数据,当然10秒
是经过几次迭代考量得出来的一个值。
具体流程如图:
当然转转版的事务消息是特定场景下的产物,并不适用于外部。
一些参数上的配置都是在场景里迭代出来的一个值。
那么社区版后面提供的事务消息是怎么做的呢?
社区版事物消息则是利用
2阶段
提交的想法来实现的
第一阶段: 先发送一个half消息,随后执行本地事务
第二阶段: 得到本地事务结果后发送一个op消息
说明一下2个概念:
half消息
本质是指写入
RMQ_SYS_TRANS_HALF_TOPIC
的消息 用来保存第一阶段的事务消息 意思是说我这有一些不知道事务最后是提交还是回滚的消息
op消息
本质上是指写入
RMQ_SYS_TRANS_OP_HALF_TOPIC
的消息 用来保存第二阶段本地事务的状态 意思是说我这知道一些half消息的事务结果
来看下社区版的事务消息是怎么使用的:
社区版的事务消息重点在于setTransactionListener
中,这里面需要实现2个方法。
executeLocalTransaction
用来执行本地事务,返回本地事务结果给到Broker
。
checkLocalTransaction
用来给Broker
查询本地事务结果。
那么是怎么发送事务消息的呢?
在发送一个事务消息时,第一次先把真实消息的topic
和queueId
,tags
等信息放入自己的properties
中,随后设置该消息的topic为RMQ_SYS_TRANS_HALF_TOPIC
发送给Broker
。
Broker
返回成功后执行本地事务也就是executeLocalTransaction
此方法的内容,得到本地事务的结果。
最后执行endTransaction
方法确认该消息的事务结果,当然这次写入的就是op消息
。
为了方便理解,画了一幅简图:第一阶段发的事务消息其实就可以看作是发送一个普通的消息给Broker
去保存下来,只不过是保存到事务消息专用的half消息
中。
第二阶段是我们要了解的重点,得到事务结果后Broker
会怎么处理呢?
源码是在EndTransactionProcessor#processRequest
方法中,这里不展开说源码细节。
说白了就2个点:
commitLog
中,随后组装一个tag
为d
的op消息
写入commitLog
中。commitLog
中写入一个tag
为d
的op消息
。这里的commitLog
是RocketMQ
用来保存消息的文件,这里不展开讨论commitLog
的细节。如图:那么为什么要写op消息
呢?
我们知道
RocketMQ
消息写入后是不会修改和删除的,如果要指明一个事务消息的结果只能通过另外一个topic
来指明,RocketMQ
就是使用了op消息
来指明某条事务消息的最终结果,该op消息
的body
内容会保存对应half消息的位置
。
op消息
是如何表示一条事务消息的状态呢?
op消息
通过一个tag
为d
的标签来表示对应的half消息
是完结状态从而无需再去请求生产者获取该消息的结果,直接忽略不处理即可。
有的half消息
由于各种原因,一直没有得到本地事务的结果,那么RocketMQ
是怎么处理的呢?
在Broker
启动时会开启一个线程每60秒
去核对一次事务消息的结果,源码在BrokerController#start
中。
最后执行会调用TransactionalMessageServiceImpl#check
方法,具体源码这里不细说。
总的来说分以下几个步骤:
half消息op消息
的消费位置;op消息
的32
条消息,判断是否有已经处理过的消息 组成一个removeMap
,键为half消息
的位置,值为op消息
的位置;while(true)
处理,从half
消息的当前位置依次获取消息,如果在removeMap
中则跳过;removeMap
则判断是否已经处理超过5
次BrokerConfig.transactionCheckMax
由此配置控制,或者距今是否超过72
小时由MessageStoreConfig.fileReservedTime
配置控制,如果是则跳过不处理;half消息
没有对应的op
结果 ,则重新发送half消息并且异步去询问生产者结果。整体流程如图:
(小小的吐槽一下,看完源码才明白为什么说阿里规范都是对外的)
通过上面的分析,可以看到在不同场景下会出现不同的技术方案。
各种方案都有其优劣,好与坏不是思考的重点,适用与否或许更加重要。
最后附上整理后的源码流程图:
社区版整体流程图: https://www.processon.com/view/link/6283410c5653bb3b4934e348
转转版整体流程图: https://www.processon.com/view/link/6274d9af5653bb45ea4ab5d6
作者简介
涂晓伟,终身学习践行者, 转转履约中台研发工程师
想了解更多转转公司的业务实践,欢迎点击关注下方公众号: