首页 > 编程语言 >RocketMQ源码(六):RocketMQ消费者启动流程

RocketMQ源码(六):RocketMQ消费者启动流程

时间:2023-09-20 21:25:04浏览次数:64  
标签:加锁 队列 流程 defaultMQPushConsumer 源码 消息 new final RocketMQ

  RocketMQ通过Consumer消费消息,可并发和顺序的处理消息,这里以并发消费普通消息为例,分析消息下佛诶的整体流程。Consumer的示例代码如下:

 1 import com.snails.rmq.common.RMQConstant;
 2 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 3 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 4 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 5 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 6 import org.apache.rocketmq.client.exception.MQClientException;
 7 import org.apache.rocketmq.common.message.MessageExt;
 8 import java.io.UnsupportedEncodingException;
 9 import java.util.List;
10 
11 public class SyncConsumer {
12     public static void main(String[] args) throws MQClientException {
13         // 实例化消费者组名称
14         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-group");
15         // 指定name server地址
16         consumer.setNamesrvAddr("127.0.0.1:9876");
17         // 订阅至少一个主题以供消费
18         consumer.subscribe("TestTopic", "*");
19         //  注册回调,处理从服务端获取的消息
20         consumer.registerMessageListener(new MessageListenerConcurrently() {
21             @Override
22             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
23                 for (MessageExt msg : msgs) {
24                     try {
25                         System.out.println(msg.getTags());
26                         System.out.println(msg.getKeys());
27                         System.out.println(String.format("线程%s,接收订单:%s", Thread.currentThread().getName(), new String(msg.getBody(), "UTF-8")));
28                     } catch (UnsupportedEncodingException e) {
29                         // TODO 补偿机制
30                         System.out.println(e.getMessage());
31                     }
32                 }
33                 // 消费消息确认
34                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
35             }
36         });
37         // 启动消费者实例
38         consumer.start();
39         System.out.printf("消费者已启动.%n");
40     }
41 }

根据上述代码,生产者发送消息主要完成以下几件事:

  1、创建消费者对象DefaultMQPushConsumer、设置NameServer地址及注册消息回调处理消息

  2、启动消费者

1、创建消费者

  DefaultMQPushConsumer构造函数详情如下:

 1 // 指定消费者组名创建Consumer
 2 public DefaultMQPushConsumer(final String consumerGroup) {
 3     this(null, consumerGroup, null, new AllocateMessageQueueAveragely());
 4 }
 5 
 6 public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
 7     AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
 8     // 消费者组
 9     this.consumerGroup = consumerGroup;
10     this.namespace = namespace;
11     // 选择消息队列策略
12     this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
13     // 消费消息对象,实际干活的
14     defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
15 }

  与生成者类似,DefaultMQPushConsumer中持有关键属性DefaultMQPushConsumerImpl。DefaultMQPushConsumer是实际干活的,DefaultMQPushConsumer的启动与消息消费都依赖于DefaultMQPushConsumer中方法,利用装饰者模式。

2、注册消息监听

  DefaultMQPushConsumer#registerMessageListener 详情如下:
1 // 注册消息监听
2 public void registerMessageListener(MessageListenerConcurrently messageListener) {
3     this.messageListener = messageListener;
4     this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
5 }

  主要完成消息回调的属性赋值,针对DefaultMQPushConsumer的messageListener属性、DefaultMQPushConsumerImpl的messageListenerInner属性。

3、启动消费者

  DefaultMQPushConsumer#start 详情如下:

  

   实际完成消息消费功能的是 DefaultMQPushConsumerImpl。

1 //  消费者实际执行对象,DefaultMQPushConsumer 使用装饰者模式
2 //  实际调用是委派 DefaultMQPushConsumerImpl 完成消息消费
3 protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;

  DefaultMQPushConsumerImpl#start 启动详情如下:

  1 /**
  2  * 消费者的核心代码入口
  3  */
  4 public synchronized void start() throws MQClientException {
  5     switch (this.serviceState) {
  6         case CREATE_JUST:
  7             log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
  8                 this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
  9             this.serviceState = ServiceState.START_FAILED;
 10             // 1.检查配置信息
 11             this.checkConfig();
 12 
 13             // 2.加工订阅信息(同时,如果消息消费模式为集群模式,还需要为该消费组创建一个重试主题。)
 14             this.copySubscription();
 15 
 16             if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
 17                 this.defaultMQPushConsumer.changeInstanceNameToPID();
 18             }
 19 
 20             // 3.创建MQClientInstance实例,
 21             // 这个实例在一个JVM中消费者和生产者共用,MQClientManager中维护了一个factoryTable,类型为ConcurrentMap,保存了clintId和MQClientInstanc
 22             this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
 23 
 24             // 1.1.负载均衡(队列默认分配算法)
 25             this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
 26             this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
 27             // 1.2.队列默认分配算法
 28             this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
 29             this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
 30 
 31             // 5. 拉取消息(无论是拉模式,还是推模式 :数据都是拉),
 32             // pullAPIWrapper拉取消息的API包装类,主要有消息的拉取方法和接受拉取到的消息
 33             this.pullAPIWrapper = new PullAPIWrapper(
 34                 mQClientFactory,
 35                 this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
 36             this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
 37 
 38             // 7.消费进度存储,如果是集群模式,使用远程存储RemoteBrokerOffsetStore,如果是广播模式,则使用本地存储LocalFileOffsetStore
 39             if (this.defaultMQPushConsumer.getOffsetStore() != null) {
 40                 this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
 41             } else {
 42                 switch (this.defaultMQPushConsumer.getMessageModel()) {
 43                     // 广播模式
 44                     case BROADCASTING:
 45                         this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
 46                         break;
 47                     // 集群模式
 48                     case CLUSTERING:
 49                         this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
 50                         break;
 51                     default:
 52                         break;
 53                 }
 54                 this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
 55             }
 56             // 8.加载消息进度(offsetStore是用来操作消费进度的对象)
 57             // push模式消费进度最后持久化在broker端,但是consumer端在内存中也持有消费进度
 58             this.offsetStore.load();
 59 
 60             // 9.判断是顺序消息还是并发消息
 61             if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
 62                 this.consumeOrderly = true;
 63                 this.consumeMessageService =
 64                     new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
 65             } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
 66                 this.consumeOrderly = false;
 67                 this.consumeMessageService =
 68                     new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
 69             }
 70 
 71             // 10.消息消费服务并启动
 72             this.consumeMessageService.start();
 73 
 74             // 11.注册消费者
 75             boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
 76             if (!registerOK) {
 77                 this.serviceState = ServiceState.CREATE_JUST;
 78                 this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
 79                 throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
 80                     + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
 81                     null);
 82             }
 83 
 84             // 12.MQClientInstance启动(第3步中创建了MQClientInstance)
 85             mQClientFactory.start();
 86             log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
 87             this.serviceState = ServiceState.RUNNING;
 88             break;
 89         case RUNNING:
 90         case START_FAILED:
 91         case SHUTDOWN_ALREADY:
 92             throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
 93                 + this.serviceState
 94                 + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
 95                 null);
 96         default:
 97             break;
 98     }
 99 
100     // 更新主题路由信息
101     this.updateTopicSubscribeInfoWhenSubscriptionChanged();
102     // 检查在Broker上的状态
103     this.mQClientFactory.checkClientInBroker();
104     // 发送心跳
105     this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
106     // 唤醒负载均衡服务
107     this.mQClientFactory.rebalanceImmediately();
108 }

启动消费者流程:

  判断当前客户端的状态,不同的服务状态执行不同的流程,RocketMQ的服务状态ServiceState如下:
 1 public enum ServiceState {
 2     // 刚创建,未启动
 3     CREATE_JUST,
 4     // 运行中
 5     RUNNING,
 6     // 关闭
 7     SHUTDOWN_ALREADY,
 8     // 启动失败
 9     START_FAILED;
10 }

  这里我们主要看 CREATE_JUST 状态的流程,其他三个状态会抛出MQClientException异常。

3.1、检查配置信息

  检查配置信息,DefaultMQPushConsumerImpl#checkConfig。   校验消费者组、消息模型、消息进度、分配消息队列策略、消息监听器、消费线程等信息。

3.2、加工订阅信息

  DefaultMQPushConsumerImpl#copySubscription 详情如下:
 1 private void copySubscription() throws MQClientException {
 2     try {
 3         // 将消费者的订阅数据缓存值 rebalanceImpl 的 subscriptionInner 属性中
 4         Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
 5         if (sub != null) {
 6             for (final Map.Entry<String, String> entry : sub.entrySet()) {
 7                 final String topic = entry.getKey();
 8                 final String subString = entry.getValue();
 9                 SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
10                     topic, subString);
11                 this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
12             }
13         }
14 
15         if (null == this.messageListenerInner) {
16             this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
17         }
18 
19         switch (this.defaultMQPushConsumer.getMessageModel()) {
20             // 广播模式
21             case BROADCASTING:
22                 break;
23             // 集群模型
24             case CLUSTERING:
25                 // 创建一个重试主题
26                 final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
27                 SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
28                     retryTopic, SubscriptionData.SUB_ALL);
29                 this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
30                 break;
31             default:
32                 break;
33         }
34     } catch (Exception e) {
35         throw new MQClientException("subscription exception", e);
36     }
37 }

  将当前消费者客户端设置在本地缓存RebalanceImpl#subscriptionInner中,若当前消息消费模式是集群模式,创建一个重试Topic。

  

  重试Topic名称的生成规则:MixAll#RETRY_GROUP_TOPIC_PREFIX 加上 消费组名称。

3.3、创建MQClientInstance实例

  MQClientInstance实例在一个JVM中消费者和生产者共用,MQClientManager中维护了一个factoryTable,类型为ConcurrentMap,保存了clintId和MQClientInstanc。   MQClientManager#getOrCreateMQClientInstance() 详情如下:

   

   消费者默认的消息队列分配策略:AllocateMessageQueueAveragely - 平均哈希队列策略,分配规则详情 AllocateMessageQueueStrategy#allocate 参数详情如下:

  0

  平均哈希队列策略 AllocateMessageQueueAveragely#allocate 实现详情如下:

    

   平均哈希队列算法,首先会获取当前消费者在消费者组中的位置,再计算当前消费者能分配的消息队列的数量,以及分配的消息队列的范围,也就是从哪里开始分配到哪里结束分配,最后分配消息队列的范围给消费者进行分配。

3.5、拉取消息包装类定义

  定义RocketMQ拉取消息的包装类PullAPIWrapper,PullAPIWrapper是拉取消息的API包装类。

  

   PullAPIWrapper中包含了实际从MQ中获取消息的方法 pullKernelImpl()。

  0

  这里主要定义了拉取消息的对象,实际拉取消息的操作在启动拉取消息服务 PullMessageService 之后才会进行,也就是在启动 PullMessageService 服务之后,拉取消息的描述在启动消费消息服务做消息介绍。

3.6、消费进度存储

  根据消费消息的模式,判断加载消费进度的,消费进度对象 DefaultMQPushConsumerImpl#offsetStore,OffsetStore接口有两个子类LocalFileOffsetStore 和 RemoteBrokerOffsetStore,类图如下:

  0

  LocalFileOffsetStore:广播模式下,从本地读取消费进度,在该模式下会将offset信息转化成json保留到本地文件中;

  RemoteBrokerOffsetStore:集群模式下,获取远程存储信息,从 Broker 中获取消费进度,在该模式下offsetTable将需要提交的MessageQueue的offset信息通过          MQClientAPIImpl提供的接口updateConsumerOffsetOneway()提交到broker进行长久化存储。consumer的shutdown()办法会被动触发一次offset持久化到broker的操作。

  0

  根据消息消费模式选择不同的同步方式:广播模式从本地文件中获取偏移量,集群模式会根据偏移量的读取方式选择是读取内存中的偏移量还是Broker磁盘中的偏移量。

  将获取的消息偏移量对象赋值给 DefaultMQPushConsumerImpl#offsetStore 属性,后续可通过此对象加载消费进度。

 

3.7、消费进度加载 根据不同消费消息的模式获取到偏移量对象 DefaultMQPushConsumerImpl#offsetStore后,开始加载消息进度。 0 加载消费进度,RemoteBrokerOffsetStore中的load()是空实现; LocalFileOffsetStore中的load()方法读取本地文件 offsets.json,本地文件存储路径详情: 0

3.8、并发消息与顺序消息的判断

  并发消费消息与顺序消费消息创建的消费消息服务不同,并发消费消息创建ConsumeMessageConcurrentlyService、顺序消息消息创建ConsumeMessageOrderlyService。

0

  这里仅仅是创建服务对象,未启动。

3.9、启动消费消息服务

3.9.1、顺序消息消费服务

  顺序消息是基于JUC的ReadWriteLock、ReentrantLoc实现的,为消息队里加锁,保证同一时间只有一个消费者可以消费该队列的消息,在拉取到消息后,通过MessageListenerOrderly接口的consumeMessage方法来处理消息。

  在ConsumeMessageOrderlyService启动时,进行加锁操作;关闭时,释放锁。

  0

1、加锁处理

  加锁操作流程如下:客户端通过Netty传递功能号LOCK_BATCH_MQ=41远程调用Broker对消息队列进行加锁,最终将加锁的消息队列存放在RebalanceLockManager#mqLockTable 缓存表中。

  0

  Broker对消息队列的加锁处理,RebalanceLockManager#tryLockBatch 详情如下:

 1 // 加锁处理
 2 public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs,
 3     final String clientId) {
 4     // 创建存放 加锁的消息队列 与 不加锁的消息队列 的容器
 5     Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size());
 6     Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size());
 7     
 8     // 将消费者组中已加锁的消息队列与未加锁的消息队列区分开,便于对未加锁的消息队列进行加锁处理
 9     for (MessageQueue mq : mqs) {
10         if (this.isLocked(group, mq, clientId)) {
11             lockedMqs.add(mq);
12         } else {
13             notLockedMqs.add(mq);
14         }
15     }
16     // 未加锁的消息队列不为空
17     if (!notLockedMqs.isEmpty()) {
18         try {
19             // 加锁
20             this.lock.lockInterruptibly();
21             try {
22                 // 锁对象添加进消息队列缓存表中
23                 ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
24                 if (null == groupValue) {
25                     groupValue = new ConcurrentHashMap<>(32);
26                     this.mqLockTable.put(group, groupValue);
27                 }
28 
29                 // 为未加锁的消息队列,创建 LockEntity 对象,并设置客户端id
30                 for (MessageQueue mq : notLockedMqs) {
31                     LockEntry lockEntry = groupValue.get(mq);
32                     if (null == lockEntry) {
33                         lockEntry = new LockEntry();
34                         lockEntry.setClientId(clientId);
35                         groupValue.put(mq, lockEntry);
36                     }
37 
38                     // 若已加锁,重置最后更新时间戳,重新设置到 缓存 的 容器中
39                     if (lockEntry.isLocked(clientId)) {
40                         lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
41                         lockedMqs.add(mq);
42                         continue;
43                     }
44 
45                     String oldClientId = lockEntry.getClientId();
46                     // 若锁到期,续期、再重新设置到 缓存 的 容器中
47                     if (lockEntry.isExpired()) {
48                         lockEntry.setClientId(clientId);
49                         lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
50                         lockedMqs.add(mq);
51                         continue;
52                     }
53                 }
54             } finally {
55                 // 释放锁
56                 this.lock.unlock();
57             }
58         } catch (InterruptedException e) {
59             log.error("putMessage exception", e);
60         }
61     }
62     return lockedMqs;
63 }

2、释放锁处理

  释放锁流程与加锁操作流程类似,客户端通过Netty传递功能号UNLOCK_BATCH_MQ=42远程调用Broker对消息队列进行锁释放,最终遍历RebalanceLockManager#mqLockTable 加锁缓存表消息队列,将缓存表中指定客户端Id的消息队列移除。   0

  Broker对消息队列的释放锁的处理,RebalanceLockManager#unlockBatch 详情如下:

 1 // 释放锁
 2 public void unlockBatch(final String group, final Set<MessageQueue> mqs, final String clientId) {
 3     try {
 4         // 加锁
 5         this.lock.lockInterruptibly();
 6         try {
 7             ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
 8             if (null != groupValue) {
 9                 // 移除锁对象中的clientId
10                 for (MessageQueue mq : mqs) {
11                     LockEntry lockEntry = groupValue.get(mq);
12                     if (null != lockEntry) {
13                         if (lockEntry.getClientId().equals(clientId)) {
14                             groupValue.remove(mq);
15                         }
16                     }
17                 }
18             } 
19         } finally {
20             // 释放锁
21             this.lock.unlock();
22         }
23     } catch (InterruptedException e) {
24         log.error("putMessage exception", e);
25     }
26 }

3.9.2、并发消息消费服务

  并发消息消费服务在启动时,会清除过期的消息,根据消息的偏移量移除ProcessQueue#msgTreeMap中的MessageExt,通过读写锁ProcessQueue#lockTreeMap来保证清理消息时的线程安全,ProcessQueue#removeMessage, 核心代码如下:

0

  在拉取到消息后,通过 MessageListenerConcurrently 消息监听处理器的consumeMessage方法来处理消息。

3.10、注册消费者

  将消费者注册到消费者缓存表 MQClientInstance#consumerTable ,MQClientInstance#registerConsumer 详情如下:

 1 // 消费者缓存表
 2 private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
 3 
 4 // 注册消费者
 5 public boolean registerConsumer(final String group, final MQConsumerInner consumer) {
 6     if (null == group || null == consumer) {
 7         return false;
 8     }
 9     // 将消费者注册到消费者缓存表中
10     MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
11     if (prev != null) {
12         log.warn("the consumer group[" + group + "] exist already.");
13         return false;
14     }
15 
16     return true;
17 } 

3.11、启动MQClientInstance

1、启动消息发送服务,启动Netty

  开启Netty通信的服务,NettyRemotingClient#start(),启动 Netty远程客户端服务。

2、开启定时任务

  开启定时任务,MQClientInsatnce#startScheduledTask 详情如下:

 1 // 开启定时任务
 2 private void startScheduledTask() {
 3     // 2min同步NameServer地址服务
 4     if (null == this.clientConfig.getNamesrvAddr()) {
 5         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 6             @Override
 7             public void run() {
 8                 try {
 9                     MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
10                 } catch (Exception e) {
11                     log.error("ScheduledTask fetchNameServerAddr exception", e);
12                 }
13             }
14         }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
15     }
16 
17     // 30s同步NameServer中的Topic路由信息
18     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
19         @Override
20         public void run() {
21             try {
22                 MQClientInstance.this.updateTopicRouteInfoFromNameServer();
23             } catch (Exception e) {
24                 log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
25             }
26         }
27     }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
28 
29     // 30s剔除下线的broker、发送心跳检测到broker
30     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
31         @Override
32         public void run() {
33             try {
34                 MQClientInstance.this.cleanOfflineBroker();
35                 MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
36             } catch (Exception e) {
37                 log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
38             }
39         }
40     }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
41 
42     // 5s 持久化消费进度
43     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
44         @Override
45         public void run() {
46             try {
47                 MQClientInstance.this.persistAllConsumerOffset();
48             } catch (Exception e) {
49                 log.error("ScheduledTask persistAllConsumerOffset exception", e);
50             }
51         }
52     }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
53 
54     // 调整线程池异步任务
55     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
56         @Override
57         public void run() {
58             try {
59                 MQClientInstance.this.adjustThreadPool();
60             } catch (Exception e) {
61                 log.error("ScheduledTask adjustThreadPool exception", e);
62             }
63         }
64     }, 1, 1, TimeUnit.MINUTES);
65 }

3、开启拉取消息服务

  启动拉取消息服务 PullMessageService,拉取消息的核心方法 PullMessageService#pullMessage。

  0

  执行DefaultMQPushConsumerImpl#pullMessage 方法:

  0

  根据不同的发送方式,调用 MQClientAPIImpl#pullMessage 拉取消息 详情如下:

 1 public PullResult pullMessage(
 2     final String addr,
 3     final PullMessageRequestHeader requestHeader,
 4     final long timeoutMillis,
 5     final CommunicationMode communicationMode,
 6     final PullCallback pullCallback
 7 ) throws RemotingException, MQBrokerException, InterruptedException {
 8     RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
 9 
10     // 通讯方式的判断
11     switch (communicationMode) {
12         case ONEWAY:
13             assert false;
14             return null;
15         // 异步
16         case ASYNC:
17             this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
18             return null;
19         // 同步
20         case SYNC:
21             return this.pullMessageSync(addr, request, timeoutMillis);
22         default:
23             assert false;
24             break;
25     }
26 
27     return null;
28 }

4、开启负载均衡服务

  负载均衡服务RebalanceService,最终执行 RebalanceImpl#doRebalance,详情如下:

 1 public void doRebalance(final boolean isOrder) {
 2     // 获取订阅数据
 3     Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
 4     if (subTable != null) {
 5         for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
 6             // 获取Topic
 7             final String topic = entry.getKey();
 8             try {
 9                 // 负载均衡处理
10                 this.rebalanceByTopic(topic, isOrder);
11             } catch (Throwable e) {
12                 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
13                     log.warn("rebalanceByTopic Exception", e);
14                 }
15             }
16         }
17     }
18     // 清除未订阅主题的消息队列
19     this.truncateMessageQueueNotMyTopic();
20 }

5、开启消息推送服务

  启动消息推送服务,用于将消息消费结果通知给Broker,将生产者添加进生产者缓存表 MQClientInstance#producerTable。向所有的Broker发送心跳,并移除超时请求,并执行回调方法onException。

  DefaultMQProducerImpl#start() 详情如下:

 1 /**
 2  * 启动生产者
 3  * @param startFactory
 4  * @throws MQClientException
 5  */
 6 public void start(final boolean startFactory) throws MQClientException {
 7     switch (this.serviceState) {
 8         // 刚创建未启动
 9         case CREATE_JUST:
10             this.serviceState = ServiceState.START_FAILED;
11             // todo 检查配置
12             this.checkConfig();
13             // 更改当前instanceName为进程ID
14             if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
15                 this.defaultMQProducer.changeInstanceNameToPID();
16             }
17 
18             //todo 获取MQ客户端实例
19             //整个JVM中只存在一个MQClientManager实例,维护一个MQClientInstance缓存表
20             //ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<String,MQClientInstance>();
21             //同一个clientId只会创建一个MQClientInstance。
22             //MQClientInstance封装了RocketMQ网络处理API,是消息生产者和消息消费者与NameServer、Broker打交道的网络通道
23             this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
24 
25             // 注册Producer到MQClientInstance客户端实例
26             boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
27             // 未注册成功,抛出异常
28             if (!registerOK) {
29                 this.serviceState = ServiceState.CREATE_JUST;
30                 throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
31                     + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
32                     null);
33             }
34 
35             // 路由信心添加进缓存表中
36             this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
37 
38             // 启动MQ客户端实例
39             if (startFactory) {
40                 // todo 最终调用MQClientInstance
41                 mQClientFactory.start();
42             }
43 
44             log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
45                 this.defaultMQProducer.isSendMessageWithVIPChannel());
46             this.serviceState = ServiceState.RUNNING;
47             break;
48         case RUNNING:
49         case START_FAILED:
50         case SHUTDOWN_ALREADY:
51             throw new MQClientException("The producer service state not OK, maybe started once, "
52                 + this.serviceState
53                 + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
54                 null);
55         default:
56             break;
57     }
58 
59     // 向所有的broker发送心跳
60     this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
61 
62     // 扫描并移除超时请求,并执行回调方法onException
63     this.timer.scheduleAtFixedRate(new TimerTask() {
64         @Override
65         public void run() {
66             try {
67                 RequestFutureTable.scanExpiredRequest();
68             } catch (Throwable e) {
69                 log.error("scan RequestFutureTable exception", e);
70             }
71         }
72     }, 1000 * 3, 1000);
73 }

  消息推送服务,在RocketMQ源码(四):RocketMQ生产者发送消息流程中已经做了详细介绍,此处不再赘述。

 

6、更新服务状态

  更新服务状态为运行中,同时更新订阅信息,并发送心跳。

  0

4、总结

  RocketMQ消费者启动流程如下:

  

 

 

标签:加锁,队列,流程,defaultMQPushConsumer,源码,消息,new,final,RocketMQ
From: https://www.cnblogs.com/RunningSnails/p/17718386.html

相关文章

  • nacos1.4.X版本服务注册源码分析
     客户端:nacos1.4.1版本服务注册流程1:依赖spring-cloud-starter-alibaba-nacos-discovery2:resources/META-INF/spring.factories自动配置NacosServiceRegistryAutoConfiguration3:自动配置类NacosAutoServiceRegistration继承AbstractAutoServiceRegistration实现Appli......
  • 《Web性能权威指南》高清高质量PDF电子书+源码
    前言阅读第一部分网络技术概览阅读第1章延迟与带宽阅读第2章TCP的构成第3章UDP的构成第4章传输层安全(TLS)第二部分无线网络性能第5章无线网络概览第6章Wi-Fi第7章移动网络第8章移动网络的优化建议第三部分HTTP第9章HTTP简史第10章Web性能要点第11章HTTP......
  • [SpringSecurity5.6.2源码分析十二]:CsrfFilter
    前言• Csrf(跨站伪造请求):指的是用户在A网站认证完成后,A网站Cookie保存在了浏览器中,然后用户在B网站点击了钓鱼链接,使其让钓鱼请求带有了A网站的Cookie,从而让A网站认为这是一次正常的请求• 而SpringSecurity采用的是同步令牌模式(SynchronizerTokenPattern)来预防Csrf攻击•......
  • 《算法的乐趣》高清高质量PDF 电子书 附源码
    本书从一系列有趣的生活实例出发,全面介绍了构造算法的基础方法及其广泛应用,生动地展现了算法的趣味性和实用性。全书分为两个部分,第一部分介绍了算法的概念、常用的算法结构以及实现方法,第二部分介绍了算法在各个领域的应用,如物理实验、计算机图形学、数字音频处理等。其中,既有各种......
  • 《Python深度学习》高清高质量PDF电子书+源码
    Keras之父,TensorFlow机器学习框架贡献者详尽介绍了用Python和Keras进行深度学习的探索实践,包括计算机视觉、自然语言处理、产生式模型等应用。书中包含30多个代码示例,步骤讲解详细透彻。由于本书立足于人工智能的可达性和大众化,读者无须具备机器学习相关背景知识即可展开阅读。在学......
  • javaweb运行tomcat时Jsp文件显示源码
    今天在写javaweb项目的时候出现了运行一直不打开浏览器,如果手动打开浏览器的话,就会出现自己写的jsp文件中的所有源码,具体如图所示我的问题在Servlet中因为要告诉jsp文件servlet在哪里所以要在类名的上一行写上@WebServlet("/Servlet"),但是由于我的粗心写成了@WebServlet("Servl......
  • C++医学影像(PACS)管理系统源码
    PACS(PictureArchivingandCommunicationsSystem)——图像存储与传输系统,和医院信息化及数字化的目标紧密关联,它是专门为现代化医院的影像管理而设计的包括数字化医学图像信息的采集、显示、处理、存储、诊断、输出、管理、查询、信息处理的综合应用系统,是以数字化诊断(无纸化、无......
  • tgt源码阅读
    读懂一个开源项目源码之前,需要先了解该项目的背景知识。背景知识熟悉了,代码只是具体实现手段而已。源码地址:https://github.com/fujita/tgt对于tgt来说,背景知识是块设备、scsi、iscsi协议。众所周知,一条协议一般指的是一个包头,然后把要收发的数据放在包头后面。scsi协议和iscsi......
  • 直播带货源码,评论框自动控制高度
    直播带货源码,评论框自动控制高度HTML <divclass="cont_comment_cont">  <divid="textareaHeight">    <textareaid="textarea"placeholder="在此输入评论内容~"></textarea>  </div>  <div>发布</di......
  • drf(初始drf,restfull规范 ,源码)
    一web开发模式#前后端混合开发(前后端不分离):通过模版语法,在服务器上处理好html的内容(组合字符串),返回给浏览器一堆字符串(字符串封装到respons对象里),浏览器在渲染#前后端分离:只专注于写后端接口,返回json、xml格式#xml比json笨重#补充:什么是动态页面(需要查数据......