首页 > 其他分享 >rocket mq实现分布式事务

rocket mq实现分布式事务

时间:2023-05-16 18:47:57浏览次数:28  
标签:事务 String rocket RocketMQLocalTransactionState mq test message transactionId 分布式

使用rocket mq实现分布式事务

发送半消息 -> 执行本地事务 -> 回查本地事务执行状态 -> 第二个服务消费事务消息

 

1.参照下面链接去安装rocketmq

  https://blog.csdn.net/weixin_43464076/article/details/127766159

  rocket mq 启动命令:   start mqnamesrv.cmd   start mqbroker.cmd -n 127.0.0.1:9876 -c ../conf/broker.conf   2.java代码部分   1.添加依赖给maven
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.3</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.tomcat</groupId>
                    <artifactId>annotations-api</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>javax.persistence</groupId>
                    <artifactId>javax.persistence-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

 

  2.添加rocketmq config 到application yml文件

rocketmq:
  name-server: 127.0.0.1:9876
  # 默认的消息组
  producer:
    group: test

 

  3.发送半消息

    @GetMapping("/testTransaction")
    public String testTransaction() {
        String transactionId = UUID.randomUUID().toString();

        JSONObject jsonObject = new JSONObject();
        jsonObject.put("test", "test-value");
        MessageBuilder<String> stringMessageBuilder = MessageBuilder.withPayload(jsonObject.toJSONString());
        stringMessageBuilder.setHeader("order_id", "test-transaction-001");
        stringMessageBuilder.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId);

        Message message = stringMessageBuilder.build();

        this.rocketMQTemplate.sendMessageInTransaction("test-topic", message,"testObj");

        return "success;";

    }

  4. executeLocalTransaction方法执行本地事务,当本地事务执行成功时可以给table  t_transaction_log 添加一条记录, 用于检查本地事务是否成功。

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        logger.info("执行本地事务");
        MessageHeaders headers = message.getHeaders();
        //获取事务ID
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        String testOdr = (String) headers.get("order_id");
        logger.info("transactionId is {}, orderId is {}", transactionId, testOdr);

        try {
            //执行本地事务,并记录日志
            this.logService.doTest(transactionId);
            //执行成功,可以提交事务
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

  

  5. 实现 checkLocalTransaction 完成本地事务是否提交成功的检查。

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        MessageHeaders headers = message.getHeaders();
        //获取事务ID
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        logger.info("检查本地事务,事务ID:{}", transactionId);
        //根据事务id从日志表检索
        Optional<TransactionLog> optionalTransactionLog = this.transactionRepository.findTransactionLogByTransactionId(transactionId);
        if (optionalTransactionLog.isPresent()) {
            return RocketMQLocalTransactionState.COMMIT;
        }
        return RocketMQLocalTransactionState.ROLLBACK;
    }

  

  6. 第二个服务需要实现 RocketMQListener 接口, 完成服务的提交。

@Slf4j
@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "test")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class TestListener implements RocketMQListener<Message> {

    @Autowired
    private MessageService messageService;

    /**
     * 收到消息的业务逻辑
     */
    @Override
    public void onMessage(Message message) {
        log.info("received message: {}", message);
        this.messageService.testTransaction();
        log.info("add money success");
    }
}

 

标签:事务,String,rocket,RocketMQLocalTransactionState,mq,test,message,transactionId,分布式
From: https://www.cnblogs.com/goblinn/p/17406483.html

相关文章

  • 消息中间件-RabbitMQ
    网络协议。基于TCP上面架构更高层次的功能框架。这里主要是异步,中间服务器,多个客户端角色。多对多的情形。发布--订阅模式Mqtt—messagequeueingtelemetrytransport发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)就是消息分了类型,然后指定某个类型接收队列模......
  • RocketMQ 在小米的多场景灾备实践案例
    本文作者:邓志文、王帆01为什么要容灾?在小米内部,我们使用RocketMQ来为各种在线业务提供消息队列服务,比如商城订单、短信通知甚至用来收集IoT设备的上报数据,可以说RocketMQ的可用性就是这些在线服务的生命线。作为软件开发者,我们通常希望服务可以按照理想状态去运行:在没有B......
  • 分布式session的解决方案
    一、背景一个服务要部署在多台服务器上时,用户信息记录在session中。如果用户小白的登录请求打到服务器A上,这时服务器A会记录session。之后用户小白的下单请求打到服务器B上,由于服务器B没有保存用户A的session,所以用户小白需要再次登录后,才能下单。为了解决用户重复登录的问题,提......
  • hdu:这是真正的水题(RMQ)
    ProblemDescription在缺水的地方,水是非常有限的资源,所以人们常常为争夺最大的水源而战。给定一系列水源,用a1,a2,a3,…,an代表水源的大小。给定一组查询,每个查询包含2整数L和R,请找出L和R之间最大的水源。Input输入数据首先给定一个整数T(T≤10)表示测试用例的数量。......
  • ORA-02049:超时:分布式事务处理等待锁 问题解决
    数据库添加DBLink后,很容易出现一个问题:ORA-02049:超时:分布式事务处理等待锁ORA-02063:紧接着line(起自ODS_LINK) 问题原因分析:第一次执行操作后出错,数据库没有提交或回退,未关闭原有数据库窗口,重新打开新窗口执行数据插入操作,报ORA-02049错误。解决办法:数据库登陆管理员账号查看1、......
  • 4、zookeeper的java三种客户端介绍-Curator(crud、事务操作、监听、分布式计数器、分布
    目录Zookeeper系列文章目录一、zookeeper原生JavaAPI二、ZkClient三、Apachecurator1、pom.xml2、定义常量类3、连接实例化4、事务操作示例5、CRUD示例6、监听示例7、计数器示例1)、单机原子自增性实现1、Synchronized示例2、Lock示例3、AtomicInteger示例2)、分布式线程安全原子......
  • 分布式锁
    setnxlua脚本  可重入锁 ......
  • Windows 安装 RabbitMQ
    引用:http://www.ppmy.cn/news/6570.html 下载地址寻找:https://www.rabbitmq.com/download.html  ......
  • 【jmeter】分布式如何设置唯一变量值(二)
    如果您在JMeter分布式测试中需要产生唯一值,可以使用以下两种方法:1、Redis数据存储器:Redis是一个开源、基于内存的、支持多种数据结构的NoSQL数据库。可通过使用Redis数据存储器来生成唯一值。详情请参见Redis数据存储器配置示例。 2、-Counter功能:可以使用-JMeter的计数器功......
  • 【jmeter】分布式如何设置唯一变量值(一)
    在JMeter分布式测试中,每个被测试的目标机器都会启动一个JMeter客户端进行测试。这会导致变量可能不唯一的问题。为了确保变量的唯一性,在执行分布式测试时可以使用以下方法:1、使用__UUID函数在测试计划中使用__UUID函数可以生成唯一的值。例如,在需要使用时间戳作为变量的值时,......