首页 > 其他分享 >RocketMQ 与 Spring Cloud Stream之事务消息配置

RocketMQ 与 Spring Cloud Stream之事务消息配置

时间:2024-08-30 17:21:48浏览次数:20  
标签:stream Stream spring rocketmq testchannel RocketMQ Spring cloud 消息

1 引言

RocketMQ的事务消息设计是为了解决分布式系统中数据一致性的问题。在分布式系统中,由于数据可能分布在不同的服务或节点上,因此需要一种机制来确保数据的最终一致性。事务消息通过引入本地事务和消息状态的关联,确保了消息的发送与本地事务的执行结果紧密相关,从而避免了数据不一致的问题。

2 事务消息步骤

  1. 生产者将半事务消息发送至 RocketMQ Broker。
  2. RocketMQ Broker 将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息暂不能投递,为半事务消息。
  3. 生产者开始执行本地事务逻辑。
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
    4.1 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
    4.2 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
    5.1 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
    5.2 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。
  6. 注意:服务端仅仅会按照参数尝试指定次数,超过次数后事务会强制回滚,因此未决事务的回查时效性非常关键,需要按照业务的实际风险来设置。

3 项目结构

本章内容以Spring Cloud Alibaba 快速学习之 RocketMQ中的项目为基础稍作修改。下面列出修改的文件。

3.1 项目rocketmq-producer

在这里插入图片描述

  • application.properties
    这里添加了事务相关的配置
 #spring应用程序监听的端口号
server.port=8080
#spring应用程序的名称
spring.application.name=rocketmq-producer
#spring当前激活的配置文件
spring.profiles.active=dev

#rocketmq 服务地址
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
#rocketmq 通道分组
spring.cloud.stream.bindings.testchannel-out-0.producer.group=test-group
#rocketmq 通道目标
spring.cloud.stream.bindings.testchannel-out-0.destination=test-destination
#rocketmq 通道消息类型
spring.cloud.stream.bindings.testchannel-out-0.content-type=application/json

#rocketmq 事务消息配置
spring.cloud.stream.rocketmq.bindings.testchannel-out-0.producer.producerType=Trans
#rocketmq 事务消息分组
spring.cloud.stream.rocketmq.bindings.testchannel-out-0.producer.group=test-group
#rocketmq 事务消息监听
spring.cloud.stream.rocketmq.bindings.testchannel-out-0.producer.transactionListener=RocketMQTransactionListener
  • rocketmq-producer/src/main/java/org/example/controller/TestController.java
 package org.example.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;


@RestController
@RequestMapping("/test")
public class TestController {

    @Autowired
    private StreamBridge streamBridge;

    @GetMapping("/send")
    public String send() {
        for (int i = 0; i < 5; i++) {
            Map<String, String> map = new HashMap<>();
            String id = i + "";
            map.put("id", id);
            map.put("msg", "测试消息");
            MessageBuilder<Map<String, String>> builder = MessageBuilder.withPayload(map);
            streamBridge.send("testchannel-out-0", builder.build());
        }
        return "消息发送成功!";
    }


}
  • rocketmq-producer/src/main/java/org/example/conf/RocketMQTransactionListener.java
    这里添加了事务监听器,注意@Component名称与配置文件中对应
package org.example.conf;

import com.alibaba.fastjson2.JSONObject;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

@Component("RocketMQTransactionListener")
public class RocketMQTransactionListener implements TransactionListener {
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        String msg = new String(message.getBody());
        System.out.println("execute:" + msg);
        JSONObject jsonObject = JSONObject.parseObject(msg);
        if (jsonObject.getIntValue("id") == 0) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        if (jsonObject.getIntValue("id") == 1) {
            return LocalTransactionState.UNKNOW;
        }
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        System.out.println("check:" + new String(messageExt.getBody()));
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

3.2 项目rocketmq-consumer-b

在这里插入图片描述

  • application.properties
    这里注释了接收广播消息
#spring应用程序监听的端口号
server.port=8082
#spring应用程序的名称
spring.application.name=rocketmq-consumer-b
#spring当前激活的配置文件
spring.profiles.active=dev

#rocketmq 服务地址
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
#rocketmq 接受消息的方法名,须保持与通道名一致
spring.cloud.stream.function.definition=testchannel
# 通道接收广播消息
#spring.cloud.stream.rocketmq.bindings.testchannel-in-0.consumer.messageModel=BROADCASTING

#rocketmq 通道分组
spring.cloud.stream.bindings.testchannel-in-0.group=test-group
#rocketmq 通道目标
spring.cloud.stream.bindings.testchannel-in-0.destination=test-destination
#rocketmq 通道消息类型
spring.cloud.stream.bindings.testchannel-in-0.content-type=application/json

3.3 项目rocketmq-consumer-a

在这里插入图片描述

  • application.properties
    这里也是注释了接收广播消息
#spring应用程序监听的端口号
server.port=8081
#spring应用程序的名称
spring.application.name=rocketmq-consumer-b
#spring当前激活的配置文件
spring.profiles.active=dev

#rocketmq 服务地址
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
#rocketmq 接受消息的方法名,须保持与通道名一致
spring.cloud.stream.function.definition=testchannel
# 通道接收广播消息
#spring.cloud.stream.rocketmq.bindings.testchannel-in-0.consumer.messageModel=BROADCASTING

#rocketmq 通道分组
spring.cloud.stream.bindings.testchannel-in-0.group=test-group
#rocketmq 通道目标
spring.cloud.stream.bindings.testchannel-in-0.destination=test-destination
#rocketmq 通道消息类型
spring.cloud.stream.bindings.testchannel-in-0.content-type=application/json

4 测试

4.1 同时启动三个子项目

在这里插入图片描述

4.2 发送消息

  • 打开浏览器访问:http://localhost:8080/test/send,可以看到5条消息都进入了executeLocalTransaction方法。

在这里插入图片描述 - 其中消息id为0二次确认结果为Commit,被consumerA正常接收,这与监听器中代码功能一致。
在这里插入图片描述- 其中消息id为1二次确认结果为Unknown,触发回查,在回查中正常Commit,被consumerB正常接收,这与监听器中代码功能一致。
在这里插入图片描述在这里插入图片描述- 其他消息二次确认结果为Rollback,服务端将回滚事务,不会将半事务消息投递给消费者,这与监听器中代码功能一致。

5 完整代码

Gitee代码链接

标签:stream,Stream,spring,rocketmq,testchannel,RocketMQ,Spring,cloud,消息
From: https://blog.csdn.net/qq_40718345/article/details/141531718

相关文章

  • Spring Security基于token的极简示例
    1引言SpringSecurity是一个能够为基于Spring的企业应用系统提供声明式的安全访问控制解决方案的安全框架,但是用起来有点复杂,为了便于学习理解,下面将用最简洁的配置和示例,展示整个流程。2代码创建一个spring-security-demo的项目,总共包含5个文件2.1pom.xml引入spri......
  • 二. Spring Boot 中的 “依赖管理和自动配置” 详解透彻到底(附+详细代码流程)sh
    二.SpringBoot中的“依赖管理和自动配置”详解透彻到底(附+详细代码流程)@目录*二.SpringBoot中的“依赖管理和自动配置”详解透彻到底(附+详细代码流程)1.如何理解“约定优于配置”2.SpringBoot依赖管理和自动配置2.1SpringBoot的依赖管理2.1.1什么......
  • Spring整合mybatis源码剖析
    Spring整合mybatis源码剖析整合原理图:@MapperScan底层原理剖析主要作用:会将MapperScannerConfigurer注册到spring容器中。@Retention(RetentionPolicy.RUNTIME)@Target(ElementType.TYPE)@Documented@Import(MapperScannerRegistrar.class)//点进去@Repeatable(Mapper......
  • Spring Task
    使用方式启动类importorg.springframework.boot.SpringApplication;importorg.springframework.boot.autoconfigure.SpringBootApplication;importorg.springframework.scheduling.annotation.EnableScheduling;@SpringBootApplication@EnableSchedulingpublicclas......
  • springboot 接口接收参数的注解介绍(@RequestParam,@PathVariable,@RequestBody 等)
    springboot接收参数的注解介绍(使用方法)在SpringBoot中,接收参数的方式主要依赖于SpringMVC提供的注解。这些注解帮助你将HTTP请求中的参数绑定到控制器(Controller)方法的参数上。以下是一些常用的接收参数的注解:1.@RequestParam用法:用于将HTTP请求参数绑定到控制器的方......
  • springboot+vue安心养老一站通服务系统的设计与实现【程序+论文+开题】计算机毕业设计
    系统程序文件列表开题报告内容研究背景随着社会老龄化的加速,养老问题已成为全球性的社会挑战。传统的养老模式面临着资源分配不均、服务效率低下、信息孤岛等诸多问题,难以满足老年人日益增长的多元化、个性化需求。在此背景下,安心养老一站通服务系统的设计与实现显得尤为重......
  • springboot+vue爱心捐赠系统【程序+论文+开题】计算机毕业设计
    系统程序文件列表开题报告内容研究背景在当今社会,随着经济的快速发展与信息技术的日益普及,公益慈善事业逐渐成为连接社会爱心与需要帮助群体的重要桥梁。然而,传统的捐赠方式往往存在信息不对称、流程繁琐、透明度不足等问题,限制了公益资源的有效配置与利用。因此,构建一个高......
  • springboot+vue爱心慈善公益系统【程序+论文+开题】计算机毕业设计
    系统程序文件列表开题报告内容研究背景在当今社会,随着经济的快速发展与人民生活水平的日益提高,社会各界对于慈善公益事业的关注度与参与度也显著提升。然而,传统的慈善捐赠方式往往存在信息不对称、流程繁琐、透明度不足等问题,限制了公益资源的高效配置与利用。特别是在互联......
  • 深入解析 Spring Boot 中 MyBatis 自动配置的流程
    在SpringBoot项目中,自动配置是一个非常强大的功能,可以极大简化配置工作。本文将通过MyBatis的自动配置为例,详细解析SpringBoot自动配置的整个流程,包括从META-INF/spring.factories文件到mysql-connector-java依赖的引入,帮助大家更好地理解SpringBoot的自动配......
  • SpringBoot原理
    目录一、配置优先级二、Bean管理1.获取Bean(1)三种方式(2)获取示例 2.Bean作用域 3.第三方Bean 三、SpringBoot原理1.起步依赖2.自动配置(1)概述(2)准备工作 (3)@Import (4)模拟过程3.自动配置原理(1)源码跟踪 (2)@Conditional (3)自定义start业务场景需求 步骤具......