首页 > 其他分享 >RocketMQ笔记(九):延时/定时消息

RocketMQ笔记(九):延时/定时消息

时间:2023-05-05 09:13:31浏览次数:39  
标签:时间 消息 延时 import 定时 RocketMQ

一、什么是延时/定时消息

  定时/延时消息为 RocketMQ 中提供的一种消息类型。定时消息和延时消息本质相同,都是服务端根据消息设置的定时时间在某一固定时刻将消息投递给消费者消费。

  Producer将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递(被消费者消费),而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。

二、延时/定时消息处理规则

  RocketMQ 定时消息设置的定时时间是一个预期触发的系统时间戳,延时时间也需要转换成当前系统时间后的某一个时间戳,而不是一段延时时长。

  定时时间的格式为毫秒级的Unix时间戳,需要将要设置的时刻转换成时间戳形式。

  定时时间必须设置在定时时长范围内,超过范围则定时不生效,服务端会立即投递消息。

  定时时长最大值默认为24小时,不支持自定义修改。

  定时时间必须设置为当前时间之后,若设置到当前时间之前,则定时不生效,服务端会立即投递消息。

三、定时/延时消息示例

  定时消息:例如,当前系统时间为2023-01-18 16:47:46,您希望消息在下午19:20:00定时投递,则定时时间为2022-06-09 19:20:00,转换成时间戳格式为1674036000000。

  延时消息:例如,当前系统时间为2023-01-18 18:30:00,您希望延时1个小时后投递消息,则您需要根据当前时间和延时时长换算成定时时刻,即消息投递时间为2023-01-18 19:30:00,转换为时间戳格式为1674041400000。

四、定时消息生命周期  

  0

1、初始化

  生产者构建消息,待发送到RocketMQ服务端的状态。

2、定时中

  消息发送到RocketMQ服务端,优先将定时消息存储在定时存储系统中,此时消费对消费者不可见,不会进行消息投递。等待定时时刻到达,才会对消费者可见。

3、待消费

  定时时刻到达后,RocketMQ将消息重新写入普通存储引擎,此时消息对下游消费者可见,等待消费者消费。

4、消费中

  消息被消费者消费,执行本地业务逻辑处理。此时RocketMQ服务端会等待消费者完成消费并提交消费结果,若一定时间后未收到消费者的响应,RocketMQ会对消息进行重试处理。

5、消费提交

  消费者完成消费处理,并向RocketMQ提交消费结果,服务端标记当前消息已经被处理(消费成功/失败)。RocketMQ默认支持保留所有消息,此时消息不会被立即删除,逻辑标记已消费。

  消息在保存时间到期或存储空间不足被删除前,消费者可以回溯消息重新消费。

6、消息删除

  RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

五、定时/延时消息示例demo

  工具类详见:RocketMQ笔记(六):示例代码工具类

5.1、生产者

 1 import com.snails.rmq.common.RMQConstant;
 2 import com.snails.rmq.utils.ClientUtils;
 3 import com.snails.rmq.utils.DateUtils;
 4 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 5 import org.apache.rocketmq.client.producer.SendResult;
 6 import org.apache.rocketmq.common.message.Message;
 7 
 8 import java.nio.charset.StandardCharsets;
 9 
10 /**
11  *  当前 RocketMQ 不支持任意时间的延迟。
12  *  生产者发送延迟消息前需要设置几个固定的延迟级别,分别对应1s到2h的1到18个延迟级,
13  *  消息消费失败会进入延迟消息队列,消息发送时间与设置的延迟级别和重试次数有关。
14  *  messageDelayLevel -> 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
15  */
16 public class DelayProducer {
17 
18     public static void main(String[] args) throws Exception {
19         // 创建producer实例,并启动
20         DefaultMQProducer producer =
21                 ClientUtils.gainProducerInstance(RMQConstant.DELAY_GROUP, RMQConstant.NAEMSRV_ADDR);
22         // 设置消息内容
23         Message message = new Message();
24         message.setTopic(RMQConstant.DELAY_GROUP);
25         message.setBody(("我是延时消息,当前发送时间: " + DateUtils.now()).getBytes(StandardCharsets.UTF_8));
26         // 设置消息延迟级别,当前设置为5s
27         message.setDelayTimeLevel(2);
28         // 发送消息
29         SendResult result = producer.send(message);
30         System.out.println("result = "+ result);
31         // 关闭producer实例
32         ClientUtils.shutdownProducer(producer);
33     }
34 }

5.2、消费者

 1 import com.snails.rmq.common.RMQConstant;
 2 import com.snails.rmq.utils.ClientUtils;
 3 import com.snails.rmq.utils.DateUtils;
 4 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 5 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 6 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 7 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 8 import org.apache.rocketmq.common.message.MessageExt;
 9 
10 import java.nio.charset.StandardCharsets;
11 import java.util.List;
12 
13 /**
14  *
15  * @Description: 延时消息消费者
16  */
17 public class DelayConsumer {
18 
19     public static void main(String[] args) throws Exception {
20         // 创建消费者实例 consumer 并订阅topic
21         DefaultMQPushConsumer consumer = ClientUtils.gainConsumerInstance(RMQConstant.DELAY_GROUP,
22                 RMQConstant.NAEMSRV_ADDR, RMQConstant.DELAY_GROUP, RMQConstant.SUB_EXPRESSION_ALL);
23         // 消费消息
24         consumer.registerMessageListener(new MessageListenerConcurrently() {
25             @Override
26             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
27                 for (MessageExt messageExt : list) {
28                     System.out.println("接收消息:msgId = " + messageExt.getMsgId()
29                             + ",接收时间receiveTime = " + DateUtils.now()
30                             + ",详情:" + new String(messageExt.getBody(), StandardCharsets.UTF_8));
31                 }
32                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
33             }
34         });
35         // 启动消费者
36         ClientUtils.startupConsumer(consumer);
37     }
38 }

 

标签:时间,消息,延时,import,定时,RocketMQ
From: https://www.cnblogs.com/RunningSnails/p/17373098.html

相关文章

  • RocketMQ之消息存储
    一、概述消息持久化存储是MQ消息队列中最为复杂和最为重要的一部分,本文先从目前几种比较常用的MQ消息队列存储方式出发,为大家介绍RocketMQ选择磁盘文件存储的原因。然后,本文分别从RocketMQ的消息存储整体架构和RocketMQ文件存储模型层次结构两方面进行深入分析介绍。使得大家读完......
  • RocketMQ(四):RocketMQ概览
    一、普通消息普通消息为RocketMQ中最基础的消息,区别于有特性的顺序消息、定时/延时消息和事务消息普通消息一般应用于微服务解耦、事件驱动、数据集成等场景,这些场景大多数要求数据传输通道具有可靠传输的能力,且对消息的处理时机、处理顺序没有特别要求。1、普通消息......
  • moment指定时间
    1.startOf( )设置开始时间moment().startOf('day').subtract(15,'d)//指定前15天的日期moment().startOf('day')2.endOf( )设置结束时间moment().endOf('day').add(1,'d')//指定后一天的日期moment().endOf('hour')......
  • 【java】定时器
     定时器的实现方式:线程等待实现:最原始最简单的方式,先创建一个thread,然后让它在while循环里一直运行着,通过sleep方法来达到定时任务的效果。 publicclassTask{publicstaticvoidmain(String[]args){//runinasecondfinallongtimeInterv......
  • PHP获取时间戳、获取天周月的起始时间、指定时间所在周、指定时间的各个周等相关函数
    一、时间戳和日期互相转换//获取时间戳$date=time();//获取当前时间戳$date=mktime(0,0,0,10,10,2020);//获取指定时间的时间戳2020年10月10日0时0分0秒//日期转换为时间戳$date="2019-08-0808:08:08";$timestamp=strtotime($date);//将时间戳......
  • c定时执行任务
    由一个C定时执行任务的程序引发的思考程序这里使用C写了个定时执行的程序,见a.c//a.c#include<stdio.h>#include<pthread.h>#include<signal.h>#include<stdlib.h>#include<unistd.h>void*send_signal_every_second(void*args){while(1){kill(getpid()......
  • 七、使用调度框架quartz,为12306系统增加定时调度功能
    为什么要有定时调度定时调度在企业级系统中非常重要(统计报表、功能补偿、不紧急的大批量任务)12306每天都需要生成15天后的车次数据本章内容集成quartz,比较SpringBoot自带定时任务喝quartz的区别使用控台来操作定时任务:新增、暂停、重启、删除项目中增加batch定时调度......
  • tornado服务端+tornado.ioloop.PeriodicCallback定时任务踩坑记录及解决方案
    背景:用tornado部署一个AI模型的服务端,由于AI模型较慢,收到请求肯定没办法同步返回结果,所以最后定的方案是批处理并异步回调。异步回调下,我这边的处理方式是:实时接收所有请求并多线程落库(使用数据库连接池),再启动一个定时任务取出库中(未处理过的)数据进行批处理。因为web框架用的是tor......
  • RocketMQ的简单使用
    大家好,我是Leo!今天来和大家分享RocketMQ的一些用法。领域模型介绍Producer:用于生产消息的运行实体。Topic:主题,用于消息传输和存储的分组容器。MessageQueue:消息传输和存储的实际单元容器。Message:消息传输的最小单元。ConsumerGroup:消费者组。Consumer:消费......
  • 一文讲透 RocketMQ 消费者是如何负载均衡的
    RocketMQ支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。集群消费:同一Topic下的一条消息只会被同一消费组中的一个消费者消费。也就是说,消息被负载均衡到了同一个消费组的多个消费者实例上。广播消费:当使用广播消费模式时,每条消息推送给集群内所有的消费者......