首页 > 其他分享 >ActiveMQ消息模式Queue和Topic机制讲解

ActiveMQ消息模式Queue和Topic机制讲解

时间:2024-10-22 11:48:23浏览次数:10  
标签:Queue 消费者 订阅 Topic 消息 ActiveMQ

Docker安装ActiveMQ镜像以及通过Java生产消费activemq示例_docker activemq-CSDN博客

背景

周末由于服务器异常宕机,导致业务系统重启后出现ActiveMQ中的数据没有被正常消费,运维认为是消息积压,便联系博主排查。

最终发现并不存在消息积压,是因为采用ActiveMQ Topic模式生产消费消息,服务器宕机期间没有消费者正常消费topic数据,导致数据丢失,最后通过上游补发消息修复。

ActiveMQ中的Queue(队列)和Topic(主题)是两种不同的消息传递模式,它们各自具有独特的特点和适用场景。

消息传递模式

    Queue(队列):

        队列是点对点(Point-to-Point)的消息传递模式。

        一条消息只能被一个消费者接收。

        如果当前没有消费者,消息会被存储在队列中,等待消费者消费。

        消费者可以随时从队列中读取消息,并且消息的消费顺序与发送顺序一致。

    Topic(主题):

        主题是发布/订阅(Publish/Subscribe)的消息传递模式。

        一条消息可以被多个订阅者接收。

        如果当前没有订阅者,消息会被丢弃(除非配置了持久订阅)。

        订阅者只能接收到订阅之后发布的消息(除非配置了持久订阅,允许消费未激活状态时发送的消息)。

消息存储与持久化

    Queue:

        消息默认会存储在ActiveMQ服务器上,直到被消费者消费。

        支持消息持久化,即使ActiveMQ服务器重启,已持久化的消息也不会丢失。

    Topic:

        默认情况下,Topic不保存消息,消息是无状态且不落地的。

        支持持久订阅,但即使配置了持久订阅,消息的存储也与Queue有所不同,它主要依赖于订阅者的持久化机制。

生产者与消费者关系

    Queue:

        生产者与消费者之间没有时间上的相关性。生产者发送消息时,无需考虑消费者是否在线或已订阅。

        消费者可以在之后的任意时间消费队列中的消息。

    Topic:

        生产者与消费者之间有一定的时间相关性。订阅者只能接收到订阅之后发布的消息。

        如果生产者发送消息时,没有订阅者在线,那么这些消息会被丢弃(除非配置了持久订阅)。

使用场景

    Queue:

        适用于需要确保消息被唯一消费者处理的场景,如订单处理、任务调度等。

        适用于消费者需要按顺序处理消息的场景。

    Topic:

        适用于需要将消息广播给多个消费者的场景,如实时通知、事件驱动架构等。

        适用于消费者需要实时接收并处理消息的场景。

代码测试

Queue 示例,这里是先启动的生产者,后启动的消费者

生产者日志:

消费者日志:

可以看到是可以从头开始消费的

Topic示例,同样先启动的生产者,后启动的消费者

生产者日志:

消费者日志

可以看到启动前的数据没有被消费到

以上就是ActiveMQ的生产消费机制的内容了,有问题的地方也欢迎大家一起交流沟通~

标签:Queue,消费者,订阅,Topic,消息,ActiveMQ
From: https://blog.csdn.net/u010479989/article/details/143145734

相关文章

  • 优先级队列(priority_queue)
     priority_queue简介   优先级队列的底层结构其实就是堆,就是我们在学习二叉树的时候学习的堆。它是一种逻辑结构,底层还是vector,我们把它想象成一个堆结构。    我们在学习堆的时候,知道了它的父亲和左右孩子的关系。它如果存在孩子,则一定存在这一种关系,leftchi......
  • RabbitMQ 通配符(Topic)模式示例
    总结自:BV15k4y1k7Ep模式说明Topic类型与Direct相比,都是可以根据Routingkey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routingkey的时候使用通配符!Topic类型的Routingkey一般都是由一个或多个单词组成,多个单词之间以.分隔,例如:item.insert通配符规......
  • flume传输数据报错“Space for commit to queue couldn‘t be acquired. Sinks are li
        最近在写一个数据量比较大的项目时候,需要使用flume将kafka中的数据传输到HDFS上进行存储,方便后续的数仓搭建,但是flume在传输数据中却报错如下日志org.apache.flume.ChannelFullException:Spaceforcommittoqueuecouldn'tbeacquired.Sinksarelikelynot......
  • 故障5:2022年1月1日所有邮件不能发送接收(Email Stuck in Exchange On-premises Transpo
    凯中故障5:2022年1月1日所有邮件不能发送接收(EmailStuckinExchangeOn-premisesTransportQueues)https://techcommunity.microsoft.com/t5/exchange-team-blog/email-stuck-in-exchange-on-premises-transport-queues/ba-p/3049447https://social.microsoft.com/Forums/zh-C......
  • C++——stack和queue
    1.简介栈和队列的定义和之前的容器有所差别2.简单地使用voidtest_stack1(){ stack<int>st; st.push(1); st.push(2); st.push(3); st.push(4); while(!st.empty()) { cout<<st.top()<<""; st.pop(); } cout<<endl;}voidtest_queu......
  • ROS通信方式之Topic话题与Message消息的关系与C++实现
    ros由于其分布式模块化的设计理念,会将一个完整任务分解成多个节点去实现,这些节点之间的协作通过topic话题和message消息.相关概念有节点(Nodes):节点是一个可执行文件,它可以通过ROS来与其他节点进行通信。消息(Messages):订阅或发布话题时所使用的ROS数据类型。话题(Topics):节点可以将......
  • C#使用线程安全队列ConcurrentQueue处理数据
    usingSystem;usingSystem.Collections.Concurrent;usingSystem.Collections.Generic;usingSystem.Globalization;usingSystem.Linq;usingSystem.Text;usingSystem.Threading;usingSystem.Threading.Tasks;namespaceConsoleApp10{internalclassProg......
  • 【C++】priority_queue的介绍和模拟实现
    【C++】priority_queue的介绍和模拟实现一.priority_queue的介绍1.priority_queue的基本介绍2.priority_queue的使用介绍二.priority_queue的模拟实现一.priority_queue的介绍1.priority_queue的基本介绍优先队列是一种容器适配器,根据严格的弱排序标准,它的......
  • 优先级队列 ( PriorityQueue )
    文章目录前言一、优先级队列1.1、概念二、优先级队列的模拟实现2.1、堆的概念2.2、堆的存储方式 2.3、堆的创建2.4、堆的插入与删除三、常用接口介绍3.1、Top-k问题3.2、使用PriorityQueue创建大小堆,解决TOPK问题前言        前几篇我们讲解过队列,其是一种先......
  • STL-queue&deque&stack
    STLqueue&deque&stackqueue主要包括循环队列queue和优先队列priority_queue两个容器stack包含栈容器include头文件声明#include<queue>#include<deque>#include<stack>声明queue<int>q;deque<int>p;structabc{…};queue<abc>q; //结构体r......