首页 > 编程语言 >阿里云openservices rocketmq消息队列消费消息底层源码分析

阿里云openservices rocketmq消息队列消费消息底层源码分析

时间:2022-10-11 18:35:54浏览次数:61  
标签:消费 源码 openservices 消息 put new Consumer rocketmq tempProperties


mq消费源码
依赖

<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
</dependency>

阿里云rocketmq消息队列

阿里云openservices rocketmq消息队列消费消息底层源码分析_构造方法


进来createConsumer方法

阿里云openservices rocketmq消息队列消费消息底层源码分析_构造方法_02


return new ConsumerImpl(ONSUtil.extractProperties(properties));

阿里云openservices rocketmq消息队列消费消息底层源码分析_rocketmq_03

可以看到阿里云 opensevices提供许多mq的功能,包括创建生产者、创建消费者、批量创建消费者、批量创建生产者、
创建顺序Consumer(全家顺序和局部顺序,和上面的并发消费区分开来),顺序消费的使用场景是某些业务必须同步完成的时候可以使用,就是这一个业务线的数据都发到同一个queue中并且使用同一个消费者来消费,这样相对于每个queue就是有序的
最后一个是创建事务生产者,所以做rocketmq包括阿里云那套对常见的方法封装的比较好,这也是这个mq的优势之一。

接着回来ConsumerImpl 消费的源码

阿里云openservices rocketmq消息队列消费消息底层源码分析_构造方法_04


这个方法中先调用了父类ONSConsumerAbstract中的构造方法,在ONSConsumerAbstract的构造方法中又调用了ONSClientAbstract的构造方法。

阿里云openservices rocketmq消息队列消费消息底层源码分析_rocketmq_05


ONSClientAbstract的构造方法检查参数,包括连接地址、访问密钥等

阿里云openservices rocketmq消息队列消费消息底层源码分析_rocketmq_06

private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
public Thread newThread(Runnable r) {
return new Thread(r, “ONSClient-UpdateNameServerThread”);
}
});


然后有个线程池,每90秒检测一次nameServer地址是否变更
这块逻辑其实consumer与producer公用的。ProducerImpl也会去调用父类方法super(properties);

阿里云openservices rocketmq消息队列消费消息底层源码分析_.net_07


DefaultMQPushConsumer,核心代码,ONSConsumerAbstract的构造方法中创建消费者,使用的是push的方式。

回顾一下push pull区别

阿里云openservices rocketmq消息队列消费消息底层源码分析_rocketmq_08

push方式,consumer把轮询过程封装了,并注册messagelistener监听器,取到消息后,唤醒messagelister的consumerMessage消费,感觉队列被push过来进行订阅。
pull方式,消息自己去取,首先通过打算消费topic拿到messagequeue集合,遍历messagequeue集合,然后针对每个messgequeue批量取消息,一次取完后,记录该队列下一次要取的offset,直到取完了,再换另一个messagequeue.

到回DefaultMQPushConsumer之后,配置项设置consumerGroup,实例名称,nameServer地址,消费者线程的最小和最大线程数,包括一些在PropertyKeyConst类中的配置(客户端缓存消息数量,客户端缓存最大内容)。

阿里云openservices rocketmq消息队列消费消息底层源码分析_构造方法_09


阿里云openservices rocketmq消息队列消费消息底层源码分析_java-rocketmq_10

最后setPostSubscriptionWhenPull设置为false,
consumer.setMessageModel(MessageModel.CLUSTERING)集群模式(消费模式分集群、广播)

总结:消费方式mq

阿里云openservices rocketmq消息队列消费消息底层源码分析_rocketmq_11


;设置Properties

阿里云openservices rocketmq消息队列消费消息底层源码分析_.net_12


获取yml里面的访问密钥、密钥、mq服务器地址、设置发送超时时间,单位毫秒 1000=1s;

阿里云openservices rocketmq消息队列消费消息底层源码分析_.net_13


然后获取消费组进行遍历,以及groupid,然后就可以创建消费者,创建消费者上面说了,

创建方式构造方法,

阿里云openservices rocketmq消息队列消费消息底层源码分析_阿里云_14


把这些参数初始化,然后做一些校验和在赋值。

拉取消息的前置和后置处理类的创建

// 为Consumer增加消息轨迹回发模块
String msgTraceSwitch = properties.getProperty(PropertyKeyConst.MsgTraceSwitch);
if (!UtilAll.isBlank(msgTraceSwitch) && (!Boolean.parseBoolean(msgTraceSwitch))) {
log.info(“MQ Client Disable the Trace Hook!”);
} else {
try {
Properties tempProperties = new Properties();
tempProperties.put(OnsTraceConstants.AccessKey, sessionCredentials.getAccessKey());
tempProperties.put(OnsTraceConstants.SecretKey, sessionCredentials.getSecretKey());
tempProperties.put(OnsTraceConstants.MaxMsgSize, “128000”);
tempProperties.put(OnsTraceConstants.AsyncBufferSize, “2048”);
tempProperties.put(OnsTraceConstants.MaxBatchNum, “100”);
tempProperties.put(OnsTraceConstants.NAMESRV_ADDR, this.getNameServerAddr());
tempProperties.put(OnsTraceConstants.InstanceName, “PID_CLIENT_INNER_TRACE_PRODUCER”);
tempProperties.put(OnsTraceConstants.TraceDispatcherType, OnsTraceDispatcherType.CONSUMER.name());
AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, sessionCredentials);
dispatcher.setHostConsumer(defaultMQPushConsumer.getDefaultMQPushConsumerImpl());
traceDispatcher = dispatcher;
this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
new OnsConsumeMessageHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error(“system mqtrace hook init failed ,maybe can’t send msg trace data”, e);
}
}
然后回来就是设置pull flase,集群模式。
然后把创建的消费者去订阅消息
consumerBean.subscribe(consumer.getTopic(), consumer.getTag(),
SpringUtil.getBean(consumer.getBeanName()));

阿里云openservices rocketmq消息队列消费消息底层源码分析_构造方法_15


subscribeTable放入订阅map,

阿里云openservices rocketmq消息队列消费消息底层源码分析_.net_16


topic和tag过来消费者订阅消息了,

阿里云openservices rocketmq消息队列消费消息底层源码分析_.net_17


,发送的时候用了new ReentrantLock()锁
public void sendHeartbeatToAllBrokerWithLock() {
if (this.lockHeartbeat.tryLock()) {
try {
this.sendHeartbeatToAllBroker();
this.uploadFilterClassSource();
} catch (Exception var5) {
this.log.error(“sendHeartbeatToAllBroker exception”, var5);
} finally {
this.lockHeartbeat.unlock();
}
} else {
this.log.warn(“lock heartBeat, but failed.”);
}
}

push:
理下流程:

首先 new DefaultMQPushConsumer 对象,并指定一个消费组名。
然后设置相关参数,例如 nameSrvAdd、消费失败重试次数、线程数等
通过调用 setConsumeFromWhere 方法指定初次启动时从什么地方消费,默认是最新的消息开始消费。
通过调用 setAllocateMessageQueueStrategy 指定队列负载机制,默认平均分配。
通过调用 registerMessageListener 设置消息监听器,即消息处理逻辑,最终返回 CONSUME_SUCCESS(成功消费)或 RECONSUME_LATER(需要重试)。

pull
首先根据 MQConsumer 的 fetchSubscribeMessageQueues 的方法获取 Topic 的所有队列信息
然后遍历所有队列,依次通过 MQConsuemr 的 PULL 方法从 Broker 端拉取消息。
对拉取的消息进行消费处理
通过调用 MQConsumer 的 updateConsumeOffset 方法更新位点,但需要注意的是这个方法并不是实时向 Broker 提交,而是客户端会启用以线程,默认每隔 5s 向 Broker 集中上报一次。

1.集群消费方式
一个ConsumerGroup中的Consumer实例平均分摊消费生产者发送的消息。例如某个Topic有九条消息,其中一个Consumer Group有三个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的3条消息,Consumer不指定消费方式的话默认是集群消费的,适用于大部分消息的业务
2.广播消费方式
一条消息被多个Consumer消费,几十这些Consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer消费一次,广播消费中的ConsumerGroup概念可以认为在消息划分层面没有意义,适用于一些分发消息的场景,比如我订单下单成功了,需要通知财务系统,客服系统等等这种分发的场景,可以通过修改Consumer中的MessageModel来设置消费方式为广播消费

//广播
BROADCASTING(“BROADCASTING”),
//集群
CLUSTERING(“CLUSTERING”);
源码
String messageModel = properties.getProperty(“MessageModel”, “CLUSTERING”);
this.defaultMQPushConsumer.setMessageModel(MessageModel.valueOf(messageModel));



标签:消费,源码,openservices,消息,put,new,Consumer,rocketmq,tempProperties
From: https://blog.51cto.com/u_14604401/5747617

相关文章

  • HashMap实现原理及源码分析
    哈希表(hashtable)也叫散列表,是一种非常重要的数据结构,应用场景及其丰富,许多缓存技术(比如memcached)的核心其实就是在内存中维护一张大的哈希表,而HashMap的实现原理也常常出现......
  • 源码角度了解Skywalking之服务端OAP对Trace的处理
    源码角度了解Skywalking之服务端OAP对Trace的处理从前几篇的文章我们知道Skywalking对Trace信息进行生成收集后,将TraceSegment对象转换为UpstreamSegment对象,通过GRPC发送......
  • 基于SSM+Vue汽车租赁管理系统Java车辆出租系统(源码调试+讲解+文档)
    ......
  • golang-set包的用法及源码解析
    Set是一种基本的数据结构,它具备确定性、互异性、无序性三个特点。因此,在开发过程中我们通常用它来判断一些数据的集合与另一个数据集合或者元素的包含关系。在大部分开发......
  • 3.0 Spring生命周期源码解析
    Spring最核心的功能之一就是创建对象(IOC)Bean的生命周期指:在spring中,一个Bean的生成和销毁的过程1.生成BeanDefinitionSpring启动先进行扫描,调用org.springframework.c......
  • Lua5.3源码解析
    2022-10-11,16点52 大概看了2个月不到的时间,坚持每天看lua设计与实现.pdf还有csdn上面lua的博客.然后自己debug研究.最后把细节加到注释里面.建议看这个项目时候......
  • 工厂方法在Spring源码中的运用
    我们都知道Spring中IOC是使用的工厂模式,但是对于实现细节就一知半解了,今天这篇文章就带大家解读Spring中是如何使用工厂模式的。在上篇文章中我们懂了什么是工厂模式,这篇文......
  • drf三大认证之频率类源码解析
    主要从SimpleRateThrottle的allow_request方法开始分析第一步1.查看SimpleRateThrottle的allow_requestifself.rateisNone:returnTrue#表示没......
  • 云转码源码|m3u8切片程序全开源
     什么是云转码? 云转码是完全在云中将视频文件转换为其他格式的过程。更具体地说,转码意味着从单个编码视频文件创建不同大小、分辨率和比特率的新文件。这种方法使广播......
  • 工厂方法在Spring源码中的运用
    我们都知道Spring中IOC是使用的工厂模式,但是对于实现细节就一知半解了,今天这篇文章就带大家解读Spring中是如何使用工厂模式的。在上篇文章中我们懂了什么是工厂模式,这篇......