首页 > 其他分享 >RocketMq

RocketMq

时间:2024-08-01 16:39:40浏览次数:9  
标签:Topic producer 队列 重试 消息 RocketMq 刷盘

RocketMQ的使用场景

应用解耦

系统的耦合性越高,容错性就越低。以电商应用为例,用户创建订单后,如果耦合调用库存系统、 物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异 常,影响用户使用体验。

流量削峰

应用系统如果遇到系统请求流量的瞬间猛增,有可能会将系统压垮。有了消息队列可以将大量请求 缓存起来,分散到很长一段时间处理,这样可以大大提到系统的稳定性和用户体验。 举例:业务系统正常时段的QPS如果是1000,流量最高峰是10000,为了应对流量高峰配置高性能 的服务器显然不划算,这时可以使用消息队列对峰值流量削峰

数据分发

通过消息队列可以让数据在多个系统之间进行流通。数据的产生方不需要关心谁来使用数据,只需 要将数据发送到消息队列,数据使用方直接在消息队列中直接获取数据即可

RocketMQ的角色介绍

Producer:消息的发送者;举例:发信者 Consumer:消息接收者;举例:收信者 Broker:暂存和传输消息;举例:邮局 NameServer:管理Broker;举例:各个邮局的管理机构 Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者 可以订阅一个或者多个Topic消息 Message Queue:相当于是Topic的分区;用于并行发送和接收消息

 Rocket如何保证消息不丢失

要想保证消息不丢失,需要从以下几个方面考虑:

 普通对于顺序消息,异常默认不重试,可以用户自己重试,并发送到其他队列

严格有序消息:发送严格有序消息,通过指定队列,保证严格有序,异常默认不重试

Producer 发送消息

  同步发送

  

public void send() throws Exception { 
  String message = "test producer";
  Message sendMessage = new Message("topic1", "tag1", message.getBytes());
  sendMessage.putUserProperty("name1","value1");
  SendResult sendResult = null;
  DefaultMQProducer producer = new DefaultMQProducer("testGroup");
  producer.setNamesrvAddr("localhost:9876");
  producer.setRetryTimesWhenSendFailed(3);

try {
sendResult = producer.send(sendMessage);
} catch (Exception e) {
e.printStackTrace();
}

if (sendResult != null) {
System.out.println(sendResult.getSendStatus());
}
}

  异步发送

  重写回调方法,代码如下

  

public void sendAsync() throws Exception { 
String message = "test producer";
Message sendMessage = new Message("topic1", "tag1", message.getBytes()); s
endMessage.putUserProperty("name1","value1");
DefaultMQProducer producer = new DefaultMQProducer("testGroup");
producer.setNamesrvAddr("localhost:9876");
producer.setRetryTimesWhenSendFailed(3);
producer.send(sendMessage, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
}
@Override
public void onException(Throwable e) {
TODO 可以在这里加入重试逻辑
}
});
}

 

Broker 保存消息

  同步刷盘

消息写入内存后,立刻请求刷盘线程进行刷盘,如果消息未在约定的时间内(默认 5 s)刷盘成功,就返回 FLUSH_DISK_TIMEOUT,Producer 收到这个响应后,可以进行重试。同步刷盘策略保证了消息的可靠性,同时降低了吞吐量,增加了延迟。要开启同步刷盘,需要增加下面配置:

flushDiskType=SYNC_FLUSH

 

  异步刷盘

默认。消息写入 CommitLog 时,并不会直接写入磁盘,而是先写入 PageCache 缓存后返回成功,然后用后台线程异步把消息刷入磁盘。异步刷盘提高了消息吞吐量,但是可能会有消息丢失的情况,比如断点导致机器停机,PageCache 中没来得及刷盘的消息就会丢失。

 

Broker集群主从之间复制过程出现异常 

在默认方式下,当消息成功写入主节点时,就会返回确认响应给生产者,并异步将消息复制到从节点。然而,如果主节点突然宕机且无法恢复,尚未复制到从节点的消息将会丢失。

为了进一步提高消息的可靠性,我们可以采用同步复制方式。主节点将会同步等待从节点完成复制,然后才返回确认响应。这样可以确保消息的可靠性。可以通过brokerRole=SYNC_MASTER参数进行控制。

Consumer 消费消息

  

先想想什么情况下,消息存储会丢失呢?

因为各种原因消费失败,但是还是提交了消费位点,这条消息从业务角度来说就“丢失”了。

那怎么解决这个问题?

跟消息生产一样,其实思路是比较直接的,就是 「消息确认机制」和「失败重试机制」

消费者从RocketMQ拉取消息后,需要返回"CONSUME_SUCCESS"来表示业务方已经正常完成消费。只有返回"CONSUME_SUCCESS"才算作消费完成。这就是消费时的「消息确认机制」。

如果返回"CONSUME_LATER",则会按照不同的消息延迟级别进行再次消费,延迟级别从秒到小时不等,最长延迟时间为2个小时后再次尝试消费。这就是消费时的「失败重试机制」。

重试消息会被存入名为 "%RETRY%+消费组名称" 的Topic中,原始主题Topic会存入属性中。然后会基于定时任务机制,在到期时将任务再次拉取出来。

另外,RocketMQ跟kafka不同的是,天然支持了 「死信队列机制」

如果在尝试消费的过程中达到了最大重试次数(通常为16次),仍然无法成功消费,则消息将被发送到死信队列,以确保消息存储的可靠性。后续业务可以根据死信队列,来做相关补偿措施。

 

极端情况

如果对消息丢失零容忍,我们必须要考虑极端情况,比如整个 RocketMQ 集群挂了,这时 Producer 端发送消息一定会失败,可以考虑在 Producer 端做降级,把要发送的消息保存到本地数据库或磁盘,等 RocketMQ 恢复以后再把本地消息推送出去。

标签:Topic,producer,队列,重试,消息,RocketMq,刷盘
From: https://www.cnblogs.com/darkb4dawn/p/18336809

相关文章

  • rocketmq普通消息-消息类型
    引用普通消息发送|RocketMQ异步调用的经典业务场景_使用异步的业务场景-CSDN博客同步消息类型Send消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式,可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。用途1.重要通知......
  • Python rocketMq 客户端的同步和异步模式
    同步模式fromrocketmq.clientimportPushConsumer,ConsumeStatusimporttimedefcallback(msg):print(msg.id,msg.body,msg.get_property('property'))returnConsumeStatus.CONSUME_SUCCESSdefstart_consume_message():consumer=PushCon......
  • RocketMQ知识总结(基本原理)
    文章收录在网站:http://hardyfish.top/文章收录在网站:http://hardyfish.top/文章收录在网站:http://hardyfish.top/文章收录在网站:http://hardyfish.top/基本原理总体架构图零拷贝零拷贝技术是一个思想,指的是指计算机执行操作时,CPU不需要先将数据从某处内存复制到另......
  • 基于Docker安装RocketMQ
    1、拉取RocketMQ镜像代码:dockerpullapache/rocketmq成功示例:Usingdefaulttag:latestlatest:Pullingfromapache/rocketmq2d473b07cdd5:Pullcomplete23700610a3bb:Pullcompleteb58ea7c35511:Pullcompletea5224a1908d3:Pullcompleteb4d9fb37957c:Pull......
  • RocketMQ命令介绍及RocketMQ的可视化工具部署
    前言上篇文章我们介绍了RocketMQ集群的搭建,但是我们只能通过命令来查看集群情况。所以,这篇文章我们主要介绍RocketMQ的可视化平台。RocketMQ的可视化工具主要用于监控和管理RocketMQ集群,帮助我们更加直观地了解RocketMQ的运行状态和性能指标。mqadmin工具RocketMQ官方......
  • Raft协议深度解析:RocketMQ中基于DLedger的日志主从复制
    本文所涉及的注释源码:bigcoder84/dledgerRaft协议主要包含两个部分:Leader选举和日志复制。前面我们在Raft协议深度解析:RocketMQ中的自动Leader选举与故障转移一文中已经详细介绍了DLedger如何实现Leader选举的,而本文主要聚焦于Leader选举完成后的日志复制的过程。一.Rock......
  • 公司技术栈用到了RocketMQ,我对此块知识进行了回顾(初始RocketMQ)
    前言作为24届的校招生,不知道大伙儿们是否都已经到了工作岗位上。为了以后更方便的接触到公司的业务,我司为我们安排了将近一个月的实操。虽然不用敲代码,但是…了解到我司使用到的技术栈,在空闲时间正好对RocketMQ这块技术做个回顾,希望能回想起那死去的记忆,同时希望能够帮助......
  • SpringBoot集成RocketMQ
    RocketMQ的延迟消息概述举例说明延迟:PDD的拼单,大家在pdd拼单购买商品时,下单后,会有一个拼单倒计时,如果在倒计时结束的时候,还未拼单成功,那么系统就会取消到这个订单。技术实现:只需在消息生产者代码中添加一句:message.setDelayTimeLevel(3);//设置延迟的等级,即消费者接收到......
  • RocketMQ 面试题及答案整理,最新面试题
    RocketMQ的消息存储机制是如何设计的?RocketMQ消息存储机制的设计原理:1、CommitLog文件:所有的消息都存储在一个连续的CommitLog文件中,保证了消息的顺序写入,提高写入性能。2、消费队列:为每个主题的每个队列创建消费队列文件,存储指向CommitLog中消息的索引,加快消费速度......
  • springboot 使用 rocketMQ
    POM依赖<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version></dependency>配置文件rocketmq:name-server:192.168.20......