RocketMQ通过Consumer消费消息,可并发和顺序的处理消息,这里以并发消费普通消息为例,分析消息下佛诶的整体流程。Consumer的示例代码如下:
1 import com.snails.rmq.common.RMQConstant; 2 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; 3 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; 4 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; 5 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; 6 import org.apache.rocketmq.client.exception.MQClientException; 7 import org.apache.rocketmq.common.message.MessageExt; 8 import java.io.UnsupportedEncodingException; 9 import java.util.List; 10 11 public class SyncConsumer { 12 public static void main(String[] args) throws MQClientException { 13 // 实例化消费者组名称 14 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-group"); 15 // 指定name server地址 16 consumer.setNamesrvAddr("127.0.0.1:9876"); 17 // 订阅至少一个主题以供消费 18 consumer.subscribe("TestTopic", "*"); 19 // 注册回调,处理从服务端获取的消息 20 consumer.registerMessageListener(new MessageListenerConcurrently() { 21 @Override 22 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { 23 for (MessageExt msg : msgs) { 24 try { 25 System.out.println(msg.getTags()); 26 System.out.println(msg.getKeys()); 27 System.out.println(String.format("线程%s,接收订单:%s", Thread.currentThread().getName(), new String(msg.getBody(), "UTF-8"))); 28 } catch (UnsupportedEncodingException e) { 29 // TODO 补偿机制 30 System.out.println(e.getMessage()); 31 } 32 } 33 // 消费消息确认 34 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 35 } 36 }); 37 // 启动消费者实例 38 consumer.start(); 39 System.out.printf("消费者已启动.%n"); 40 } 41 }
根据上述代码,生产者发送消息主要完成以下几件事:
1、创建消费者对象DefaultMQPushConsumer、设置NameServer地址及注册消息回调处理消息
2、启动消费者
1、创建消费者
DefaultMQPushConsumer构造函数详情如下:
1 // 指定消费者组名创建Consumer 2 public DefaultMQPushConsumer(final String consumerGroup) { 3 this(null, consumerGroup, null, new AllocateMessageQueueAveragely()); 4 } 5 6 public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook, 7 AllocateMessageQueueStrategy allocateMessageQueueStrategy) { 8 // 消费者组 9 this.consumerGroup = consumerGroup; 10 this.namespace = namespace; 11 // 选择消息队列策略 12 this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; 13 // 消费消息对象,实际干活的 14 defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); 15 }
与生成者类似,DefaultMQPushConsumer中持有关键属性DefaultMQPushConsumerImpl。DefaultMQPushConsumer是实际干活的,DefaultMQPushConsumer的启动与消息消费都依赖于DefaultMQPushConsumer中方法,利用装饰者模式。
2、注册消息监听
DefaultMQPushConsumer#registerMessageListener 详情如下:1 // 注册消息监听 2 public void registerMessageListener(MessageListenerConcurrently messageListener) { 3 this.messageListener = messageListener; 4 this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); 5 }
主要完成消息回调的属性赋值,针对DefaultMQPushConsumer的messageListener属性、DefaultMQPushConsumerImpl的messageListenerInner属性。
3、启动消费者
DefaultMQPushConsumer#start 详情如下:
实际完成消息消费功能的是 DefaultMQPushConsumerImpl。
1 // 消费者实际执行对象,DefaultMQPushConsumer 使用装饰者模式 2 // 实际调用是委派 DefaultMQPushConsumerImpl 完成消息消费 3 protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
DefaultMQPushConsumerImpl#start 启动详情如下:
1 /** 2 * 消费者的核心代码入口 3 */ 4 public synchronized void start() throws MQClientException { 5 switch (this.serviceState) { 6 case CREATE_JUST: 7 log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(), 8 this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()); 9 this.serviceState = ServiceState.START_FAILED; 10 // 1.检查配置信息 11 this.checkConfig(); 12 13 // 2.加工订阅信息(同时,如果消息消费模式为集群模式,还需要为该消费组创建一个重试主题。) 14 this.copySubscription(); 15 16 if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { 17 this.defaultMQPushConsumer.changeInstanceNameToPID(); 18 } 19 20 // 3.创建MQClientInstance实例, 21 // 这个实例在一个JVM中消费者和生产者共用,MQClientManager中维护了一个factoryTable,类型为ConcurrentMap,保存了clintId和MQClientInstanc 22 this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); 23 24 // 1.1.负载均衡(队列默认分配算法) 25 this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); 26 this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); 27 // 1.2.队列默认分配算法 28 this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); 29 this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); 30 31 // 5. 拉取消息(无论是拉模式,还是推模式 :数据都是拉), 32 // pullAPIWrapper拉取消息的API包装类,主要有消息的拉取方法和接受拉取到的消息 33 this.pullAPIWrapper = new PullAPIWrapper( 34 mQClientFactory, 35 this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); 36 this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); 37 38 // 7.消费进度存储,如果是集群模式,使用远程存储RemoteBrokerOffsetStore,如果是广播模式,则使用本地存储LocalFileOffsetStore 39 if (this.defaultMQPushConsumer.getOffsetStore() != null) { 40 this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); 41 } else { 42 switch (this.defaultMQPushConsumer.getMessageModel()) { 43 // 广播模式 44 case BROADCASTING: 45 this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); 46 break; 47 // 集群模式 48 case CLUSTERING: 49 this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); 50 break; 51 default: 52 break; 53 } 54 this.defaultMQPushConsumer.setOffsetStore(this.offsetStore); 55 } 56 // 8.加载消息进度(offsetStore是用来操作消费进度的对象) 57 // push模式消费进度最后持久化在broker端,但是consumer端在内存中也持有消费进度 58 this.offsetStore.load(); 59 60 // 9.判断是顺序消息还是并发消息 61 if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { 62 this.consumeOrderly = true; 63 this.consumeMessageService = 64 new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); 65 } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { 66 this.consumeOrderly = false; 67 this.consumeMessageService = 68 new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); 69 } 70 71 // 10.消息消费服务并启动 72 this.consumeMessageService.start(); 73 74 // 11.注册消费者 75 boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); 76 if (!registerOK) { 77 this.serviceState = ServiceState.CREATE_JUST; 78 this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown()); 79 throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() 80 + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), 81 null); 82 } 83 84 // 12.MQClientInstance启动(第3步中创建了MQClientInstance) 85 mQClientFactory.start(); 86 log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup()); 87 this.serviceState = ServiceState.RUNNING; 88 break; 89 case RUNNING: 90 case START_FAILED: 91 case SHUTDOWN_ALREADY: 92 throw new MQClientException("The PushConsumer service state not OK, maybe started once, " 93 + this.serviceState 94 + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), 95 null); 96 default: 97 break; 98 } 99 100 // 更新主题路由信息 101 this.updateTopicSubscribeInfoWhenSubscriptionChanged(); 102 // 检查在Broker上的状态 103 this.mQClientFactory.checkClientInBroker(); 104 // 发送心跳 105 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); 106 // 唤醒负载均衡服务 107 this.mQClientFactory.rebalanceImmediately(); 108 }
启动消费者流程:
判断当前客户端的状态,不同的服务状态执行不同的流程,RocketMQ的服务状态ServiceState如下:1 public enum ServiceState { 2 // 刚创建,未启动 3 CREATE_JUST, 4 // 运行中 5 RUNNING, 6 // 关闭 7 SHUTDOWN_ALREADY, 8 // 启动失败 9 START_FAILED; 10 }
这里我们主要看 CREATE_JUST 状态的流程,其他三个状态会抛出MQClientException异常。
3.1、检查配置信息
检查配置信息,DefaultMQPushConsumerImpl#checkConfig。 校验消费者组、消息模型、消息进度、分配消息队列策略、消息监听器、消费线程等信息。3.2、加工订阅信息
DefaultMQPushConsumerImpl#copySubscription 详情如下:1 private void copySubscription() throws MQClientException { 2 try { 3 // 将消费者的订阅数据缓存值 rebalanceImpl 的 subscriptionInner 属性中 4 Map<String, String> sub = this.defaultMQPushConsumer.getSubscription(); 5 if (sub != null) { 6 for (final Map.Entry<String, String> entry : sub.entrySet()) { 7 final String topic = entry.getKey(); 8 final String subString = entry.getValue(); 9 SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), 10 topic, subString); 11 this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); 12 } 13 } 14 15 if (null == this.messageListenerInner) { 16 this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener(); 17 } 18 19 switch (this.defaultMQPushConsumer.getMessageModel()) { 20 // 广播模式 21 case BROADCASTING: 22 break; 23 // 集群模型 24 case CLUSTERING: 25 // 创建一个重试主题 26 final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()); 27 SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), 28 retryTopic, SubscriptionData.SUB_ALL); 29 this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData); 30 break; 31 default: 32 break; 33 } 34 } catch (Exception e) { 35 throw new MQClientException("subscription exception", e); 36 } 37 }
将当前消费者客户端设置在本地缓存RebalanceImpl#subscriptionInner中,若当前消息消费模式是集群模式,创建一个重试Topic。
重试Topic名称的生成规则:MixAll#RETRY_GROUP_TOPIC_PREFIX 加上 消费组名称。
3.3、创建MQClientInstance实例
MQClientInstance实例在一个JVM中消费者和生产者共用,MQClientManager中维护了一个factoryTable,类型为ConcurrentMap,保存了clintId和MQClientInstanc。 MQClientManager#getOrCreateMQClientInstance() 详情如下:
消费者默认的消息队列分配策略:AllocateMessageQueueAveragely - 平均哈希队列策略,分配规则详情 AllocateMessageQueueStrategy#allocate 参数详情如下:
平均哈希队列策略 AllocateMessageQueueAveragely#allocate 实现详情如下:
平均哈希队列算法,首先会获取当前消费者在消费者组中的位置,再计算当前消费者能分配的消息队列的数量,以及分配的消息队列的范围,也就是从哪里开始分配到哪里结束分配,最后分配消息队列的范围给消费者进行分配。
3.5、拉取消息包装类定义
定义RocketMQ拉取消息的包装类PullAPIWrapper,PullAPIWrapper是拉取消息的API包装类。
PullAPIWrapper中包含了实际从MQ中获取消息的方法 pullKernelImpl()。
这里主要定义了拉取消息的对象,实际拉取消息的操作在启动拉取消息服务 PullMessageService 之后才会进行,也就是在启动 PullMessageService 服务之后,拉取消息的描述在启动消费消息服务做消息介绍。
3.6、消费进度存储
根据消费消息的模式,判断加载消费进度的,消费进度对象 DefaultMQPushConsumerImpl#offsetStore,OffsetStore接口有两个子类LocalFileOffsetStore 和 RemoteBrokerOffsetStore,类图如下:
LocalFileOffsetStore:广播模式下,从本地读取消费进度,在该模式下会将offset信息转化成json保留到本地文件中;
RemoteBrokerOffsetStore:集群模式下,获取远程存储信息,从 Broker 中获取消费进度,在该模式下offsetTable将需要提交的MessageQueue的offset信息通过 MQClientAPIImpl提供的接口updateConsumerOffsetOneway()提交到broker进行长久化存储。consumer的shutdown()办法会被动触发一次offset持久化到broker的操作。
根据消息消费模式选择不同的同步方式:广播模式从本地文件中获取偏移量,集群模式会根据偏移量的读取方式选择是读取内存中的偏移量还是Broker磁盘中的偏移量。
将获取的消息偏移量对象赋值给 DefaultMQPushConsumerImpl#offsetStore 属性,后续可通过此对象加载消费进度。
3.7、消费进度加载 根据不同消费消息的模式获取到偏移量对象 DefaultMQPushConsumerImpl#offsetStore后,开始加载消息进度。 加载消费进度,RemoteBrokerOffsetStore中的load()是空实现; LocalFileOffsetStore中的load()方法读取本地文件 offsets.json,本地文件存储路径详情:
3.8、并发消息与顺序消息的判断
并发消费消息与顺序消费消息创建的消费消息服务不同,并发消费消息创建ConsumeMessageConcurrentlyService、顺序消息消息创建ConsumeMessageOrderlyService。
这里仅仅是创建服务对象,未启动。
3.9、启动消费消息服务
3.9.1、顺序消息消费服务
顺序消息是基于JUC的ReadWriteLock、ReentrantLoc实现的,为消息队里加锁,保证同一时间只有一个消费者可以消费该队列的消息,在拉取到消息后,通过MessageListenerOrderly接口的consumeMessage方法来处理消息。
在ConsumeMessageOrderlyService启动时,进行加锁操作;关闭时,释放锁。
1、加锁处理
加锁操作流程如下:客户端通过Netty传递功能号LOCK_BATCH_MQ=41远程调用Broker对消息队列进行加锁,最终将加锁的消息队列存放在RebalanceLockManager#mqLockTable 缓存表中。
Broker对消息队列的加锁处理,RebalanceLockManager#tryLockBatch 详情如下:
1 // 加锁处理 2 public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs, 3 final String clientId) { 4 // 创建存放 加锁的消息队列 与 不加锁的消息队列 的容器 5 Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size()); 6 Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size()); 7 8 // 将消费者组中已加锁的消息队列与未加锁的消息队列区分开,便于对未加锁的消息队列进行加锁处理 9 for (MessageQueue mq : mqs) { 10 if (this.isLocked(group, mq, clientId)) { 11 lockedMqs.add(mq); 12 } else { 13 notLockedMqs.add(mq); 14 } 15 } 16 // 未加锁的消息队列不为空 17 if (!notLockedMqs.isEmpty()) { 18 try { 19 // 加锁 20 this.lock.lockInterruptibly(); 21 try { 22 // 锁对象添加进消息队列缓存表中 23 ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group); 24 if (null == groupValue) { 25 groupValue = new ConcurrentHashMap<>(32); 26 this.mqLockTable.put(group, groupValue); 27 } 28 29 // 为未加锁的消息队列,创建 LockEntity 对象,并设置客户端id 30 for (MessageQueue mq : notLockedMqs) { 31 LockEntry lockEntry = groupValue.get(mq); 32 if (null == lockEntry) { 33 lockEntry = new LockEntry(); 34 lockEntry.setClientId(clientId); 35 groupValue.put(mq, lockEntry); 36 } 37 38 // 若已加锁,重置最后更新时间戳,重新设置到 缓存 的 容器中 39 if (lockEntry.isLocked(clientId)) { 40 lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); 41 lockedMqs.add(mq); 42 continue; 43 } 44 45 String oldClientId = lockEntry.getClientId(); 46 // 若锁到期,续期、再重新设置到 缓存 的 容器中 47 if (lockEntry.isExpired()) { 48 lockEntry.setClientId(clientId); 49 lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); 50 lockedMqs.add(mq); 51 continue; 52 } 53 } 54 } finally { 55 // 释放锁 56 this.lock.unlock(); 57 } 58 } catch (InterruptedException e) { 59 log.error("putMessage exception", e); 60 } 61 } 62 return lockedMqs; 63 }
2、释放锁处理
释放锁流程与加锁操作流程类似,客户端通过Netty传递功能号UNLOCK_BATCH_MQ=42远程调用Broker对消息队列进行锁释放,最终遍历RebalanceLockManager#mqLockTable 加锁缓存表消息队列,将缓存表中指定客户端Id的消息队列移除。Broker对消息队列的释放锁的处理,RebalanceLockManager#unlockBatch 详情如下:
1 // 释放锁 2 public void unlockBatch(final String group, final Set<MessageQueue> mqs, final String clientId) { 3 try { 4 // 加锁 5 this.lock.lockInterruptibly(); 6 try { 7 ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group); 8 if (null != groupValue) { 9 // 移除锁对象中的clientId 10 for (MessageQueue mq : mqs) { 11 LockEntry lockEntry = groupValue.get(mq); 12 if (null != lockEntry) { 13 if (lockEntry.getClientId().equals(clientId)) { 14 groupValue.remove(mq); 15 } 16 } 17 } 18 } 19 } finally { 20 // 释放锁 21 this.lock.unlock(); 22 } 23 } catch (InterruptedException e) { 24 log.error("putMessage exception", e); 25 } 26 }
3.9.2、并发消息消费服务
并发消息消费服务在启动时,会清除过期的消息,根据消息的偏移量移除ProcessQueue#msgTreeMap中的MessageExt,通过读写锁ProcessQueue#lockTreeMap来保证清理消息时的线程安全,ProcessQueue#removeMessage, 核心代码如下:
在拉取到消息后,通过 MessageListenerConcurrently 消息监听处理器的consumeMessage方法来处理消息。
3.10、注册消费者
将消费者注册到消费者缓存表 MQClientInstance#consumerTable ,MQClientInstance#registerConsumer 详情如下:
1 // 消费者缓存表 2 private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>(); 3 4 // 注册消费者 5 public boolean registerConsumer(final String group, final MQConsumerInner consumer) { 6 if (null == group || null == consumer) { 7 return false; 8 } 9 // 将消费者注册到消费者缓存表中 10 MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer); 11 if (prev != null) { 12 log.warn("the consumer group[" + group + "] exist already."); 13 return false; 14 } 15 16 return true; 17 }
3.11、启动MQClientInstance
1、启动消息发送服务,启动Netty
开启Netty通信的服务,NettyRemotingClient#start(),启动 Netty远程客户端服务。
2、开启定时任务
开启定时任务,MQClientInsatnce#startScheduledTask 详情如下:
1 // 开启定时任务 2 private void startScheduledTask() { 3 // 2min同步NameServer地址服务 4 if (null == this.clientConfig.getNamesrvAddr()) { 5 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 6 @Override 7 public void run() { 8 try { 9 MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr(); 10 } catch (Exception e) { 11 log.error("ScheduledTask fetchNameServerAddr exception", e); 12 } 13 } 14 }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); 15 } 16 17 // 30s同步NameServer中的Topic路由信息 18 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 19 @Override 20 public void run() { 21 try { 22 MQClientInstance.this.updateTopicRouteInfoFromNameServer(); 23 } catch (Exception e) { 24 log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); 25 } 26 } 27 }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS); 28 29 // 30s剔除下线的broker、发送心跳检测到broker 30 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 31 @Override 32 public void run() { 33 try { 34 MQClientInstance.this.cleanOfflineBroker(); 35 MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); 36 } catch (Exception e) { 37 log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); 38 } 39 } 40 }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS); 41 42 // 5s 持久化消费进度 43 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 44 @Override 45 public void run() { 46 try { 47 MQClientInstance.this.persistAllConsumerOffset(); 48 } catch (Exception e) { 49 log.error("ScheduledTask persistAllConsumerOffset exception", e); 50 } 51 } 52 }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS); 53 54 // 调整线程池异步任务 55 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 56 @Override 57 public void run() { 58 try { 59 MQClientInstance.this.adjustThreadPool(); 60 } catch (Exception e) { 61 log.error("ScheduledTask adjustThreadPool exception", e); 62 } 63 } 64 }, 1, 1, TimeUnit.MINUTES); 65 }
3、开启拉取消息服务
启动拉取消息服务 PullMessageService,拉取消息的核心方法 PullMessageService#pullMessage。
执行DefaultMQPushConsumerImpl#pullMessage 方法:
根据不同的发送方式,调用 MQClientAPIImpl#pullMessage 拉取消息 详情如下:
1 public PullResult pullMessage( 2 final String addr, 3 final PullMessageRequestHeader requestHeader, 4 final long timeoutMillis, 5 final CommunicationMode communicationMode, 6 final PullCallback pullCallback 7 ) throws RemotingException, MQBrokerException, InterruptedException { 8 RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader); 9 10 // 通讯方式的判断 11 switch (communicationMode) { 12 case ONEWAY: 13 assert false; 14 return null; 15 // 异步 16 case ASYNC: 17 this.pullMessageAsync(addr, request, timeoutMillis, pullCallback); 18 return null; 19 // 同步 20 case SYNC: 21 return this.pullMessageSync(addr, request, timeoutMillis); 22 default: 23 assert false; 24 break; 25 } 26 27 return null; 28 }
4、开启负载均衡服务
负载均衡服务RebalanceService,最终执行 RebalanceImpl#doRebalance,详情如下:
1 public void doRebalance(final boolean isOrder) { 2 // 获取订阅数据 3 Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); 4 if (subTable != null) { 5 for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { 6 // 获取Topic 7 final String topic = entry.getKey(); 8 try { 9 // 负载均衡处理 10 this.rebalanceByTopic(topic, isOrder); 11 } catch (Throwable e) { 12 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 13 log.warn("rebalanceByTopic Exception", e); 14 } 15 } 16 } 17 } 18 // 清除未订阅主题的消息队列 19 this.truncateMessageQueueNotMyTopic(); 20 }
5、开启消息推送服务
启动消息推送服务,用于将消息消费结果通知给Broker,将生产者添加进生产者缓存表 MQClientInstance#producerTable。向所有的Broker发送心跳,并移除超时请求,并执行回调方法onException。
DefaultMQProducerImpl#start() 详情如下:
1 /** 2 * 启动生产者 3 * @param startFactory 4 * @throws MQClientException 5 */ 6 public void start(final boolean startFactory) throws MQClientException { 7 switch (this.serviceState) { 8 // 刚创建未启动 9 case CREATE_JUST: 10 this.serviceState = ServiceState.START_FAILED; 11 // todo 检查配置 12 this.checkConfig(); 13 // 更改当前instanceName为进程ID 14 if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { 15 this.defaultMQProducer.changeInstanceNameToPID(); 16 } 17 18 //todo 获取MQ客户端实例 19 //整个JVM中只存在一个MQClientManager实例,维护一个MQClientInstance缓存表 20 //ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<String,MQClientInstance>(); 21 //同一个clientId只会创建一个MQClientInstance。 22 //MQClientInstance封装了RocketMQ网络处理API,是消息生产者和消息消费者与NameServer、Broker打交道的网络通道 23 this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook); 24 25 // 注册Producer到MQClientInstance客户端实例 26 boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); 27 // 未注册成功,抛出异常 28 if (!registerOK) { 29 this.serviceState = ServiceState.CREATE_JUST; 30 throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() 31 + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), 32 null); 33 } 34 35 // 路由信心添加进缓存表中 36 this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); 37 38 // 启动MQ客户端实例 39 if (startFactory) { 40 // todo 最终调用MQClientInstance 41 mQClientFactory.start(); 42 } 43 44 log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), 45 this.defaultMQProducer.isSendMessageWithVIPChannel()); 46 this.serviceState = ServiceState.RUNNING; 47 break; 48 case RUNNING: 49 case START_FAILED: 50 case SHUTDOWN_ALREADY: 51 throw new MQClientException("The producer service state not OK, maybe started once, " 52 + this.serviceState 53 + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), 54 null); 55 default: 56 break; 57 } 58 59 // 向所有的broker发送心跳 60 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); 61 62 // 扫描并移除超时请求,并执行回调方法onException 63 this.timer.scheduleAtFixedRate(new TimerTask() { 64 @Override 65 public void run() { 66 try { 67 RequestFutureTable.scanExpiredRequest(); 68 } catch (Throwable e) { 69 log.error("scan RequestFutureTable exception", e); 70 } 71 } 72 }, 1000 * 3, 1000); 73 }
消息推送服务,在RocketMQ源码(四):RocketMQ生产者发送消息流程中已经做了详细介绍,此处不再赘述。
6、更新服务状态
更新服务状态为运行中,同时更新订阅信息,并发送心跳。
4、总结
RocketMQ消费者启动流程如下:
标签:加锁,队列,流程,defaultMQPushConsumer,源码,消息,new,final,RocketMQ From: https://www.cnblogs.com/RunningSnails/p/17718386.html