首页 > 其他分享 >MQ消息队列

MQ消息队列

时间:2023-07-10 17:57:27浏览次数:25  
标签:Topic 队列 Broker Queue MQ 消息 Consumer

1、消息队列应用场景

消息队列,指保存消息的一个容器,本质是个队列。

  • 异步处理,主要目的是减少请求响应时间;
  • 应用解耦,使用消息队列后,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系;
  • 流量削峰,秒杀活动中,系统峰值流量往往集中于一小段时间,消息队列作为缓冲,可以削弱峰值流量;
  • 日志处理,解决大量日志创数问题;

2、MQ整体架构

 

消息生产者Producer:负责产生和发送消息到Broker;
消息消费者Consumer:负责从Broker中获取消息,并进行相应处理;
消息处理中心Broker:负责消息存储、确认、重试等,一般包含多个queue;

3、设计Broker主要考虑

1)消息的转储:在更合适的时间点投递,或者通过一系列手段辅助消息最终能送达消费机。

2)规范一种范式和通用的模式,以满足解耦、最终一致性、错峰等需求。

3)其实简单理解就是一个消息转发器,把一次RPC做成两次RPC。发送者把消息投递到broker,broker再将消息转发一手到接收端。

总结起来就是两次RPC加一次转储,如果要做消费确认,则是三次RPC。

4、两种模型

1)点对点模型

包含三个角色:

  • 消息队列(Queue)
  • 发送者(Sender)
  • 接收者(Receiver)

每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,可以放在 内存 中,也可以 持久化,直到他们被消费或超时。

特点:

  • 每个消息只有一个消费者(即一旦被消费,消息就不再在消息队列中);
  • 发送者和接收者之间在时间上没有依赖性;
  • 接收者在成功接收消息之后需向队列应答成功;

2)发布者订阅消息模型

模型包含三个角色:

  • 主题(Topic)
  • 发布者(Publisher)
  • 订阅者(Subscriber)

多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。

特点:

  • 每个消息可以有多个消费者:和点对点方式不同,发布消息可以被所有订阅者消费;
  • 发布者和订阅者之间有时间上的依赖性;
  • 针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息;
  • 为了消费消息,订阅者必须保持运行的状态;

5、RocketMQ原理

1)基础概念

  • Producer: 消息生产者,负责产生消息,一般由业务系统负责产生消息;
  • Producer Group:消息生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者;
  • Consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费;
  • Consumer Group:消费者组,和生产者类似,消费同一类消息的多个 Consumer 实例组成一个消费者组;
  • Topic:主题,用于将消息按主题做划分,Producer将消息发往指定的Topic,Consumer订阅该Topic就可以收到这条消息;
  • Message:消息,每个message必须指定一个topic,Message 还有一个可选的 Tag 设置,以便消费端可以基于 Tag 进行过滤消息;
  • Tag:标签,子主题(二级分类)对topic的进一步细化,用于区分同一个主题下的不同业务的消息;
  • Broker:Broker是RocketMQ的核心模块,负责接收并存储消息,同时提供Push/Pull接口来将消息发送给Consumer。Broker同时提供消息查询的功能,可以通过MessageID和MessageKey来查询消息。Borker会将自己的Topic配置信息实时同步到NameServer;
  • Queue:Topic和Queue是1对多的关系,一个Topic下可以包含多个Queue,主要用于负载均衡,Queue数量设置建议不要比消费者数少。发送消息时,用户只指定Topic,Producer会根据Topic的路由信息选择具体发到哪个Queue上。Consumer订阅消息时,会根据负载均衡策略决定订阅哪些Queue的消息;
  • Offset:RocketMQ在存储消息时会为每个Topic下的每个Queue生成一个消息的索引文件,每个Queue都对应一个Offset记录当前Queue中消息条数;
  • NameServer:NameServer可以看作是RocketMQ的注册中心,它管理两部分数据:集群的Topic-Queue的路由配置和Broker的实时配置信息。其它模块通过NameServer提供的接口获取最新的Topic配置和路由信息;各 NameServer 之间不会互相通信, 各 NameServer 都有完整的路由信息,即无状态;
    •   Producer/Consumer :通过查询接口获取Topic对应的Broker的地址信息和Topic-Queue的路由配置;
    •   Broker : 注册配置信息到NameServer, 实时更新Topic信息到NameServer;

2)Client的使用

 1 from rocketmq.client import Message, Producer, PushConsumer, PullConsumer
 2 
 3 # Message
 4 msg = Message(self.topic)
 5 msg.set_tags(tag)
 6 msg.set_keys(key)
 7 msg.set_body(json.dumps(self.msg))
 8 
 9 # Producer
10 producer = Producer(self.group_id)
11 producer.set_namesrv_addr(self.namesrv_addr)
12 producer.set_max_message_size(1310720)
13 producer.start()
14 ret = producer.send_sync(msg=msg)                    # 同步发送消息
15 producer.shutdown()
16 
17 # PushConsumer
18 consumer = PushConsumer(listener_name)
19 consumer.subscribe(self.topic, listener_class)    # 订阅topic
20 consumer.set_namesrv_addr(self.namesrv_addr)
21 logger.info('[ListenRocketMq]',
22       u'{} {} {}'.format(self.namesrv_addr, self.topic, 'Monitor MQ heartbeat.'))
23 consumer.start()
24 while True:
25     time.sleep(random.randint(2000, 3000))
26 consumer.shutdown()

PushConsumer VS PullConsumer

PushConsumer,推,Broker主动向Consumer推消息,应用通常向对象注册一个Listener接口,一旦接收到消息,Consumer对象立刻回调Listener接口方法。Push方式里,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。

PullConsumer,拉,Consumer主动的从Broker拉取消息,主动权由应用控制,可以实现批量的消费消息。Pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。

3)消费模式

  • 广播模式

一条消息被多个Consumer消费,即使这些Consumer属于同一个Consumer Group,消息也会被Consumer Group中的每一个Consumer都消费一次;

1 # 设置广播模式       
2 consumer.setMessageModel(MessageModel.BROADCASTING);
  • 集群模式

一个Consumer Group中的所有Consumer平均分摊消费消息(组内负载均衡);

1 # 设置集群模式,也就是负载均衡模式
2 consumer.setMessageModel(MessageModel.CLUSTERING);

4)Broker的存储结构

 

Commit log:消息存储文件,记录所有的topic信息,RocketMQ 会对commit log文件进行分割(默认大小1GB),新文件以消息最后一条消息的偏移量命名;

Consumer queue:消息消费队列(也是个文件),记录每个消费者组消费topic最后的偏移量,消费者是先从 Consume queue 得到消息真实的物理地址,然后再去 Commit log 获取消息;

IndexFile:索引文件,是额外提供查找消息的手段,通过 Key 或者时间区间来查询对应的消息;

整体流程:

Producer 使用轮询的方式分别向每个 Queue 中发送消息。

Consumer 启动的时候会在 Topic,Consumer group 维度发生负载均衡,为每个客户端分配需要处理的 Queue。负载均衡过程中每个客户端都获取到全部的的 ConsumerID 和所有 Queue 并进行排序,每个客户端使用相同负责均衡算法,例如平均分配的算法,这样每个客户端都会计算出自己需要消费那些 Queue,每当 Consumer 增加或减少就会触发负载均衡,所以我们可以通过 RocketMQ 负载均衡机制实现动态扩容,提升客户端收发消息能力。客户端负责均衡为客户端分配好 Queue 后,客户端会不断向 Broker 拉取消息,在客户端进行消费。

 

标签:Topic,队列,Broker,Queue,MQ,消息,Consumer
From: https://www.cnblogs.com/cxq1126/p/17540430.html

相关文章

  • RabbitMQ基础及实践
    一、RabbitMQ的基本概念,以及6种工作模式,消息确认机制RabbitMQ简介:RabbitMQ基于AMQP标准,采用Erlang语言开发的消息中间件。 基本概念:●Producer:作为消息的生成者。●Consumer:作为消息的消费者。●Connection:消息的发布方或者消息的消费方和broker之间的TCP连接。●......
  • rabbitMQ二(rabbitMQ图形化界面)
    RabbitMQManagement:IP+15672(可视化界面端口号加上前缀1:5672->15672) username和password默认都是guest rabbitMQ架构图 ......
  • 【技术积累】数据结构中栈与队列及其相关算法【一】
    什么是栈栈是一种特殊的数据结构,它的各个元素按照一定的次序排列,且只能在表的一端(称为栈顶)进行添加和删除数据,这种数据结构遵循后进先出(LIFO)的原则。栈可以简单地理解为一种容器,它在使用时非常方便,因为只需在顶部压入(push)或弹出(pop)元素即可。栈可以直接使用数组或链表等数据结构......
  • rabbitmq
    一.死信队列1.Config配置类packagecom.yufou.studyrabbitmq.config;importorg.springframework.amqp.core.*;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.a......
  • 七、获取消息的方式
    RocketMQ获取消息的方式有两种:PULL(消费者主动去Broker拉取):拉取消息需要编写代码去Broker获取。通过DefaultMQPullConsumer,关联namesrv后,通过topic获取到关联的所有MessageQueue。遍历所有的MessageQueue,批量获取消息。并消费。直到处理完所有的MessageQueue。用户需要自己保......
  • 利用RabbitMQ 的死信队列来做定时任务
    常用的应用场景死信队列常常用作延时关闭订单(如订单的超时后的取消订单等),虽然小项目中可以用定时轮询的方法进行检查,但是数据量一旦比较大时,定时轮询将给数据库带来不小的压力,而且定时间隔无法进行动态调整,特别是一个系统中,同时存在好几个定时器的时候,就显得非常的麻烦,同时给数据......
  • ds:队列的基本实现
     一.顺序队1.入队判断队满,出队判断队空;2.顺序队定义时,要注意front、rear是下标,不是指针。typedefstruct{intdata[maxsize];intrear,front;//front:队头元素的下标。rear:队尾元素的后一个位置的下标(下一个待插入的位置),}sqListQueue;3,如果判断队......
  • 五、普通消息
    生产者能发送的消息类型有:Normal:普通消息,消息本身无特殊语义,消息之间也没有任何关联。FIFO:顺序消息,ApacheRocketMQ通过消息分组MessageGroup标记一组特定消息的先后顺序,可以保证消息的投递顺序严格按照消息发送时的顺序。Delay:定时/延时消息,通过指定延时时间控制消息生......
  • centos7 安装 rabbitmq
    1、下载RabbitMQ安装包(请自行下载erlang和对应版本的rabbitmq)2、上传安装包到Linux中将上面三个软件上传到/usr/local/software目录下(如果没有software需要自己创建)3、安装文件(分别按照以下顺序安装)进入software文件夹,依次使用如下命令 rpm-ivherlang-21.3-1.el7.......
  • springcloud - 通过消息总线bus进行刷新
    修改3344服务pom文件 <!--添加消息总线RabbitMQ支持--> <dependency>   <groupId>org.springframework.cloud</groupId>   <artifactId>spring-cloud-starter-bus-amqp</artifactId> </dependency>yaml文件 #rabbitmq相关配置 spring: ......