核心代码
初始化Default生产者
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
设置NameAddr地址
producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
producer.start();
分析new DefaultMQProducer(PRODUCER_GROUP)
public DefaultMQProducer(final String producerGroup) {
this.producerGroup = producerGroup;
//初始化
defaultMQProducerImpl = new DefaultMQProducerImpl(this, null);
//
produceAccumulator = MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
}
DefaultMQProducerImpl
- 初始化发送线程池:队列LinkedBlockingQueue<>(50000)、核心(最大)线程数据为cpu核数
- semaphoreAsyncSendNum:生产者并发数,默认10
- semaphoreAsyncSendSize:生产者“处理中”的消息最大数量,默认1024*1024
- serviceDetector:获取topic信息,获取topic对应的queue的最大位移
- MQFaultStrategy:MQ客户端故障策略,生产者出现故障后,根据brokerName重新获取DefaultMQProducerImpl对象
核心变量
- DefaultMQProducer defaultMQProducer;
- ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =new ConcurrentHashMap<>();
- ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<>();
- ArrayList<EndTransactionHook> endTransactionHookList = new ArrayList<>();
- RPCHook rpcHook;
- BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
- ExecutorService defaultAsyncSenderExecutor:
- BlockingQueue<Runnable> checkRequestQueue;
- ExecutorService checkExecutor;
- ServiceState serviceState = ServiceState.CREATE_JUST;
- MQClientInstance mQClientFactory;
- MQFaultStrategy mqFaultStrategy;
MQClientManager.getInstance().getOrCreateProduceAccumulator
单例获取MQClientManager.getInstance
获取clientId
根据clientId获取ProduceAccumulator
ProduceAccumulator作用:为每个client提供一个生产者具体类,内部封装了批量消息、发送消息线程等
AggregateKey作用:包括topic、queue、tag、waitStoreMsgOK(等待存储确认boolean变量)
MessageAccumulation:批次消息存储实例,用于存储对应topic、tag对应的生产者堆积实例,支持add添加消息
GuardForSyncSendService作用:同步发送消息服务线程,继承Thread,AggregateKey对MessageAccumulation:1对1,Map
GuardForAsyncSendService作用:异步发送消息服务,继承Thread,AggregateKey对MessageAccumulation:1对1,Map
分析producer.start()
对下面两个核心实例进行start
defaultMQProducerImpl.start()
produceAccumulator.start()
defaultMQProducerImpl.start()
根据ServiceState状态枚举处理,实际是对CREATE_JUST枚举(服务仅创建场景未启动)的处理
配置检测
创建MQClientInstance实例(netty处理):
- 为每个clientId分配一个MQClientInstance实例
- 底层使用netty的channel监听各种事件,核心事件为onChannelActive,在onChannelActive会记性与broken之间的心跳检测,进行负载处理
- 最终创建MQClientAPIImpl:把各种检测处理注册为NettyRequestProcessor
把defaultMQProducer注册到MQClientInstance中,这样就可以使用netty处理消息的具体内容
mQClientFactory.start():
// Start request-response channel this.mQClientAPIImpl.start(); // Start various schedule tasks this.startScheduledTask(); // Start pull service this.pullMessageService.start(); // Start rebalance service this.rebalanceService.start(); // Start push service this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
initTopicRoute():从defaultMQProducer读取所有topic,如果没有还未加载topic路由信息,则nameServer中加载topic相关信息(TopicPublishInfo)
mqFaultStrategy.startDetector():启动broker检测服务
mQClientFactory.sendHeartbeatToAllBrokerWithLock():没什么说的,遍历brokerAddrTable,向每个broker发送心跳
RequestFutureHolder.getInstance().startScheduledTask(this):启动request过期扫描定时任务,把过期请求从requestFutureTable中移除
produceAccumulator.start()
guardThreadForSyncSend.start();
guardThreadForAsyncSend.start();
标签:Start,生产者,DefaultMQProducer,topic,start,源码,new,RocketMQ From: https://www.cnblogs.com/use-D/p/18163625