事务消息

原理

分布式事务chevron-right

example

使用rocketmq发送事务消息的例子。

例子
public static void main(String[] args) throws MQClientException, InterruptedException {
    TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
    TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
    producer.setCheckThreadPoolMinSize(2);
    producer.setCheckThreadPoolMaxSize(2);
    producer.setCheckRequestHoldMax(2000);
    //消息回查,检查本地事务执行状态
    producer.setTransactionCheckListener(transactionCheckListener);
    producer.start();

    String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
    //本地事务执行器
    TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
    for (int i = 0; i < 100; i++) {
        try {
            Message msg =
                new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            //发送
            SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
            System.out.printf("%s%n", sendResult);

            Thread.sleep(10);
        } catch (MQClientException | UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    for (int i = 0; i < 100000; i++) {
        Thread.sleep(1000);
    }
    producer.shutdown();
}

源码

Producer

Broker

接收事务完结消息

在Broker启动时,会注册EndTransactionProcessor处理器,该处理器是专门用来接收事务完结消息的。

事务消息会查

4.2.0版本未开源此部分代码。

Last updated