RocketMQ的客户端编程模型相对⽐较固定,基本都有⼀个固定的步骤。掌握这个固定步骤,对于学习其他复杂的消息模型也是很有帮助的。 消息⽣产者的固定步骤 1.创建消息⽣产者producer,并指定⽣产者组名 2.指定Nameserver地址,可以在代码中固定写IP,也可以通过配置项来写,最好是配置项,这样更灵活。 3.启动producer。 这个步骤⽐较容易忘记。可以认为这是消息⽣产者与服务端建⽴连接的过程。和broker多次请求交互,要进行多次通讯,而不是就进行一次通讯拿到broker地址就断开了连接。这是长连接。保存启动状态,可以和服务端进行多次通信的。 4.创建消息对象,指定主题Topic、Tag和消息体。tag可以进行分类。 5.发送消息。三种方式发送。sendonway(msg),无返回值,单向发送一次,效率最高;但是可能没有发送成功,但是生产者不知道,性能高,但是不安全。 6.关闭⽣产者producer,释放资源。 消费者组,默认每个消费者组的逻辑都是一样的,如果一个消费者组有多个消费者,那么从broker的messageQueue中推过来的消息只会被一个消费者来消费。 推模式中,消费者必须是要一直启动的,否则broker就无法感知到消费者,就无法保证消息的实时性。只有一直启动,才能进行推消息。broker也会记录消费者的消费进度。 以消费者组为单位进行分配记录。
代理者位点:表示消息存储到多少号了,消费者位点:表示消费者消费到哪条消息了,消费到哪个点了。差值就是剩余还没有消费的。如果差值越来越大,就可能会有丢消息的可能。
注意:上面三个值都是broker来维护的。
消费者一定要给broker返回一个值,让broker去决定这条消息是否还要继续进行发送。
ConsumeConcurrentlyStatus.CONSUME_SUCCESS;如果是successes,那么就是发送成功,无需再次发送。RECONSUME_LATER,证明是消费失败,需要再次重新发送。如果不返回,或者抛出异常,也会认为是失败,有重试机制。通过创建队列进行重试,重试次数超过了限度,那么就会进入死信队列。 Consumer的进度是以消费者组为概念进行记录的,与具体的每个消费者无关。不关注是哪个终端节点消费的。与topic也无关。订阅了多个topic,可能都进入这一个重试队列。 消费者组,是处理业务逻辑相同的生产者组成的。死信消息默认的权限是2,禁读的,需要手动处理或者是修改其权限为4或者6,然后才能进行消费。 集群模式下,broker的messageQueue的消息只会被消费者组中的一个消费者消费,不会每个消费者都消费一次。这里面offset的进度是由broker来维护的。服务端的offset是不能丢失的,要知道下一条消息推送的位置,从哪里开始继续推送。 广播模式下,每个消费者都能消费到。这里面也是有offset的,这个offset的值是由消费者本身来维护的。记录我消费到哪一条,如果没有消费成功,就重试,接收到消息后进行重试。 这种就是广播模式下的消费者位点,值都是0,offset由消费者本身去控制。没有了消费者组的概念了。本地的offset可能会出现丢失,broker是不知道这些信息的。广播模式下的offset是可以丢失的。 所以广播消息是不能保证消息不丢失的,有很大丢失消息的可能。这个一定要多注意。
顺序消息机制
RocketMQ的消费者接收消息的顺序可能会和生产者发送消息的顺序不一致,但是其保证的是局部的一组消息的一致性;保证的是局部的有序性,大部分情况下,我们说的保证有序性是保证的局部有序性。
有序,就是生产者要保证发送到同一个队列messageQueue中,可以通过取余等机制,保证相同的一组信息发送到同一个queue中;
又因为queue就能自己保证有序性,就能保证消息的顺序传递,先进先出。
然后还有一点就是消费者方也要进行处理,保证顺序消费;使用不同的监听器,使用MessageListenerOrderly这个监听器。又因为为了保证吞吐量,拉取消息的时候是多个线程去并发拉取?消费者拉取消息的时候是每次拉32条,从哪里拉的不确定,随意,这样就可能会导致无序。多线程情况下就无法保证有序性了。
为了保证有序性,先锁定一个队列,将这个队列的消息消费完毕之后,才会去切换到别的队列进行处理。这个是不同的模型。
如果失败了,就阻塞当前队列一会儿,为了保证顺序性,就会阻塞整个队列,等到处理成功了才会解除这个阻塞。顺序消息对吞吐量有影响的。
如果是并发处理模型,中间一个处理失败了,可能会去后面继续消费,只要保证本次拿到32条消息能够消费就可以了。
一般都是保证局部有序就行了。要全局有序的话,所有的消息都要放到同一个messageQueue,这样就没有分布式的优势了。
顺序消费,消费者拉取的时候锁定一个队列,也是可以使用多线程线程池去并发拉取这个队列中的消息,但是需要消费者本身进行控制,保证有序性。
延迟消息:
msg.setDelayTimeLevel(3);// 第3等级。 类似一个定时任务。需要重点设计的东西。延迟的时间,RocketMQ设置了一些默认级别,没有必要记录,一共18个级别,messageDelayLevel。可以在broker.conf中进行修改,但是不建议修改。 4.x版本,只有这些延迟机制。没有办法指定时间点去发送。七天确认收货就不行。 5.x版本可以指定时间戳,到毫秒级进行消息推送。这个版本,开源版和商业版没有太大区别了。 是系统中的SCHEDULE_TOPIC_XXXX这个topic下的18个不同messageQueue。
批量消息
可以减少IO的消耗,放到一个list中;减少了IO次数,如果网络异常,那么消息就丢失了;有限制,大小不超过1M,这个限制不是强制性的。消息进行分割,分成几个批次,进行请求。
批量发,减少IO,然后要考虑安全性,每次消息大小不能太大,可以对消息进行压缩成二进制,消费者再解压等等。RocketMQ已经进行了一些压缩。
批量发送就是尽量减少请求次数,可以使用spilt进行拆分一下。
针对边缘的优化。这才是RocketMQ的精髓。
过滤消息
消费者组中多个消费者进行协作,一般情况下是不可控的,为了保证可控;就可以用到消息过滤机制。发送消息的时候生产者可以指定tag,消费者同样也订阅这个tag。subscribe这个方法订阅指定的tag,多个tag使用 || 隔开。订阅的时候订阅指定的tag,过滤是在哪里?一个是broker端,这样发送到消费者的就少了;还有就是在消费者端进行过滤,全部接收,但是进行过滤的消费。
broker端处理:网络压力小了,但是其本身的压力就大了。这里面是要有取舍的。
RocketMQ优先保证的是网络的性能,也就是在broker端实现的。不建议定义很复杂的订阅条件。
如果是更复杂的过滤条件,生产者发送的时候还是正常发送带tag的消息;然后在消费者端定义MessageSelector.bySql。
默认情况下,服务端broker不支持bySql的方式。在集群中有一个配置可以进行配置,enablePropertyFilter,通过这个进行配置。默认值是false。修改为true。修改之后broker才支持bySql。为了防止broker端更繁忙。
典型的流式处理方式:先拿一个应用来过滤,然后通过topic来发送,保证每个messageQueue使用同一个方式去消费;这样的方式要明显高于broker端进行sql过滤。
事务消息:
重点,非常重点。
写本地事务,和发消息如果不控制,就会出现不一致;提供一个机制,保证这两个是同步的。完整的业务场景下,上游客户下单,下游商家发货,这才是完整的事务。但是在分布式情况下,这个链路太长了。RocketMQ进行了折中处理,只保证前面一半保证原子性。后面从broker到消费者的这个过程,事务可以由消费者来控制,自己确认,同步进行返回消息消费状态。
这并不是标准的分布式解决方案。是半事务。后面的一半由消费者自己去实现。
RocketMQ的事务消息和消费者无关,只是和生产者和broker相关。producer.setTransactionListener().
所谓半消息half消息,是生产者发送这个半消息,然后broker将其挪到内部的一个topic中,该topic是RMQ_SYS_TRANS_HALF_TOPIC。如果确认成功,broker将这个消息从这个topic中挪出来,发到消费者订阅的topic中。
返回unknown状态,broker就要进行回查;多久回查一次,回查多少次这些是要配置的。本地事务回查次数通过参数transactionCheckMax设定,默认15次。本地事务回查的间隔通过参数transactionCheckInterval设定,默认60秒。超过回查次数后,消息将会被丢弃。
定时任务,例如买票订单确认,15分钟未支付就取消订单,支付了就分配座位。如果使用定时任务,15分钟后去执行,那么如果是15分钟内支付了,但是不能马上去分配位置,必须要等定时任务启动才行,这样当然是不行的。也不符合业务逻辑。这个场景是broker级别配置的,配置完成之后这个broker中的所有message Queue都会生效。
messageQueue一般建议配置是broker数量的两倍。broker中一个topic的messageQueue数量默认是8个,也可以自动进行确认。
RocketMQ如果启动了长时间,那么从控制台可能会看到消息的最小位点就不是0了,因为日志文件默认是1G,超过之后就写一个新的文件;历史文件默认保存120个小时,凌晨4点会删除过期的文件,删除之后历史的消息就找不到了。清空了。
消费模型:消费者组中的消费者可以消费一个broker中的一个messageQueue,也可以消费多个messageQueue,也可以跨broker消费messageQueue;但是broker中的messageQueue只会被消费者组中的一个消费者去消费,不会出现一个messageQueue被一个消费者组中的多个消费者消费。
权限控制
非核心功能,一般来说MQ是内部使用的,根本就无序进行权限控制,但是RocketMQ提供了权限控制的功能。perm,6可读可写;4是可读不可写;2禁写禁读。太粗粒度了。
幂等
对消费者来说,不保证只推送一次,保证最少一次;可以向同一个消费者推送多次,或者可以向不同的消费者推送。
RocketMQ提供messageId,这个不靠谱,事务消息,或者多次转发的过程中,这个可能会变;建议自己去实现这个消息id。
标签:生产实践,消费者,messageQueue,模型,broker,保证,消息,RocketMQ From: https://www.cnblogs.com/0630sun/p/18176958