名言警句
任何先进的技术均与魔法无异
追本溯源
【经历了6个月的失踪,我将带着干货终究归来!【RocketMQ入门到精通】】
RocketMQ消费机制回顾
在众多MQ的体系中,一般消息的流转过程为消息通过生产者发送到某一个主题(topic),对应的订阅该topic的消费者将会消费里面的消息。在介绍消费者的使用方法之前我们先回顾以下RocketMQ的消费机制以及相关的消费属性(消费组、消费位点等)。
RocketMQ的消费者与消费组
RocketMQ消费组的由来
消息队列的重要作用之一是削峰填谷,但比如在电商大促的场景中,如果下游的消费者消费能力不足的话,大量的瞬时流量进入会后堆积在服务端。此时,消息的端到端延迟(从发送到被消费的时间)就会增加,对服务端而言,一直消费历史数据也会产生冷读。因此需要增加消费能力来解决这个问题,除了去优化消息消费的时间,最简单的方式就是扩容消费者。
首先需要了解消费组的概念。在消费者中消费组的有非常重要的作用,如果多个消费者设置了相同的Consumer Group,我们认为这些消费者在同一个消费组内。
在 Apache RocketMQ 有两种消费模式,分别是:
- 集群消费模式:当使用集群消费模式时,RocketMQ 认为任意一条消息只需要被消费组内的任意一个消费者处理即可。
- 广播消费模式:当使用广播消费模式时,RocketMQ 会将每条消息推送给消费组所有的消费者,保证消息至少被每个消费者消费一次。
集群消费模式适用于每条消息只需要被处理一次的场景,也就是说整个消费组会Topic收到全量的消息,而消费组内的消费分担消费这些消息,因此可以通过扩缩消费者数量,来提升或降低消费能力,具体示例如下图所示,是最常见的消费方式。
RocketMQ消费者三要素
推(push)、拉(pull)和长轮询
Apache RocketMQ既提供了Push模式也提供了Pull模式。
- Push是服务端主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。
- Pull是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。
推(push)、拉(pull)其实本质都是pull模式
在RocketMQ中,Consumer端的两种消费模式(Push/Pull)都是基于拉模式来获取消息的,而在Push模式只是对pull模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。如果未拉取到消息,则延迟一下又继续拉取。
两种基于拉模式的消费方式(Push/Pull)中,均需要Consumer端在知道从Broker端的哪一个消息队列—队列中去获取消息。因此,有必要在Consumer端来做负载均衡,即Broker端中多个MessageQueue分配给同一个ConsumerGroup中的哪些Consumer消费。
消费位点
Apache RocketMQ通过消费位点管理消费进度。当生产者和消费者在进行消息收发时,必然会涉及以下场景,消息先生产后订阅或先订阅后生产。这两种场景下,消费者客户端启动后从哪里开始消费?如何标记已消费的消息?这些都是由 Apache RocketMQ 的消费进度管理机制来定义的。总结了一下几种场景问题:
消费者启动后从哪里开始消费消息?
首先对消费者而言有几种消费策略模式:
- CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费
- CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费
- CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费
当然新版本又加入了其他的位点消费模式,这些都是Offset的计算变种版本,但是都是基于以上这三种模式。
注意:以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在 broker 端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始。所以CONSUME_FROM_FIRST_OFFSET不会出现从头消费的过程,除非你是新加入的消费组。
分析一下RocketMQ的推(push)模式运作机制
RocketMQ消费者使用的核心方法也是门面方法就是DefaultMQPushConsumer,我们从代码进行出发一点一点的分析技术本质。下面是我采用了并发消费模式的消费者的基础代码。
String groupName = "rocketMqGroup1";
// 用于把多个Consumer组织到一起,提高并发处理能力
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
// 设置nameServer地址,多个以;分隔
consumer.setNamesrvAddr("name-serverl-ip:9876;name-server2-ip:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setMessageModel(MessageModel.BROADCASTING);
// 订阅topic,可以对指定消息进行过滤,例如:"TopicTest","tagl||tag2||tag3",*或null表示topic所有消息
consumer.subscribe("order-topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> mgs,
ConsumeConcurrentlyContext consumeconcurrentlycontext) {
System.out.println(Thread.currentThread().getName()+"Receive New Messages:"+mgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
- CLUSTERING(集群模式):默认模式,同一个ConsumerGroup (groupName 相同) 每个consumer 只消费所订阅消息的一部分内容(一个ConsumerQueue队列),同一个ConsumerGroup里所有的Consumer消息加起来才是所订阅topic所有数据,通过增加消费者组的成员数量从而达到负载均衡的目的。
- BROADCASTING(广播模式):同一个ConsumerGroup 每个 consumer 都消费到所订阅 topic 所有消息,也就是一个消费会被多次分发,被多个 consumer 消费。
消费状态
- ConsumeConcurrentlyStatus.CONSUME_SUCCESS:代表消费成功。
- ConsumeConcurrentlyStatus.RECONSUME_LATE:broker会根据设置的messageDelayLevel发起重试(默认为0立刻重试),默认16次。
整体流程图
DefaultMQPushConsumerImpl分析Push原理及源码
最开始注册消费者监听器->consumer.registerMessageListener。
/**
* Register a callback to execute on message arrival for concurrent consuming.
* @param messageListener message handling callback.
*/
@Override
public void registerMessageListener(MessageListenerConcurrently messageListener) {
this.messageListener = messageListener;
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
}
通过上面的案例代码可以看出主要实现过程在DefaultMQPushConsumerImpl类中consumer.start 后调用 DefaultMQPushConsumerImpl的同步start方法。
分析DefaultMQPushConsumerImpl的start方法
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
this.copySubscription();
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
}
我们主要关注内部选择的几个对象。
DefaultMQPushConsumerImpl的start方法使用的核心对象
- RebalancePushImpl:主要负责决定当前的consumer应该从哪些Queue中消费消息的计算逻辑
- PullAPIWrapper:长连接,负责从broker处拉取消息,然后利用 ConsumeMessageService 回调用户的 Listener 执行消息消费逻辑;
- ConsumeMessageService:实现所谓的 "Push - 被动" 消费机制;从Broker拉取的消息后,封装成ConsumeRequest提交给ConsumeMessageSerivce,此 service负责回调用户的 Listener消费消息;
- OffsetStore:维护当前 consumer 的消费记录(offset);
- 有两种实现:Local 和 Remote。
- Local存储在本地磁盘上,适用于BROADCASTING广播消费模式;
- Remote则将消费进度存储在Broker,适用CLUSTERING集群消费模式;
- MQClientFactory:负责管理 client(consumer、producer),并提供多中功能接口供各个 Service(Rebalance、PullMessage 等)调用;大部分逻辑均在这个类中完成;
分析mQClientFactory.start方法做了什么
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
在这个方法中有多个start,我们主要看pullMessageService.start (); 通过这里我们发现 RocketMQ 的 Push 模式底层其实也是通过 pull 实现的,下面我们来看下 pullMessageService 处理了哪些逻辑:
分析pullMessageService的start方法
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}
我们发现其实他还是通过 DefaultMQPushConsumerImpl 类的 pullMessage方法来进行消息的逻辑处理。
PullRequest拉取方式分析
PullRequest这里说明一下,上面我们已经提了一下RocketMQ的Push模式其实是通过pull模式封装实现的,pullrequest这里是通过长轮询的方式达到push效果。
长轮询的方式可以结合两者优点
长轮询方式既有pul 的优点又有 push 模式的实时性有点。回顾一下之前的push和pull模式的区别和优缺点。
- Push方式是 server 端接收到消息后,主动把消息推送给 client 端,实时性高。弊端是server端工作量大,影响性能,其次是 client 端处理能力不同且 client 端的状态不受 server 端的控制,如果client端不能及时处理消息容易导致消息堆积已经影响正常业务等。
- pull方式是client循环从server端拉取消息,主动权在 client 端,自己处理完一个消息再去拉取下一个,缺点是循环的时间不好设定,时间太短容易忙等,浪费 CPU 资源,时间间隔太长 client 的处理能力会下降,有时候有些消息会处理不及时。
分析pullMessage源码执行流程
public void pullMessage(final PullRequest pullRequest) {
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
log.info("the pull request[{}] is dropped.", pullRequest.toString());
return;
}
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
try {
this.makeSureStateOK();
} catch (MQClientException e) {
log.warn("pullMessage exception, consumer state not ok", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
return;
}
if (this.isPause()) {
log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
}
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
}
return;
}
} else {
if (processQueue.isLocked()) {
if (!pullRequest.isLockedFirst()) {
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
pullRequest, offset, brokerBusy);
if (brokerBusy) {
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
pullRequest, offset);
}
pullRequest.setLockedFirst(true);
pullRequest.setNextOffset(offset);
}
} else {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}
}
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}
final long beginTimestamp = System.currentTimeMillis();
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}
break;
case NO_NEW_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, {} {}",
pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
@Override
public void run() {
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
};
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}
String subExpression = null;
boolean classFilter = false;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
subExpression = sd.getSubString();
}
classFilter = sd.isClassFilterMode();
}
int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
try {
// 下面我们看继续跟进这个方法,这个方法已经就是客户端如何拉取消息
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
// 消息的通信方式为异步
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
}
- 检查PullRequest对象中的ProcessQueue对象的dropped是否为 true(在 RebalanceService 线程中为topic下的MessageQueue创建拉取消息请求时要维护对应的ProcessQueue对象。
- 若Consumer不再订阅该topic则会将该对象的dropped置为true)若是则认为该请求是已经取消的,则直接跳出该方法,下面是对应的源码。
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
log.info("the pull request[{}] is dropped.", pullRequest.toString());
return;
}
更新PullRequest对象中的ProcessQueue对象的时间戳(ProcessQueue.lastPullTimestamp)为当前时间戳,下面是对应的源码。
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
检查该 Consumer是否运行中,即 DefaultMQPushConsumerImpl.serviceState 是否为 RUNNING; 若不是运行状态或者是暂停状态(DefaultMQPushConsumerImpl.pause=true),则调用 PullMessageService.executePullRequestLater (PullRequest pullRequest, long timeDelay) 方法延迟再拉取消息,其中 timeDelay=3000;
try {
this.makeSureStateOK();
} catch (MQClientException e) {
log.warn("pullMessage exception, consumer state not ok", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
return;
}
if (this.isPause()) {
log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
}
该方法的目的是在 3 秒之后再次将该 PullRequest 对象放入 PullMessageService. pullRequestQueue队列中;并跳出该方法;
之后进行流控校验过程,若ProcessQueue对象的msgCount大于了消费端的流控阈值(DefaultMQPushConsumer.pullThresholdForQueue,默认值为1000),则调用 PullMessageService.executePullRequestLater 方法,在50毫秒之后重新该PullRequest请求放入PullMessageService.pullRequestQueue队列中;并跳出该方法;
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
。之后判断是否属于顺序消费(即DefaultMQPushConsumerImpl.consumeOrderly 等于 false),则检查 ProcessQueue 对象的 msgTreeMap:TreeMap<Long,MessageExt> 变量的第一个 key 值与最后一个 key 值之间的差额,该 key 值表示查询的队列偏移量 queueoffset;
processQueue.getMaxSpan()
- 若差额大于阈值(由 DefaultMQPushConsumer. consumeConcurrentlyMaxSpan 指定,默认是 2000),则调用 PullMessageService.executePullRequestLater 方法,在 50 毫秒之后重新将该 PullRequest 请求放PullMessageService.pullRequestQueue 队列中;并跳出该方法,源码如下。
if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
}
return;
}
}
之后会以PullRequest.messageQueue对象topic值为参数从 RebalanceImpl.subscriptionInner: ConcurrentHashMap, SubscriptionData > 中获取对应的 SubscriptionData 对象,若该对象为 null,考虑到并发的关系,调用 executePullRequestLater 方法,稍后重试;并跳出该方法;
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}
如果不属于顺序模式,则需要先判断一下是否该消费队列属于锁定状态,可能在执行rebalance状态控制机制。那么会判断该拉取请求是否被加锁状态,如果没有会进行计算当前的队列的拉取消息位点是否超过当前broker中的计划的下一次拉取的位点(例如按照客户端的节奏而言,本身已经应该要拉取到了200,但是broker计算后得到的消息位点180,说明broker已经跟不上消费速度了),那么会进行将进行重新修复拉取位点以及锁住当前的该队列的数据信息,防止其他线程仍然在处理该队列(setLockedFirst在处理过程中先锁上,该ProcessQueue,将本次拉取拉取的位点更正为实际能拉取到的offset),如果没有锁定那会延迟拉取的。代码如下。
if (processQueue.isLocked()) {
if (!pullRequest.isLockedFirst()) {
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
pullRequest, offset, brokerBusy);
if (brokerBusy) {
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
pullRequest, offset);
}
pullRequest.setLockedFirst(true);
pullRequest.setNextOffset(offset);
}
} else {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}
若消息模型为集群模式(RebalanceImpl.messageModel 等于 CLUSTERING),则以PullRequest对象的MessageQueue变量值、type =READ_FROM_MEMORY(从内存中获取消费进度offset值)为参数调用 DefaultMQPushConsumerImpl. offsetStore 对象(初始化为RemoteBrokerOffsetStore 对象)的 readOffset (MessageQueue mq, ReadOffsetType type) 方法从本地内存中获取消费进度 offset 值。
- 若该 offset 值大于 0 则置临时变量 commitOffsetEnable 等于 true 否则为 false;该 offset 值作为 pullKernelImpl 方法中的 commitOffset 参数,在 Broker 端拉取消息之后根据 commitOffsetEnable 参数值决定是否用该 offset 更新消息进度。该
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}
String subExpression = null;
boolean classFilter = false;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
subExpression = sd.getSubString();
}
classFilter = sd.isClassFilterMode();
}
int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
- readOffset 方法的逻辑是:以入参MessageQueue 对象从 RemoteBrokerOffsetStore.offsetTable:ConcurrentHashMap <MessageQueue,AtomicLong > 变量中获取消费进度偏移量;若该偏移量不为 null 则返回该值,否则返回 - 1;
- 当每次拉取消息之后需要更新订阅关系(由 DefaultMQPushConsumer. postSubscriptionWhenPull 参数表示,默认为 false)并且以 topic 值参数从 RebalanceImpl.subscriptionInner 获取的 SubscriptionData 对象的 classFilterMode 等于 false(默认为 false),则将 sysFlag 标记的第 3 个字节置为 1,否则该字节置为 0;
- 该 sysFlag 标记的第 1 个字节置为 commitOffsetEnable 的值;第 2 个字节(suspend 标记)置为 1;第 4 个字节置为 classFilterMode 的值;
PullCallback的执行方式
初始化匿名内部类 PullCallback,实现了onSucess/onException 方法; 该方法只有在异步请求的情况下才会回调,调用底层的拉取消息 API 接口:
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}
break;
case NO_NEW_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, {} {}",
pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
@Override
public void run() {
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
};
发送远程请求拉取消息
在MQClientAPIImpl.pullMessage方法中,根据入参communicationMode的值分为异步拉取和同步拉取方式两种。
public PullResult pullMessage(
final String addr,
final PullMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
switch (communicationMode) {
case ONEWAY:
assert false;
return null;
case ASYNC:
this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
return null;
case SYNC:
return this.pullMessageSync(addr, request, timeoutMillis);
default:
assert false;
break;
}
return null;
}
无论是异步方式拉取还是同步方式拉取,在发送拉取请求之前都会构造一个 ResponseFuture对象,以请求消息的序列号为key值,存入NettyRemotingAbstract.responseTable:ConcurrentHashMap, ResponseFuture>变量中,对该变量有几种情况会处理:
- 发送失败后直接删掉responseTable 变量中的相应记录;
- 收到响应消息之后,会以响应消息中的序列号(由服务端根据请求消息的序列号原样返回)从 responseTable中查找ResponseFuture 对象,并设置该对象的responseCommand变量。
- 同步发送会唤醒等待响应的ResponseFuture.waitResponse方法;
- 异步发送会调用ResponseFuture.executeInvokeCallback () 方法完成回调逻辑处理;
在NettyRemotingClient.start启动时,也会初始化定时任务,该定时任务每隔 1 秒定期扫描 responseTable 列表,遍历该列表中的 ResponseFuture 对象,检查等待响应是否超时,若超时,则调用ResponseFuture. executeInvokeCallback () 方法,并将该对象从 responseTable 列表中删除;
同步拉取pullMessageSync
对于同步发送方式,调用MQClientAPIImpl.pullMessageSync (String addr, RemotingCommand request, long timeoutMillis) 方法,大致步骤如下:
- 调用 RemotingClient.invokeSync (String addr, RemotingCommand request, long timeoutMillis) 方法:
- 获取Broker地址的 Channel 信息。根据broker地址从RemotingClient.channelTables:ConcurrentHashMap, ChannelWrapper > 变量中获取 ChannelWrapper 对象并返回该对象的 Channel 变量;若没有 ChannelWrapper 对象则与 broker 地址建立新的连接并将连接信息存入 channelTables 变量中,便于下次使用;
- 若 NettyRemotingClient.rpcHook:RPCHook 变量不为空(该变量在应用层初始化 DefaultMQPushConsumer 或者 DefaultMQPullConsumer 对象传入该值),则调用 RPCHook.doBeforeRequest (String remoteAddr, RemotingCommand request) 方法;
- 调用 NettyRemotingAbstract.invokeSyncImpl (Channel channel, RemotingCommand request, long timeoutMillis) 方法,该方法的逻辑如下:
- A)使用请求的序列号(opaue)、超时时间初始化 ResponseFuture 对象;并将该 ResponseFuture 对象存入 NettyRemotingAbstract.responseTable: ConcurrentHashMap 变量中;
- B)调用 Channel.writeAndFlush (Object msg) 方法将请求对象 RemotingCommand 发送给 Broker;然后调用 addListener (GenericFutureListener<? extends Future<? super Void>> listener) 方法添加内部匿名类:该内部匿名类实现了 ChannelFutureListener 接口的 operationComplete 方法,在发送完成之后回调该监听类的 operationComplete 方法,在该方法中,首先调用 ChannelFuture. isSuccess () 方法检查是否发送成功,若成功则置 ResponseFuture 对象的 sendRequestOK 等于 true 并退出此回调方法等待响应结果;若不成功则置 ResponseFuture 对象的 sendRequestOK 等于 false,然后从 NettyRemotingAbstract.responseTable 中删除此请求序列号(opaue)的记录,置 ResponseFuture 对象的 responseCommand 等于 null,并唤醒 ResponseFuture.waitResponse (long timeoutMillis) 方法的等待;
- C)调用 ResponseFuture.waitResponse (long timeoutMillis) 方法等待响应结果;在发送失败或者收到响应消息(详见 5.10.3 小节)或者超时的情况下会唤醒该方法返回 ResponseFuture.responseCommand 变量值;
- D)若上一步返回的 responseCommand 值为 null,则抛出异常:若 ResponseFuture.sendRequestOK 为 true,则抛出 RemotingTimeoutException 异常,否则抛出 RemotingSendRequestException 异常;
- E)若上一步返回的 responseCommand 值不为 null,则返回 responseCommand 变量值;
- 若 NettyRemotingClient.rpcHook: RPCHook 变量不为空,则调用 RPCHook.doAfterResponse (String remoteAddr, RemotingCommand request) 方法;
- 以上一步的返回值 RemotingCommand 对象为参数调用 MQClientAPIImpl. processPullResponse (RemotingCommand response) 方法将返回对象解析并封装成 PullResultExt 对象然后返回给调用者,响应消息的结果状态转换如下:
- 若 RemotingCommand 对象的 Code 等于 SUCCESS,则 PullResultExt.pullStatus=FOUND;
- 若 RemotingCommand 对象的 Code 等于 PULL_NOT_FOUND,则 PullResultExt.pullStatus= NO_NEW_MSG;
- 若 RemotingCommand 对象的 Code 等于 PULL_RETRY_IMMEDIATELY,则 PullResultExt.pullStatus= NO_MATCHED_MSG;
- 若 RemotingCommand 对象的 Code 等于 PULL_OFFSET_MOVED,则 PullResultExt.pullStatus= OFFSET_ILLEGAL;
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
long beginStartTime = System.currentTimeMillis();
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call timeout");
}
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
if (this.rpcHook != null) {
this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
}
return response;
} catch (RemotingSendRequestException e) {
log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
} catch (RemotingTimeoutException e) {
if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
this.closeChannel(addr, channel);
log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
}
log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
getMQClientAPIImpl ().pullMessage 最终通过 channel 写入并刷新队列中。然后在消息服务端大体的处理逻辑是服务端收到新消息请求后,如果队列中没有消息不急于返回,通过一个循环状态,每次 waitForRunning 一段时间默认 5 秒,然后再 check,如果 broker 一直没有新新消息,第三次 check 的时间等到时间超过 SuspendMaxTimeMills 就返回空,如果在等待过程中收到了新消息直接调用 notifyMessageArriving 函数返回请求结果。“长轮询” 的核心是,Broker 端 HOLD 住客户端过来的请求一小段时间,在这个时间内有新消息到达,就利用现有的连接立刻返回消息给 Consumer 。长轮询的主动权掌握在 consumer 中,即使 broker 有大量的消息堆积也不会主动推送给 consumer。