首页 > 其他分享 >RabbitMQ、RocketMQ、Kafka延迟队列实现

RabbitMQ、RocketMQ、Kafka延迟队列实现

时间:2022-12-22 14:47:14浏览次数:43  
标签:队列 RabbitMQ Kafka topic 延迟时间 消息 RocketMQ 延迟

延迟队列在实际项目中有非常多的应用场景,最常见的比如订单未支付,超时取消订单,在创建订单的时候发送一条延迟消息,达到延迟时间之后消费者收到消息,如果订单没有支付的话,那么就取消订单。

那么,今天我们需要来谈的问题就是RabbitMQ、RocketMQ、Kafka中分别是怎么实现延时队列的,以及他们对应的实现原理是什么?

RabbitMQ

RabbitMQ本身并不存在延迟队列的概念,在 RabbitMQ 中是通过 DLX 死信交换机和 TTL 消息过期来实现延迟队列的。

TTL(Time to Live)过期时间

有两种方式可以设置 TTL。

  1. 通过队列属性设置,这样的话队列中的所有消息都会拥有相同的过期时间
  2. 对消息单独设置过期时间,这样每条消息的过期时间都可以不同

那么如果同时设置呢?这样将会以两个时间中较小的值为准。

针对队列的方式通过参数x-message-ttl来设置。

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

针对消息的方式通过setExpiration来设置。

AMQP.BasicProperties properties = new AMQP.BasicProperties();
Properties.setDeliveryMode(2);
properties.setExpiration("60000");
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "message".getBytes());

DLX(Dead Letter Exchange)死信交换机

一个消息要成为死信消息有 3 种情况:

  1. 消息被拒绝,比如调用reject方法,并且需要设置requeuefalse
  2. 消息过期
  3. 队列达到最大长度

可以通过参数dead-letter-exchange设置死信交换机,也可以通过参数dead-letter- exchange指定 RoutingKey(未指定则使用原队列的 RoutingKey)。

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "exchange.dlx");
args.put("x-dead-letter-routing-key", "routingkey");
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

原理

当我们对消息设置了 TTL 和 DLX 之后,当消息正常发送,通过 Exchange 到达 Queue 之后,由于设置了 TTL 过期时间,并且消息没有被消费(订阅的是死信队列),达到过期时间之后,消息就转移到与之绑定的 DLX 死信队列之中。

这样的话,就相当于通过 DLX 和 TTL 间接实现了延迟消息的功能,实际使用中我们可以根据不同的延迟级别绑定设置不同延迟时间的队列来达到实现不同延迟时间的效果。

RocketMQ

RocketMQ 和 RabbitMQ 不同,它本身就有延迟队列的功能,但是开源版本只能支持固定延迟时间的消息,不支持任意时间精度的消息(这个好像只有阿里云版本的可以)。

他的默认时间间隔分为 18 个级别,基本上也能满足大部分场景的需要了。

默认延迟级别:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h。

使用起来也非常的简单,直接通过setDelayTimeLevel设置延迟级别即可。

setDelayTimeLevel(level)

原理

实现原理说起来比较简单,Broker 会根据不同的延迟级别创建出多个不同级别的队列,当我们发送延迟消息的时候,根据不同的延迟级别发送到不同的队列中,同时在 Broker 内部通过一个定时器去轮询这些队列(RocketMQ 会为每个延迟级别分别创建一个定时任务),如果消息达到发送时间,那么就直接把消息发送到指 topic 队列中。

RocketMQ 这种实现方式是放在服务端去做的,同时有个好处就是相同延迟时间的消息是可以保证有序性的。

谈到这里就顺便提一下关于消息消费重试的原理,这个本质上来说其实是一样的,对于消费失败需要重试的消息实际上都会被丢到延迟队列的 topic 里,到期后再转发到真正的 topic 中。

Kafka

对于 Kafka 来说,原生并不支持延迟队列的功能,需要我们手动去实现,这里我根据 RocketMQ 的设计提供一个实现思路。

这个设计,我们也不支持任意时间精度的延迟消息,只支持固定级别的延迟,因为对于大部分延迟消息的场景来说足够使用了。

只创建一个 topic,但是针对该 topic 创建 18 个 partition,每个 partition 对应不同的延迟级别,这样做和 RocketMQ 一样有个好处就是能达到相同延迟时间的消息达到有序性。

原理

  • 首先创建一个单独针对延迟队列的 topic,同时创建 18 个 partition 针对不同的延迟级别

  • 发送消息的时候根据延迟参数发送到延迟 topic 对应的 partition,对应的key为延迟时间,同时把原 topic 保存到 header 中

ProducerRecord<Object, Object> producerRecord = new ProducerRecord<>("delay_topic", delayPartition, delayTime, data);
producerRecord.headers().add("origin_topic", topic.getBytes(StandardCharsets.UTF_8));
  • 内嵌的consumer单独设置一个ConsumerGroup去消费延迟 topic 消息,消费到消息之后如果没有达到延迟时间那么就进行pause,然后seek到当前ConsumerRecordoffset位置,同时使用定时器去轮询延迟的TopicPartition,达到延迟时间之后进行resume

  • 如果达到了延迟时间,那么就获取到header中的真实 topic ,直接转发

这里为什么要进行pauseresume呢?因为如果不这样的话,如果超时未消费达到max.poll.interval.ms 最大时间(默认300s),那么将会触发 Rebalance。

标签:队列,RabbitMQ,Kafka,topic,延迟时间,消息,RocketMQ,延迟
From: https://www.cnblogs.com/ilovejaney/p/16998654.html

相关文章

  • RocketMq学习记录
    RocketMQ的部署模型   在RocketMq中有四个部分组成,分别是Producer,Consumer,Broker,以及NameServer。生产者Producer发布消息的角色。Producer通过MQ的负载均衡模......
  • 设置发布和订阅消息的 RabbitMQ AMQP 服务器
    本指南将引导您完成设置发布和订阅消息的RabbitMQAMQP服务器的过程,并创建一个Spring引导应用程序以与该RabbitMQ服务器进行交互。您将构建什么您将构建一个应用程序,......
  • RocketMQ消费者没有成功消费消息的问题排查
    背景今天下游同事反馈,有一些以取消的订单库存还原异常了,导致部分商品库存没有还原。查日志发现没有收到还原消息,但是查看发送方是可以确认消息是已经发了的,那么是什么原因......
  • 微服务异步通讯——RabbitMQ消息队列复习笔记
    服务异步通讯——RabbitMQ复习随笔微服务间通讯有同步和异步两种方式:同步通讯:就像打电话,需要实时响应。异步通讯:就像发邮件,不需要马上回复。两种方式各有优劣,打电话......
  • rocketmq搭建和测试
    准备工作搭建rocketmq:1.命令启动mqbroker:mqbroker.cmd-nlocalhost:9876autoCreateTopicEnable=true(保证topic自动创建)2.启动mqnamesrv.cmd生产者-客户端初始化:packa......
  • kafka集群安装教程
    kafka安装(我是用docker搭建了三个centos装的)先安装zookeeperzookeeper安装1.三台机器分别安装jdk(jdk1.8.0_351)cd/opt/softwaretar-zxvfjdk-8u351-linux-x6......
  • .NET Core如何通过认证机制访问Kafka?
    大家好,我是Edison。最近有一个ASP.NETCore使用认证机制访问Kafka的需求,加之我们又使用了CAP这个开源项目使用的Kafka,于是网上寻找了一番发现对应资料太少,于是调查了一番,......
  • kafka集群配置文件修改
    broker.id=2listeners=PLAINTEXT://192.168.49.222:9092#每个节点的host不同,默认是注释的log.dirs=/opt/module/kafka/datas#自定义路径zookeeper.connect=192.168......
  • AliMQ(RocketMQ)源码(一)创建producer
    公司现在在使用阿里云的AliMQ也就是RocketMQ,趁着工作之余,将RocketMQ的使用心得分析一下,关于RocketMQ的Producer、Consumer、Broker、NameServer等架构问题,在此处先不做分析......
  • AliMQ(RocketMQ)源码(二)producer.start()
    在创建完成Producer后,就进入了Producer的start()方法。start()方法DefaultMQProducerImpl的start()方法。this.serviceState=ServiceState.START_FAILED;......