公司现在在使用阿里云的AliMQ也就是RocketMQ,趁着工作之余,将RocketMQ的使用心得分析一下,关于RocketMQ的Producer、Consumer、Broker、NameServer等架构问题,在此处先不做分析了,想了解的可以自行查找,在这里从Java端分析一下RocketMQ源码,本篇从发送消息的Producer开始,先了解一下producer的创建。
一、创建Producer
一般我们创建Producer,会使用如下方法:
Producer producers = ONSFactory.createProducer(properties);
进入方法介绍看下:
/**
* 创建Producer
*
* <p>
* <code>properties</code>
* 应该至少包含以下几项必选配置内容:
* <ol>
* <li>{@link PropertyKeyConst#ProducerId}</li>
* <li>{@link PropertyKeyConst#AccessKey}</li>
* <li>{@link PropertyKeyConst#SecretKey}</li>
* <li>{@link PropertyKeyConst#ONSAddr}</li>
* </ol>
* 以下为可选内容
* <ol>
* <li>{@link PropertyKeyConst#OnsChannel}</li>
* <li>{@link PropertyKeyConst#SendMsgTimeoutMillis}</li>
* <li>{@link PropertyKeyConst#NAMESRV_ADDR} 该属性会覆盖{@link PropertyKeyConst#ONSAddr}</li>
* </ol>
* </p>
*
* <p>
* 返回创建的{@link Producer}实例是线程安全, 可复用, 发送各个主题. 一般情况下, 一个进程中构建一个实例足够满足发送消息的需求.
* </p>
*
* <p>
* 示例代码:
* <pre>
* Properties props = ...;
* // 设置必要的属性
* Producer producer = ONSFactory.createProducer(props);
* producer.start();
*
* //producer之后可以当成单例使用
*
* // 发送消息
* Message msg = ...;
* SendResult result = produer.send(msg);
*
* // 应用程序关闭退出时
* producer.shutdown();
* </pre>
* </p>
*
* @param properties Producer的配置参数
* @return {@link Producer} 实例
*/
public static Producer createProducer(final Properties properties) {
return onsFactory.createProducer(properties);
}
上面列出了方法的注释,因为是阿里的中间件团队开发的,所以注释也是中文,上面说明Properties中有4个必选参数,3个可选参数,
因为公司是用的阿里云企业版的RocketMQ(AliMQ),其中四个必选参数都是企业版验证的参数,ONSAddr是公网的地址。
有一个可以重点关注的注释:
返回创建的{@link Producer}实例是线程安全, 可复用, 发送各个主题. 一般情况下, 一个进程中构建一个实例足够满足发送消息的需求.
由上所知,可以将创建的producer存储在服务的内存中,以方便后续复用。
二、Producer的构造方法
后面创建直接进入的ProducerImpl的构造方法,
1、进入其父类ONSClientAbstract的构造方法中
public ONSClientAbstract(Properties properties) {
this.properties = properties;
this.sessionCredentials.updateContent(properties);
// 检测必须的参数
if (null == this.sessionCredentials.getAccessKey() || "".equals(this.sessionCredentials.getAccessKey())) {
throw new ONSClientException("please set access key");
}
if (null == this.sessionCredentials.getSecretKey() || "".equals(this.sessionCredentials.getSecretKey())) {
throw new ONSClientException("please set secret key");
}
if (null == this.sessionCredentials.getOnsChannel()) {
throw new ONSClientException("please set ons channel");
}
// 用户指定了Name Server
// 私有云模式有可能需要
this.nameServerAddr = this.properties.getProperty(PropertyKeyConst.NAMESRV_ADDR);
if (nameServerAddr != null) {
return;
}
this.nameServerAddr = fetchNameServerAddr();
if (null == nameServerAddr) {
throw new ONSClientException(FAQ.errorMessage("Can not find name server, May be your network problem.", FAQ.FIND_NS_FAILED));
}
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
public void run() {
try {
String nsAddrs = fetchNameServerAddr();
if (nsAddrs != null && !ONSClientAbstract.this.nameServerAddr.equals(nsAddrs)) {
ONSClientAbstract.this.nameServerAddr = nsAddrs;
if (isStarted()) {
updateNameServerAddr(nsAddrs);
}
}
} catch (Exception e) {
log.error("update name server periodically failed.", e);
}
}
}, 10 * 1000L, 30 * 1000L, TimeUnit.MILLISECONDS);
}
在其构造方法中,
- 先是把4个必选参数放入了属性sessionCredentials中
- 接着通过我们之前传入的ONSAddr的公网地址获取nameServer的地址
- 最后通过一个定时线程每隔30秒轮询一个NameServer的地址是否有变更,如果变更及时进行更新。
2、创建DefaultMQProducer
this.defaultMQProducer = new DefaultMQProducer(new OnsClientRPCHook(sessionCredentials));
创建DefaultMQProducer的时候主要是传入了OnsClientRPCHook参数,这是一个钩子对象,其构造方法的参数就是我们上面传的sessionCredentials对象。
在每一个请求前,都会调用OnsClientRPCHook的doBeforeRequest方法,
3、设置Group
第三四行是将我们传入的必穿参数中的ProducerId作为producerGroup
String producerGroup = properties.getProperty(PropertyKeyConst.ProducerId, "__ONS_PRODUCER_DEFAULT_GROUP");
this.defaultMQProducer.setProducerGroup(producerGroup);
不过producerGroup不像consumerGroup,只有在事务消息中采用使用到,对于其他消息来说就是鸡肋
这个再下面的两行是判断是否是走VIP通道,这个如果启动的话,阿里云会多收取你的费用,不过性能也更好,如果业务量非常大的话并且必须的话,可以考虑走VIP通道。
4、设置超时时间
7、8、9、10是设置超时时间,可以在Properties中的SendMsgTimeoutMillis参数手动设置,默认producer端是5秒超时时间
if (properties.containsKey(PropertyKeyConst.SendMsgTimeoutMillis)) {
this.defaultMQProducer.setSendMsgTimeout(Integer.valueOf(properties.get(PropertyKeyConst.SendMsgTimeoutMillis).toString()));
} else {
this.defaultMQProducer.setSendMsgTimeout(5000);
}
5、设置Producer的实例名称
String instanceName = properties.getProperty(PropertyKeyConst.InstanceName, this.buildIntanceName());
this.defaultMQProducer.setInstanceName(instanceName);
11、12行是设置实例名称,可以理解为一个随机的唯一数。
6、设置NameServer地址
this.defaultMQProducer.setNamesrvAddr(this.getNameServerAddr());
13行设置NameServer的地址,多个地址用分号分隔。
7、设置消息体最大值
// 消息最大大小4M
this.defaultMQProducer.setMaxMessageSize(1024 * 1024 * 4);
8、设置消息轨迹
最后面的一段是设置消息轨迹,用过RocketMQ的都知道我们可以通过msgId从控制到查询到数据的情况,就是根据这个来的。
try {
Properties tempProperties = new Properties();
tempProperties.put(OnsTraceConstants.AccessKey, sessionCredentials.getAccessKey());
tempProperties.put(OnsTraceConstants.SecretKey, sessionCredentials.getSecretKey());
tempProperties.put(OnsTraceConstants.MaxMsgSize, "128000");
tempProperties.put(OnsTraceConstants.AsyncBufferSize, "2048");
tempProperties.put(OnsTraceConstants.MaxBatchNum, "100");
tempProperties.put(OnsTraceConstants.NAMESRV_ADDR, this.getNameServerAddr());
tempProperties.put(OnsTraceConstants.InstanceName, "PID_CLIENT_INNER_TRACE_PRODUCER");
tempProperties.put(OnsTraceConstants.TraceDispatcherType, OnsTraceDispatcherType.PRODUCER.name());
AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, sessionCredentials);
dispatcher.setHostProducer(defaultMQProducer.getDefaultMQProducerImpl());
traceDispatcher = dispatcher;
this.defaultMQProducer.getDefaultMQProducerImpl().registerSendMessageHook(
new OnsClientSendMessageHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data.", e);
}
在上面的方法中,最后一行可以看到所有的参数都成为了ProducerImpl的一个钩子的OnsClientSendMessageHookImpl,这个类的父类是SendMessageHook,其有两个方法sendMessageBefore,sendMessageAfter。分别是在发送消息前设置和发送消息后调用,将一系列消息相关的数据都放入了AsyncArrayDispatcher的traceContextQueue属性中。并且这些数据在一个任务中一直在往磁盘上面写数据,所以就是我们在RocketMQ的控制台上面看到的数据。
在Producer的start()方法中:
public void start() throws MQClientException {
TraceProducerFactory.registerTraceDispatcher(dispatcherId);
this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncArrayDispatcher-Thread-" + dispatcherId);
this.worker.setDaemon(true);
this.worker.start();
this.registerShutDownHook();
}
AsyncRunnable的run()方法有个while(!stop)就是一直在往控制台写数据。
至此,Producer的创建就说完了,下面一篇博文我们一起看看Producer的启动,即producer.start()方法。
aliyun的jar包中为我们提供了ProducerBean,在spring-boot中直接将ProducerBean作为Producer的初始化使用是推荐的方式。如下
@Bean
public ProducerBean producerBean(){
ProducerBean producer = new ProducerBean();
Properties properties =new Properties();
properties.setProperty("AccessKey", accessKey);
properties.setProperty("SecretKey", secretKey);
properties.setProperty("ONSAddr", addr);
properties.setProperty("ProducerId", producerId);
producer.setProperties(properties);
producer.start();
return producer;
}