首页 > 其他分享 >消息队列

消息队列

时间:2024-04-01 23:13:53浏览次数:10  
标签:消费者 队列 发送 MQ 丢失 消息

消息队列的作用

  1. 解耦
  2. 异步
  3. 消峰

1. 解耦

在没有使用消息队列前

档案归档, 在没有使用消息队列前, 上游系统将数据推送到CDAS后, CDAS将数据先存入DB中, 然后启用定时任务定时对数据进行处理

  1. 定时任务间隔、单次任务处理任务数据数量都不好控制
  1. 业务处理过程中需要调用其他系统的接口,需要在业务接口中处理调用中存在的异常情况, 比如服务不可用、服务超时
  2. 新增服务或服务下线都需要进行维护

在使用消息队列后

服务调用方只须要生产消息,然后将消息发送到MQ

消息调用方自己到对应的消息队列中获取消息消费, 消费这的上线,下线 对生产者都没有影响, 生产者中的业务逻辑也很清晰

2. 异步

image

image

异步, 生产者将消息发送到消息队列后, 就不关心下游接口的处理了,此时可以直接将结果返回给用户

注意: 只适合对不需要关心下游处理结果的接口, 如果需要下游接口处理结果才可以继续执行业务,则不适合。

3. 消峰

在流量高峰的时候,将消息放到消息队列, 然后消息消费者进行持续消费,避免流量高峰的时候将系统冲垮,无法提供服务。

  1. 提高消费者的数量,提高消费速度
  2. 在流量低谷时期消费消息

消息队列的优缺点

  1. 降低系统的可用性

  2. 提高系统的复杂度

  3. 产生一致性问题

引入MQ后, 如果消息队列服务不可用, 那么下游系统就无法获取到消息,从而导致服务不可用

  1. 保证消息队列的高可用

消费者可能会重复消费、消息丢失、消息顺序、消息积压等问题, 在引入MQ后这些都需要考虑, 这些会导致系统的复杂性提高。

生产者发送消息后, 如果有多个消费者消费此消息, 但是只有部分消费者成功消费而部分消费者因为异常将数据进行了回滚, 此时消费成功的消费者如何处理呢? 这些都需要引入其他额外的技术方案来解决

  • 分布式事务

MQ技术选型

特性 RabbitMQ RocketMQ Kafka
单机吞吐量 万级
比RocketMQ、Kafka低一个数量级
10万级, 支撑高吞吐 10万级, 高吞吐
一般配合大数据类的系统来进行实时数据计算、日志采集等场景
Topic数量对吞吐量的影响 topic可以达到几百、几千的级别
吞吐量会有较小幅度的下降
这是RocketMQ的一大优势, 在同等机器下,可以支撑大量的topic
topic 从几十到几百个的时候,吞吐量会有大幅下降
在同等机器下,Kafka尽量保证Topic数量不要过多
如果要支撑大规模的topic, 需要增加更多的机器资源
时效性 微妙级
延迟最低
毫秒级 延迟在毫秒级以内
可用性 高, 基于主从架构实现高可用 非常高,分布式架构 非常高,分布式
一个数据多个副本, 少数机器宕机不会丢失数据,不会导致不可用
消息可靠性 基本不丢失 经过参数优化,可以0丢失 经过参数优化,可以0丢失
总结 基于erlang开发, 并非能力强,性能极好,延时很低 基于Java语言,基于MQ功能较为完善, 分布式, 良好的可伸缩性和高可用性 功能较为简单, 主要支持简单的MQ功能, 在大数据领域的实时计算以及日志采集被大规模使用
适用场景 需要可靠消息传递的业务场景, 例如金融支付、订单处理等
需要高度灵活性的消息模型,例如消息路由,动态队列等。
需要与其他应用集成的场景, RabbitMQ提供了丰富的客户端库和协议支持
高性能,高可用性的消息传递场景, 例如实时数据分析、电商秒杀等
需要强大的消息过滤和消息追踪功能的场景, 例如广告投放、用户推送等。
需要分布式事务支持的场景, RocketMQ提供了分布式事务消息特性
需要高吞吐量和低延迟的实时数据处理场景, 例如用户行为日志分析、实时监控等
需要保留大量历史数据并支持数据回溯的场景, 例如大数据分析、数据仓库等。
需要构建事件驱动架构的场景,Kafka可以作为事件源和消息总线。

image

RabbitMQ架构设计

image

  1. 生产者, 消息发送方, 负责产生消息并发送到RabbitMQ, 生产者通常将消息发送到交换机 Exchange
  2. 交换机,交换机是消息的分发中心, 浮加将接收到的消息路由到一个或多个队列。 它定义了消息的传递规则, 可以根据规则将消息啊发送到一个或多个队列1
    1. 直连交换机 Direct Exchange, 将消息路由到与消息中的路由键(routingKey)完全匹配的队列
    2. 主题交换机 Topic Exchange, 根据通配符匹配路由键, 将消息路由到一个或多个队列
    3. 扇出交换机 Fanout Exchange, 将消息广播到所有与交换机绑定的队列, 忽略路由键
    4. 头部交换机 Headers Exchange, 根据消息头中的属性进行匹配,将消息路由到与消息头匹配的队列
  3. 队列 Queue, 队列是消息的存储区,是用于存储生产者发送的消息,消息最终会被消费者从队列中取出并处理, 每个队列都有一个名称, 并且可以绑定到一个或多个交换机。
  4. 消费者 Consumer, 消费者是消息的接收方, 负责从队列中获取消息并处理, 消费者通过订阅队列来接收消息。
  5. 绑定 binding, 绑定是交换机和队列之间的关联关系, 生产者将消息发送到交换机, 而队列通过绑定与交换机关联, 从而接收消息。
  6. 虚拟主机 Virtual Host, 是RabbitMQ的基本工作单元, 每个虚拟主机拥有自己独立的用户,权限,交换机,队列等资源, 完全隔离于其他虚拟主机
  7. 连接 Connection, 连接是指生产者、消费者与RabbitMQ之间的网络连接,每个连接可以包含多个信道(channel)每个信道是一个独立的会话通道, 可以进行独立的消息传递。
  8. 消息: 是生产者与消费者之间传递的数据单元, 消息通常保安消息体和可选的属性, 如路由键等。

消息队列如何保证消息丢失

image

消息丢失的环节:

  1. 生产者发送消息时丢失

  2. MQ接收消息后丢失

  3. 消费者消费消息时丢失

1. RabbitMQ如何保证消息丢失

1.1生产者发送消息时丢失

生产者发送消息到MQ的过程中丢失消息, 可能是网络波动或MQ服务宕机等。

消息确认机制

Confirm是RabbitMQ提供的一种消息可靠性保障机制, 当生产者通过Confirm模式发送消息时会等待MQ的确认,确保消息发送到对应的Exchange中

如果发送成功, 返回ACK, 否则返回NACK

如果没有对应的exchange、exchange没有绑定对应的队列也会丢失消息。

  1. 生产者通过confirm.select 方法将Channel 设置为 confirm 模式
  2. 发送消息后,通过添加 add_confirm_listener 方法监听消息的确认状态

1.2 MQ接收到消息后丢失

消息被MQ接收,但是消息没有持久化而MQ服务宕机导致消息丢失

启用持久化机制, 将消息存储到磁盘, 保证MQ宕机或重启时不会丢失消息。

  1. 生产者通过将消息的 delivery_mode 设置为2, 将消息标记为持久化
  2. 队列也需要进行持久化设置, 确保队列在MQ重启后仍然存在。经典队列需要将 durable设置为 true。 仲裁队列和流式独立额默认必须持久化保存。

持久化机制会影响性能, 因此需要确保消息不丢失的场景才使用。

1.3 消费者消费消息时丢失

ACK事务机制, 确保消息被正确消费。 当消息消费成功后,消费者发送ACK给MQ, 告知消息可以移除。 这个过程是自动处理的, 也可以关闭手动处理。

  1. RabbitMQ中, ACK是默认开启的。 当消息被消费者接收后,会立即从队列中删除,除非消费者发送异常
  2. 可以手动开启ACK机制, 通过设置 auto_ack=false 手动控制消息ACK

会导致消息重复消费的问题。

2. RocketMQ 如何保证消息丢失

2.1 生产者发送消息环节

生产者发送消息的方式:同步发送、异步发送、OneWay发送

1、同步消息(sync message )

producer向 broker 发送消息,执行 API 时同步等待, 直到broker 服务器返回发送结果 。

2、异步消息(async message)

producer向 broker 发送消息时指定消息发送成功及发送异常的回调方法,调用 API 后立即返回,producer发送消息线程不阻塞 ,消息发送成功或失败的回调任务在一个新的线程中执行 。

3、单向消息(oneway message)

producer向 broker 发送消息,执行 API 时直接返回,不等待broker 服务器的结果 。

这种方式会存在消息丢失的情况, 在需要保证消息不丢失的场景下应避免适用。

解决办法

使用自带的事务机制来发送消息

  1. 生产者发送half消息到MQ, 此时消费者无法消费half消息。 若half消息发送失败则执行回滚
  2. half发送成功 且 MQ返回成功响应, 则执行生产者核心链路
  3. 如果生产者核心链路执行失败,则回滚并通知 MQ 删除half消息
  4. 如果生产者核心链路执行成功,则通知MQ commit half消息, 此时消费者可以消费此消息

2.2.消息队列接收到消息后丢失

  1. MQ接收到消息后,在没有写入磁盘的时候宕机
  2. MQ将消息写入磁盘后,磁盘损坏导致消息丢失

MQ接收到消息后并不是直接将消息存入磁盘而是OSCache,为了提高性能, 默认是异步刷盘。

  1. 修改flushDiskType从ASYNC_FLUSH 修改为 SYNC_FLUSH
  2. 采用主从架构, 集群部署, Leader中的数据在多个Follower中都有备份, 避免单点故障

2.3. 消费者消费时丢失

消费者在接收到消息后,在消费过程中异常或宕机,但是提前发送确认消息。 MQ会任务此消息已经被消费, 但实际没有消费。

  1. 保证消息消费完成后发送确认, 不要异步消费

使用上面一整套的方案就可以在使用RocketMQ时保证消息零丢失,但是性能和吞吐量也将大幅下降

  • 使用事务机制传输消息,会比普通的消息传输多出很多步骤,耗费性能
  • 同步刷盘相比异步刷盘,一个是存储在磁盘中,一个存储在内存中,速度完全不是一个数量级
  • 主从机构的话,需要Leader将数据同步给Follower
  • 消费时无法异步消费,只能等待消费完成再通知RocketMQ消费完成

RabbitMQ如何保证消息不重复消费

会导致重复消费的场景

  1. 生产者:重复发送消息, 比如发送消息的接口被重复调用, 没有做接口幂等性
  2. 消息队列:消费者在消费后发送ACK时,MQ宕机, MQ恢复后再次推送该消息, 导致重复消费
  3. 消费者:消费者消费消息后在发送ACK前宕机, MQ认为消费者没有消费该消息而重新推送该消息

1. 使用数据库唯一键约束

局限性大,仅能在数据新增场景中使用, 且性能低

2. 乐观锁

假设是更新订单状态, 在发送消息的时候带上修改该字段的版本号, 当DB中的版本号有消息中的版本号一致时才可以修改

  1. 更新字段较多,更新场景较多时,可能导致数据库字段增加
  2. 多条消息同时在队列中, 此时它们修改的版本号一致, 排在后续的消息无法被消费

3. 简单的消息去重, 插入消费记录, 增加数据库判断

在消费成功后记录一条记录, 在消息重复消费前检查此消息的消费记录, 如果存在就不消费。

RabbitMQ如何解决消息积压问题

image

  1. 消费者处理消息的速度太慢
    1. 增加消费者数量来提高处理能力, 水平扩展
    2. 优化消费者性能, 提高消费者处理消息的效率, 优化代码,增加资源
    3. 消息预取限制, 调整消费者预取数量以避免一次性处理过多消息而导致处理缓慢
  2. 队列的容量太小
    1. 增加队列的容量, 调整队列设置以允许更多消息存储
  3. 网络故障
    1. 监控&告警 通过监控网络状况并设置告警, 确保网络故障的时候及时发现并解决问题
    2. 持久化和高可用性, 确保消息和队列的持久化以避免消息丢失,并使用镜像队列提高可用性
  4. 消费者故障
    1. 使用死信队列, 将无法处理的消息转移到死信队列, 防止堵塞主队列
    2. 容错机制, 实现消费者的自动重启和错误处理逻辑
  5. 队列配置不当
    1. 优化队列配置, 检查并优化消息确认模式, 队列长度限制和其他相关配置
    2. 有些消息不需要确认就不要开启
  6. 消息大小
    1. 消息分片, 将大型消息分割成小的消息片段, 加快处理速度
  7. 业务逻辑复杂或耗时
    1. 优化业务逻辑, 简化消费者中的业务逻辑,减少处理每个消息所需的时间
  8. 消息产生速度快于消费速度
    1. 使用消息限流 控制消息的生产速度, 确保不超过消费速度
    2. 负载均衡 确保消息在消费者之间公平分配, 避免部分消费者过载, 部分消费者空闲的情况
  9. 其他优化配置
    1. 消息优先级 使用消息优先级确保高优先级先处理
    2. 调整RabbitMQ配置,优化RabbitMQ服务的配置, 如文件描述符限制, 内存使用限制等

如何保证消息消费顺序

  1. 消费者是集群部署, 各个消费者处理速度不一致

RabbitMQ

RabbitMQ 交换机类型

直连 Direct Exchange

路由键与队列名完全匹配, 通过RoutingKey路由键将交换机与队列进行绑定, 消息被发送到交换机时, 需要根据routingKey进行匹配, 发送到完全匹配路由键的队列中。

  1. 完全匹配
  2. 单播

主题 Topic Exchange

与Direct交换机类似, 需要通过routingKey进行匹配分发, 区别是Topic交换机可以进行模糊匹配。

  1. Topic中, RoutingKey通过 . 来分为多个部分
  2. * 代表一个部分
  3. # 代表0个或多个部分(如果绑定的路由键为 # 时,则接收所有消息, 因为路由键所有都匹配)
flowchart LR Topic_Exchange --key1.key2.key3.*--> queue1 Topic_Exchange --key1.#--> queue2 Topic_Exchange --*.key2.*.key4--> queue3 Topic_Exchange --#.key3.key4--> queue4

路由键为"key1.key2.key3.key4"的消息会分发给所有4个队列。

. 将路由键分为4个部分,* 表示一个部分,# 表示0到多个部分

key1 分发到 query2

扇出 Fanout Exchange

将消息分发给所有绑定了此交换机的队列, 日志routingKey参数无效。

  • 广播模式

头部 Headers Exchange

匹配AMQP消息的header而不是路由键,headers与direct交换机完全一致, 但性能差很多, 目前几乎不用。

消费方指定的headers中必须包含一个x-match的键

  1. x-match=all 表示所有的键值都匹配才接收到消息
  2. x-match=any 表示只要有键值队匹配就能接收到消息

Kafka 的架构设计与工作流程

标签:消费者,队列,发送,MQ,丢失,消息
From: https://www.cnblogs.com/dreamsrj/p/18109601

相关文章

  • Offer必备算法20_队列_宽搜bfs_四道力扣题详解(由易到难)
    目录①力扣429.N叉树的层序遍历解析代码②力扣103.二叉树的锯齿形层序遍历解析代码③力扣662.二叉树最大宽度解析代码④力扣515.在每个树行中找最大值解析代码本篇完。①力扣429.N叉树的层序遍历429.N叉树的层序遍历难度中等给定一个N叉树,返回其节......
  • 评测机队列(牛吃草问题)
    1.问题2.解决2.1分析关键思路是利用好支点,这里具体评测机的评测速度是未知的,但是我们写出方程组则可以发现,该速度是可以约去的,这时我们不妨设置为最简单的1个程序/min2.2代码#include<bits/stdc++.h>usingnamespacestd;intmain(){ intn1=8,t1=30,n2=10,......
  • https安全性 带给im 消息加密的启发
    大家好,我是蓝胖子,在之前#MYSQL是如何保证binlog和redolog同时提交的?这篇文章里,我们可以从mysql的设计中学会如何让两个服务的调用逻辑达到最终一致性,这也是分布式事务实现方式之一。今天来看看我们能够从httpsd设计中得到哪些启发可以用于业务系统开发中。https原理分析首先......
  • C#中的消息中间件(RabbitMQ 和 Redis)
    消息中间件是一种用于在分布式系统中进行异步通信的技术,常用于解耦应用程序的不同组件、实现消息传递、提高系统的可伸缩性和可靠性等。以下是关于消息中间件的知识点以及可能会在面试中被问到的一些问题和答案:消息中间件的知识点:消息队列(MessageQueue):消息中间件通常基于消......
  • 通过企业微信API接口发送消息(通过postman或者企业微信开发者中心《服务端API调试工具
    如何创建一个与企业后台互动的自建应用添加自建应用登录企业微信管理后台->应用管理->自建下创建应用,填写必要的logo,应用名称,在可见范围中选择部门/成员获取应用的相关信息agentid和secret;应用里创建完毕可出现在选择了可见范围的成员的企业微信终端上。使用Postman调试api......
  • 13天【代码随想录算法训练营34期】 第五章 栈与队列part03(● 239. 滑动窗口最大值 ●
    239.滑动窗口最大值单调队列:单调递减,一个queue,最大值在queue口,队列中只维护有可能为最大值的数字比如说1,3,2,4;当slidingwindow已经到3时,就可以把1pop出去了,因为有了3,1不可能为最大值,同理到4的时候,3、2都可以pop出去classMyQueue:def__init__(self):self.queue......
  • 如何使用PHP和Redis实现消息队列功能?
    前言今天,我们继续讲消息队列,如何使用Redis实现消息队列的功能。前期准备,需要安装好docker、docker-compose的运行环境。PHP的项目运行环境可以参考下面的文章内容。如何使用docker部署php服务-CSDN博客前面我们也讲了PHP和RabbitMQ实现消息队列的功能,感兴趣的可以查看下面......
  • 链式队列实现
    typedefstructlist//创建队列中的链式结构{ datatypedata;//数据域 structlist*next;//指针域}list;typedefstructqueue//创建队列{ list*front;//队头 list*rear;//队尾}queue;voidinitqueue(queue*q)//初始化队列{ q->front=q->rear=(list*)mal......
  • Python数据结构与算法——数据结构(栈、队列)
    目录数据结构介绍列表栈栈的基本操作:栈的实现(使用一般列表结构即可实现):栈的应用——括号匹配问题队列队列的实现方式——环形队列 队列的实现方式——双向队列 队列内置模块栈和队列应用——迷宫问题栈——深度优先搜索 队列——广度优先搜索数据结构介绍......
  • 11天【代码随想录算法训练营34期】 第五章 栈与队列part02(● 20. 有效的括号 ● 1047
    20.有效的括号classSolution:defisValid(self,s:str)->bool:stk=[]upper=["(","{","["]lower=[")","}","]"]dictionary={")":"(&qu......