首页 > 其他分享 >rocketmq--同步、异步、批量、事务消息demo

rocketmq--同步、异步、批量、事务消息demo

时间:2024-01-23 20:12:28浏览次数:18  
标签:-- demo springframework import apache org public rocketmq

在Spring Boot中使用RocketMQ进行同步和异步消息传输的关键是使用RocketMQTemplate类。下面是两个例子,分别演示了如何实现同步和异步消息传输。

首先,确保你已经添加了RocketMQ的依赖到你的pom.xml中,如下所示:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>

然后在application.yml中配置RocketMQ的相关属性:

rocketmq:
  name-server: 127.0.0.1:9876  # 替换为你的RocketMQ NameServer地址
  producer:
    group: my-producer-group

同步消息传输例子

在这个例子中,我们将创建一个服务来发送同步消息。

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class SyncProducerService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public SendResult send(String topic, String message) {
        // 同步发送消息
        return rocketMQTemplate.syncSend(topic, message);
    }
}

异步消息传输例子

在这个例子中,我们将创建一个服务来发送异步消息。

import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class AsyncProducerService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void send(String topic, String message) {
        // 异步发送消息
        rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                // 消息发送成功的回调
                System.out.printf("Async send message success. MsgId: %s%n", sendResult.getMsgId());
            }

            @Override
            public void onException(Throwable e) {
                // 消息发送失败的回调
                System.out.printf("Async send message failed. Exception: %s%n", e);
            }
        });
    }
}

批量消息传输例子

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.stream.Collectors;

@Service
public class BatchProducerService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public SendResult sendBatch(String topic, List<String> messages) {
        // 创建批量消息
        List<org.springframework.messaging.Message> messageList = messages.stream()
                .map(msg -> MessageBuilder.withPayload(msg).build())
                .collect(Collectors.toList());

        // 批量发送消息
        return rocketMQTemplate.syncSend(topic, messageList);
    }
}

事务消息传输例子

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;

@RocketMQTransactionListener(txProducerGroup = "my-tx-producer-group")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行本地事务
        try {
            // 这里写本地事务的业务逻辑
            System.out.println("执行本地事务");
            // 如果本地事务执行成功,返回COMMIT
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            // 如果本地事务执行失败,返回ROLLBACK
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        // 检查本地事务状态
        System.out.println("检查本地事务状态");
        // 根据本地事务状态返回相应的状态
        // RocketMQLocalTransactionState.COMMIT
        // RocketMQLocalTransactionState.ROLLBACK
        // RocketMQLocalTransactionState.UNKNOWN
        return RocketMQLocalTransactionState.COMMIT;
    }
}

然后创建一个服务来发送事务消息:

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class TransactionProducerService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void sendMessageInTransaction(String topic, String message) {
        // 发送事务消息
        rocketMQTemplate.sendMessageInTransaction("my-tx-producer-group", topic,
                MessageBuilder.withPayload(message).build(), null);
    }
}

在主类Application.java中,你可以创建一个命令行运行器来测试这两个服务。

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;

@SpringBootApplication
public class RocketMQDemoApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(RocketMQDemoApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(RocketMQDemoApplication.class, args);
        SyncProducerService syncProducerService = context.getBean(SyncProducerService.class);
        AsyncProducerService asyncProducerService = context.getBean(AsyncProducerService.class);

        // 发送同步消息
        SendResult sendResult = syncProducerService.send("test-topic-sync", "Hello, RocketMQ Sync!");
        System.out.println("Sync message sent. MsgId: " + sendResult.getMsgId());

        // 发送异步消息
        asyncProducerService.send("test-topic-async", "Hello, RocketMQ Async!");
    }
}

在这个例子中,我们创建了两个服务,分别用于同步和异步发送消息。在同步发送的例子中,syncSend方法会等待消息发送成功或失败,并返回一个SendResult对象。在异步发送的例子中,asyncSend方法会立即返回,并提供一个SendCallback回调来处理消息发送成功或失败的情况。

请注意,为了运行这些例子,你需要确保RocketMQ服务器正在运行,并且application.yml中配置的name-server地址是正确的。此外,你可能需要根据实际情况调整Topic和消费者组名。

标签:--,demo,springframework,import,apache,org,public,rocketmq
From: https://www.cnblogs.com/xylfjk/p/17983319

相关文章

  • 结构体对齐:
    结构体对齐:公式1:前面的地址必须是后面的地址正数倍,不是就补齐公式2:整个Struct的地址必须是最大字节的整数倍练习:StructE1{inta;charb;charc}e1;第一地址肯定存放a是4Byte地址,第二地址,b要1Byte的地址,来欢迎公式一登场:4==1*N(N等于正整数)答"是"!地址现在为......
  • 主席树(可持久化线段树)
    主席树前言主席树也是一种数据结构,是线段树的进阶,作用是可以保留历史版本,支持回退,就是回到之前某个未修改的状态。就像我们在写博客时如果误删了重要的内容,就可以用Ctrl+z撤回一次操作,也可以用Ctrl+Shift+z还原我们撤回的操作,这就需要存下每次编辑的操作。基本原理可......
  • SpringSecurity-手机号+短信验证码登陆
    与验证码登录逻辑是不一样的,所以不能使用SpringSecurity默认提供的那套逻辑;需要自个去写一个自定义身份认证逻辑短信验证码生成生成验证码短信验证码类ValidateCode是父类,ImageCode子类publicclassValidateCode{privateStringcode;/***过期时间......
  • 2024.01 总结
    1.模拟赛总的来说状态较好,只有一次较大的挂分。1-1.优点:思维方面:能够推出DP式子,通过打表找到一些规律。码力方面:基础的数据结构实现很少出错。策略方面:先把自己能拿的分拿满。1-2.缺点:思维方面:推出式子不会优化。码力方面:难以实现复杂的数据结构和代码。......
  • matplotlib数据可视化
    Matplotlib数据可视化基础一、绘制基本图形1、绘制折线图importnumpyasnpimportmatplotlib.pyplotasplt#创建数组xx=np.arange(10)np.random.seed(500)#创建数组yy=np.random.randint(20,size=(10,))plt.plot(x,y,'b-')plt.xticks(fontsize=15)plt.yticks......
  • 寒假学习(13)
    今天我继续学习python知识。今天主要学习的是文件操作,这个知识我从学习C语言开始就一直不熟悉,不仅因为它在教学的最后一章,知识讲的较少,更引文平时学习中基本用不到文件操作,所以这部分我用了很长时间。打开和关闭文件:open() 函数:用于打开一个文件,并返回一个文件对象。有两个......
  • 参数的校验
    参数校验框架:springvalidation(使用预定义的注解完成参数校验)引入SpringValidation起步依赖<dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter-validation</artifactId></dependency>在参数前面加Patte......
  • 寒假生活指导15
    #相关模块导入fromseleniumimportwebdriver#selenium4版本必须要设置浏览器选项,否则会闪退option=webdriver.EdgeOptions()option.add_experimental_option("detach",True)#实例化浏览器驱动对象,并将配置浏览器选项driver=webdriver.Edge(options=option)#......
  • JeecgBoot集成TiDB,打造高效可靠的数据存储解决方案
    TiDB简介TiDB是PingCAP公司自主设计、研发的开源分布式关系型数据库,同时支持在线事务处理与在线分析处理(HybridTransactionalandAnalyticalProcessing,HTAP)的融合型分布式数据库产品,具备水平扩容或者缩容、金融级高可用、实时HTAP、云原生的分布式数据库、兼容MySQL5......
  • 即时语录
    “曾经以为林深处见鹿。海蓝时见鲸。梦醒时可见你。可后来才发现,林深时雾起,不知归处。海蓝时浪涌,望而却步。梦醒时夜续,惊慌失措。不见鹿,不见鲸,亦不见你。年少时,春风得意马蹄疾。不信人间有别离。多年后才明白。风月无情人暗换。旧游如梦,空肠断呀。生命本是一场又一场的离别和相遇......