首页 > 其他分享 >RabbitMQ 延时队列

RabbitMQ 延时队列

时间:2023-06-08 23:31:47浏览次数:41  
标签:exchange 队列 RabbitMQ 死信 消息 延时 TTL order

分布式事务-最终一致性库存解锁逻辑

一、Seata的不足

Seata的AT模式是二阶段提交协议(2PC),第一阶段将本地事务直接提交,第二阶段想要回滚的时候,是通过回滚日志(日志表)做的反向补偿,数据库原来是多少又改了回来。

Seata应用场景:后台管理系统,比如添加商品,优惠、库存、积分、会员要成功都成功,要失败都失败,对于并发性能不高的可以使用Seata来处理分布式事务。

如果并发性能要求很高的,比如下单,则需要使用最终一致性,RMQ发消息,保证消息的可靠性(发送端和接收端确认),不能达到强一致性,但能达到软柔性事务的最终一致性。

下单属于高并发场景,为了保证高并发,不推荐使用seata,Seata用了很多锁机制,因为是加锁,相当于把并发变为串行了,如果多个订单下来,就得进行排队,等待上一个人处理完了,释放了锁,才能继续下单,这样系统可能就没法用了,提升不了效率,可以发消息给库存服务。

二、高并发场景

在高并发场景下,不考虑 2PC 和 TCC 模式(这两种属于刚性事务),可以使用 最大努力通知型方案和可靠消息+最终一致性方案,这两种是通过消息来实现的,并且都是柔性事务。

为了保证高并发,库存服务自己回滚,可以发消息给库存服务。库存服务本身也可以使用自动解锁模式,要参与消息队列。 image.png 在锁库存的时候,需要给数据库增加记录,锁的数量和SKU,以及仓库,如果锁失败,库存锁表里边没有这条记录,可以使用定时任务来处理,不过使用定时任务来处理,是非常麻烦的事情,可以使用延时队列来处理,延时队列做一个定时工作。 image.png

一、RabbitMQ延时队列

RabbitMQ延时队列实现定时任务。

场景: 比如未付款订单,超过一定时间后,系统自动取消订单并释放占有的库存。

常用解决方案: spring的schedule定时任务轮训数据库

缺点: 消耗系统内存,增加了数据库的压力、存在较大的时间误差

解决: Rabbit的消息 TTL 和私信Exchange结合。

消息的TTL

消息的TTL(Time To Live)就是消息的存活时间,单位是毫秒。。 RabbitMQ 可以对队列和消息分别设置TTL。

  • 对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就是死了,称之为死信。
  • 如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration 字段或者 x-message-ttl 属性来设置时间,两者是一样的效果。

注意:延时消息放入到队列中,没有被任何消费者监听,如果监听就拿到了,也就被消费了,队列里边的消息只要一过设置的过期时间,就成了死信队列,服务器就会丢弃。

那么,如何设置这个TTL值呢?有两种方式,第一种是在创建队列的时候设置队列的“x-message-ttl”属性,如下:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

这样所有被投递到该队列的消息都最多不会存活超过6s。

另一种方式便是针对每条消息设置TTL,代码如下:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());

这样这条消息的过期时间也被设置成了6s。

但这两种方式是有区别的,如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃,而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间。

另外,还需要注意的一点是,如果不设置TTL,表示消息永远不会过期,如果将TTL设置为0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

死信

死信:Dead Letter Exchange(DLX) 一个消息在满足如下条件,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。

一个消息被Consumer拒收了,并且reject方法的参数里 requeue 是false。也就是说不会被放在队列里,被其他消费者使用。(basic.reject/basic.nack) requeue=false 上面的消息的TTL到了,消息过期了。 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由。 Dead Letter Exchange 其实就是一种普通的 exchange,和创建其他exchange一样。只是在某一个设置Dead Letter Exchange 的队列中有信息过期了,会自动触发消息的转发,发送到 Dead Letter Exhange中去。

我们既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息被路由到某一个指定的交换机,结合二者,其实就可以实现一个延时队列。 image.png

延时关单

场景:用户下单,过了30分钟没有支付,系统会默认关闭该订单,以前可以用定时任务做,现在使用延时队列。 image.png

规范设计#

设计建议规范(基于事件模型的交换机设计): 1、交换机命名:业务+exchange;交换机为Topic 2、路由键:事件.需要感知的业务(可以不写) 3、队列命名:事件+想要监听服务名+queue 4、绑定关系:事件.感知的业务(#)

整体业务设计: image.png 按照上边的规范设计,对关单业务进行升级设计: image.png 上图说明:交换机 order-event-exchange 绑定了一个延时队列order.delay.queue,路由key是 order.create.order, 当创建了一个订单时,会发消息到该延时队列,等到TTL过期,变为死信,会自动触发消息的转发,发送到 Dead Letter Exhange(order-event-exchange) 中去,注意死信路由是 order.release.order,然后exchange根据路由key order.release.order转发消息到 order.release.order.queue队列,客户端监听该队列获取消息。

根据上图的业务设计分析,需要创建两个队列,一个交换机,和两个绑定。

  • MyMQConfig.java
@Configuration
public class MyMQConfig {

  /* 容器中的Queue、Exchange、Binding 会自动创建(在RabbitMQ)不存在的情况下 */

  /**
   * 客户端监听队列(测试)
   * @param orderEntity
   * @param channel
   * @param message
   * @throws IOException
   */
  @RabbitListener(queues = "order.release.order.queue")
  public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {

    System.out.println("收到过期的订单信息:准备关闭订单" + orderEntity.getOrderSn());
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

  }

  /**
   * 死信队列
   *
   * @return
   */
  @Bean
  public Queue orderDelayQueue(){

     /*
            Queue(String name,  队列名字
            boolean durable,  是否持久化
            boolean exclusive,  是否排他
            boolean autoDelete, 是否自动删除
            Map<String, Object> arguments) 属性
         */
    HashMap<String, Object> arguments = new HashMap<>();
    arguments.put("x-dead-letter-exchange", "order-event-exchange");
    arguments.put("x-dead-letter-routing-key", "order.release.order");
    arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟

    Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
    return queue;
  }

  /**
   * 普通队列
   *
   * @return
   */
  @Bean
  public Queue orderReleaseQueue(){

    Queue queue = new Queue("order.release.order.queue", true, false, false);
    return queue;
  }

  /**
   * TopicExchange
   *
   * @return
   */
  @Bean
  public Exchange orderEventExchange(){
    /*
     *   String name,
     *   boolean durable,
     *   boolean autoDelete,
     *   Map<String, Object> arguments
     * */

    return new TopicExchange("order-event-exchange", true, false);
  }

  @Bean
  public Binding orderCreateBinding() {
    /*
     * String destination, 目的地(队列名或者交换机名字)
     * DestinationType destinationType, 目的地类型(Queue、Exhcange)
     * String exchange,
     * String routingKey,
     * Map<String, Object> arguments
     * */
    return new Binding("order.delay.queue",
        Binding.DestinationType.QUEUE,
        "order-event-exchange",
        "order.create.order",  // 路由key一般为事件名
        null);
  }

  @Bean
  public Binding orderReleaseBinding() {

    return new Binding("order.release.order.queue",
        Binding.DestinationType.QUEUE,
        "order-event-exchange",
        "order.release.order",
        null);
  }

}
  • HelloController.java
@Controller
public class HelloController {

  @Autowired
  private RabbitTemplate rabbitTemplate;

  @ResponseBody
  @GetMapping(value = "/test/createOrder")
  public String createOrderTest() {

    //订单下单成功
    OrderEntity orderEntity = new OrderEntity();
    orderEntity.setOrderSn(UUID.randomUUID().toString());
    orderEntity.setModifyTime(new Date());

    //给MQ发送消息
    rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",orderEntity);

    return "ok";
  }
}

交换机: image.png 交换机绑定的队列(路由key): image.png 队列: image.png 可以看到第一个队列是死信队列,第二个事普通队列 收到的消息为实体对象json: image.png 控制器输出的监控信息: 收到过期的订单信息:准备关闭订单321c3329-d57a-4613-a4ff-334066d4105a 收到过期的订单信息:准备关闭订单44fcf65f-1e7a-40c6-8336-a6260362920b

标签:exchange,队列,RabbitMQ,死信,消息,延时,TTL,order
From: https://blog.51cto.com/u_15993308/6444314

相关文章

  • 数据结构之栈与队列
         栈和队列是两种重要的数据结构。从栈与队列的逻辑结构上来说,它们也是线性结构,与线性表不同的是它们所支持的基本操作是受到限制的,它们是操作受限的线性表,是一种限定性的数据结构。     栈(stack)又称堆栈,它是运算受限的线性表,其限制是仅允许在表的一端进行插入和......
  • .Net全网最简RabbitMQ操作【强烈推荐】
    【前言】本文自1年前的1.0版本推出以来,已被业界大量科技公司采用。同时也得到了.Net圈内多位大佬的关注+推荐,文章也被多家顶级.Net/C#公众号转载。现在更新到了7.0版本,更好的服务各位.Neter。 【正文】支持.Net/.NetCore/.NetFramework,可以部署在Docker,Windows,Linux,......
  • Java语言实现生产者与消费者的消息队列模型(附源码)
    Java构建生产者与消费者之间的生产关系模型,可以理解生产者生产message,存放缓存的容器,同时消费者进行消费需求的消耗,如果做一个合理的比喻的话:生产者消费者问题是线程模型中的经典问题。解决生产者/消费者问题有两种方法:一是采用某种机制保护生产者和消费者之间的同步;二是在生产者和......
  • 消息队列
    消息队列解耦、异步、削峰应用耦合:多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败;异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间;限流削峰:广泛应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉......
  • Redis系列15:使用Stream实现消息队列(精讲)
    Redis系列1:深刻理解高性能Redis的本质Redis系列2:数据持久化提高可用性Redis系列3:高可用之主从架构Redis系列4:高可用之Sentinel(哨兵模式)Redis系列5:深入分析Cluster集群模式追求性能极致:Redis6.0的多线程模型追求性能极致:客户端缓存带来的革命Redis系列8:Bitmap实现亿万级......
  • 栈&队列:剑指 Offer 09. 用两个栈实现队列
    题目描述:用两个栈实现一个队列。队列的声明如下,请实现它的两个函数appendTail和deleteHead,分别完成在队列尾部插入整数和在队列头部删除整数的功能。(若队列中没有元素,deleteHead 操作返回-1)   classCQueue{LinkedList<Integer>A,B;publicCQu......
  • JS 模拟 队列 结构
    Code:/***队列(基于动态数组)*@class*/varAQueue=(function(){/***栈容器*@type{DArray}*/letarr;/***@class*/class_AQueue{/***构造器*@constructor*@param{number}[capacity]*/con......
  • 【python】一个同步的队列类queue
    queuequeue 模块实现了多生产者、多消费者队列。这特别适用于消息必须安全地在多线程间交换的线程编程。模块中的 Queue 类实现了所有所需的锁定语义。 函数作用Queue.qsize()返回队列的大致大小。注意,qsize()>0不保证后续的get()不被阻塞,qsize()<maxsize......
  • 数据结构与算法-队列
    队列FIFO先进先出队列的实现classQueue(object):def__init__(self):self.__list=[]defenqueue(self,item):self.__list.append(item)defdequeue(self):returnself.__list.pop(0)defis_empty():returnse......
  • 队列
    1、定义:先进先出的线性表,就像排队,它只允许在队列一端插入元素,在另一端删除元素(插入一端队尾,删除一端队头)2、典型例子:作业排队3、基本功能1、宏定义结构体定义#include<stdio.h>#include<stdlib.h>#defineERROR0;#defineOK1;typedefstructNode{intdata......