首页 > 其他分享 >RocketMQ模型和生产实践

RocketMQ模型和生产实践

时间:2024-05-08 18:56:43浏览次数:26  
标签:生产实践 消费者 messageQueue 模型 broker 保证 消息 RocketMQ

RocketMQ的客户端编程模型相对⽐较固定,基本都有⼀个固定的步骤。掌握这个固定步骤,对于学习其他复杂的消息模型也是很有帮助的。 消息⽣产者的固定步骤 1.创建消息⽣产者producer,并指定⽣产者组名 2.指定Nameserver地址,可以在代码中固定写IP,也可以通过配置项来写,最好是配置项,这样更灵活。 3.启动producer。 这个步骤⽐较容易忘记。可以认为这是消息⽣产者与服务端建⽴连接的过程。和broker多次请求交互,要进行多次通讯,而不是就进行一次通讯拿到broker地址就断开了连接。这是长连接。保存启动状态,可以和服务端进行多次通信的。 4.创建消息对象,指定主题Topic、Tag和消息体。tag可以进行分类。 5.发送消息。三种方式发送。sendonway(msg),无返回值,单向发送一次,效率最高;但是可能没有发送成功,但是生产者不知道,性能高,但是不安全。 6.关闭⽣产者producer,释放资源。     消费者组,默认每个消费者组的逻辑都是一样的,如果一个消费者组有多个消费者,那么从broker的messageQueue中推过来的消息只会被一个消费者来消费。 推模式中,消费者必须是要一直启动的,否则broker就无法感知到消费者,就无法保证消息的实时性。只有一直启动,才能进行推消息。broker也会记录消费者的消费进度。 以消费者组为单位进行分配记录。

代理者位点:表示消息存储到多少号了,消费者位点:表示消费者消费到哪条消息了,消费到哪个点了。差值就是剩余还没有消费的。如果差值越来越大,就可能会有丢消息的可能。

注意:上面三个值都是broker来维护的。

消费者一定要给broker返回一个值,让broker去决定这条消息是否还要继续进行发送。

ConsumeConcurrentlyStatus.CONSUME_SUCCESS;如果是successes,那么就是发送成功,无需再次发送。RECONSUME_LATER,证明是消费失败,需要再次重新发送。如果不返回,或者抛出异常,也会认为是失败,有重试机制。通过创建队列进行重试,重试次数超过了限度,那么就会进入死信队列。 Consumer的进度是以消费者组为概念进行记录的,与具体的每个消费者无关。不关注是哪个终端节点消费的。与topic也无关。订阅了多个topic,可能都进入这一个重试队列。 消费者组,是处理业务逻辑相同的生产者组成的。死信消息默认的权限是2,禁读的,需要手动处理或者是修改其权限为4或者6,然后才能进行消费。   集群模式下,broker的messageQueue的消息只会被消费者组中的一个消费者消费,不会每个消费者都消费一次。这里面offset的进度是由broker来维护的。服务端的offset是不能丢失的,要知道下一条消息推送的位置,从哪里开始继续推送。 广播模式下,每个消费者都能消费到。这里面也是有offset的,这个offset的值是由消费者本身来维护的。记录我消费到哪一条,如果没有消费成功,就重试,接收到消息后进行重试。   这种就是广播模式下的消费者位点,值都是0,offset由消费者本身去控制。没有了消费者组的概念了。本地的offset可能会出现丢失,broker是不知道这些信息的。广播模式下的offset是可以丢失的。 所以广播消息是不能保证消息不丢失的,有很大丢失消息的可能。这个一定要多注意。

 

顺序消息机制

RocketMQ的消费者接收消息的顺序可能会和生产者发送消息的顺序不一致,但是其保证的是局部的一组消息的一致性;保证的是局部的有序性,大部分情况下,我们说的保证有序性是保证的局部有序性。

有序,就是生产者要保证发送到同一个队列messageQueue中,可以通过取余等机制,保证相同的一组信息发送到同一个queue中;

又因为queue就能自己保证有序性,就能保证消息的顺序传递,先进先出。

然后还有一点就是消费者方也要进行处理,保证顺序消费;使用不同的监听器,使用MessageListenerOrderly这个监听器。又因为为了保证吞吐量,拉取消息的时候是多个线程去并发拉取?消费者拉取消息的时候是每次拉32条,从哪里拉的不确定,随意,这样就可能会导致无序。多线程情况下就无法保证有序性了。

为了保证有序性,先锁定一个队列,将这个队列的消息消费完毕之后,才会去切换到别的队列进行处理。这个是不同的模型。

如果失败了,就阻塞当前队列一会儿,为了保证顺序性,就会阻塞整个队列,等到处理成功了才会解除这个阻塞。顺序消息对吞吐量有影响的。

如果是并发处理模型,中间一个处理失败了,可能会去后面继续消费,只要保证本次拿到32条消息能够消费就可以了。

一般都是保证局部有序就行了。要全局有序的话,所有的消息都要放到同一个messageQueue,这样就没有分布式的优势了。

顺序消费,消费者拉取的时候锁定一个队列,也是可以使用多线程线程池去并发拉取这个队列中的消息,但是需要消费者本身进行控制,保证有序性。

 

延迟消息:

msg.setDelayTimeLevel(3);// 第3等级。 类似一个定时任务。需要重点设计的东西。延迟的时间,RocketMQ设置了一些默认级别,没有必要记录,一共18个级别,messageDelayLevel。可以在broker.conf中进行修改,但是不建议修改。 4.x版本,只有这些延迟机制。没有办法指定时间点去发送。七天确认收货就不行。 5.x版本可以指定时间戳,到毫秒级进行消息推送。这个版本,开源版和商业版没有太大区别了。 是系统中的SCHEDULE_TOPIC_XXXX这个topic下的18个不同messageQueue。

 

批量消息

可以减少IO的消耗,放到一个list中;减少了IO次数,如果网络异常,那么消息就丢失了;有限制,大小不超过1M,这个限制不是强制性的。消息进行分割,分成几个批次,进行请求。

批量发,减少IO,然后要考虑安全性,每次消息大小不能太大,可以对消息进行压缩成二进制,消费者再解压等等。RocketMQ已经进行了一些压缩。

批量发送就是尽量减少请求次数,可以使用spilt进行拆分一下。

针对边缘的优化。这才是RocketMQ的精髓。

 

过滤消息

消费者组中多个消费者进行协作,一般情况下是不可控的,为了保证可控;就可以用到消息过滤机制。发送消息的时候生产者可以指定tag,消费者同样也订阅这个tag。subscribe这个方法订阅指定的tag,多个tag使用 || 隔开。订阅的时候订阅指定的tag,过滤是在哪里?一个是broker端,这样发送到消费者的就少了;还有就是在消费者端进行过滤,全部接收,但是进行过滤的消费。

broker端处理:网络压力小了,但是其本身的压力就大了。这里面是要有取舍的。

RocketMQ优先保证的是网络的性能,也就是在broker端实现的。不建议定义很复杂的订阅条件。

如果是更复杂的过滤条件,生产者发送的时候还是正常发送带tag的消息;然后在消费者端定义MessageSelector.bySql。

默认情况下,服务端broker不支持bySql的方式。在集群中有一个配置可以进行配置,enablePropertyFilter,通过这个进行配置。默认值是false。修改为true。修改之后broker才支持bySql。为了防止broker端更繁忙。

典型的流式处理方式:先拿一个应用来过滤,然后通过topic来发送,保证每个messageQueue使用同一个方式去消费;这样的方式要明显高于broker端进行sql过滤。

 

事务消息:

重点,非常重点。

写本地事务,和发消息如果不控制,就会出现不一致;提供一个机制,保证这两个是同步的。完整的业务场景下,上游客户下单,下游商家发货,这才是完整的事务。但是在分布式情况下,这个链路太长了。RocketMQ进行了折中处理,只保证前面一半保证原子性。后面从broker到消费者的这个过程,事务可以由消费者来控制,自己确认,同步进行返回消息消费状态。

这并不是标准的分布式解决方案。是半事务。后面的一半由消费者自己去实现。

RocketMQ的事务消息和消费者无关,只是和生产者和broker相关。producer.setTransactionListener().

所谓半消息half消息,是生产者发送这个半消息,然后broker将其挪到内部的一个topic中,该topic是RMQ_SYS_TRANS_HALF_TOPIC。如果确认成功,broker将这个消息从这个topic中挪出来,发到消费者订阅的topic中。

返回unknown状态,broker就要进行回查;多久回查一次,回查多少次这些是要配置的。本地事务回查次数通过参数transactionCheckMax设定,默认15次。本地事务回查的间隔通过参数transactionCheckInterval设定,默认60秒。超过回查次数后,消息将会被丢弃。

定时任务,例如买票订单确认,15分钟未支付就取消订单,支付了就分配座位。如果使用定时任务,15分钟后去执行,那么如果是15分钟内支付了,但是不能马上去分配位置,必须要等定时任务启动才行,这样当然是不行的。也不符合业务逻辑。这个场景是broker级别配置的,配置完成之后这个broker中的所有message Queue都会生效。

messageQueue一般建议配置是broker数量的两倍。broker中一个topic的messageQueue数量默认是8个,也可以自动进行确认。

RocketMQ如果启动了长时间,那么从控制台可能会看到消息的最小位点就不是0了,因为日志文件默认是1G,超过之后就写一个新的文件;历史文件默认保存120个小时,凌晨4点会删除过期的文件,删除之后历史的消息就找不到了。清空了。

消费模型:消费者组中的消费者可以消费一个broker中的一个messageQueue,也可以消费多个messageQueue,也可以跨broker消费messageQueue;但是broker中的messageQueue只会被消费者组中的一个消费者去消费,不会出现一个messageQueue被一个消费者组中的多个消费者消费。

 

权限控制

非核心功能,一般来说MQ是内部使用的,根本就无序进行权限控制,但是RocketMQ提供了权限控制的功能。perm,6可读可写;4是可读不可写;2禁写禁读。太粗粒度了。

 

幂等

对消费者来说,不保证只推送一次,保证最少一次;可以向同一个消费者推送多次,或者可以向不同的消费者推送。

RocketMQ提供messageId,这个不靠谱,事务消息,或者多次转发的过程中,这个可能会变;建议自己去实现这个消息id。

 

 

 

 

 

标签:生产实践,消费者,messageQueue,模型,broker,保证,消息,RocketMQ
From: https://www.cnblogs.com/0630sun/p/18176958

相关文章

  • 谈谈我对大模型的想法
    【2024年5月8日思考】整一个软件整了多少天了,没有进展,指望GPT给我启发,有些收获,不过问题离真正解决差得远。这么搞,路子不对!现在市面上说大模型很火,就像多年前大家热衷说深度学习一样。现在在科研院所,做研究也一样,被动卷着,也要提提深度学习、大模型的字眼。说实话,你高校,其实大多数......
  • (一)文本分类经典模型之CNN篇
    CNN源于计算机视觉研究,后来诸多学者将其应用于短文本分类,其基本结构如下图所示:由上图可知,基于CNN的短文本分类模型,通常包括输入层、卷积层、池化层、全连接层和输出层五部分,其中卷积层和池化层是最为关键的特征提取环节。卷积层通过构造二维卷积核,并将其上下移动,在卷积窗口内与......
  • YOLOv8 模型训练后验证
    验证代码:fromultralyticsimportYOLOpath="E:/resource/yolo8_all/ultralytics-main/"#训练后进行验证model=YOLO(path+"runs/detect/train11/weights/best.pt")metrics=model.val(data=path+"data_NEUDET.yaml")#自动评估训练的数据 参考链接......
  • RocketMQ 事件驱动:云时代的事件驱动有啥不同?
    前言:从初代开源消息队列崛起,到PC互联网、移动互联网爆发式发展,再到如今IoT、云计算、云原生引领了新的技术趋势,消息中间件的发展已经走过了30多个年头。目前,消息中间件在国内许多行业的关键应用中扮演着至关重要的角色。随着数字化转型的深入,客户在使用消息技术的过程中往......
  • 当「软件研发」遇上 AI 大模型
    作者:陈鑫(神秀)大家好,我是通义灵码的产品技术负责人陈鑫。过去有八年时间,我都是在阿里集团做研发效能,即研发工具相关的工作。我们从2015年开始做一站式DevOps平台,然后打造了云效,也就是将DevOps平台实现云化。到了2023年,我们明显感觉到大模型时代来了以后,软件工具将面临着......
  • 当「软件研发」遇上 AI 大模型
    作者:陈鑫(神秀)大家好,我是通义灵码的产品技术负责人陈鑫。过去有八年时间,我都是在阿里集团做研发效能,即研发工具相关的工作。我们从2015年开始做一站式DevOps平台,然后打造了云效,也就是将DevOps平台实现云化。到了2023年,我们明显感觉到大模型时代来了以后,软件工具将面临着......
  • Apache RocketMQ ACL 2.0 全新升级
    作者:徒钟引言RocketMQ作为一款流行的分布式消息中间件,被广泛应用于各种大型分布式系统和微服务中,承担着异步通信、系统解耦、削峰填谷和消息通知等重要的角色。随着技术的演进和业务规模的扩大,安全相关的挑战日益突出,消息系统的访问控制也变得尤为重要。然而,RocketMQ现有的AC......
  • 基于总线设备驱动模型的按键读取驱动程序
    本次实验基于总线设备驱动模型实现按键驱动程序的编写,给上层应用程序提供检测按键是否按下的操作接口,上层应用根据按键是否按下控制led的亮灭。所以上层应用程序会同时使用led和按键的驱动接口,但是对于下层驱动而言,这二者是分离的,因此只需要专注于编写按键驱动程序就可以了。在正......
  • 一种光电容积波PPG 转换到心电图ECG进行房颤检测的神经网络模型
    具体的软硬件实现点击http://mcu-ai.com/MCU-AI技术网页_MCU-AI人工智能光电体积描记法(PPG)是一种经济有效的非侵入性技术,利用光学方法测量心脏生理学。PPG在健康监测领域越来越受欢迎,并用于各种商业和临床可穿戴设备。与心电图(ECG)相比,PPG并没有提供实质性的临床诊断价值,尽管......
  • 创建个人博客网站记录-2.3 建立模型以及对应的CRUD操作
    2.3、建立模型以及对应的CRUD操作在本节中,创建了USER用户类和BLOG博文类两个对象类,并实现了其基本的增删改查的操作。#flaskr/models.pyfromflaskimportgfromflask_sqlalchemyimportSQLAlchemyfromsqlalchemyimportColumn,Integer,String,TIMESTAMP,ForeignKey,T......