首页 > 其他分享 >rocketMQ中事务发送消息

rocketMQ中事务发送消息

时间:2024-10-14 17:43:42浏览次数:1  
标签:事务 String tags 发送 message public rocketMQ RocketMQLocalTransactionState

rocketMQ中有关事务的发送消息方式,写的一个demo

1、在MyProducer类中的方法,即先定义调用

@Component
public class MyProducer {

    @Autowired
    private RocketMQTemplate template;
public void sendTractionMessage(String topic, String msg) throws InterruptedException {
        String[] tags = new String[]{"TagA","TagB","TagC","TagD","TagE"};   //这里的tag是标签,具体使用的时候,可以自行根据规则赋值,可以是订单编号
        for (int i = 0; i < 10; i++) {     //这个10也是demo时候用的,实际使用可以根据具体需求
            Message<String> message = MessageBuilder.withPayload(msg).build();
            String destination = topic + ":" + tags[i % tags.length];  //这里是取模
            TransactionSendResult transactionSendResult = template.sendMessageInTransaction(destination, message, destination);
            System.out.println("transactionSendResult = " + transactionSendResult);
           
        }
    }

}

2、监听MyTransactionListener这个类作用是对不同tag的处理方式

@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")  //使用rocketMQTemplate
public class MyTransactionListener implements RocketMQLocalTransactionListener {  //需要实现该类


    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object o) {
        String destination = (String) o;
        org.apache.rocketmq.common.message.Message message = RocketMQUtil.convertToRocketMessage(
                new StringMessageConverter(),
                String.valueOf(StandardCharsets.UTF_8),
                destination,
                msg
        );
        String tags = message.getTags();
        if (StringUtils.contains(tags,"TagA")){   //对于不同的tag标签,返回不同的状态,rocketMQ会根据状态来决定两阶段提交的第二阶段决定消息是提交还是放弃
            return RocketMQLocalTransactionState.COMMIT;
        }else if (StringUtils.contains(tags,"TagB")){
            return RocketMQLocalTransactionState.ROLLBACK;
        }else {
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        return null;
    }
}

3、使用测试类测试

    @Test
    void testSendTransactionMessage() throws InterruptedException {
        String topic = "my-boot-topic";
        String message = "hello rocket mq transaction springboot message";
        myProducer.sendTractionMessage(topic,message);
        System.out.println("事务消息发送成功~");
    }

4、消费端消费情况,demo中循环是10,但只有2条被消费者接收处理,就是那TagA的两条数据,因为他们的状态是commit

 5、rocketMQ的事务分两阶段提交,第一阶段是发送broker一个half消息,而只有等本地事务处理完成,才能再告诉broker是commit还是rockback,最终保证原子性

 

以上内容纯属学习使用!

 

标签:事务,String,tags,发送,message,public,rocketMQ,RocketMQLocalTransactionState
From: https://www.cnblogs.com/qwg-/p/18464662

相关文章

  • rocketMQ发送消息demo
    在发送之前,需要先搭建好rocketMQ之后便是创建两个springboot工程,一个是生产中producer生产者、另外一个是consumer消费者一、生产者创建步骤:1、通过idea创建一个springboot工程,在创建工程的时候,添加spring-boot-starter-web依赖即可2、在pom.xml文件中添加rocketMQ相关依赖,注......
  • 分布式事务的原理(经典面试题)
    大家好,我是ZHF,一个工作了5 年的Java程序员在互联网企业的面试中,经常会问到分布式、高并发下的技术问题,其中分布式事务问题就是其中之一。下面我们来看一下,关于分布式事务的解决方案,一般人和高手是如何回答这个问题的!一般人的回答:分布式事务,就是多个事......
  • 如何用ajax发送post请求
    和get类似,稍作改动:<!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><metaname="viewport"content="width=device-width,initial-scale=1.0"><title>AJAXPOST请......
  • 分布式事务之Seata的AT模型
    在Seata的事务管理中有三个重要的角色:TC(TransactionCoordinator)-事务协调者:维护全局和分支事务的状态,协调全局事务提交或回滚。TM(TransactionManager)-事务管理器:定义全局事务的范围、开始全局事务、提交或回滚全局事务。RM(ResourceManager)-资源管理器:管理分......
  • 《深入理解分布式事务与Seata解决方案》
    分布式事务-重要1.数据库的事务1、事务概念事务是一个完整的,不可分割操作单元。整个事务要么全部执行成功,要么全部执行失败。事务具备4个特性:ACIDA:原子性【】C:一致性【数据的一致性:事务开始前和事务结束后,数据总量不变】I:隔离性【事务相互隔离,互不影响】D:持久性【事务一......
  • 什么是分布式事务?使用Seata有哪些解决方案?Seata的AT模式的执行流程
    什么是分布式事务?简单来说:在分布式系统中出现的事务问题,称为分布式事务。为什么会出现分布式事务:一个分布式事务流程图在这个业务中有3个数据库连接,就没有办法做到全局的事务控制。这就是分布式事务问题分布式事务产生的情况有两种:​ 1.业务跨多个服务实现​ 2.业务跨多......
  • MySQL 事务隔离
    事务保证一组操作要么全部成功,要么全部失败。在MySQL中事务是在引擎层实现的。隔离性和隔离级别事务的四大特性(ACID):原子性、一致性、隔离性、持久性事务隔离级别:读未提交、读已提交、可重复读、串行化当数据库有多个事务同时执行的时候,可能会出现丢失更新、脏读、不可重复......
  • Entity Framework Core 中使用仓库和工作单元事务,服务层和控制器
    定义实体首先定义一个实体,例如Product:publicclassProduct{publicintId{get;set;}publicstringName{get;set;}publicdecimalPrice{get;set;}}CopyInsert2.创建DbContext创建一个DbContext类:publicclassAppDbContext:DbContext{public......
  • MySQL 的索引和事务
    MySQL的索引事务1.索引首先知道两个事情索引是一种特殊的文件,包含对数据表所有记录的指针像目录一个表是可以产生多个索引的关键字是index索引的作用:用于快速定位和检索我们都知道mySQL每次的查询都会遍历一遍这个列利用好索引可以快速的定位检索数据......
  • STM32与ESP32串口数据发送以及网页端数据实时显示和远程遥控
    目标:实现网页端速度实时显示以及可以通过点击页面按键达到对小车的位移方位控制。一、ESP32代码首先,需要让ESP32连接到WiFi,这样才能为后续的操作做准备。ssid="xxxxxx"password="xxxxxx"#WIFI连接defwifi_connect():wlan=network.WLAN(network.STA_IF)#STA模式......