让你看懂的RocketMQ事务消息源码分析(干货)

前言 得益于MQ削峰填谷,系统解耦,操作异步等功能特性,在互联网行业,可以说有分布式服务的地方,MQ都往往不会缺席。由阿里自研的RocketMQ更是经历了多年的双十一高并发挑战,其中4.3.0版本推出了事务消息的新特性,本文对RocketMQ 4.5.0版本事务消息相关的源码跟踪介绍,通过阅读读者可以知道:

  • 事务消息解决什么样的问题
  • 事务消息的实现原理及其设计亮点
解决什么问题 假设我所在的系统现在有这样一个场景:
本地开启数据库事务进行扣款操作,成功后发送MQ消息给库存中心进行发货。
有人会想到开启mybatis事务实现,把本地事务和MQ消息放在一起不就行了吗?如果MQ发送成功,就提交事务,发送失败就回滚事务,整套操作一气呵成。
transaction{ 扣款(); boolean success = 发送MQ(); if(success){ commit(); }else{ rollBack(); } }

看似没什么问题,但是网络是不可靠的。
假设MQ返回过来的响应因为网络原因迟迟没有收到,所以在面对不确定的MQ返回结果只好进行回滚。但是MQ 服务器又确实是收到了这条消息的,只是给客户端的响应丢失了,所以导致的结果就是扣款失败,成功发货。
让你看懂的RocketMQ事务消息源码分析(干货)
文章图片

既然MQ消息的发送不能和本地事务写在一起,那如何来保证其整体具有原子性的需求呢?答案就是今天我们介绍的主角:事务消息。
概览 让你看懂的RocketMQ事务消息源码分析(干货)
文章图片

总体而言RocketMQ事务消息分为两条主线
  1. 定时任务发送流程:发送half message(半消息),执行本地事务,发送事务执行结果
  2. 定时任务回查流程:MQ服务器回查本地事务,发送事务执行结果
因此本文也通过这两条主线对源码进行分析
源码分析 半消息发送流程 本地应用(client)
在本地应用发送事务消息的核心类是TransactionMQProducer,该类通过继承DefaultMQProducer来复用大部分发送消息相关的逻辑,这个类的代码量非常少只有100来行,下面是这个类的sendMessageTransaction方法
@Override public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException { if (null == this.transactionListener) { throw new MQClientException("TransactionListener is null", null); }return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg); }

这个方法做了两件事,
  1. 检查transactionListener是否存在
  2. 调用父类执行事务消息发送
TransactionListener在事务消息流程中起到至关重要的作用,一起看看这个接口
public interface TransactionListener { /** * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction. * * @param msg Half(prepare) message * @param arg Custom business parameter * @return Transaction state */ LocalTransactionState executeLocalTransaction(final Message msg, final Object arg); /** * When no response to prepare(half) message. broker will send check message to check the transaction status, and this * method will be invoked to get local transaction status. * * @param msg Check message * @return Transaction state */ LocalTransactionState checkLocalTransaction(final MessageExt msg); }

接口注释说的很明白,配合上面的概览图来看就是,executeLocalTransaction方法对应的就是执行本地事务操作,checkLocalTransaction对应的就是回查本地事务操作。
下面是DefaultMQProducer类的sendMessageInTransaction方法源码
public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { ... SendResult sendResult = null; MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); ... sendResult = this.send(msg); ... switch (sendResult.getSendStatus()) { case SEND_OK: { ... localTransactionState = transactionListener.executeLocalTransaction(msg, arg); ... break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; }... this.endTransaction(sendResult, localTransactionState, localException); ... }

为了使源码的逻辑更加直观,笔者精简了核心代码。sendMessageInTransaction方法主要做了以下事情
  1. 给消息打上事务消息相关的标记,用于MQ服务端区分普通消息和事务消息
  2. 发送半消息(half message)
  3. 发送成功则由transactionListener执行本地事务
  4. 执行endTransaction方法,如果半消息发送失败或本地事务执行失败告诉服务端是删除半消息,半消息发送成功且本地事务执行成功则告诉服务端生效半消息。
发送半消息流程,Client端代码到这里差不多就结束了,接下来看看RocketMQ Server端是如何处理的
RocketMQ Server
Server在接收到消息过后会进行一些领域对象的转化和是否支持事务消息的权限校验,对理解事务消息用处不大,此处就省略对旁枝末节的介绍了。下面是TransactionalMessageBridge类处理half message的源码
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) { return store.putMessage(parseHalfMessageInner(messageInner)); }private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) { MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId())); msgInner.setSysFlag( MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE)); msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic()); msgInner.setQueueId(0); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); return msgInner; }

这两个方法主要做了以下事情:
public class Message implements Serializable { private static final long serialVersionUID = 8445773977080406428L; private String topic; private int flag; private Map properties; private byte[] body; private String transactionId; }

  1. 将消息的topic,queueId放进消息体自身的map里进行缓存
  2. 将消息的topic 设置为“RMQ_SYS_TRANS_OP_HALF_TOPIC”,queueId设置为0
  3. 将消息写入磁盘持久化
可以看到所有的事务半消息都会被放进同一个topic的同一个queue里面,通过对topic的区分,从而避免了半消息被consumer给消费到
Server将半消息持久化后然后会发送结果给我们本地的应用程序。到了这里Server端对半消息的处理就结束了,紧接着的是定时任务的登场。
定时任务回查流程 RocketMQ Server
定时任务是一个叫TransactionalMessageService类的线程,下面是该类的check方法
@Override public void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener) { ... if (!putBackHalfMsgQueue(msgExt, i)) { continue; } listener.resolveHalfMsg(msgExt); } ... }

check方法非常长,省略的代码大致都是对半消息进行过滤(如超过72小时的事务消息,就被算作过期),只保留符合条件的半消息对其进行回查。
其中很有意思的是putBackHalfMsgQueue方法,因为每次把半消息从磁盘拉到内存里进行处理都会对其属性进行改变(例如TRANSACTION_CHECK_TIMES,这是是否丢弃事务消息的关键信息),所以在发送回查消息之前需要对半消息再次放进磁盘。RocketMQ采取的方法是基于最新的物理偏移量重新写入,而不是对原有的半消息进行修改,其中的目的就是RocketMQ的存储设计采用顺序写,如果去修改消息 ,无法做到高性能。
下面是resolveHalfMsg方法,主要就是开启一个线程然后发送check消息。
public void resolveHalfMsg(final MessageExt msgExt) { executorService.execute(new Runnable() { @Override public void run() { try { sendCheckMessage(msgExt); } catch (Exception e) { LOGGER.error("Send check message error!", e); } } }); }

本地应用(client)
下面是DefaultMQProducerImpl的checkTransactionState方法,是本地应用对回查消息的处理逻辑
@Override public void checkTransactionState(final String addr, final MessageExt msg, final CheckTransactionStateRequestHeader header) { Runnable request = new Runnable() { ... @Override public void run() { ... TransactionListener transactionListener = getCheckListener(); ... localTransactionState = transactionListener.checkLocalTransaction(message); ...this.processTransactionState( localTransactionState, group, exception); }private void processTransactionState( ... DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark, 3000); ... } }; this.checkExecutor.submit(request); }

精简代码逻辑后可以清晰的看到
  • 开启一个线程来执行回查的逻辑
  • 执行transactionListener的checkLocalTransaction方法来获取本地事务执行的结果
RocketMQ Server
RocketMQ 服务器在收到Client发过来的Commit消息后会
读出半消息——>恢复topic等原消息体的信息——>和普通消息一样再次写入磁盘——>删除之前的半消息
如果是Rollback消息则直接删除之前的半消息
到此,整条RocketMQ 事务消息的调用链就结束了
思考 1. 分布式事务等于事务消息吗?
两者并没有关系,事务消息仅仅保证本地事务和MQ消息发送形成整体的原子性,而投递到MQ服务器后,消费者是否能一定消费成功是无法保证的。
2. 源码设计上有什么亮点吗?
通过对整条链路源码的学习理解发现还是有不少亮点的
  • server端回查消息的发送,client端回查消息逻辑的处理,client端commit/rollback消息的提交都是用了异步进行,可以说能异步的地方都用了异步,通过异步+重试的方式保证了在分布式环境中即使短暂的网络状况不良好,也不会影响整体逻辑。
  • 引入TransactionListener,真正做到了开闭原则以及依赖倒置原则,面向接口编程。整体扩展性做得非常好,使用者只需要编写自己的Listener就可以做到事务消息的发送,非常方便
  • TransactionMQProducer通过继承DefaultMQProducer极大地复用了关于发送消息相关的逻辑
3. 源码设计上有什么不足吗?
【让你看懂的RocketMQ事务消息源码分析(干货)】RocketMQ作为一款极其成功的消息中间件,要发现不足不是那么容易了,笔者谈几点看法
  • sendMessageIntransaction等事务相关的方法被划分在了DefaultMQProducer里面,从内聚的角度来说这是跟事务相关的发送消息方法应该被划分在TransactionMQProducer。
  • 所有topic的半消息都会写在topic为RMQ_SYS_TRANS_OP_HALF_TOPIC的半消息队列里,并且每条半消息,在整个链路里会被写多次,如果并发很大且大部分消息都是事务消息的话,可靠性会存在问题。

    推荐阅读