首页 > 编程语言 >RocketMQ源码(四):RocketMQ生产者发送消息流程

RocketMQ源码(四):RocketMQ生产者发送消息流程

时间:2023-08-21 16:34:41浏览次数:53  
标签:MQClientInstance 发送 源码 消息 new null public RocketMQ

  RocketMQ通过Producer发送消息,以同步方式发送普通消息为例,分析发送消息的整体流程。Producer的示例代码如下:

 1 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 2 import org.apache.rocketmq.client.producer.SendResult;
 3 import org.apache.rocketmq.common.message.Message;
 4 import org.apache.rocketmq.remoting.common.RemotingHelper;
 5 // 同步发送
 6 public class SyncProducer {
 7     public static void main(String[] args) throws Exception{
 8         // 实例化消息生产者Producer
 9         DefaultMQProducer producer = new DefaultMQProducer("group_test");
10         try{
11             // 设置NameServer的地址
12             producer.setNamesrvAddr("127.0.0.1:9876");
13             // 启动Producer实例
14             producer.start();
15             for (int i = 0; i < 2; i++) {
16                 // 创建消息,并指定Topic,Tag和消息体
17                 Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
18                 // 发送消息到一个Broker
19                 SendResult sendResult = producer.send(msg);
20                 System.out.printf("%s%n", sendResult);
21             }
22         }finally {
23             //如果不再发送消息,关闭Producer实例。
24             producer.shutdown();
25         }
26     }
27 }

根据上述代码,生产者发送消息主要完成以下几件事:

  1、创建生产者对象DefaultMQProducer并设置NameServer地址

  2、启动生产者

  3、发送消息

一、创建生产者

  DefaultMQProducer构造函数详情如下:

1 public DefaultMQProducer(final String producerGroup) {
2     this(null, producerGroup, null);
3 }
4 
5 public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
6     this.namespace = namespace;
7     this.producerGroup = producerGroup;
8     defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
9 }

  DefaultMQProducer中的持有关键属性DefaultMQProducerImpl。DefaultMQProducerImpl是实际干活的,DefaultMQProducer的启动与消息发送都依赖于DefaultMQProducerImpl中方法,利用装饰者模式。

二、启动生产者

  DefaultMQProducerImpl#start() 核心代码:
 1 // 启动生产者
 2 public void start(final boolean startFactory) throws MQClientException {
 3     switch (this.serviceState) {
 4         // 刚创建未启动
 5         case CREATE_JUST:
 6             this.serviceState = ServiceState.START_FAILED;
 7             // 检查配置
 8             this.checkConfig();
 9             // 更改当前instanceName为进程ID
10             if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
11                 this.defaultMQProducer.changeInstanceNameToPID();
12             }
13 
14             // 获取MQ客户端实例
15             this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
16 
17             // 注册Producer到MQClientInstance客户端实例
18             boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
19             // 未注册成功,抛出异常
20             if (!registerOK) {
21                 this.serviceState = ServiceState.CREATE_JUST;
22                 throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
23                     + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
24                     null);
25             }
26 
27             // 将Topic发布信息添加到topicPublishInfoTable属性中
28             this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
29 
30             // 启动MQ客户端实例
31             if (startFactory) {
32                 // todo 最终还是调用MQClientInstance
33                 mQClientFactory.start();
34             }
35 
36             log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
37                 this.defaultMQProducer.isSendMessageWithVIPChannel());
38             this.serviceState = ServiceState.RUNNING;
39             break;
40         case RUNNING:
41         case START_FAILED:
42         case SHUTDOWN_ALREADY:
43             throw new MQClientException("The producer service state not OK, maybe started once, "
44                 + this.serviceState
45                 + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
46                 null);
47         default:
48             break;
49     }
50 
51     // 向所有的broker发送心跳
52     this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
53 
54     // 扫描并移除超时请求,并执行回调方法onException
55     this.timer.scheduleAtFixedRate(new TimerTask() {
56         @Override
57         public void run() {
58             try {
59                 RequestFutureTable.scanExpiredRequest();
60             } catch (Throwable e) {
61                 log.error("scan RequestFutureTable exception", e);
62             }
63         }
64     }, 1000 * 3, 1000);
65 }

1、检查配置

  DefaultMQProducerImpl#checkConfig() 核心代码:

 1 private void checkConfig() throws MQClientException {
 2     // 组名称校验
 3     Validators.checkGroup(this.defaultMQProducer.getProducerGroup());
 4     // 组名称为null,抛出异常
 5     if (null == this.defaultMQProducer.getProducerGroup()) {
 6         throw new MQClientException("producerGroup is null", null);
 7     }
 8     //不能和系统的分组名冲突(DEFAULT_PRODUCER)
 9     if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {
10         throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",
11             null);
12     }
13 }

2、获取客户端实例

  DefaultMQProducerImpl#start() 获取客户端实例的代码段:

1 // 获取MQ客户端实例
2 this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
3 // 注册Producer到MQClientInstance客户端实例
4 boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);

2.1、获取MQClientInstance实例

  MQClientManager是单例对象,整个JVM中只存在一个MQClientManager实例。

  

  在MQClientManager中维护了一个clinetId与MQClientInstance映射的缓存表。获取MQ客户端实例MQClientManager#getOrCreateMQClientInstance() 核心代码:

 1 public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
 2     // 获取客户端ID
 3     String clientId = clientConfig.buildMQClientId();
 4     // 根据客户端id缓存中获取MQClientInstance
 5     MQClientInstance instance = this.factoryTable.get(clientId);
 6     // 若为空,则创建实例并添加到实例表中
 7     if (null == instance) {
 8         // 创建MQClientInstance
 9         instance =
10             new MQClientInstance(clientConfig.cloneClientConfig(),
11                 this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
12         // 添加进factoryTable缓存
13         MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
14         if (prev != null) {
15             instance = prev;
16             log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
17         } else {
18             log.info("Created new MQClientInstance for clientId:[{}]", clientId);
19         }
20     }
21     return instance;
22 }

  一个clientId只会创建一个MQClientInstance,添加进MQClientManager的缓存表。clientId的生成规则IP@instanceName@unitName。

  ClientConfig#buildMQClientId 核心代码:
 1 // IP@instanceName@unitName相同,复用同一个MQClientInstance
 2 public String buildMQClientId() {
 3     StringBuilder sb = new StringBuilder();
 4     // IP
 5     sb.append(this.getClientIP());
 6     sb.append("@");
 7     // 实例名称
 8     sb.append(this.getInstanceName());
 9     // unitName不为null
10     if (!UtilAll.isBlank(this.unitName)) {
11         sb.append("@");
12         sb.append(this.unitName);
13     }
14     return sb.toString();
15 }

  对于RocketMQ而言,消息发送者、消息消费者都属于客户端,每一个客户端就是一个MQClientInstance,每一个ClientConfig对应一个实例。

  

   MQClientInstance通过ClientConfig属性,既关联生产者又关联消费者。

  所以不同的生产者、消费端,如果引用同一个客户端配置(ClientConfig),则它们共享一个MQClientInstance实例。所以在定义的时候如果生产者和消费者如果分组名相同容易导致这个问题。   MQClientInstance构造函数详情如下:

  

   在创建MQClientInstance对象时,会初始化远程客户端MQClientAPIImpl,用于网络处理API,是消息生产者和消息消费者与NameServer、Broker打交道的网络通道。

2.2、注册组名与生产者实例映射关系

  MQClientInstance#registerProducer() 核心代码:
 1 // 生产者缓存表
 2 private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
 3 
 4 public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
 5     if (null == group || null == producer) {
 6         return false;
 7     }
 8     // 将生产者组名称与生产者实例映射关系 添加进MQClientInstance中
 9     MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
10     // 若MQClientInstanc已存在生产者实例,注册失败
11     if (prev != null) {
12         log.warn("the producer group[{}] exist already.", group);
13         return false;
14     }    
15     return true;
16 }

  将生产者实例注册到客户端实例MQClientInstance缓存表中。

3、启动客户端实例

  启动生产者,MQClientInstance#start() 核心代码:
 1 public void start() throws MQClientException {
 2     synchronized (this) {
 3         switch (this.serviceState) {
 4             // 刚创建未启动
 5             case CREATE_JUST:
 6                 this.serviceState = ServiceState.START_FAILED;
 7                 // 未指定NameServer地址,通过http的方式远程调用NameServer服务,获取NameServer地址
 8                 if (null == this.clientConfig.getNamesrvAddr()) {
 9                     this.mQClientAPIImpl.fetchNameServerAddr();
10                 }
11                 // 开启网络通信  NRC
12                 this.mQClientAPIImpl.start();
13                 // 开启定时执行任务
14                 this.startScheduledTask();
15                 // 开启拉取消息服务
16                 this.pullMessageService.start();
17                 // 开启负载均衡服务
18                 this.rebalanceService.start();
19                 // 开启消息推送服务
20                 this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
21                 // 设置当前服务状态为运行中
22                 this.serviceState = ServiceState.RUNNING;
23                 break;
24             case START_FAILED:
25                 throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
26             default:
27                 break;
28         }
29     }
30 }

1、开启网络通信

2、开启定时任务

  MQClientInstance#startScheduledTask() 核心代码:

 1 /**
 2  * 开启各种定时任务
 3  *  生产者或者消费者不是实时感知Broker的状态,而是会有一定的偏差,
 4  *  所以如果服务器出现宕机,生产者或者消费要自行处理故障
 5  */
 6 private void startScheduledTask() {
 7     // 2min同步NameServer地址服务
 8     if (null == this.clientConfig.getNamesrvAddr()) {
 9         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
10 
11             @Override
12             public void run() {
13                 try {
14                     MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
15                 } catch (Exception e) {
16                     log.error("ScheduledTask fetchNameServerAddr exception", e);
17                 }
18             }
19         }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
20     }
21 
22     // 30s同步NameServer中的Topic路由信息
23     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
24 
25         @Override
26         public void run() {
27             try {
28                 MQClientInstance.this.updateTopicRouteInfoFromNameServer();
29             } catch (Exception e) {
30                 log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
31             }
32         }
33     }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
34 
35     // 30s剔除下线的broker、发送心跳检测到broker
36     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
37         @Override
38         public void run() {
39             try {
40                 MQClientInstance.this.cleanOfflineBroker();
41                 MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
42             } catch (Exception e) {
43                 log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
44             }
45         }
46     }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
47 
48     // 5s 持久化消费进度
49     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
50 
51         @Override
52         public void run() {
53             try {
54                 MQClientInstance.this.persistAllConsumerOffset();
55             } catch (Exception e) {
56                 log.error("ScheduledTask persistAllConsumerOffset exception", e);
57             }
58         }
59     }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
60 
61     // 调整线程池异步任务
62     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
63         @Override
64         public void run() {
65             try {
66                 MQClientInstance.this.adjustThreadPool();
67             } catch (Exception e) {
68                 log.error("ScheduledTask adjustThreadPool exception", e);
69             }
70         }
71     }, 1, 1, TimeUnit.MINUTES);
72 }

  同步NameServer地址服务定时任务、同步NameServer中的Topic路由信息、持久化消费进度、发送心跳检测到broker

3、开启消息拉取服务、负载均衡服务

  PullMessageService、RebalanceService服务继承自ServiceThread,类图关系如下:

  

   开启消息拉取服务、负载均衡服务,ServiceThread#start() 核心代码:

 1 public void start() {
 2     log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
 3     if (!started.compareAndSet(false, true)) {
 4         return;
 5     }
 6     stopped = false;
 7     this.thread = new Thread(this, getServiceName());
 8     this.thread.setDaemon(isDaemon);
 9     this.thread.start();
10 }

  此处只是开启了线程,具体执行逻辑在PullMessageService、RebalanceService的重写方法run()中。

三、发送消息

  根据DefaultMQProducer的send方法,最终调到DefaultMQProducerImpl的sendDefaultImpl方法,核心代码段如下。

1、传递数据合法性校验

  DefaultMQProducerImpl#sendDefaultImpl()方法,代码段:

 

   校验传送的消息与Topic,Validators#checkMessage() 核心代码:

 1 // 校验消息核心代码
 2 public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
 3     throws MQClientException {
 4     if (null == msg) {
 5         throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
 6     }
 7     // topic规范检查
 8     Validators.checkTopic(msg.getTopic());
 9     // topic是否能写入消息
10     Validators.isNotAllowedSendTopic(msg.getTopic());
11     // 消息非空检查
12     if (null == msg.getBody()) {
13         throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
14     }
15     // 消息长度检查
16     if (0 == msg.getBody().length) {
17         throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
18     }
19     if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
20         throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
21             "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
22     }
23 }

2、查找路由

  DefaultMQProducerImpl#sendDefaultImpl()方法,代码段:

  

   查找路由信息,DefaultMQProducerImpl#tryToFindTopicPublishInfo() 核心代码:

 1 // topic与topic路由信息映射缓存表
 2 private final ConcurrentMap<String, TopicPublishInfo> topicPublishInfoTable =
 3     new ConcurrentHashMap<String, TopicPublishInfo>();
 4 // MQ客户端实例
 5 private MQClientInstance mQClientFactory;
 6 
 7 // 查找路由信息
 8 private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
 9     // 获取缓存表中的topic路由信息
10     TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
11     // 缓存表中路由信息为空,则从NameServer获取topic路由信息
12     if (null == topicPublishInfo || !topicPublishInfo.ok()) {
13         this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
14         this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
15         topicPublishInfo = this.topicPublishInfoTable.get(topic);
16     }
17     // 缓存表中存在或Nameserver中存在topic的路由信息,直接返回
18     if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
19         return topicPublishInfo;
20     // 若未找到当前topic的路由信息,则用默认主题继续查找
21     } else {
22         this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
23         topicPublishInfo = this.topicPublishInfoTable.get(topic);
24         return topicPublishInfo;
25     }
26 }

  优先获取缓存表中获取topic的路由信息TopicPublishInfo,若缓存表中存在,返回路由信息。

  若缓存表中不存在,根据topic从nameserver中获取路由信息,若获取到路由信息,设置进缓存表并返回路由信息。

  若nameserver中未查找到topic路由信息,使用默认的主题继续从nameserver中查找路由信息。

3、选择发送消息队列

  DefaultMQProducerImpl#sendDefaultImpl()方法,代码段:

 

  DefaultMQProducerImpl#selectOneMessageQueue() 核心代码:

 1 // 故障延迟标识开关,默认关闭
 2 private boolean sendLatencyFaultEnable = false;
 3 
 4 // 选择消息队列
 5 public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
 6     // Broker故障延迟机制,默认关闭
 7     if (this.sendLatencyFaultEnable) {
 8         try {
 9             int index = tpInfo.getSendWhichQueue().getAndIncrement();
10             // 1、对消息队列轮询获取一个队列
11             for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
12                 // 基于index和队列数量取余,确定位置
13                 int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
14                 if (pos < 0)
15                     pos = 0;
16                 // 2、获取消息队列
17                 MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
18                 // 3、消息队列所在的broker可用,返回消息队列;不可用,获取下一个消息队列
19                 if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
20                     return mq;
21             }
22 
23             // 4、若预测的所有broker都不可用,则随机选择一个broker,随机选择该Broker下一个队列进行发送
24             final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
25             // 获得Broker的写队列集合
26             int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
27             if (writeQueueNums > 0) {
28                 final MessageQueue mq = tpInfo.selectOneMessageQueue();
29                 if (notBestBroker != null) {
30                     // 获得一个队列,指定broker和队列ID并返回
31                     mq.setBrokerName(notBestBroker);
32                     mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
33                 }
34                 return mq;
35             } else {
36                 latencyFaultTolerance.remove(notBestBroker);
37             }
38         } catch (Exception e) {
39             log.error("Error occurred when selecting message queue", e);
40         }
41 
42         return tpInfo.selectOneMessageQueue();
43     }
44 
45     // 默认执行
46     return tpInfo.selectOneMessageQueue(lastBrokerName);
47 }

  故障延迟处理标识开关sendLatencyFaultEnable默认关闭,开关关闭,RocketMQ选择消息队列采用轮询的方式,轮询算法能保证每一个Queue队列的消息投递数量尽可能均匀。

 1 // 默认使用轮询方式选择消息队列
 2 public MessageQueue selectOneMessageQueue() {
 3     // 使用 ThreadLocal 进行 sendWhichQueue 的自增
 4     int index = this.sendWhichQueue.getAndIncrement();
 5     // 对队列大小取模
 6     int pos = Math.abs(index) % this.messageQueueList.size();
 7     if (pos < 0)
 8         pos = 0;
 9     // 返回对应的队列
10     return this.messageQueueList.get(pos);
11 }

  开关打开,RocketMQ就开启了故障延迟功能,每次向Broker成功或者异常的发送,RocketMQ都会计算出该Borker的可用时间(发送结束时间-发送开始时间,失败的按照30S计算),并且保存,方便下次发送时做broekr不可用时长的判断。

  

   故障规避延时机制,MQFaultStrategy#updateFaultItem() 核心代码:

 1 // 发送延时
 2 private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
 3 // 故障规避
 4 private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
 5 
 6 public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
 7     if (this.sendLatencyFaultEnable) {
 8         //获取不可用持续时长,在这个时间内,Broker将被规避
 9         long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
10         this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
11     }
12 }
13 // 根据发送延时来定义故障规避的时间
14 private long computeNotAvailableDuration(final long currentLatency) {
15     for (int i = latencyMax.length - 1; i >= 0; i--) {
16         if (currentLatency >= latencyMax[i])
17             return this.notAvailableDuration[i];
18     }
19     return 0;
20 }

  如果消息时长在550ms之内,不可用时长为0;达到550ms,不可用时长为30S;达到1000ms,不可用时长为60S;达到2000ms,不可用时长为120S;达到3000ms,不可用时长为180S;达到15000ms,不可用时长为600S。

  故障延迟机制策略处理:遍历消息队列列表,获取列表中broker可用的消息队列;若列表中没有broker可用的消息队列,随机选择一个broker中的某个队列进行消息发送。

  

  从RocketMQ的策略上可以看到,默认队列选择是轮训策略,而故障延迟选择队列则是优先考虑消息的发送时长短的队列。

  当一个Topic创建在不同的Broker上时,通讯网络较好,推荐默认的轮询策略;通讯网络较差,推荐故障延迟机制,可避免不断向宕机的broker发送消息,实现消息发送的高可用。

4、消息发送

  DefaultMQProducerImpl#sendDefaultImpl()方法,代码段:

  

   DefaultMQProducerImpl#sendKernelImpl()方法,代码段:

  

MQClientAPIImpl#sendMessage()方法,代码段:

  构建远程通信请求对象RemotingCommand,同时为ReuqestCode设置请求码ReuqestCode,RequestCode的具体作用在Broker端处理消息在具体介绍。

  

   根据通讯模式,执行远程的调用:

  

   MQClientAPIImpl#sendMessageSync()方法,代码段:

 

   NettyRemotingClient#invokeSync(),代码段:

  

  在RocketMQ中,客户端与Broker的连接是在发送消息时建立的,即只有在需要与对端进行数据交互时建立的网络通信连接,连接建立之后,发送同步消息。

  NettyRemotingAbstract#invokeSync(),代码段:

  

    通过netty发送消息到broker中,并监听响应结果。

四、生产者消息发送核心流程图

 

标签:MQClientInstance,发送,源码,消息,new,null,public,RocketMQ
From: https://www.cnblogs.com/RunningSnails/p/17646320.html

相关文章

  • (一)Dubbo源码解析:增强SPI
    〇、前言在Dubbo的架构设计中,如何可以通过“类插拔”的方式,对其功能进行灵活的扩展或者削弱,那么,SPI起到了极其关键的作用。本篇文章作为分析Dubbo源码的第一篇文章,我们先暂时放下“服务注册发布流程”、“服务启动流程”、“请求处理流程”……这些功能代码的探索,我们先从最基本的......
  • 使用JMeter模拟设备通过MQTT发送数据
    需求:需要一个工具能够支持MQTT协议发送各种不同的数据。目的:模拟小型温室设备反馈,搭建一个测试环境,根据测试的数据显示硬件的状态和数值。工具:JMeter环境:需要配置Java运行环境。操作步骤:1.下载JMeter运行包下载地址:https://jmeter.apache.org/download_jmeter.cgi,下载后可以解压......
  • 逻辑清晰,详解社交源码Android开发SDK
    前篇我们讲解了有关如何在IOS平台开发集成SDK,那么今天来给大家简单讲解下如何在社交源码Android客户端上开发集成。1.获取SDK:从提供SDK的第三方开发者或公司获得SDK的相关文件和文档。2.导入SDK文件:将SDK的库文件(.jar或.aar格式)拷贝到Android项目的libs文件夹中。3.配置权限:检查并......
  • app直播源码,读取多行文本、读取文件分割多行文本
    app直播源码,读取多行文本、读取文件分割多行文本读取文本 publicfunctiondaoru(){/* *逐行读取TXT文件  */     $rep=str_replace("\n",',',"TD92069E76EC27CA8B66B631CB49A9C6TD5A22D898050393C2F8D5C29C854F1B");    $cont=explode(',',$re......
  • 直播系统源码,实现上滑加载分页(触底加载)
    直播系统源码,实现上滑加载分页(触底加载) //依据分类查询图书  publicfunctionquery_book_by_classid(){    $token=input('token');    $class_id=input('class_id');    $page=input('page');//起始行    $per_page=input('per_page');//......
  • 机房收费管理系统-计算机毕业设计源码+LW文档
    【摘要】作为计算机机房管理的必要组成部分,计算机机房管理系统有助于机房资源的合理分配、统一管理和设备利用率的提高,从而有力地保证了机房的管理质量。现代化、信息化和自动化是计算机机房的发展方向,它们旨在实现无人或少人值守的开放式管理,并减轻管理员的压力。通过自动计费和合......
  • 实验室信息管理系统(LIMS)源码,采用灵活的架构开发,支持多种应用程序和技术
    实验室信息管理系统(LIMS)是指帮助实验室组织和管理实验数据的计算机软件系统,它将实验室操作有机地组织在一起,以满足实验室工作流程的所有要求。它能以不同的方式支持实验室的工作,从简单的过程(如样品采集和入库)到复杂的流程(如教据报告和实验结果分析),完全改变实验室的工作流程,使......
  • StoneDB 源码解读系列|Tianmu 引擎工具类模块源码详解(一)
    StoneDB源码解读系列文章正式开启,预计以周更的形式跟大家见面,请多多支持~本篇源码解读内容已进行直播分享,可在视频号观看直播回放,也可点击阅读原文跳转至B站观看回放视频。PPT内容可在社区论坛中查看下载:https://forum.stonedb.io/t/topic/89各个工具类属于Tianmu引擎......
  • 手机直播源码开发,协议讨论篇(三):RTMP实时消息传输协议
    一、实时消息传输协议RTMP简介RTMP又称实时消息传输协议,是一种实时通信协议。在当今数字化时代,手机直播源码平台为全球用户进行服务,如何才能增加用户,提升用户黏性?就需要让一对一直播平台能够为用户提供优质的体验。而RTMP协议的部署就能为手机直播源码平台提供出低延迟、高质量的......
  • [完结25章]Java七大热门技术框架源码解析
    点击下载:[完结25章]Java七大热门技术框架源码解析提取码:ygnz《Java七大热门技术框架源码解析》已完结25章,任何框架都是有助于更快更好地开发软件解决方案的工具之一。框架的基本原理不必重新发明轮子。框架使开发人员的工作变得更轻松,并帮助他们专注于业务逻辑,而不必担心通用的......