首页 > 其他分享 >Kafka 延时队列,重试队列,死信队列

Kafka 延时队列,重试队列,死信队列

时间:2022-11-24 15:57:48浏览次数:71  
标签:delay 队列 主题 重试 死信 消息 延时

延时队列

在发送延时消息的时候并不是先投递到要发送的真实主题(real_topic)中,而是先投递到一些 Kafka 内部的主题(delay_topic)中,这些内部主题对用户不可见,

然后通过一个自定义的服务拉取这些内部主题中的消息,并将满足条件的消息再投递到要发送的真实的主题中,消费者所订阅的还是真实的主题。

如果采用这种方案,那么一般是按照不同的延时等级来划分的,比如设定5s、10s、30s、1min、2min、5min、10min、20min、30min、45min、1hour、2hour这些按延时时间递增的延时等级,延时的消息按照延时时间投递到不同等级的主题中,投递到同一主题中的消息的延时时间会被强转为与此主题延时等级一致的延时时间,这样延时误差控制在两个延时等级的时间差范围之内(比如延时时间为17s的消息投递到30s的延时主题中,之后按照延时时间为30s进行计算,延时误差为13s)。虽然有一定的延时误差,但是误差可控,并且这样只需增加少许的主题就能实现延时队列的功能。

img

发送到内部主题(delay*topic**)中的消息会被一个独立的 DelayService 进程消费,这个 DelayService 进程和 Kafka broker 进程以一对一的配比进行同机部署(参考下图),以保证服务的可用性。

img

针对不同延时级别的主题,在 DelayService 的内部都会有单独的线程来进行消息的拉取,以及单独的 DelayQueue(这里用的是 JUC 中 DelayQueue)进行消息的暂存。与此同时,在 DelayService 内部还会有专门的消息发送线程来获取 DelayQueue 的消息并转发到真实的主题中。从消费、暂存再到转发,线程之间都是一一对应的关系。如下图所示,DelayService 的设计应当尽量保持简单,避免锁机制产生的隐患。

img

为了保障内部 DelayQueue 不会因为未处理的消息过多而导致内存的占用过大,DelayService 会对主题中的每个分区进行计数,当达到一定的阈值之后,就会暂停拉取该分区中的消息。

因为一个主题中一般不止一个分区,分区之间的消息并不会按照投递时间进行排序,DelayQueue的作用是将消息按照再次投递时间进行有序排序,这样下游的消息发送线程就能够按照先后顺序获取最先满足投递条件的消息。

实现delay service 里需要注意:

  1. 因为consumer是单线程的,所以理论上有多少个延迟主题,就需要创建多少个线程。像上面topic-delay-1s,topic-delay-5s,topic-delay-5m,topic-delay-30m 这4种主题,就需要4个线程来处理。延迟主题少问题不大,但延迟主题如果比较多的话,还是比较难受的。
  2. 需要手动提交偏移量,因为delay service 可能会因为升级或者故障,导致重启,这个时候消息是不能漏的,所以一定要消息已经转发到业务主题后,再提交偏移量,防止漏消息。
  3. 重复消息的处理,因为delay service 需要先把消息发送到业务主题,再提交偏移量,就有可能出现 消息发送到业务主题后,还没来的及发送偏移量,delay service 就因电力故障无法正常服务了,下次重启后,就可能继续发送已经发送过的消息。所以业务上建议是要做到幂等,以实现容错。

重试队列 与 死信队列

我们在 延时队列的基础上实现重试队列就比较简单了,当某个消息处理失败时,就把这个消息发送到 延时队列。等时间到了就会重试处理。如果处理成功,则结束。如果处理失败则重试次数加1,再一次进入延时队列。而如果超过了重试次数,则写入死信队列,作为记录。

这里说的重试队列,死信队列都是概念上的东西,kafka本身并不提供。我们是可以在使用层实现这一类概念。下面的时序图,是一个示例。我们假设有一个订单支付成功了,有积分逻辑需要处理,但重试三次后依然失败了。时序过程描述如下:

image.png

参考:

kafka延迟队列、重试队列、死信队列

kafka进阶:延时队列

标签:delay,队列,主题,重试,死信,消息,延时
From: https://www.cnblogs.com/hongdada/p/16922111.html

相关文章

  • Java阻塞队列中的异类,SynchronousQueue底层实现原理剖析
    上篇文章谈到BlockingQueue的使用场景,并重点分析了ArrayBlockingQueue的实现原理,了解到ArrayBlockingQueue底层是基于数组实现的阻塞队列。但是BlockingQueue的实现类中,有......
  • 嵌入式操作系统内核原理和开发(消息队列)
         消息队列是线程交互的一种方法,任务可以通过消息队列来实现数据的沟通和交换。在嵌入式系统上,这可以说这是用的最多的一种方法。通过消息队列,无论是发送者,还是接......
  • 随想录(为什么循环队列具有先天的并行性)
       循环队列是很多人喜欢用的一种数据结构。本着先来先服务的特性,循环队列是一种十分简单、健壮的数据结构。不像链表、二叉树,如果使用不慎,就会造成很大的麻烦,但是在循......
  • golang算法—— 使用两个栈实现一个队列
    前言阅读本文,假定已经了解了基本数据结构概念。队列:先入先出。栈:先进后出。分析使用两个栈串联,可以实现先进先出。但是,得注意以下两点:队列在入列时,stack2必须为空,stac......
  • 消息队列中间件nsq安装与使用
    安装与运行nsq的镜像开启容器时并不是默认开启三个服务的,而是需要手动开启。dockerpullnsqio/nsqdockerrun-itd--restart=on-failure:20-p4150:4150-p4151:4151-p......
  • freertos消息队列的值传递和指针传递
    消息队列的使用方法总结:1、消息队列初始化(定义一个消息队列的结构体),一般在main.c中完成。2、消息队列的发送:  aextern消息队列   b定义一个结构体的指针指向消......
  • 4.队列、栈、链表
    目录一、队列1.什么是队列2.抽象数据类型Queue3.python实现ADTQueue4.举例热土豆问题(约瑟夫问题)5.举例:打印队列二、双端队列1.什么是双端队列?2.抽象数据类型Deque3.pytho......
  • Java双向链表实现队列
    将双向链表做简单的改造,即可实现一个FIFO(FirstInputFirstOut)队列,该队列只在头节点出队,尾节点入队。一般来说定义节点类只需一个后驱节点next即可。这里保留pre节......
  • 【Azure 服务总线】Azure.Messaging.ServiceBus 多次发送消息报超时错误,是否可以配置
    问题描述使用AzureServiceBus,提供应用程序之间松耦合的消息交换,但是有时候发送消息多次出现超时错误。Aconnectionattemptfailedbecausetheconnectedpartydidno......
  • 优先队列(std_priority_queue)
    title:优先队列(std::priority_queue)date:2022-11-1715:50:12tags:算法本文章遵守知识共享协议CC-BY-NC-SA,转载时需要署名,推荐在我的个人博客阅读。优先队列是......