首页 > 其他分享 >消息队列正确使用手册

消息队列正确使用手册

时间:2022-08-24 17:39:10浏览次数:63  
标签:消费 正确 队列 版本号 version 使用手册 消息 ID

1. 如何保证消息不丢失?

消息确认机制

2. 如何保证消息只被消费一次?

为了保证消息丢失,需要付出两方面的代价:一方面是性能的损耗,一方面可能造成消息重复消费。为了保证消息只被消费一次,我们需要保证消费多条消息时所得到的结果就是相同的,即幂等的。消息在生产和消费的过程中都可能会产生重复,所以你要做的是在生产过程和消费过程中增加消息幂等性的保证。

  1. 生产者保证消息唯一性

    给每一个生产者和消息赋予唯一的ID,消息存储时以<生产者ID,最后一条消息ID>存储,当某一个生产者产生新的消息时,消息队列服务端会比对存储的最后一条,如果一致就认为是重复的消息,服务端会自动丢弃。

  2. 消费者保证消费的唯一性

    • 设置全局唯一的消息ID。在消息被生产的时候给它生成一个全局唯一的消息ID,消息被处理之后把这个ID存储在数据库/缓存中,在处理下一条消息之前先从数据库里面检查这个全局ID是否被消费过,如果被消费过就放弃消费。

      问题:如果消息在处理之后,还没有来得及写入数据库,消费者宕机了重启之后发现数据库中并没有这条消息,还是会重复执行两次消费逻辑。

      解决:这时你就需要引入事务机制,保证消息处理和写入数据库必须同时成功或者同时失败。

      适用:全局唯一的消息ID+事务的范式使得消息处理的成本就更高了,所以如果对于消息重复没有特别严格的要求,可以直接使用全局唯一的消息ID,而不考虑引入事务。

    • 适用乐观锁。给数据中增加一个版本号的字段,在生产消息时先查询版本号,并且将版本号连同消息一起发送给消息队列。消费端在拿到消息和版本号后,在执行更新账户金额SQL的时候带上版本号

      update user set amount = amount + 20, version=version+1 where userId=1 and version=1;
      

      SQL可以执行成功,version+1;在执行第二条相同的消息时,由于version值不再是1,所以这条SQL不能执行成功,也就保证了消息的幂等性。

3. 减少消息延迟

消息队列已经堆积了大量的消息,消费者由于性能问题不能及时消费消息,就容易造成消息堆积。

如何增加增加消费者的消息处理能力?

  1. 消息队列: 为Topic(话题)配置多个Partition(分区),过增加分区来提高消费者的处理能力。

  1. 消费者:

    • 优化消费代码提升性能;
    • 增加消费者的数量。消费者使用线程池,在接收到消息之后把消息丢到线程池中来异步地处理,这样,原本串行的消费消息的流程就变成了并行的消费,可以提高消息消费的吞吐量

标签:消费,正确,队列,版本号,version,使用手册,消息,ID
From: https://www.cnblogs.com/greengages/p/16620952.html

相关文章

  • SpringBoot使用RabbitMq实现队列和延时队列
    闲来无事看了看RabbitMq的队列,总结了一些队列的实现方法,当然,免不了各种看别人的博客哈哈哈其中延时队列有两种方式,一种是使用TTl+死信队列实现,一种是直接用RabbitMq的官方......
  • 消息队列 kafka
    Kafka概念解释topic:队列producer:生产者,指发送消息端consumer:消费者,消息消费端consumergroup:消费者组,消息会发给每个消费者组,每个消费者对应一个消费者组border:ka......
  • 正确实例(采用readonly)
    父组件:<template><div>父组件:{{data.name}}-{{data.age}}-{{data.hight}}<hr><Hello:data="data":readonlyData="readonlyData"@......
  • Java工具篇之Disruptor高性能队列
    简介: disruptor适用于多个线程之间的消息队列,`作用与ArrayBlockingQueue有相似之处`,但是disruptor从功能、性能都远好于ArrayBlockingQueue,当多个线程之间传递大量数据或......
  • python 二次封装logging,打印日志文件名正确,且正确写入/结合pytest执行,日志不输出的问
    基于之前日志问题,二次封装日志后,导致日志输出的文件名不对,取到的文件一直都是当前二次封装的log的文件名,基于这个问题,做了优化,详细看https://www.cnblogs.com/cuitang/p/1......
  • PerfView专题 (第十篇):洞察 C# 终结队列引发的内存泄漏
    一:背景C#程序内存泄漏的诱发因素有很多,但从顶层原理上来说,就是该销毁的用户根对象没有被销毁,从而导致内存中意料之外的对象无限堆积,导致内存暴涨,最终崩溃,这其中的一个......
  • 栈(Stack)和队列(Queue)
    一 栈(Stack):一种特殊的线性表,其只允许在固定的一端进行插入和删除元素操作。进行数据插入和删除操作的一端称为栈顶,另一端称为栈底。栈中的数据元素遵守先进后出。  ......
  • 2022-8-23 剑指offer-优先队列(堆)-每日一题-太难不写了
    剑指OfferII061.和最小的k个数对难度中等44收藏分享切换为英文接收动态反馈给定两个以升序排列的整数数组 nums1 和 nums2 , 以及一个整数 k 。定义......
  • 队列结构
     什么是队列?队列(Queue),它是一种运算受限的线性表,先进先出(FIFOFirstInFirstOut)队列是一种受限的线性结构受限之处在于它只允许在表的前端(front)进行删除操......
  • 622. 设计循环队列
    622.设计循环队列设计你的循环队列实现。循环队列是一种线性数据结构,其操作表现基于FIFO(先进先出)原则并且队尾被连接在队首之后以形成一个循环。它也被称为“环形缓......