首页 > 其他分享 >RocketMq发送消息之延迟消息

RocketMq发送消息之延迟消息

时间:2023-09-23 22:11:28浏览次数:38  
标签:producer 消息 msg consumer public RocketMq 延迟

延迟消息

比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

使用限制

对比于rabbitmq中的延迟消息来说,rockermq并不支持任意时间的延迟,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18级

// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

生产者代码

通过 msg.setDelayTimeLevel(2); 设置延迟等级,2 表示5s的延迟

public class producer {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者Producer,指定一个生产者组
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        // 设置NameServer的地址
        producer.setNamesrvAddr("192.168.78.129:9876;192.168.78.130:9876");
        // 启动Producer实例
        producer.start();
        for (int i = 0; i < 10; i++) {
            // 创建消息,并指定Topic(主题),Tag(消息Tag)和消息体(消息内容)
            Message msg = new Message("DelayTopic",
                    "Tag1",
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            // 设置 延迟时间 , 目前只支持"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";对应18个等级
            msg.setDelayTimeLevel(2);// 表示5s
            // 发送消息到一个Broker
            SendResult sendResult = producer.send(msg);
            TimeUnit.SECONDS.sleep(1);
            // 通过sendResult返回消息是否成功送达
            System.out.printf("%s%n", sendResult);
        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

消费者代码

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        // 1.创建消费者Consumer,指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        // 2.指定NameServer地址
        consumer.setNamesrvAddr("192.168.78.129:9876;192.168.78.130:9876");
//        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 3.订阅主题Topic和Tag
        consumer.subscribe("DelayTopic","Tag1");//消费Tag1中的消息,通过 || xxx 可以消费多个消息,通过*可以消费 DelayTopic 下的所有消息
        // 4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            // 接收消息内容的方法
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("接收到的消息消息ID:"+msg.getMsgId()+"延迟时间:"+(System.currentTimeMillis()-msg.getStoreTimestamp()));
                }
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 5.启动消费者consumer
        consumer.start();
    }
}

运行结果

image

标签:producer,消息,msg,consumer,public,RocketMq,延迟
From: https://www.cnblogs.com/zgf123/p/17725168.html

相关文章

  • MQ - 04 基础篇_存储_消息数据和元数据的存储设计
    @[toc]![在这里插入图片描述](https://img-blog.csdnimg.cn/855d3c6d2ef74a1893f352a4545b479c.png)---------------------#导图![在这里插入图片描述](https://img-blog.csdnimg.cn/d0569e3871dd433784f74d76ebee9a9d.png)----------#概述消息数据和元数据的存......
  • RockerMq发送消息之顺序消息
    顺序消息        消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。        顺序消费的原理解析,在默认的情况下消息发送会采取RoundRobin轮询方式把消息发送到不同的queue(分区队列);而消费消......
  • RocketMQ发送消息之同步异步单向
    官网教程:https://rocketmq.apache.org/zh/docs/quickStart/01quickstart基于双主双从异步方式开启的前提下,在maven项目中引入下列依赖<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.1&l......
  • 记一次缓存一致性中延迟双删的使用场景
    1、背景: 前边写了个这样的业务需求:从算法服务那边会不断的发送过来一些预警的数据和预警恢复的数据,当有新预警数据过来时,会进行数据库记录和redis缓存,当有该预警的恢复过来时会将数据库状态修改并清除缓存,我的做法是使用了缓存双删的策略,即先删缓存,再更新数据库,再删缓存。但是......
  • Kafka的消息传递保证和一致性
    前言通过前面的文章,相信大家对Kafka有了一定的了解了,那接下来问题就来了,Kafka既然作为一个分布式的消息队列系统,那它会不会出现消息丢失或者重复消费的情况呢?今天咱们就来一探。实现机制Kafka采用了一系列机制来实现消息传递的保证和一致性,关键点:至少一次的消息传递(AtLeastOnceD......
  • 消息队列中,如何保证消息的顺序性?
    本文选自:advanced-java作者:yanglbme问:如何保证消息的顺序性?面试官心理分析其实这个也是用MQ的时候必问的话题,第一看看你了不了解顺序这个事儿?第二看看你有没有办法保证消息是有顺序的?这是生产系统中常见的问题。面试题剖析我举个例子,我们以前做过一个mysqlbinlog同步的系统,压......
  • 消息队列中,如何保证消息的顺序性?
    本文选自:advanced-java作者:yanglbme问:如何保证消息的顺序性?面试官心理分析其实这个也是用MQ的时候必问的话题,第一看看你了不了解顺序这个事儿?第二看看你有没有办法保证消息是有顺序的?这是生产系统中常见的问题。面试题剖析我举个例子,我们以前做过一个mysqlbinlog同步......
  • Kafka的消息传递保证和一致性
    前言通过前面的文章,相信大家对Kafka有了一定的了解了,那接下来问题就来了,Kafka既然作为一个分布式的消息队列系统,那它会不会出现消息丢失或者重复消费的情况呢?今天咱们就来一探。实现机制Kafka采用了一系列机制来实现消息传递的保证和一致性,关键点:至少一次的消息传递(AtLeas......
  • redis消息队列——发布订阅
    一、相关依赖<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><grou......
  • Kafka消息压缩算法性能调优与选择
    前言Kafka作为一款高性能的分布式消息队列,其消息压缩算法的选择和调优对于系统性能的提升至关重要。本文将深入探讨Kafka消息压缩算法的性能调优和选择。压缩算法的选择Kafka支持多种压缩算法,包括gzip、snappy和lz4。这些算法各有优缺点,需要根据实际情况进行选择。gzipgzip是......