首页 > 编程语言 >AliMQ(RocketMQ)源码(一)创建producer

AliMQ(RocketMQ)源码(一)创建producer

时间:2022-12-21 12:02:12浏览次数:40  
标签:producer Producer sessionCredentials PropertyKeyConst 源码 new properties RocketMQ


公司现在在使用阿里云的AliMQ也就是RocketMQ,趁着工作之余,将RocketMQ的使用心得分析一下,关于RocketMQ的Producer、Consumer、Broker、NameServer等架构问题,在此处先不做分析了,想了解的可以自行查找,在这里从Java端分析一下RocketMQ源码,本篇从发送消息的Producer开始,先了解一下producer的创建。

一、创建Producer

一般我们创建Producer,会使用如下方法:

Producer producers = ONSFactory.createProducer(properties);

进入方法介绍看下:

/**
* 创建Producer
*
* <p>
* <code>properties</code>
* 应该至少包含以下几项必选配置内容:
* <ol>
* <li>{@link PropertyKeyConst#ProducerId}</li>
* <li>{@link PropertyKeyConst#AccessKey}</li>
* <li>{@link PropertyKeyConst#SecretKey}</li>
* <li>{@link PropertyKeyConst#ONSAddr}</li>
* </ol>
* 以下为可选内容
* <ol>
* <li>{@link PropertyKeyConst#OnsChannel}</li>
* <li>{@link PropertyKeyConst#SendMsgTimeoutMillis}</li>
* <li>{@link PropertyKeyConst#NAMESRV_ADDR} 该属性会覆盖{@link PropertyKeyConst#ONSAddr}</li>
* </ol>
* </p>
*
* <p>
* 返回创建的{@link Producer}实例是线程安全, 可复用, 发送各个主题. 一般情况下, 一个进程中构建一个实例足够满足发送消息的需求.
* </p>
*
* <p>
* 示例代码:
* <pre>
* Properties props = ...;
* // 设置必要的属性
* Producer producer = ONSFactory.createProducer(props);
* producer.start();
*
* //producer之后可以当成单例使用
*
* // 发送消息
* Message msg = ...;
* SendResult result = produer.send(msg);
*
* // 应用程序关闭退出时
* producer.shutdown();
* </pre>
* </p>
*
* @param properties Producer的配置参数
* @return {@link Producer} 实例
*/
public static Producer createProducer(final Properties properties) {
return onsFactory.createProducer(properties);
}

上面列出了方法的注释,因为是阿里的中间件团队开发的,所以注释也是中文,上面说明Properties中有4个必选参数,3个可选参数,
因为公司是用的阿里云企业版的RocketMQ(AliMQ),其中四个必选参数都是企业版验证的参数,ONSAddr是公网的地址。

有一个可以重点关注的注释:
返回创建的{@link Producer}实例是线程安全, 可复用, 发送各个主题. 一般情况下, 一个进程中构建一个实例足够满足发送消息的需求.
由上所知,可以将创建的producer存储在服务的内存中,以方便后续复用。

二、Producer的构造方法

后面创建直接进入的ProducerImpl的构造方法,

1、进入其父类ONSClientAbstract的构造方法中

public ONSClientAbstract(Properties properties) {
this.properties = properties;
this.sessionCredentials.updateContent(properties);
// 检测必须的参数
if (null == this.sessionCredentials.getAccessKey() || "".equals(this.sessionCredentials.getAccessKey())) {
throw new ONSClientException("please set access key");
}

if (null == this.sessionCredentials.getSecretKey() || "".equals(this.sessionCredentials.getSecretKey())) {
throw new ONSClientException("please set secret key");
}

if (null == this.sessionCredentials.getOnsChannel()) {
throw new ONSClientException("please set ons channel");
}

// 用户指定了Name Server
// 私有云模式有可能需要
this.nameServerAddr = this.properties.getProperty(PropertyKeyConst.NAMESRV_ADDR);
if (nameServerAddr != null) {
return;
}
this.nameServerAddr = fetchNameServerAddr();
if (null == nameServerAddr) {
throw new ONSClientException(FAQ.errorMessage("Can not find name server, May be your network problem.", FAQ.FIND_NS_FAILED));
}

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
public void run() {
try {
String nsAddrs = fetchNameServerAddr();
if (nsAddrs != null && !ONSClientAbstract.this.nameServerAddr.equals(nsAddrs)) {
ONSClientAbstract.this.nameServerAddr = nsAddrs;
if (isStarted()) {
updateNameServerAddr(nsAddrs);
}
}
} catch (Exception e) {
log.error("update name server periodically failed.", e);
}
}
}, 10 * 1000L, 30 * 1000L, TimeUnit.MILLISECONDS);

}

在其构造方法中,

  • 先是把4个必选参数放入了属性sessionCredentials中
  • 接着通过我们之前传入的ONSAddr的公网地址获取nameServer的地址
  • 最后通过一个定时线程每隔30秒轮询一个NameServer的地址是否有变更,如果变更及时进行更新。

2、创建DefaultMQProducer

this.defaultMQProducer = new DefaultMQProducer(new OnsClientRPCHook(sessionCredentials));

创建DefaultMQProducer的时候主要是传入了OnsClientRPCHook参数,这是一个钩子对象,其构造方法的参数就是我们上面传的sessionCredentials对象。
在每一个请求前,都会调用OnsClientRPCHook的doBeforeRequest方法,

3、设置Group

第三四行是将我们传入的必穿参数中的ProducerId作为producerGroup

String producerGroup = properties.getProperty(PropertyKeyConst.ProducerId, "__ONS_PRODUCER_DEFAULT_GROUP");
this.defaultMQProducer.setProducerGroup(producerGroup);

不过producerGroup不像consumerGroup,只有在事务消息中采用使用到,对于其他消息来说就是鸡肋
这个再下面的两行是判断是否是走VIP通道,这个如果启动的话,阿里云会多收取你的费用,不过性能也更好,如果业务量非常大的话并且必须的话,可以考虑走VIP通道。

4、设置超时时间

7、8、9、10是设置超时时间,可以在Properties中的SendMsgTimeoutMillis参数手动设置,默认producer端是5秒超时时间

if (properties.containsKey(PropertyKeyConst.SendMsgTimeoutMillis)) {
this.defaultMQProducer.setSendMsgTimeout(Integer.valueOf(properties.get(PropertyKeyConst.SendMsgTimeoutMillis).toString()));
} else {
this.defaultMQProducer.setSendMsgTimeout(5000);
}

5、设置Producer的实例名称

String instanceName = properties.getProperty(PropertyKeyConst.InstanceName, this.buildIntanceName());
this.defaultMQProducer.setInstanceName(instanceName);

11、12行是设置实例名称,可以理解为一个随机的唯一数。

6、设置NameServer地址

this.defaultMQProducer.setNamesrvAddr(this.getNameServerAddr());

13行设置NameServer的地址,多个地址用分号分隔。

7、设置消息体最大值

// 消息最大大小4M
this.defaultMQProducer.setMaxMessageSize(1024 * 1024 * 4);

8、设置消息轨迹

最后面的一段是设置消息轨迹,用过RocketMQ的都知道我们可以通过msgId从控制到查询到数据的情况,就是根据这个来的。

try {
Properties tempProperties = new Properties();
tempProperties.put(OnsTraceConstants.AccessKey, sessionCredentials.getAccessKey());
tempProperties.put(OnsTraceConstants.SecretKey, sessionCredentials.getSecretKey());
tempProperties.put(OnsTraceConstants.MaxMsgSize, "128000");
tempProperties.put(OnsTraceConstants.AsyncBufferSize, "2048");
tempProperties.put(OnsTraceConstants.MaxBatchNum, "100");
tempProperties.put(OnsTraceConstants.NAMESRV_ADDR, this.getNameServerAddr());
tempProperties.put(OnsTraceConstants.InstanceName, "PID_CLIENT_INNER_TRACE_PRODUCER");
tempProperties.put(OnsTraceConstants.TraceDispatcherType, OnsTraceDispatcherType.PRODUCER.name());
AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, sessionCredentials);
dispatcher.setHostProducer(defaultMQProducer.getDefaultMQProducerImpl());
traceDispatcher = dispatcher;
this.defaultMQProducer.getDefaultMQProducerImpl().registerSendMessageHook(
new OnsClientSendMessageHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data.", e);
}

在上面的方法中,最后一行可以看到所有的参数都成为了ProducerImpl的一个钩子的OnsClientSendMessageHookImpl,这个类的父类是SendMessageHook,其有两个方法sendMessageBefore,sendMessageAfter。分别是在发送消息前设置和发送消息后调用,将一系列消息相关的数据都放入了AsyncArrayDispatcher的traceContextQueue属性中。并且这些数据在一个任务中一直在往磁盘上面写数据,所以就是我们在RocketMQ的控制台上面看到的数据。
在Producer的start()方法中:

public void start() throws MQClientException {
TraceProducerFactory.registerTraceDispatcher(dispatcherId);
this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncArrayDispatcher-Thread-" + dispatcherId);
this.worker.setDaemon(true);
this.worker.start();
this.registerShutDownHook();
}

AsyncRunnable的run()方法有个while(!stop)就是一直在往控制台写数据。
至此,Producer的创建就说完了,下面一篇博文我们一起看看Producer的启动,即producer.start()方法。

aliyun的jar包中为我们提供了ProducerBean,在spring-boot中直接将ProducerBean作为Producer的初始化使用是推荐的方式。如下

@Bean
public ProducerBean producerBean(){
ProducerBean producer = new ProducerBean();
Properties properties =new Properties();
properties.setProperty("AccessKey", accessKey);
properties.setProperty("SecretKey", secretKey);
properties.setProperty("ONSAddr", addr);
properties.setProperty("ProducerId", producerId);
producer.setProperties(properties);
producer.start();
return producer;
}


标签:producer,Producer,sessionCredentials,PropertyKeyConst,源码,new,properties,RocketMQ
From: https://blog.51cto.com/u_11970680/5959707

相关文章

  • AliMQ(RocketMQ)源码(二)producer.start()
    在创建完成Producer后,就进入了Producer的start()方法。start()方法DefaultMQProducerImpl的start()方法。this.serviceState=ServiceState.START_FAILED;......
  • Dubbo架构设计与源码解析(二) 服务注册
    一、Dubbo简介Dubbo是一款典型的高扩展、高性能、高可用的RPC微服务框架,用于解决微服务架构下的服务治理与通信问题。其核心模块包含【RPC通信】和【服务治理】,其中......
  • 万字长文 | Spring Cloud Alibaba组件之Nacos实战及Nacos客户端服务注册源码解析
    滴滴滴,上车了!本次旅途,你将获取到如下知识:Nacos在微服务架构中的作用Nacos在Linux下的安装与使用搭建真实项目环境,实现服务注册与发现真实项目环境下实现Nacos的配置......
  • 仿剪映播放器、剪辑视频、预览条、快速精准抽帧(附源码)
    给大家分享一下里面小伙伴的项目实践,高仿剪映快速抽帧、精准显示功能,而且还有源码给出!关于实现思路,之前也在公众号里面给大家分享过:​​干货|快速抽取缩略图是怎么练成的?......
  • 企业电子投票系统(论文+PPT+源码)
    设计题目 企业电子投票系统摘要目录​​第​​1部分概述1​第​​2部分 分析部分2​​2​​.1.功能需求2​第​​3部分 系统设计3​​3​​.1.功能模块设计3​​3​​.2.数......
  • 固定资产管理系统(论文+PPT+源码)
    固定资产管理系统摘要随着计算机信息技术的发展以及对资产、设备的管理科学化、合理化的高要求,利用计算机实现设备及资产的信息化管理已经显得非常重要。固定资产管理系统......
  • 企业车辆管理系统(论文+源码)
    毕业设计(论文)资料设计(论文)题目:企业车辆管理系统摘要随着经济的日益增长,车辆作为最重要的交通工具,在企事业单位中得以普及,单位的车辆数目已经远远不止简单的几辆,与此同......
  • 用Java EE技术实现产品售后服务系统(论文+PPT+源码)
    目录摘要1Abstract11引言52.1系统需求分析72.2可行性分析82.3本系统采用的关键技术93系统概要设计124系统详细设计144.1后台数据库设计144.2系统E-R图184.3.2数......
  • Spring源码编译
    资料参考地址1:Spring源码编译准备环境配置JDK8(与Spring5的兼容性最好)spring:5.2.0release下载Spring源码直接去官方的github库下载,https://github.com/spring......
  • Qt下MQTT模块的导入(源码直接导入)适用Windows和Linux系统
    Qt下MQTT模块的导入(源码直接导入)适用Windows和Linux系统​​0.环境​​​​1.MQTT源码下载(也可以去官网下载)​​​​2.MQTT源码解压成功复制src/mqtt文件夹到工程中​......