说到RocketMq的事务消息,那么涉及到的一定是分布式的事务,本地的事务也用不到这些中间件。但是说到分布式事务,了解的人一般会想到2PC、3PC、TCC、Seata这些,但是这些分布式事务都是强一致性的事务。其实还有一种解决方案叫做最终一致性也称为消息事务。而RabbitMq事务消息就是消息事务的一种解决方案。

强一致性的事务,就是要求要么都成功,要么都失败。而消息事务则是要求可以先有一个分布式事务成功,而其他的事务则通过消费第一个事务成功后发送到消息队列里面的消息,而达到最终所有的事务都执行成功的目标,由于这些事务是异步分开的,最终结果是一直的,所以也称为最终一致性事务

举个实际的例子来解释一下,咱们就以rocketmq官网的例子来举例

在普通的分布式事务中,在用户发起支付后,由事务管理器发起下面的修改订单事务、物流分配事务、积分变动事务、清空购物车事务这四个分支事务,这四个事务要么都返回了事务执行成功,这个事务最终才会显示执行成功。若是其中有一个事务执行失败,其他的事务都会回滚,并返回这个分布式事务执行失败。这就是强一致性事务。

而在RocketMq事务消息的介入下,事务的关系就会变成下面的这种状态

当用户发起支付订单后,修改订单事务会发送一个事务消息给RocketMq,但是这个消息是一个半事务消息(虽然这个消息已经发送到了,RocketMq的服务端,但是控制面板里面是看不到这条消息,消费者也不会收到这条消息的)。当我们的本地的修改订单事务执行成功后,我们会给RocketMq发送一个本地事务执行成功的通知,然后RocketMq就会把之前的半事务消息,投递给消费者,然后物流事务、积分变动事务、清空购物车事务就会根据RocketMq获取的消息来去执行各自本地的事务,如果本地事务执行失败,只要我们不确认消息成功消费,那么这条消息就一直在队列里面或者进入死信队列,直到我们把消息最终消费掉。这个事务才算完整执行成功。当我们的本地事务执行失败,会给RocketMq通知失败,RocketMq就不会把那条半事务消息通知给消费者。

以上就是rocketmq事务消息的大白话解释。

使用rocketmq事务消息的第一步,当然是创建事务消息主题了

NORMAL类型Topic不支持TRANSACTION类型消息,生产消息会报错。

./bin/mqadmin updatetopic -n localhost:9876 -t TransactionTopic -c DefaultCluster -a +message.type=TRANSACTION
  • -c 集群名称

  • -t Topic名称

  • -n nameserver地址

  • -a 额外属性,本例给主题添加了message.typeTRANSACTION的属性用来支持事务消息

rocketmq-client版本

普通java项目

第一步创建消息接收者。同一主题下的不同消费者分组收到消息是相同的,也就是广播模式。同一消费者分组下的消费者的消息消费模式可以分为集群模式(即一条消息只能又消费者分组下的一个消费者消费)、广播模式(一条消息同一消费者分组下的每个消费者都会收到这条消息)两种

清空购物车消息事务

@Test
public void shoppingCarConsumer() throws InterruptedException, MQClientException, MQBrokerException, RemotingException, UnknownHostException {
  //客户端配置
  ClientConfig clientConfig = new ClientConfig();
  clientConfig.setNamesrvAddr('IP+端口');
  clientConfig.setAccessChannel(AccessChannel.LOCAL);
  clientConfig.setClientIP(InetAddress.getLocalHost().getHostAddress());
  clientConfig.setInstanceName('客户端实例名称');
  clientConfig.setMqClientApiTimeout(30000);
//创建消息消费者
  DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer();
  defaultMQPushConsumer.resetClientConfig(clientConfig );
  defaultMQPushConsumer.setConsumerGroup("ShoppingCarConsumer");
  defaultMQPushConsumer.subscribe("TransactionTopic", "Java");
  //设置消费批量拉取消息的数量
  defaultMQPushConsumer.setConsumeMessageBatchMaxSize(1);
  //设置消费者的最大线程数
  defaultMQPushConsumer.setConsumeThreadMax(1);
  //设置消费者的最小线程数
  defaultMQPushConsumer.setConsumeThreadMin(1);
  // 对于需要慢速拉取的情况(如流控制场景),暂停拉取时间 或者又称消费失败再次消费的间隔时间
  defaultMQPushConsumer.setSuspendCurrentQueueTimeMillis(10000);
  //设置消费模式 集群消费还是广播消费
  defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);  
  //消息监听者
  //顺序消费
  MessageListenerOrderly messageListenerOrderly = new MessageListenerOrderly() {
  @Override
  public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
    //本地事务再此执行
    System.out.println("msgs:"+JSON.toJSONString(msgs));
    System.out.println("messageContent:"+JSON.toJSONString(new String(msgs.get(0).getBody())));
    System.out.println("context:"+JSON.toJSONString(context));
    //本地事务执行成功则返回SUCCESS,不成功则返回SUSPEND_CURRENT_QUEUE_A_MOMENT
    return ConsumeOrderlyStatus.SUCCESS;
   }
  };
  defaultMQPushConsumer.start();
  Thread.sleep(Long.MAX_VALUE);
  normal.shutdown();
}

积分变动事务消费者

@Test
public void integralChangeConsumer() throws InterruptedException, MQClientException, MQBrokerException, RemotingException, UnknownHostException {
        //客户端配置
  ClientConfig clientConfig = new ClientConfig();
  clientConfig.setNamesrvAddr('IP+端口');
  clientConfig.setAccessChannel(AccessChannel.LOCAL);
  clientConfig.setClientIP(InetAddress.getLocalHost().getHostAddress());
  clientConfig.setInstanceName('客户端实例名称');
  clientConfig.setMqClientApiTimeout(30000);
//创建消息消费者
  DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer();
  defaultMQPushConsumer.resetClientConfig(clientConfig );
  defaultMQPushConsumer.setConsumerGroup("IntegralChangeConsumer");
  defaultMQPushConsumer.subscribe("TransactionTopic", "Java");
  //设置消费批量拉取消息的数量
  defaultMQPushConsumer.setConsumeMessageBatchMaxSize(1);
  //设置消费者的最大线程数
  defaultMQPushConsumer.setConsumeThreadMax(1);
  //设置消费者的最小线程数
  defaultMQPushConsumer.setConsumeThreadMin(1);
  // 对于需要慢速拉取的情况(如流控制场景),暂停拉取时间 或者又称消费失败再次消费的间隔时间
  defaultMQPushConsumer.setSuspendCurrentQueueTimeMillis(10000);
  //设置消费模式 集群消费还是广播消费
  defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);  
  //消息监听者
  //顺序消费
  MessageListenerOrderly messageListenerOrderly = new MessageListenerOrderly() {
  @Override
  public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
    //本地事务再此执行
    System.out.println("msgs:"+JSON.toJSONString(msgs));
    System.out.println("messageContent:"+JSON.toJSONString(new String(msgs.get(0).getBody())));
    System.out.println("context:"+JSON.toJSONString(context));
    //本地事务执行成功则返回SUCCESS,不成功则返回SUSPEND_CURRENT_QUEUE_A_MOMENT
    return ConsumeOrderlyStatus.SUCCESS;
   }
  };
  defaultMQPushConsumer.start();
  Thread.sleep(Long.MAX_VALUE);
  normal.shutdown();
 }  

消息生产者

@Test
    public void sendTransactionMsg() throws UnknownHostException, InterruptedException, MQClientException, MQBrokerException, RemotingException {
      //客户端配置
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.setNamesrvAddr('IP+端口');
        clientConfig.setAccessChannel(AccessChannel.LOCAL);
        clientConfig.setClientIP(InetAddress.getLocalHost().getHostAddress());
        clientConfig.setInstanceName('客户端实例名称');
        clientConfig.setMqClientApiTimeout(30000);
         //创建消息生产者
        TransactionMQProducer transactionMQProducer = new TransactionMQProducer("TransactionProducerJava");
        transactionMQProducer.setSendMsgTimeout(30000);
        transactionMQProducer.resetClientConfig(clientConfig);
        transactionProducer.setTransactionListener(new TransactionListener() {

            /**
             * 执行本地事务代码逻辑编写处
             * @param msg Half(prepare) message
             * @param arg Custom business parameter
             * @return
             */
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                System.out.println("本地事务执行方法执行");
                System.out.println("消息参数:"+JSON.toJSONString(msg));
                System.out.println("arg:"+JSON.toJSONString(arg));
                return LocalTransactionState.UNKNOW;
            }

            /**
             * 服务端长时间未接收事务执行成功或失败时,校验事务是否执行成功的方法
             * @param msg Check message
             * @return
             */
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                System.out.println("事务消息校验方法执行");
                System.out.println(JSON.toJSONString(msg));
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        transactionMQProducer.start();
        String topic = "TransactionTopic";
        String tag = "Java";
        String messageContent = "Transaction Message Java Test";
        String key = "msg-14";
        Message message = new Message(topic, tag, key, messageContent.getBytes(StandardCharsets.UTF_8));
        transactionProducer.sendMessageInTransaction(message,null);
        Thread.sleep(Long.MAX_VALUE);
    }

事务消息在执行到最后时,有三种结果,

一、成功(COMMIT_MESSAGE):消息会立即发送给消费者

二、失败(ROLLBACK_MESSAGE):消息不会发送给消费者

三、未知(UNKNOW):本地事务未知异常,则不处理,等待事务消息回查。 checkLocalTransaction方法中校验

Springboot项目示例

和java项目类似先创建两个消费者,不过用@RocketMQMessageListener 注解创建的监听者不能手动进行ack消息确认,不过我们的事务一般没有特殊情况都是遇到异常回滚,这样消息也不会被消费掉所以这样也够用。由于这种方式的消费者创建很简单,我这里就不在进行演示。

我们这里演示一下怎么在Springboot中手动进行消息ACK的相关代码

//积分变动消费者事务
@Slf4j
@Component
public class IntegralChangeConsumerService  {

    /**
            * @date 2024/10/16 17:19
            * @description 同步消息消费成功 手动提交
     */
    @PostConstruct
    public void onSyncMessage() throws MQClientException {
        //要自己创建消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("IntegralChangeConsumer");
        consumer.setMqClientApiTimeout(300000);
        consumer.setNamesrvAddr("192.168.183.129:30885");
        consumer.subscribe("TransactionTopic", "SpringBoot");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                log.warn(JSON.toJSONString(msgs));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}
//清空购物车消息事务
@Slf4j
@Component
public class ShoppingCarConsumerService {
    /**
     * @date 2024/10/16 17:19
     * @description 同步消息消费成功 手动提交
     */
    @PostConstruct
    public void onSyncMessage() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ShoppingCarConsumer");
        consumer.setMqClientApiTimeout(300000);
        consumer.setNamesrvAddr("192.168.183.129:30885");
        consumer.subscribe("TransactionTopic", "SpringBoot");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                log.warn(JSON.toJSONString(msgs));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

下面是生产者的相关代码

配置yaml相关配置

rocketmq:
  name-server: '192.168.183.129:30885'
  access-channel: LOCAL
  api-timeout: 30000
  producer:
    group: "TransactionProducer"
    send-message-timeout: 30000

创建消息事务执行的相关代码

@Slf4j
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {


    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        //执行事务的相关逻辑开始
        // .....
        //执行事务的相关逻辑结束
        log.warn("begin executeLocalTransaction");
        log.info("msg:{}", JSON.toJSONString(message));
        return RocketMQLocalTransactionState.COMMIT;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        log.warn("check executeLocalTransaction");
        return RocketMQLocalTransactionState.COMMIT;
    }
}

创建生产发送消息相关代码

@Slf4j
@SpringBootTest
class RocketMqTestApplicationTests {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Test
    void sendTransactionMsg(){
        Message<String> message = new Message<String>() {

            @Override
            public String getPayload() {
                return "Springboot Send TransactionMsg";
            }

            @Override
            public MessageHeaders getHeaders() {
                return null;
            }
        };
        rocketMQTemplate.sendMessageInTransaction("TransactionTopic:SpringBoot",message,null);

    }
}

由于事务执行的listener是自动注入的,我也没有找到怎么消息生产者中指定事务的执行器,且一个项目只能配置一个RocketMQLocalTransactionListener 配置多个会抛出异常。

rocketmq-client-java版本

普通java项目

Springboot项目示例