首页 > 其他分享 >rocketmq--消息顺序消费demo

rocketmq--消息顺序消费demo

时间:2024-01-23 19:46:36浏览次数:23  
标签:顺序 ordered -- demo springframework org import rocketmq

在RocketMQ中,要实现消息的顺序消费,你需要确保以下几点:

  1. 发送消息时,相同业务顺序的消息应该发送到同一个队列(MessageQueue)。
  2. 消费者在消费时,应该使用顺序消费的方式。

下面是一个使用Spring Boot和RocketMQ实现消息顺序消费的例子。

  1. 添加依赖 (pom.xml):
<dependencies>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
</dependencies>
  1. 配置RocketMQ (application.yml):
rocketmq:
  name-server: 127.0.0.1:9876  # 替换为你的RocketMQ NameServer地址
  producer:
    group: ordered-producer-group
  consumer:
    group: ordered-consumer-group
    subscribe:
      - topic: ordered-topic
        selectorExpression: "*"
  1. 创建消息生产者 (OrderedProducerService.java):
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class OrderedProducerService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public SendResult send(String topic, String message, int orderId) {
        // 使用orderId作为队列选择器的key
        return rocketMQTemplate.syncSendOrderly(topic, MessageBuilder.withPayload(message).build(), String.valueOf(orderId));
    }
}
  1. 创建消息消费者 (OrderedConsumerService.java):
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(
        topic = "ordered-topic",
        consumerGroup = "ordered-consumer-group",
        selectorType = RocketMQMessageListener.SelectorType.TAG,
        selectorExpression = "*",
        consumeMode = ConsumeMode.ORDERLY // 设置为顺序消费
)
public class OrderedConsumerService implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.printf("Received ordered message: %s%n", message);
    }
}
  1. 创建Spring Boot应用程序 (OrderedDemoApplication.java):
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;

@SpringBootApplication
public class OrderedDemoApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(OrderedDemoApplication.class, args);
        OrderedProducerService producerService = context.getBean(OrderedProducerService.class);

        // 发送几条顺序消息,orderId相同的消息会被顺序消费
        for (int i = 0; i < 5; i++) {
            SendResult sendResult = producerService.send("ordered-topic", "Hello, RocketMQ! " + i, 100); // orderId为100
            System.out.printf("Send ordered message: %s%n", sendResult);
        }
    }
}

在这个例子中,我们使用了syncSendOrderly方法来发送顺序消息,并且为了保证消息顺序,我们传入了一个orderId作为队列选择器的key。在消费端,我们通过@RocketMQMessageListener注解设置consumeModeORDERLY来实现顺序消费。

运行上述Spring Boot应用程序,它将发送几条消息到ordered-topic,并由OrderedConsumerService以顺序方式接收并打印出来。由于所有消息都使用相同的orderId,所以它们会被发送到同一个MessageQueue,从而保证了消费的顺序性。

请注意,为了确保消息的顺序性,同一业务的消息应该

标签:顺序,ordered,--,demo,springframework,org,import,rocketmq
From: https://www.cnblogs.com/xylfjk/p/17983267

相关文章

  • 设计模式之模板方法
    1.定义定义了一个算法的框架,并允许子类重写其中的某些步骤,而不改变算法的结构2.口语化表述模板方法其实在日常生活中已经很常见,所谓模板方法,就是事先约定好一些事情,后续做时再慢慢实现或者修改,比如组装电脑假设现在需要组装一台台式电脑,一开始计划使用3090显卡,后来根据实际......
  • asp使用ItextSharp生成Pdf
    1.使用ItextSharp生成Pdf应用场景:将用户所填写的数据根据业务场景填入到pdf模板中并生成新的pdf。操作步骤如下:1.1.使用word制作模板制作word模板然后转成pdf,使用福昕或者其他pdf编辑器在需要填充数据的地方添加文本域(我这里使用的是破解版的福昕)。1.2.设置变量将需要填......
  • 假期学习记录11
    本次学习学习了常用键值对rdd的操作常用的键值对RDD转换操作reduceByKey(func)reduceByKey(func)的功能是,使用func函数合并具有相同键的值(Hadoop,1)(Spark,1)(Hive,1)(Spark,1)scala>pairRDD.reduceByKey((a,b)=>a+b).foreach(println)(Spark,2)(Hive,1)(Hadoop,1)......
  • 审计日志
    审计日志按照时间顺序记录了每个用户操作记录,并可以将日志存储在文件或webhook。【注意】会增加api-server内存消耗为k8s管理员提供了:发生了什么?什么时候发生的?谁触发的?活动发生在哪个(些)对象上?在哪观察到的?它从哪触发的?活动的后续处理行为是什么?每个请求都可被记录......
  • 003*:React 父子通信
    目录 正文1:父子通信/*目标:父传子传递属性,子传父-传递回调函数功能:1:两个组件一个导航,一个侧边栏2:点击导航里的按钮控制侧边栏的显示和隐藏*/importReact,{Component}from'react'//导航组件classNavBarextendsComponent{render(){return(......
  • set用法详解
    ES6中的Set是一种新的数据结构,类似于数组,用于存储有序的数据。Set没有随机访问的能力,不能通过索引来获取具体的某个元素Set中的元素具有唯一性,不允许存储相同的元素。Set本身是一个构造函数,可以用来实例化Set对象。通过add()方法可以向Set中添加元素,如果添加的元......
  • 0123今日收获
    今日代码1今日代码2今日代码3今日代码4今日代码5今日代码6今日代码7今日代码8今日代码9!今日收获满满......
  • ERROR:Only one ConfirmCallback is supported by each RabbitTemplate] with root cau
     错误:OnlyoneConfirmCallbackissupportedbyeachRabbitTemplate]withrootcause 原因:因为Spring的Bean默认都是单例;而RabbitTemplate对象同样支持一个回调。 解决:使用@Scope("prototype")可通知Spring将被注解的Bean变为多例。代码: //改Ra......
  • 金句记录
      Ournewtrailsavingtechniqueoffersdifferenttradeoffsincomparisonwithchronologicalbacktrackingandoftenyieldssuperiorperformance.与按时间顺序回溯相比,我们新的跟踪保存技术提供了不同的权衡,并且通常会产生卓越的性能。   UsingC-......
  • 基于信号功率谱特征和GRNN广义回归神经网络的信号调制类型识别算法matlab仿真
    1.算法运行效果图预览 2.算法运行软件版本MATLAB2022a 3.算法理论概述       本课题,我们主要对MPSK和MFSK调制类型进行识别。在进行信号调制方式区分之前,首先需要对PSK和FSK进行区分,提出了一种基于信号功率谱的PSK和FSK调制方式的识别方法。信号的功率谱计算过程......