首页 > 其他分享 >博学谷学习记录 自我总结 用心分享 | RocketMQ刨析

博学谷学习记录 自我总结 用心分享 | RocketMQ刨析

时间:2023-10-13 10:56:01浏览次数:42  
标签:String 刨析 Broker private 发送 消息 Consumer 博学 RocketMQ

  RocketMQ是一个分布式开放消息中间件,底层基于队列模型来实现消息收发功能。RocketMQ集群中包含4个模块:Namesrv, Broker, Producer, Consumer。

主要功能

  • 削峰填谷(主要解决瞬时写压力大于应用服务能力导致消息丢失、系统奔溃等问题)
  • 系统解耦(解决不同重要程度、不同能力级别系统之间依赖导致一死全死)
  • 提升性能(当存在一对多调用时,可以发一条消息给消息系统,让消息系统通知相关系统)
  • 蓄流压测(线上有些链路不好压测,可以通过堆积一定量消息再放开来压测)

各个模块的作用

  • Namesrv: 存储当前集群所有Brokers信息、Topic跟Broker的对应关系。
  • Broker: 集群最核心模块,主要负责Topic消息存储、消费者的消费位点管理(消费进度)。
  • Producer: 消息生产者,每个生产者都有一个ID(编号),多个生产者实例可以共用同一个ID。同一个ID下所有实例组成一个生产者集群。
  • Consumer: 消息消费者,每个订阅者也有一个ID(编号),多个消费者实例可以共用同一个ID。同一个ID下所有实例组成一个消费者集群。

功能架构部署图:

 

MQ集群工作流程

  1. 启动Namesrv,Namesrv起来后监听端口,等待Broker、Produer、Consumer连上来,相当于一个路由控制中心。

  2. Broker启动,跟所有的Namesrv保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有topic信息。注册成功后,namesrv集群中就有Topic跟Broker的映射关系。

  3. 收发消息前,先创建topic,创建topic时需要指定该topic要存储在哪些Broker上。也可以在发送消息时自动创建Topic。

  4. Producer发送消息,启动时先跟Namesrv集群中的其中一台建立长连接,并从Namesrv中获取当前发送的Topic存在哪些Broker上,然后跟对应的Broker建立长连接,直接向Broker发消息。

  5. Consumer跟Producer类似。跟其中一台Namesrv建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

Producer

示例代码:

这里用InitializingBeanDisposableBean来管理mq的生命周期,InitializingBean用来初始化mq配置信息,DisposableBean 在mq执行完成后用来销毁bean。

@Componentpublic class CancelDisplayProducer implements InitializingBean, DisposableBean {    private static final Logger logger= LoggerFactory.getLogger(CancelDisplayProducer.class);    private DefaultMQProducer defaultMQProducer;    @Value("${crk.topic}")    private String topicName;    @Value("${crk.nameServer}")    private String nameServer;    @Value(("${crk.groupName}"))    private String groupName;    public SendResult sendCancelDisplayMq(String tag, String msg, Object primaryKey, Object hashVal){        logger.info("发送取消延时队列消息内容{}",msg);        Message rocketMsg = null;        com.alibaba.rocketmq.client.producer.SendResult sendResult = null;        try {            rocketMsg =  new Message(topicName, tag, primaryKey + "", msg.getBytes("UTF-8"));            //设置该消息延迟1s发送            rocketMsg.setDelayTimeLevel(1);            sendResult = defaultMQProducer.send(rocketMsg, new MessageQueueSelector() {            //发送顺序消息                @Override                public MessageQueue select(List list, Message message, Object obj) {                    int hashCode = obj.hashCode();if(hashCode                         hashCode = Math.abs(hashCode);                    }                    int index = hashCode % list.size();return list.get(index);                }            }, hashVal);if(sendResult != null && sendResult.getSendStatus() == SendStatus.SEND_OK) {                logger.info("发送取消延时队列消息成功,发送内容:{},keys:{}", msg, primaryKey);            }        } catch (Exception e) {            logger.error("发送取消延时队列消息异常【{}】", e);        }return sendResult;    }    @Override    public void destroy() throws Exception {        defaultMQProducer.shutdown();    }    @Override    public void afterPropertiesSet() throws Exception {        logger.info("groupName=" + groupName);        logger.info("nameServer=" + nameServer);        //初始化        defaultMQProducer = new DefaultMQProducer();        defaultMQProducer.setNamesrvAddr(nameServer);        defaultMQProducer.setProducerGroup(groupName);        defaultMQProducer.setRetryTimesWhenSendFailed(5);        defaultMQProducer.setInstanceName("openCarCancelDisplayInstance");        //设置超时时间为5s        defaultMQProducer.setSendMsgTimeout(5000);        defaultMQProducer.start();        logger.info("DefaultMQProudcer start success!");    }}//***调用生产者发送消息***cancelDisplayProducer.sendCancelDisplayMq("cancleDisplay",JSONObject.toJSONString(bodyJson),orderNo,orderNo);
Producer顺序发送

Rocketmq能够保证消息严格顺序,但是Rocketmq需要producer保证顺序消息按顺序发送到同一个queue中,比如购买流程(1)下单(2)支付(3)支付成功,

这三个消息需要根据特定规则将这个三个消息按顺序发送到一个queue Producer端确保消息顺序唯一要做的事情就是将消息路由到特定的分区(这里的分区可以理解为不同的队列),在RocketMQ中,通过MessageQueueSelector来实现分区的选择。

如何实现把顺序消息发送到同一个queue:

8853645a5fbe23fff1fa5774d9e7c4e6.png

一般消息是通过轮询所有队列发送的,顺序消息可以根据业务比如说订单号orderId相同的消息发送到同一个队列, 或者同一用户userId发送到同一队列等等

messageQueueList [orderId%messageQueueList.size()]messageQueueList [userId%messageQueueList.size()]

Consumer

示例代码:

@Componentpublic class CancelDisplayConsumer implements InitializingBean, DisposableBean {    private static  final String CANCEL_DISPLAY_GROUP_NAME="cancle_display_consumer_group";    private static  final  String CANCLE_DISPLAY_INSTANCE_NAME="cancle_display_consumer_instance";    private static  final Logger logger= LoggerFactory.getLogger(CancelDisplayConsumer.class);    private DefaultMQPushConsumer consumer;    @Autowired    private CancelDisplayProducer cancelDisplayProducer;    @Autowired    private IComTransChannelConfigService comTransChannelConfigService;    @Value("${crk.nameServer}")    private String nameServer;    @Value("${crk.topic}")    private String topicName;    @Autowired    private IHongqiOrderMappingService hongqiOrderMappingService;    @Override    public void destroy() throws Exception {        consumer.shutdown();        logger.info("订单取消延时队列消费消息关闭");    }    @Override    public void afterPropertiesSet() throws Exception {        try {            consumer = new DefaultMQPushConsumer(CANCEL_DISPLAY_GROUP_NAME);            consumer.setNamesrvAddr(nameServer);            consumer.setInstanceName(CANCLE_DISPLAY_INSTANCE_NAME);            consumer.subscribe(topicName, "*");            consumer.registerMessageListener(new MessageListenerOrderly() {                @Override                public ConsumeOrderlyStatus consumeMessage(List list, ConsumeOrderlyContext consumeOrderlyContext) {for(MessageExt messageExt : list) {                            logger.info("消费取消延迟消息start:{}", list);                            String body = new String(messageExt.getBody());                            JSONObject bodyJson = JSONObject.parseObject(body);                            String orderNo = bodyJson.getString("orderNo");                            String channel=bodyJson.getString("channel");                            MDC.put("traceId", messageExt.getMsgId());                       //逻辑代码忽略.........return ConsumeOrderlyStatus.SUCCESS;                }            });            consumer.start();        } catch (MQClientException e) {            logger.error("消费延迟取消消息consume启动异常:{}",e);        }    }}

如何保证消息不丢失

分别从Producer发送机制、Broker的持久化机制,以及消费者的offSet机制来最大程度保证消息不易丢失

一、producer重试发送消息
  1. 默认情况下,可以通过同步的方式阻塞式的发送,check SendStatus,状态是OK,表示消息一定成功的投递到了Broker,状态超时或者失败,则会触发默认的2次重试。此方法的发送结果,可能Broker存储成功了,也可能没成功

  2. 采取事务消息的投递方式,并不能保证消息100%投递成功到了Broker,但是如果消息发送Ack失败的话,此消息会存储在CommitLog当中,但是对ConsumerQueue是不可见的。可以在日志中查看到这条异常的消息,严格意义上来讲,也并没有完全丢失

  3. RocketMQ支持 日志的索引,如果一条消息发送之后超时,也可以通过查询日志的API,来check是否在Broker存储成功

二、broker的持久化机制
  1. 消息支持持久化到Commitlog里面,即使宕机后重启,未消费的消息也是可以加载出来的

2.Broker自身支持同步刷盘、异步刷盘的策略,可以保证接收到的消息一定存储在本地的内存中

  1. Broker集群支持 1主N从的策略,支持同步复制和异步复制的方式,同步复制可以保证即使Master 磁盘崩溃,消息仍然不会丢失
三、消费端的重试机制

消费者可以根据自身的策略批量Pull消息

  1. Consumer自身维护一个持久化的offset(对应MessageQueue里面的min offset),标记已经成功消费或者已经成功发回到broker的消息下标

  2. 如果Consumer消费失败,那么它会把这个消息发回给Broker,发回成功后,再更新自己的offset

  3. 如果Consumer消费失败,发回给broker时,broker挂掉了,那么Consumer会定时重试这个操作

如果Consumer和broker一起挂了,消息也不会丢失,因为consumer 里面的offset是定时持久化的,重启之后,继续拉取offset之前的消息到本地

关于offset:

RocketMQ 中, 一 种类型的消息会放到 一 个 Topic 里,为了能够并行, 一般一个 Topic 会有多个 Message Queue (也可以 设置成一个), Offset是指某个 Topic下的一条消息在某个 Message Queue里的 位置,通过 Offset的值可以定位到这条消息,或者指示 Consumer从这条消息 开始向后继续处理。

Offset主要分为本地文件类型和 Broker代存的类型两种。

Rocketmq集群有两种消费模式

默认是 CLUSTERING 模式,也就是同一个 Consumer group 里的多个消费者每人消费一部分,各自收到的消息内容不一样。这种情况下,由 Broker 端存储和控制 Offset 的值,使用 RemoteBrokerOffsetStore 结构。

BROADCASTING模式下,每个 Consumer 都收到这个 Topic 的全部消息,各个 Consumer 间相互没有干扰, RocketMQ 使用 LocalfileOffsetStore,把 Offset存到本地。

 

标签:String,刨析,Broker,private,发送,消息,Consumer,博学,RocketMQ
From: https://www.cnblogs.com/LiuLance/p/17759791.html

相关文章

  • 博学谷学习记录 自我总结 用心分享 | Kafka刨析
    基本概念Kafka 体系架构Kafka体系架构包括若干Producer、若干Broker、若干Consumer,以及一个 ZooKeeper 集群。在Kafka中还有两个特别重要的概念—主题(Topic)与分区(Partition)。Kafka中的消息以主题为单位进行归类,生产者负责将消息发送到特定的主题(发送到Kafka集群......
  • 【RocketMQ】RocketMQ 5.0新特性(二)- Pop消费模式
    Pop模式消费和消息粒度负载均衡在RocketMQ5.0之前,消费有两种方式可以从Broker获取消息,分别为Pull模式和Push模式。Pull模式:消费需要不断的从阻塞队列中获取数据,如果没有数据就等待,这个阻塞队列中的数据由消息拉取线程从Broker拉取消息之后加入的,所以Pull模式下消费需要不断主......
  • 博学谷学习记录 自我总结 用心分享 | OpenResty中间件
    1.什么是OpenRestyOpenResty是一个基于Nginx与Lua的高性能Web平台,其内部集成了大量精良的Lua库、第三方模块以及大多数的依赖项。用于方便地搭建能够处理超高并发、扩展性极高的动态Web应用、Web服务和动态网关。OpenResty通过汇聚各种设计精良的Nginx模块,从而将Nginx有效地......
  • 【RocketMQ】RocketMQ存储结构设计
    CommitLog生产者向Broker发送的消息,会以顺序写的方式,写入CommitLog文件,CommitLog文件的根目录由配置参数storePathRootDir决定,默认每一个CommitLog的文件大小为1G,如果文件写满会新建一个CommitLog文件,以该文件中第一条消息的偏移量为文件名,小于20位用0补齐:比如第一个文件中第一......
  • 【RocketMQ】Dledger模式下的日志复制
    RocketMQ在开启Dledger时,使用DLedgerCommitLog,其他情况使用的是CommitLog来管理消息的存储。在Dledger模式下,消息写入时Leader节点还需要将消息转发给Follower节点,有过半的节点响应成功,消息才算写入成功。Leader消息写入Dledger下有DLedgerMemoryStore(基于内存存储)和DLedgerMmap......
  • RocketMQ简单入门
    服务端安装及配置docker安装dockerpullrocketmqinc/rocketmq:4.4.0指定版本号是为了后面确定配置文件的路径启动namesrvdockerrun-d-p9876:9876--namerocketmq-nameservice-eMAX_POSSIBLE_HEAP=100000000rocketmqinc/rocketmq:4.4.0shmqnamesrv运行成功执行m......
  • 【RocketMQ】DLedger模式下的选主流程分析
    RocketMQ4.5版本之前,可以采用主从架构进行集群部署,但是如果master节点挂掉,不能自动在集群中选举出新的Master节点,需要人工介入,在4.5版本之后提供了DLedger模式,使用Raft算法,如果Master节点出现故障,可以自动选举出新的Master进行切换。Raft协议Raft是分布式系统中的一种共识算法,......
  • 【RocketMQ】主从同步实现原理
    RocketMQ支持集群部署来保证高可用。它基于主从模式,将节点分为Master、Slave两个角色,集群中可以有多个Master节点,一个Master节点可以有多个Slave节点。Master节点负责接收生产者发送的写入请求,将消息写入CommitLog文件,Slave节点会与Master节点建立连接,从Master节点同步消息数据。......
  • 【RocketMQ】Rebalance负载均衡总结
    消费者负载均衡,是指为消费组下的每个消费者分配订阅主题下的消费队列,分配了消费队列消费者就可以知道去消费哪个消费队列上面的消息,这里针对集群模式,因为广播模式,所有的消息队列可以被消费组下的每个消费者消费不涉及负载均衡,而集群模式一个消息队列同一时间只能分配给组内的一个......
  • 【RocketMQ】事务实现原理总结
    RocketMQ事务的使用场景单体架构下的事务在单体系统的开发过程中,假如某个场景下需要对数据库的多张表进行操作,为了保证数据的一致性,一般会使用事务,将所有的操作全部提交或者在出错的时候全部回滚。以创建订单为例,假设下单后需要做两个操作:在订单表生成订单在积分表增加本次订......