首页 > 其他分享 >延迟消息的五种实现方案

延迟消息的五种实现方案

时间:2023-06-12 13:36:50浏览次数:41  
标签:方案 队列 五种 消息 msg message 级别 延迟

生产者把消息发送到消息队列中以后,并不期望被立即消费,而是等待指定时间后才可以被消费者消费,这类消息通常被称为延迟消息。延迟消息的应用场景其实是非常的广泛,比如以下的场景:

  • 网上直播授课时,在课程开始前15分钟通知所有学生准备上课。
  • 订单提交成功后1个小时内未支付,订单需要及时关闭并且释放对应商品的库存。
  • 用户超过15天未登录时,给该用户发送召回推送。
  • 工单提交后超过24小时未处理,向相关责任人发送催促处理的提醒。

针对延迟消息,本文向大家分享五种实现方案,下面我们就来逐一讨论各种方案的大致实现和优缺点。 

一、Redis

在Redis中,有一种有序集合(Sorted Set)的数据结构,在有序集合中,所有元素是按照其 Score 进行排序的。我们可以把消息被消费的预期时间戳作为Score,定时任务不断读取Score大于当前时间的元素即可。基本流程如下:

  1. 调用API,传入执行时间、消息体等数据。
  2. 生成唯一key,把消息体数据序列化后存入Redis的String结构中。
  3. 把key和执行时间的时间戳存入Redis的有序集合结构中,有序集合中不存储具体的消息体数据,而是存储唯一的key。
  4. 定时任务不断读取时间戳最小的消息。
  5. 如果时间戳小于当前时间,将key放入作为队列的Redis的List结构中。
  6. 另外一个定时任务不断从队列中读取需要消费的消息的key。
  7. 根据key获取消息体数据,对消息进行消费。
  8. 如果消费消息成功,删除key对应的消息体数据。
  9. 如果消费消息失败,重新存入key和时间戳(加60秒)。

具体方案如下图:

延迟消息的五种实现方案_定时任务

为了避免一个有序集合中存储过多的延时消息,存入操作以及查询操作速度变慢的问题,可以建立多个有序集合,通过哈希算法把消息路由到不同的有序集合中去。 

优点

简单实用,快速落地。

缺点

  • 单个有序集合无法支持太大的数据量。
  • 定时任务不断读取可能造成不必要的请求。

所以,Redis方案并不是一个十分成熟的方案,只是一个支持小消息量可以快速落地的方案。 

二、RabbitMQ

RabbitMQ本身是不支持延迟消息功能的,一般的做法,是通过最大生存时间(Time-To-Live)和死信交换机(Dead Letter Exchanges)两个特性模拟出延迟消息的功能。消息超过最大生存时间没有被消费就变成一条死信,便会被重新投递到死信交换机,然后死信交换机根据绑定规则转发到对应的死信队列上,监听该队列就可以让消息被重新消费。

不过,在RabbitMQ的3.5.8版本以后,我们就可以使用官方推荐的rabbitmq delayed message exchange插件很方便地实现延迟消息的功能。

 

安装插件

首先去官方插件列表页面下载rabbitmq_delayed_message_exchang插件,然后复制到RabbitMQ每个节点的plugins目录中。使用命令启用插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

一旦插件被启用,我们就可以开始使用它了。

 

使用示例

安装该插件后会生成支持延迟投递机制的Exchange类型:x-delayed-message。接收到该类型的消息后不会立即将消息投递至目标队列中,而是存储在mnesia表中,检测消息达到可投递时间时再投递到目标队列。

使用延迟消息时,需要先声明一个x-delayed-message类型的交换器机:

Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("one-more-exchange", "x-delayed-message", true, false, args);

发送延迟消息,其中在header中添加x-delay,表示延迟的毫秒数:

byte[] messageBodyBytes = "This is a delayed message".getBytes();
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
props.headers(headers);
channel.basicPublish("one-more-exchange", "", props.build(), messageBodyBytes);

优点

大品牌中间件,可靠稳定。

缺点

由于master单节点,导致性能瓶颈,吞吐量受限。 

三、ActiveMQ

ActiveMQ在5.4及以上版本开始支持持久化的延迟消息功能,甚至支持Cron表达式。默认是该功能是不开启的,如果开启需要修改配置文件activemq.xml,在broker节点上把schedulerSupport属性设置为true,如:

<broker xmlns="http://activemq.apache.org/schema/core" schedulerSupport="true">
</broker>

当服务端开启延迟消息功能以后,客户端就可以利用下面的属性发送延迟消息:

  • AMQ_SCHEDULED_DELAY:该消息延迟发送的时间,单位为毫秒。
  • AMQ_SCHEDULED_PERIOD:每次重新发送该消息的时间间隔,单位为毫秒。
  • AMQ_SCHEDULED_REPEAT:重新发送该消息的次数。
  • AMQ_SCHEDULED_CRON:使用Cron表达式设置发送该消息的时机。

 

使用示例
  1. 消息延迟60秒发送:
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("This is a delayed message");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 60 * 1000);
producer.send(message);
  1. 消息延迟60秒发送,并且重复发送5次,每次间隔10秒:
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("This is a delayed message");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 60 * 1000);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 10 * 1000);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 4);
producer.send(message);
  1. 利用Cron表达式,每天的凌晨3点发送一次消息:
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("This is a delayed message");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, " 0 3 * * *");
producer.send(message);

需要注意的是,Cron表达式是由5位组成的,分别表示分钟(059)、小时(023)、日(131)、月(112)、星期(0~6,表示星期日到星期六)。

  1. Cron表达式的优先级高于其他参数,如果在设置了Cron表达式的同时,也设置了其他参数,那么会在每次CRON执行时,再应用其他参数。比如,消息延迟60秒发送,并且重复发送5次,每次间隔10秒,并且每个小时都发送这一系列消息:
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 60 * 1000);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 10 * 1000);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 4);
producer.send(message);

 

优点

大品牌中间件,可靠稳定,甚至支持Cron表达式。

缺点

由于master单节点,导致性能瓶颈,吞吐量受限。 

四、RocketMQ

在RocketMQ中,支持延迟消息,但是不支持任意时间精度的延迟消息,只支持特定级别的延迟消息。如果要支持任意时间精度,不能避免在Broker层面做消息排序,再涉及到持久化的考量,那么消息排序就不可避免产生巨大的性能开销。

消息延迟级别分别为1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,共18个级别。在发送消息时,设置消息延迟级别即可,设置消息延迟级别时有以下3种情况:

  1. 设置消息延迟级别等于0时,则该消息为非延迟消息。
  2. 设置消息延迟级别大于等于1并且小于等于18时,消息延迟特定时间,如:设置消息延迟级别等于1,则延迟1s;设置消息延迟级别等于2,则延迟5s,以此类推。
  3. 设置消息延迟级别大于18时,则该消息延迟级别为18,如:设置消息延迟级别等于20,则延迟2h。 
使用示例

首先,写一个消费者,用于消费延迟消息:

public class Consumer {

    public static void main(String[] args) throws MQClientException {
        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");

        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OneMoreGroup");

        // 设置NameServer的地址
        consumer.setNamesrvAddr("localhost:9876");

        // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
        consumer.subscribe("OneMoreTopic", "*");
        // 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            System.out.printf("%s %s Receive New Messages:%n"
                    , sdf.format(new Date())
                    , Thread.currentThread().getName());
            for (MessageExt msg : msgs) {
                System.out.printf("\tMsg Id: %s%n", msg.getMsgId());
                System.out.printf("\tBody: %s%n", new String(msg.getBody()));
            }
            // 标记该消息已经被成功消费
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        // 启动消费者实例
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

 

再写一个延迟消息的生产者,用于发送延迟消息:

public class DelayProducer {
    public static void main(String[] args) throws Exception {
        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");

        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("OneMoreGroup");
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动Producer实例
        producer.start();

        Message msg = new Message("OneMoreTopic"
                , "DelayMessage", "This is a delay message.".getBytes());

        //"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
        //设置消息延迟级别为3,也就是延迟10s。
        msg.setDelayTimeLevel(3);

        // 发送消息到一个Broker
        SendResult sendResult = producer.send(msg);
        // 通过sendResult返回消息是否成功送达
        System.out.printf("%s Send Status: %s, Msg Id: %s %n"
                , sdf.format(new Date())
                , sendResult.getSendStatus()
                , sendResult.getMsgId());

        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

运行生产者以后,就会发送一条延迟消息:

10:37:14.992 Send Status: SEND_OK, Msg Id: C0A8006D5AB018B4AAC216E0DB690000

10秒钟后,消费者收到的这条延迟消息:

10:37:25.026 ConsumeMessageThread_1 Receive New Messages:
	Msg Id: C0A8006D5AB018B4AAC216E0DB690000
	Body: This is a delay message.

优点

分布式、高吞吐量、高性能、高可靠。

缺点

仅支持18个特定级别的延时,无法自定义延时时间。 

五、定制RocketMQ

上面所说的不支持自定义延时时间的RocketMQ是开源版的,但是在阿里云中商业版的RocketMQ是支持的,可能因为是业务需求不强或商业因素考虑,究竟什么原因不得而知。有条件的可以直接上阿里云;没有条件的可以修改开源版RocketMQ的源码,实现自己的需求。知己知彼,百战不殆,先看看RocketMQ开源版本中,支持18个时间级别是怎么实现的:

原理分析

以下分析的RocketMQ源码的版本号是4.7.1,版本不同源码略有差别。

CommitLog

CommitLog中,针对延迟消息做了一些处理:

// 延迟级别大于0,就是延时消息
if (msg.getDelayTimeLevel() > 0) {
    // 判断当前延迟级别,如果大于最大延迟级别,
    // 就设置当前延迟级别为最大延迟级别。
    if (msg.getDelayTimeLevel() > this.defaultMessageStore
            .getScheduleMessageService().getMaxDelayLevel()) {
        msg.setDelayTimeLevel(this.defaultMessageStore
                .getScheduleMessageService().getMaxDelayLevel());
    }

    // 获取延迟消息的主题,
    // 其中RMQ_SYS_SCHEDULE_TOPIC的值为SCHEDULE_TOPIC_XXXX
    topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
    // 根据延迟级别获取延迟消息的队列Id,
    // 队列Id其实就是延迟级别减1
    queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

    // 备份真正的主题和队列Id
    MessageAccessor.putProperty(msg
            , MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
    MessageAccessor.putProperty(msg
            , MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
    msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

    // 设置延时消息的主题和队列Id
    msg.setTopic(topic);
    msg.setQueueId(queueId);
}

可以看到,每一个延迟消息的主题都被暂时更改为SCHEDULE_TOPIC_XXXX,并且根据延迟级别延迟消息变更了新的队列Id。接下来,处理延迟消息的就是ScheduleMessageService

 

ScheduleMessageService

ScheduleMessageService是由DefaultMessageStore进行初始化的,初始化包括构造对象和调用load方法。最后,再执行ScheduleMessageService的start方法:

public void start() {
    // 使用AtomicBoolean确保start方法仅有效执行一次
    if (started.compareAndSet(false, true)) {
        this.timer = new Timer("ScheduleMessageTimerThread", true);
        // 遍历所有延迟级别
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            // key为延迟级别
            Integer level = entry.getKey();
            // value为延迟级别对应的毫秒数
            Long timeDelay = entry.getValue();
            // 根据延迟级别获得对应队列的偏移量
            Long offset = this.offsetTable.get(level);
            // 如果偏移量为null,则设置为0
            if (null == offset) {
                offset = 0L;
            }

            if (timeDelay != null) {
                // 为每个延迟级别创建定时任务,
                // 第一次启动任务延迟为FIRST_DELAY_TIME,也就是1秒
                this.timer.schedule(
                        new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }

        // 延迟10秒后每隔flushDelayOffsetInterval执行一次任务,
        // 其中,flushDelayOffsetInterval默认配置也为10秒
        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    // 持久化每个队列消费的偏移量
                    if (started.get()) ScheduleMessageService.this.persist();
                } catch (Throwable e) {
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 10000, this.defaultMessageStore
        	.getMessageStoreConfig().getFlushDelayOffsetInterval());
    }
}

遍历所有延迟级别,根据延迟级别获得对应队列的偏移量,如果偏移量不存在,则设置为0。然后为每个延迟级别创建定时任务,第一次启动任务延迟为1秒,第二次及以后的启动任务延迟才是延迟级别相应的延迟时间。

然后,又创建了一个定时任务,用于持久化每个队列消费的偏移量。持久化的频率由flushDelayOffsetInterval属性进行配置,默认为10秒。

 

定时任务

ScheduleMessageService的start方法执行之后,每个延迟级别都创建自己的定时任务,这里的定时任务的具体实现就在DeliverDelayedMessageTimerTask类之中,它核心代码是executeOnTimeup方法之中,我们来看一下主要部分:

// 根据主题和队列Id获取消息队列
ConsumeQueue cq =
        ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(
                TopicValidator.RMQ_SYS_SCHEDULE_TOPIC
                , delayLevel2QueueId(delayLevel));

如果没有获取到对应的消息队列,则在DELAY_FOR_A_WHILE(默认为100)毫秒后再执行任务。如果获取到了,就继续执行下面操作:

// 根据消费偏移量从消息队列中获取所有有效消息
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);

如果没有获取到有效消息,则在DELAY_FOR_A_WHILE(默认为100)毫秒后再执行任务。如果获取到了,就继续执行下面操作:

// 遍历所有消息
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
    // 获取消息的物理偏移量
    long offsetPy = bufferCQ.getByteBuffer().getLong();
    // 获取消息的物理长度
    int sizePy = bufferCQ.getByteBuffer().getInt();
    long tagsCode = bufferCQ.getByteBuffer().getLong();
    
    // 省略部分代码...

    long now = System.currentTimeMillis();
    // 计算消息应该被消费的时间
    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
	// 计算下一条消息的偏移量
    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE)

	long countdown = deliverTimestamp - now;
    // 省略部分代码...
}

如果当前消息不到消费的时间,则在countdown毫秒后再执行任务。如果到消费的时间,就继续执行下面操作:

// 根据消息的物理偏移量和大小获取消息
MessageExt msgExt =
    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
            offsetPy, sizePy);

如果获取到消息,则继续执行下面操作:

// 重新构建新的消息,包括:
// 1.清除消息的延迟级别
// 2.恢复真正的消息主题和队列Id
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
// 重新把消息发送到真正的消息队列上
PutMessageResult putMessageResult =
        ScheduleMessageService.this.writeMessageStore
                .putMessage(msgInner);

清除了消息的延迟级别,并且恢复了真正的消息主题和队列Id,重新把消息发送到真正的消息队列上以后,消费者就可以立即消费了。

由于篇幅限制,其中源码的细节不做过多展开,有兴趣的小伙伴可以去GitHub上下载源码仔细阅读。

 

定制化方案

经过以上对源码的分析,可以总结出延迟消息的实现步骤:

  1. 如果消息的延迟级别大于0,则表示该消息为延迟消息,修改该消息的主题为SCHEDULE_TOPIC_XXXX,队列Id为延迟级别减1。
  2. 消息进入SCHEDULE_TOPIC_XXXX的队列中。
  3. 定时任务根据上次拉取的偏移量不断从队列中取出所有消息。
  4. 根据消息的物理偏移量和大小再次获取消息。
  5. 根据消息属性重新创建消息,清除延迟级别,恢复原主题和队列Id。
  6. 重新发送消息到原主题的队列中,供消费者进行消费。

概括起来如下图:

延迟消息的五种实现方案_有序集合_02

在CommitLog中,我们可以根据自定义的延迟时间选择一个最大的延迟级别,比如:延迟15分钟消费的消息,那么最大的延迟级别就是10分钟。在ScheduleMessageService中,判断消息是否真的到了消费的时间,如果已到了消费的时间,则恢复原主题和队列Id;如果未到消费的时间,则选择最大延迟级别重新修改主题和队列ID。如下图:

延迟消息的五种实现方案_有序集合_03

消息主体以及元数据都存储在CommitLog中,队列中只存放了在CommitLog中的起始物理偏移量、消息大小和消息Tag的哈希值,虽然需要重新把消息放入队列中,但空间浪费还是比较有限的。

 

优点

分布式、高吞吐量、高性能、高可靠,支持自定义延时时间。

缺点

定制RocketMQ不易维护,无法升级新版本。

总结

从延迟消息的概念和应用场景出发,我们逐一讨论了五种不同的实现方案,分别是:

  • 使用Redis的Sorted Set结构。
  • 使用RabbitMQ的rabbitmq delayed message exchange插件。
  • 使用ActiveMQ的5.4及以上版本的延迟消息功能。
  • 使用RocketMQ仅支持特定级别的延迟消息。
  • 定制RocketMQ,以重新计算延迟级别的方式实现自定义延时。

以上每个方案都是各自的优点和缺点,所以说延迟消息没有一个放之四海而皆准的方案,需要根据数据规模和业务需求的实际情况才能确定最适合的方案。




  • 网上直播授课时,在课程开始前15分钟通知所有学生准备上课。
  • 订单提交成功后1个小时内未支付,订单需要及时关闭并且释放对应商品的库存。
  • 用户超过15天未登录时,给该用户发送召回推送。
  • 工单提交后超过24小时未处理,向相关责任人发送催促处理的提醒。

针对延迟消息,本文向大家分享五种实现方案,下面我们就来逐一讨论各种方案的大致实现和优缺点。 

一、Redis

在Redis中,有一种有序集合(Sorted Set)的数据结构,在有序集合中,所有元素是按照其 Score 进行排序的。我们可以把消息被消费的预期时间戳作为Score,定时任务不断读取Score大于当前时间的元素即可。基本流程如下:

  1. 调用API,传入执行时间、消息体等数据。
  2. 生成唯一key,把消息体数据序列化后存入Redis的String结构中。
  3. 把key和执行时间的时间戳存入Redis的有序集合结构中,有序集合中不存储具体的消息体数据,而是存储唯一的key。
  4. 定时任务不断读取时间戳最小的消息。
  5. 如果时间戳小于当前时间,将key放入作为队列的Redis的List结构中。
  6. 另外一个定时任务不断从队列中读取需要消费的消息的key。
  7. 根据key获取消息体数据,对消息进行消费。
  8. 如果消费消息成功,删除key对应的消息体数据。
  9. 如果消费消息失败,重新存入key和时间戳(加60秒)。

具体方案如下图:

延迟消息的五种实现方案_定时任务

为了避免一个有序集合中存储过多的延时消息,存入操作以及查询操作速度变慢的问题,可以建立多个有序集合,通过哈希算法把消息路由到不同的有序集合中去。 

优点

简单实用,快速落地。

缺点

  • 单个有序集合无法支持太大的数据量。
  • 定时任务不断读取可能造成不必要的请求。

所以,Redis方案并不是一个十分成熟的方案,只是一个支持小消息量可以快速落地的方案。 

二、RabbitMQ

RabbitMQ本身是不支持延迟消息功能的,一般的做法,是通过最大生存时间(Time-To-Live)和死信交换机(Dead Letter Exchanges)两个特性模拟出延迟消息的功能。消息超过最大生存时间没有被消费就变成一条死信,便会被重新投递到死信交换机,然后死信交换机根据绑定规则转发到对应的死信队列上,监听该队列就可以让消息被重新消费。

不过,在RabbitMQ的3.5.8版本以后,我们就可以使用官方推荐的rabbitmq delayed message exchange插件很方便地实现延迟消息的功能。

 

安装插件

首先去官方插件列表页面下载rabbitmq_delayed_message_exchang插件,然后复制到RabbitMQ每个节点的plugins目录中。使用命令启用插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

一旦插件被启用,我们就可以开始使用它了。

 

使用示例

安装该插件后会生成支持延迟投递机制的Exchange类型:x-delayed-message。接收到该类型的消息后不会立即将消息投递至目标队列中,而是存储在mnesia表中,检测消息达到可投递时间时再投递到目标队列。

使用延迟消息时,需要先声明一个x-delayed-message类型的交换器机:

Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("one-more-exchange", "x-delayed-message", true, false, args);

发送延迟消息,其中在header中添加x-delay,表示延迟的毫秒数:

byte[] messageBodyBytes = "This is a delayed message".getBytes();
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
props.headers(headers);
channel.basicPublish("one-more-exchange", "", props.build(), messageBodyBytes);

优点

大品牌中间件,可靠稳定。

缺点

由于master单节点,导致性能瓶颈,吞吐量受限。 

三、ActiveMQ

ActiveMQ在5.4及以上版本开始支持持久化的延迟消息功能,甚至支持Cron表达式。默认是该功能是不开启的,如果开启需要修改配置文件activemq.xml,在broker节点上把schedulerSupport属性设置为true,如:

<broker xmlns="http://activemq.apache.org/schema/core" schedulerSupport="true">
</broker>

当服务端开启延迟消息功能以后,客户端就可以利用下面的属性发送延迟消息:

  • AMQ_SCHEDULED_DELAY:该消息延迟发送的时间,单位为毫秒。
  • AMQ_SCHEDULED_PERIOD:每次重新发送该消息的时间间隔,单位为毫秒。
  • AMQ_SCHEDULED_REPEAT:重新发送该消息的次数。
  • AMQ_SCHEDULED_CRON:使用Cron表达式设置发送该消息的时机。

 

使用示例
  1. 消息延迟60秒发送:
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("This is a delayed message");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 60 * 1000);
producer.send(message);
  1. 消息延迟60秒发送,并且重复发送5次,每次间隔10秒:
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("This is a delayed message");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 60 * 1000);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 10 * 1000);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 4);
producer.send(message);
  1. 利用Cron表达式,每天的凌晨3点发送一次消息:
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("This is a delayed message");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, " 0 3 * * *");
producer.send(message);

需要注意的是,Cron表达式是由5位组成的,分别表示分钟(059)、小时(023)、日(131)、月(112)、星期(0~6,表示星期日到星期六)。

  1. Cron表达式的优先级高于其他参数,如果在设置了Cron表达式的同时,也设置了其他参数,那么会在每次CRON执行时,再应用其他参数。比如,消息延迟60秒发送,并且重复发送5次,每次间隔10秒,并且每个小时都发送这一系列消息:
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 60 * 1000);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 10 * 1000);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 4);
producer.send(message);

 

优点

大品牌中间件,可靠稳定,甚至支持Cron表达式。

缺点

由于master单节点,导致性能瓶颈,吞吐量受限。 

四、RocketMQ

在RocketMQ中,支持延迟消息,但是不支持任意时间精度的延迟消息,只支持特定级别的延迟消息。如果要支持任意时间精度,不能避免在Broker层面做消息排序,再涉及到持久化的考量,那么消息排序就不可避免产生巨大的性能开销。

消息延迟级别分别为1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,共18个级别。在发送消息时,设置消息延迟级别即可,设置消息延迟级别时有以下3种情况:

  1. 设置消息延迟级别等于0时,则该消息为非延迟消息。
  2. 设置消息延迟级别大于等于1并且小于等于18时,消息延迟特定时间,如:设置消息延迟级别等于1,则延迟1s;设置消息延迟级别等于2,则延迟5s,以此类推。
  3. 设置消息延迟级别大于18时,则该消息延迟级别为18,如:设置消息延迟级别等于20,则延迟2h。 
使用示例

首先,写一个消费者,用于消费延迟消息:

public class Consumer {

    public static void main(String[] args) throws MQClientException {
        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");

        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OneMoreGroup");

        // 设置NameServer的地址
        consumer.setNamesrvAddr("localhost:9876");

        // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
        consumer.subscribe("OneMoreTopic", "*");
        // 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            System.out.printf("%s %s Receive New Messages:%n"
                    , sdf.format(new Date())
                    , Thread.currentThread().getName());
            for (MessageExt msg : msgs) {
                System.out.printf("\tMsg Id: %s%n", msg.getMsgId());
                System.out.printf("\tBody: %s%n", new String(msg.getBody()));
            }
            // 标记该消息已经被成功消费
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        // 启动消费者实例
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

 

再写一个延迟消息的生产者,用于发送延迟消息:

public class DelayProducer {
    public static void main(String[] args) throws Exception {
        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");

        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("OneMoreGroup");
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动Producer实例
        producer.start();

        Message msg = new Message("OneMoreTopic"
                , "DelayMessage", "This is a delay message.".getBytes());

        //"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
        //设置消息延迟级别为3,也就是延迟10s。
        msg.setDelayTimeLevel(3);

        // 发送消息到一个Broker
        SendResult sendResult = producer.send(msg);
        // 通过sendResult返回消息是否成功送达
        System.out.printf("%s Send Status: %s, Msg Id: %s %n"
                , sdf.format(new Date())
                , sendResult.getSendStatus()
                , sendResult.getMsgId());

        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

运行生产者以后,就会发送一条延迟消息:

10:37:14.992 Send Status: SEND_OK, Msg Id: C0A8006D5AB018B4AAC216E0DB690000

10秒钟后,消费者收到的这条延迟消息:

10:37:25.026 ConsumeMessageThread_1 Receive New Messages:
	Msg Id: C0A8006D5AB018B4AAC216E0DB690000
	Body: This is a delay message.

优点

分布式、高吞吐量、高性能、高可靠。

缺点

仅支持18个特定级别的延时,无法自定义延时时间。 

五、定制RocketMQ

上面所说的不支持自定义延时时间的RocketMQ是开源版的,但是在阿里云中商业版的RocketMQ是支持的,可能因为是业务需求不强或商业因素考虑,究竟什么原因不得而知。有条件的可以直接上阿里云;没有条件的可以修改开源版RocketMQ的源码,实现自己的需求。知己知彼,百战不殆,先看看RocketMQ开源版本中,支持18个时间级别是怎么实现的:

原理分析

以下分析的RocketMQ源码的版本号是4.7.1,版本不同源码略有差别。

CommitLog

CommitLog中,针对延迟消息做了一些处理:

// 延迟级别大于0,就是延时消息
if (msg.getDelayTimeLevel() > 0) {
    // 判断当前延迟级别,如果大于最大延迟级别,
    // 就设置当前延迟级别为最大延迟级别。
    if (msg.getDelayTimeLevel() > this.defaultMessageStore
            .getScheduleMessageService().getMaxDelayLevel()) {
        msg.setDelayTimeLevel(this.defaultMessageStore
                .getScheduleMessageService().getMaxDelayLevel());
    }

    // 获取延迟消息的主题,
    // 其中RMQ_SYS_SCHEDULE_TOPIC的值为SCHEDULE_TOPIC_XXXX
    topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
    // 根据延迟级别获取延迟消息的队列Id,
    // 队列Id其实就是延迟级别减1
    queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

    // 备份真正的主题和队列Id
    MessageAccessor.putProperty(msg
            , MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
    MessageAccessor.putProperty(msg
            , MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
    msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

    // 设置延时消息的主题和队列Id
    msg.setTopic(topic);
    msg.setQueueId(queueId);
}

可以看到,每一个延迟消息的主题都被暂时更改为SCHEDULE_TOPIC_XXXX,并且根据延迟级别延迟消息变更了新的队列Id。接下来,处理延迟消息的就是ScheduleMessageService

 

ScheduleMessageService

ScheduleMessageService是由DefaultMessageStore进行初始化的,初始化包括构造对象和调用load方法。最后,再执行ScheduleMessageService的start方法:

public void start() {
    // 使用AtomicBoolean确保start方法仅有效执行一次
    if (started.compareAndSet(false, true)) {
        this.timer = new Timer("ScheduleMessageTimerThread", true);
        // 遍历所有延迟级别
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            // key为延迟级别
            Integer level = entry.getKey();
            // value为延迟级别对应的毫秒数
            Long timeDelay = entry.getValue();
            // 根据延迟级别获得对应队列的偏移量
            Long offset = this.offsetTable.get(level);
            // 如果偏移量为null,则设置为0
            if (null == offset) {
                offset = 0L;
            }

            if (timeDelay != null) {
                // 为每个延迟级别创建定时任务,
                // 第一次启动任务延迟为FIRST_DELAY_TIME,也就是1秒
                this.timer.schedule(
                        new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }

        // 延迟10秒后每隔flushDelayOffsetInterval执行一次任务,
        // 其中,flushDelayOffsetInterval默认配置也为10秒
        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    // 持久化每个队列消费的偏移量
                    if (started.get()) ScheduleMessageService.this.persist();
                } catch (Throwable e) {
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 10000, this.defaultMessageStore
        	.getMessageStoreConfig().getFlushDelayOffsetInterval());
    }
}

遍历所有延迟级别,根据延迟级别获得对应队列的偏移量,如果偏移量不存在,则设置为0。然后为每个延迟级别创建定时任务,第一次启动任务延迟为1秒,第二次及以后的启动任务延迟才是延迟级别相应的延迟时间。

然后,又创建了一个定时任务,用于持久化每个队列消费的偏移量。持久化的频率由flushDelayOffsetInterval属性进行配置,默认为10秒。

 

定时任务

ScheduleMessageService的start方法执行之后,每个延迟级别都创建自己的定时任务,这里的定时任务的具体实现就在DeliverDelayedMessageTimerTask类之中,它核心代码是executeOnTimeup方法之中,我们来看一下主要部分:

// 根据主题和队列Id获取消息队列
ConsumeQueue cq =
        ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(
                TopicValidator.RMQ_SYS_SCHEDULE_TOPIC
                , delayLevel2QueueId(delayLevel));

如果没有获取到对应的消息队列,则在DELAY_FOR_A_WHILE(默认为100)毫秒后再执行任务。如果获取到了,就继续执行下面操作:

// 根据消费偏移量从消息队列中获取所有有效消息
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);

如果没有获取到有效消息,则在DELAY_FOR_A_WHILE(默认为100)毫秒后再执行任务。如果获取到了,就继续执行下面操作:

// 遍历所有消息
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
    // 获取消息的物理偏移量
    long offsetPy = bufferCQ.getByteBuffer().getLong();
    // 获取消息的物理长度
    int sizePy = bufferCQ.getByteBuffer().getInt();
    long tagsCode = bufferCQ.getByteBuffer().getLong();
    
    // 省略部分代码...

    long now = System.currentTimeMillis();
    // 计算消息应该被消费的时间
    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
	// 计算下一条消息的偏移量
    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE)

	long countdown = deliverTimestamp - now;
    // 省略部分代码...
}

如果当前消息不到消费的时间,则在countdown毫秒后再执行任务。如果到消费的时间,就继续执行下面操作:

// 根据消息的物理偏移量和大小获取消息
MessageExt msgExt =
    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
            offsetPy, sizePy);

如果获取到消息,则继续执行下面操作:

// 重新构建新的消息,包括:
// 1.清除消息的延迟级别
// 2.恢复真正的消息主题和队列Id
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
// 重新把消息发送到真正的消息队列上
PutMessageResult putMessageResult =
        ScheduleMessageService.this.writeMessageStore
                .putMessage(msgInner);

清除了消息的延迟级别,并且恢复了真正的消息主题和队列Id,重新把消息发送到真正的消息队列上以后,消费者就可以立即消费了。

由于篇幅限制,其中源码的细节不做过多展开,有兴趣的小伙伴可以去GitHub上下载源码仔细阅读。

 

定制化方案

经过以上对源码的分析,可以总结出延迟消息的实现步骤:

  1. 如果消息的延迟级别大于0,则表示该消息为延迟消息,修改该消息的主题为SCHEDULE_TOPIC_XXXX,队列Id为延迟级别减1。
  2. 消息进入SCHEDULE_TOPIC_XXXX的队列中。
  3. 定时任务根据上次拉取的偏移量不断从队列中取出所有消息。
  4. 根据消息的物理偏移量和大小再次获取消息。
  5. 根据消息属性重新创建消息,清除延迟级别,恢复原主题和队列Id。
  6. 重新发送消息到原主题的队列中,供消费者进行消费。

概括起来如下图:

延迟消息的五种实现方案_有序集合_02

在CommitLog中,我们可以根据自定义的延迟时间选择一个最大的延迟级别,比如:延迟15分钟消费的消息,那么最大的延迟级别就是10分钟。在ScheduleMessageService中,判断消息是否真的到了消费的时间,如果已到了消费的时间,则恢复原主题和队列Id;如果未到消费的时间,则选择最大延迟级别重新修改主题和队列ID。如下图:

延迟消息的五种实现方案_有序集合_03

消息主体以及元数据都存储在CommitLog中,队列中只存放了在CommitLog中的起始物理偏移量、消息大小和消息Tag的哈希值,虽然需要重新把消息放入队列中,但空间浪费还是比较有限的。

 

优点

分布式、高吞吐量、高性能、高可靠,支持自定义延时时间。

缺点

定制RocketMQ不易维护,无法升级新版本。

总结

从延迟消息的概念和应用场景出发,我们逐一讨论了五种不同的实现方案,分别是:

  • 使用Redis的Sorted Set结构。
  • 使用RabbitMQ的rabbitmq delayed message exchange插件。
  • 使用ActiveMQ的5.4及以上版本的延迟消息功能。
  • 使用RocketMQ仅支持特定级别的延迟消息。
  • 定制RocketMQ,以重新计算延迟级别的方式实现自定义延时。

以上每个方案都是各自的优点和缺点,所以说延迟消息没有一个放之四海而皆准的方案,需要根据数据规模和业务需求的实际情况才能确定最适合的方案。

标签:方案,队列,五种,消息,msg,message,级别,延迟
From: https://blog.51cto.com/u_14347868/6461639

相关文章

  • 园区智慧管理技术方案
    园区综合解决方案是以”一个共享数据平台为基础,四综合应用”为主线,面向园区内不同需求,提供统一化管理、差异服务的整体综合解决方案。创建安全、高效便捷的数字化智慧园区,提升内外部需求个体适用程度。                       ......
  • 从SOA和微服务到云原生解决方案实践
     今天重新整理和分享下我在今年华南CIO大会关于SOA,微服务和云原生解决方案的一个分享材料。在前面我分享过这个材料的一个老版本,今天分享大会演讲的黑的版本。大家也可以比下这两个PPT版本风格的差异,我从原来的白底版本修改为当前的黑底风格也差不多用了2天左右的时间才完成。主要......
  • 你不知道的 CSS 居中方案
    水平居中内联元素要使内联元素(如链接,span或img)居中,使用text-align:center足够了。<divclass="desk"><spanclass="plate"></span></div>.desk{text-align:center;} 对于多个内联元素,也可以使用text-align:center:<divclass="desk">......
  • 智慧园区3D可视化解决方案
                          ......
  • 智慧街道智慧小脑解决方案
                                ......
  • 数字乡村方案建设
    数字乡村方案建设主要内容1、建设背景及需求分析乡村振兴战略国家大数据战略2、乡村振兴顶层设计数字乡村建设蓝图乡村振兴建设蓝图3、三农大数据建设方案农业大数据顶层设计农业物联网平台农产品质量追溯平台电子商务平台休闲农业平台综合门户信息发布平台三资信息平台智慧党建服......
  • 基于三维地图的智慧园区可视化解决方案
    建设目标目标:建设以园区地图数据为基础,汇聚园区各类信息的可视化应用。通过园区空间信息实施、业务信息汇聚,为园区可视化管理提供数据支撑。同时基于信息的统一管理、横向关联与分析统计,建设园区可视化应用,逐步打造智慧化的园区管理模式,更能够为园区企业和客户提供智慧化的园区服务......
  • 智慧文旅云平台建设方案
                                ......
  • 智慧农业解决方案
      【资料名称】智慧农业_促进产业结构转型,突破传统业态【资料简介】全文从三个部分描述智慧农业解决方案:1、关于“智慧农业”的思考农业信息化背景、需求分析、建设目标2、“智慧农业”总体规划“智慧农业”建设内容、技术架构、关键技术3、“智慧农业”按需实施综合运营支撑平台......
  • ABP入门教程3 - 解决方案
    点这里进入ABP入门教程目录 创建项目点这里进入ABP启动模板 如图操作,我们先生成一个基于.NETCore的MPA(多页面应用).点击"Createmyproject!"即可创建项目. 解读项目展示层(JD.CRS.Web.Mvc)提供一个用户界面,实现用户交互操作。ASP.NETCoreMVC(模型-视图-控制器)可以视为展示层......