首页 > 其他分享 >RocketMQ笔记(十):事务消息

RocketMQ笔记(十):事务消息

时间:2023-05-05 09:22:57浏览次数:43  
标签:事务 RMQConstant tags 笔记 消息 本地 import RocketMQ

  事务消息官网:RocketMQ官网 - 事务消息

一、什么是事务消息

  事务消息是 RocketMQ 提供的一种消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。

二、事务消息的原理

2.1、事务消息的生命周期

2.1.1、初始化

  半事务消息被生产者构建并完成初始化,待发送到服务端的状态。

2.1.2、事务待提交

  半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。

2.1.3、消息回滚

  第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。

2.1.4、提交待消费

  第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费。

2.1.5、消费中

  消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,RocketMQ会对消息进行重试处理。

2.1.6、消费提交

  消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。

  RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。

2.1.7、消息删除

  RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

2.2、事务消息的处理流程

  事务消息的处理涉及二阶段,一阶段prepare包含步骤1、步骤2,二阶段commit包含步骤3、步骤4。

  检查本地事务操作是对获取本地事务最终状态的补充。

摘自RocketMQ官网的流程图:

  

1、生产者投递消息

  生产者发送half消息至RocketMQ的服务端。

2、RocketMQ持久化消息并通知生产者

  RocketMQ服务端将消息持久化后,向生产者返回ACK确认消息表示消息已经发送成功,此时消息会持久化在RocketMQ的RMQ_SYS_TRANS_HALF_TOPIC中,消费者暂不能消费,这种状态下的消息称为 半事务消息。

3、生产者段执行本地事务

  生产者段执行本地的业务逻辑处理。

4、半事务消息的处理

  根据本地事务执行结果完成第二阶段的提交。

4.1、本地事务执行成功,确认结果为Commit

  RocketMQ服务端将办消息事务标记为可消费,将RMQ_SYS_TRANS_HALF_TOPIC中的半事务消息存储到消费者订阅的Topic中,并投递给消费者。

4.2、本地事务执行失败,确认结果为Rollback

  RocketMQ回滚事务,将RMQ_SYS_TRANS_HALF_TOPIC中的半事务消息删除,不会投递给消费者消费。

5、RocketMQ服务端消息回查

  若服务端未收到生产者提交的二次确认结果或者收到的确认结果为未知状态(Unknow),服务端会定期对消息生产者发起回查。

6、生产者接收回查,检查本地事务

  生产者接收到消息回查,检查本地事务执行的最终结果。

7、提交回查事务状态

  生产者根据检查到的本地事务的最终状态完成第二阶段的提交,执行结果同步骤4半事务消息的处理。

四、事务消息的demo

1、事务消息常量

  常量类中定义事务消息的生产者组、NAMESRV地址、事务消息Topic、事务消息标签内容

 1 package com.snails.rmq.common;
 2 
 3 // RMQ常量类
 4 public class RMQConstant {
 5 
 6     // 生产者组
 7     public static final String TRX_GROUP = "trx-group";
 8 
 9     // NameSrv地址,修改成自己的rocket服务
10     public static final String NAEMSRV_ADDR = "192.168.33.55:9876";
11 
12     // 测试Topic
13     public static final String TRX_TOPIC = "TrxTopic";
14 
15     // 消息标签
16     public static final String MSG_TAG_A = "TagA";
17     public static final String MSG_TAG_B = "TagB";
18     public static final String MSG_TAG_C = "TagC";
19     public static final String MSG_TAG_D = "TagD";
20     public static final String MSG_TAG_E = "TagE";
21 
22 }

2、本地事务执行器、回查器实现

  本地事务执行逻辑及本地事务回查逻辑:用于获取本地事务执行结果并完成两阶段提交;若本地事务结果状态未知,RocketMQ服务端回查获取最终的本地事务结果。

 1 package com.snails.rmq.msgtype.trx.sdk;
 2 
 3 import com.snails.rmq.common.RMQConstant;
 4 import org.apache.commons.lang3.StringUtils;
 5 import org.apache.rocketmq.client.producer.LocalTransactionState;
 6 import org.apache.rocketmq.client.producer.TransactionListener;
 7 import org.apache.rocketmq.common.message.Message;
 8 import org.apache.rocketmq.common.message.MessageExt;
 9 
10 /**
11  * 事务消息共有三种状态,提交状态、回滚状态、中间状态:
12  *
13  * TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
14  * TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
15  * TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
16  *
17  * @Description: 生产者端,本地事务执行器、回查器
18  */
19 public class TrxListenerImpl implements TransactionListener {
20 
21     /**
22      * 执行本地事务
23      * @param message
24      * @param o
25      * @return
26      */
27     @Override
28     public LocalTransactionState executeLocalTransaction(Message message, Object o) {
29         System.out.printf("executeLocalTransaction -> 开始执行本地事务");
30 
31         String tags = message.getTags();
32         String keys = message.getKeys();
33         String currentThread = Thread.currentThread().getName();
34         // TAGA标签消息  提交,允许消费者消费此消息
35         if (RMQConstant.MSG_TAG_A.equals(tags)) {
36             System.out.printf("%s tags = %s,keys = %s,提交本地事务 %n", currentThread, tags, keys);
37             return LocalTransactionState.COMMIT_MESSAGE;
38 
39         // TAGB标签消息  回滚,消息删除,不允许被消费
40         }else if(RMQConstant.MSG_TAG_B.equals(tags)) {
41             System.out.printf("%s tags = %s,keys = %s,回滚本地事务 %n", currentThread, tags, keys);
42             return LocalTransactionState.ROLLBACK_MESSAGE;
43 
44         // TAGC、TAGD、TAGE 标签消息
45         }else {
46             // 等待MQ服务端定时发起消息状态回查,超过一定重试次数或者超时,消息会被丢弃
47             System.out.printf("%s tags = %s,keys = %s,回查本地事务 %n", currentThread, tags, keys);
48             return LocalTransactionState.UNKNOW;
49         }
50     }
51 
52     /**
53      * 检查本地事务状态,并回应消息队列的检查请求
54      *
55      * @param messageExt
56      * @return
57      */
58     @Override
59     public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
60         System.out.println("checkLocalTransaction -> 开始回查本地事务");
61 
62         // 消息tag
63         String tags = messageExt.getTags();
64         // 消息key
65         String keys = messageExt.getKeys();
66         String currentThread = Thread.currentThread().getName();
67         // TagC commit操作
68         if (StringUtils.equals(tags, "TagC")) {
69             System.out.printf("checkLocalTransaction ->%s tags = %s,keys = %s,提交本地事务 %n", currentThread, tags, keys);
70             return LocalTransactionState.COMMIT_MESSAGE;
71         // TagD rollback操作
72         } else if (StringUtils.equals(tags, "TagD")) {
73             System.out.printf("checkLocalTransaction -> tags = %s,keys = %s,回滚 %n", currentThread, tags, keys);
74             return LocalTransactionState.ROLLBACK_MESSAGE;
75         // TagE 回查操作
76         } else {
77             System.out.printf("checkLocalTransaction -> tags = %s,keys = %s,回查 %n", currentThread, tags, keys);
78             return LocalTransactionState.UNKNOW;
79         }
80     }
81 
82 }

3、事务消息生产者

  事务消息生产者,用于半事务消息的投递及本地事务执行器、回查器的监听。

 1 import com.snails.rmq.common.RMQConstant;
 2 import org.apache.rocketmq.client.producer.SendResult;
 3 import org.apache.rocketmq.client.producer.TransactionMQProducer;
 4 import org.apache.rocketmq.common.message.Message;
 5 import java.nio.charset.StandardCharsets;
 6 import java.util.concurrent.*;
 7 
 8 /**
 9  *
10  * 使用 TransactionMQProducer类创建生产者,并指定唯一的 ProducerGroup
11  * 执行本地事务后、根据执行结果对消息队列进行回复
12  *
13  * @Description: 事务消息生产者
14  */
15 public class TrxProducer {
16 
17     public static void main(String[] args) throws Exception {
18         // 创建
19         TrxListenerImpl trxListener = new TrxListenerImpl();
20         // 创建事务消息生产者prodcer,并指定生产者组
21         TransactionMQProducer producer = new TransactionMQProducer(RMQConstant.TRX_GROUP);
22         producer.setNamesrvAddr(RMQConstant.NAEMSRV_ADDR);
23         // 创建线程池对事务消息做检查
24         ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
25             @Override
26             public Thread newThread(Runnable r) {
27                 Thread thread = new Thread(r);
28                 thread.setName("client-transaction-msg-check-thread");
29                 return thread;
30             }
31         });
32         // 设置线程池
33         producer.setExecutorService(executorService);
34         // 设置事务状态监听器
35         producer.setTransactionListener(trxListener);
36         // 启动生产者实例
37         producer.start();
38 
39         // 发送消息,指定消息标签
40         String[] tags = new String[] {RMQConstant.MSG_TAG_A, RMQConstant.MSG_TAG_B, RMQConstant.MSG_TAG_C, RMQConstant.MSG_TAG_D, RMQConstant.MSG_TAG_E};
41         for (int i = 0; i < 15; i++) {
42             Message msg = new Message(RMQConstant.TRX_TOPIC, tags[i % tags.length], "KEY" + i,
43                     ("我是事务消息 " + i).getBytes(StandardCharsets.UTF_8));
44             SendResult sendResult = producer.sendMessageInTransaction(msg, null);
45             System.out.printf("%s%n", sendResult);
46             Thread.sleep(10);
47         }
48         for (int i = 0; i < 100000; i++) {
49             Thread.sleep(1000);
50         }
51         // 关闭事务生产者实例
52         producer.shutdown();
53     }
54 }

4、事务消息消费者

  事务消息消费者,只消费在生产者端本地事务执行成功,第二阶段的确认结果为Commit的消息。

 1 import com.snails.rmq.common.RMQConstant;
 2 import com.snails.rmq.utils.ClientUtils;
 3 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 4 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 5 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 6 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 7 import org.apache.rocketmq.common.message.MessageExt;
 8 import java.nio.charset.StandardCharsets;
 9 import java.util.List;
10 
11 /**
12  * @Description: 事务消息消费者
13  */
14 public class TrxConsumer {
15 
16     public static void main(String[] args) {
17         // 创建消费者实例 consumer 并订阅topic、执行消息标签
18         String subExpression =  RMQConstant.MSG_TAG_A.concat(" || ").concat(RMQConstant.MSG_TAG_B).concat(" || ").concat(RMQConstant.MSG_TAG_C)
19                 .concat(" || ").concat(RMQConstant.MSG_TAG_D).concat(" || ").concat(RMQConstant.MSG_TAG_E);
20         DefaultMQPushConsumer consumer = ClientUtils.gainConsumerInstance(RMQConstant.TRX_GROUP,
21                 RMQConstant.NAEMSRV_ADDR, RMQConstant.TRX_TOPIC, subExpression);
22         // 消费事务消息
23         consumer.registerMessageListener(new MessageListenerConcurrently() {
24             @Override
25             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
26                 for (MessageExt messageExt : list) {
27                     System.out.println("接收消息:msgId = " + messageExt.getMsgId()
28                             + ",详情:" + new String(messageExt.getBody(), StandardCharsets.UTF_8));
29                 }
30                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
31             }
32         });
33         // 启动消费者
34         ClientUtils.startupConsumer(consumer);
35     }
36 }

五、总结

  RocketMQ的事务消息,保证了本地事务与事务消息发送的一致性,本地事务执行成功,事务消息方可投递到消费者端处理。

可采用 定时任务 + MQ 实现:

  在本地事务执行成功之后,插入消息表中,异步任务定时扫描消息表,发送消息到RocketMQ。

其他分布式事务:

  分布式事务、分布式事务解决方案Seata。

 

标签:事务,RMQConstant,tags,笔记,消息,本地,import,RocketMQ
From: https://www.cnblogs.com/RunningSnails/p/17373107.html

相关文章

  • RocketMQ笔记(八):顺序消息
    一、什么是顺序消息消息有序指的是可以按照消息的发送顺序来消费(FIFO)。顺序消息是RocketMQ提供的一种消息类型,支持消费者按照发送消息的先后顺序获取消息。顺序消息在发送、存储和投递的处理过程中,强调多条消息间的先后顺序关系。RocketMQ顺序消息的顺序关系通过消......
  • RocketMQ笔记(九):延时/定时消息
    一、什么是延时/定时消息定时/延时消息为RocketMQ中提供的一种消息类型。定时消息和延时消息本质相同,都是服务端根据消息设置的定时时间在某一固定时刻将消息投递给消费者消费。Producer将消息发送到消息队列RocketMQ服务端,但并不期望这条消息立马投递(被消费者消费),......
  • 工厂模式笔记
    参考教程主要参考了抽象工厂模式和工厂模式-简单工厂、工厂方法、抽象工厂解析代码部分要生产的产品packagefun.seolas.factory.simple;publicclassProduct{}/***形状产品*/interfaceShape{voiddraw();}classCircleimplementsShape{@Ov......
  • Django笔记三十五之admin后台界面介绍
    本文首发于公众号:Hunter后端原文链接:Django笔记三十五之admin后台界面介绍这一篇介绍一下Django的后台界面使用。Django自带了一套后台管理界面,可用于我们直接操作数据库数据,本篇笔记目录如下:创建后台账号以及登录操作注册后台显示的数据表列表字段的显示操作字段值......
  • 【动手学深度学习】第十二章笔记:异步计算、数据并行
    为了更好的阅读体验,请点击这里12.1编译器和解释器原书主要关注的是命令式编程(imperativeprogramming)。Python是一种解释性语言,因此没有编译器给代码优化,代码会跑得很慢。12.1.1符号式编程考虑另一种选择符号式编程(symbolicprogramming),即代码通常只在完全定义了过程之后才......
  • 【C++学习笔记】类的长度
    //空类长度是1由于可以初始化,所以必须有一个长度1class空类{}//一个函数长度是1其实函数不占长度,多个函数,长度还是为1,为了初始化,必须有一个长度。class一个函数{voidTest();}//一个虚函数类由于有一个虚函数表,所以必须长度为4,多个虚函数,也是4class一个虚函数类......
  • NTT笔记
    NTT笔记前言:这个算法是与FFT类似的,本片不会再从头讲起,建议先去补补课《FFT笔记》。本文只会讲一下互相关联的地方与一些不同的地方。建议:在电脑前放好演算纸和笔。注:本篇文章是我这个小蒟弱写的,真正的dalao请看个玩笑便好,不必争论对错(但是欢迎指出文章存在的小错误)。NT......
  • 「学习笔记」可持久化线段树
    可持久化数据结构(Persistentdatastructure)总是可以保留每一个历史版本,并且支持操作的不可变特性(immutable)。主席树全称是可持久化权值线段树,给定\(n\)个整数构成的序列\(a\),将对于指定的闭区间\(\left[l,r\right]\)查询其区间内的第\(k\)小值。可持久化线段......
  • STL源码分析读书笔记
    主要是关于标准库容器的整理空间配置器主要看SGI的实现,有两个空间配置器_malloc_alloc_template<0>__default_alloc_template<...>用户可以选择单独使用第一个分配器,或者一起使用两个分配器。当用户选择使用两个分配器时,编译器会分别将上述两个分配器typedef成malloc_a......
  • 23.4.24前学习笔记
    可通过document.documentElement.scrollTop=0控制返回页面顶部 scrollTo方法 window.scrollTo(x,y)//控制页面移动到哪  页面尺寸事件 window.addEventListener('resize',function(){    //改变屏幕尺寸时发生变化,可代替媒体查询    letw=documen......