首页 > 编程语言 >AliMQ(RocketMQ)源码(二)producer.start()

AliMQ(RocketMQ)源码(二)producer.start()

时间:2022-12-21 12:02:04浏览次数:46  
标签:log producer void defaultMQProducer start 源码 new public


在创建完成Producer后,就进入了Producer的start()方法。start()方法DefaultMQProducerImpl的start()方法。

this.serviceState = ServiceState.START_FAILED;

this.checkConfig();

if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}

this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);

boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}

this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

if (startFactory) {
mQClientFactory.start();
}

log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;

上面现将状态设置为START_FAILED,启动成功后再设置为RUNNING状态。
this.checkConfig(); 检查是否设置group
前面的代码都是做下缓存,没有功能的实际意义,真正的启动是在

mQClientFactory.start();

this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;

1、获取NameServer

if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}

上面这段是获取NameServer

2、启动channel

众所周知,RocketMQ通讯是通过Netty的,Netty的通信需要通过channel

this.mQClientAPIImpl.start();

就是初始化channel。
这个start()方法,进入了NettyRemotingClient的start()方法,这这个start方法的前面一部分是在创建netty通讯的客户端

Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingClient.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);

是检查发送的消息是否超时,并做相应处理。

this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingClient.this.scanChannelTablesOfNameServer();
} catch (Exception e) {
log.error("scanChannelTablesOfNameServer exception", e);
}
}
}, 1000 * 3, 10 * 1000);

上面这段代码是通过心跳检查查看NameServer的连接是佛正常。

3、启动多个定时任务

this.startScheduledTask();

其中的方法拆开分析如下:

if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}

每个2分钟更新一个NameServer。

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

更新topic在NameServer中的路由信息。

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

上面这段是清除下线的Broker,并与Broker的心跳检测,后面发送消息,真正的通讯是与Broker直接进行的。

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

上面是将消费端的offset持久化到你本机上。

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);

上面这段是调整发送消息的线程池的大小,其实是空方法,不起作用。

4、启动拉取消息的线程

this.pullMessageService.start();

上面这个方法是启动拉取消息的线程。

5、启动重新负载均衡的线程

this.rebalanceService.start();

上面的方法,会根据topic或者服务的变化等重新进行负载均衡。

this.defaultMQProducer.getDefaultMQProducerImpl().start(false);

上面这段,把之前的方法再验证一遍,没有实际意义。

至此,producer的start()方法就结束了。

二、super.start()

在ProducerImpl的super.start()方法中,就是做了一件事,不停将消息的轨迹持久化到控制台。

public void start() throws MQClientException {
TraceProducerFactory.registerTraceDispatcher(dispatcherId);
this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncArrayDispatcher-Thread-" + dispatcherId);
this.worker.setDaemon(true);
this.worker.start();
this.registerShutDownHook();
}


标签:log,producer,void,defaultMQProducer,start,源码,new,public
From: https://blog.51cto.com/u_11970680/5959708

相关文章