首页 > 编程语言 >RocketMQ之消息发送源码分析

RocketMQ之消息发送源码分析

时间:2023-05-06 19:12:32浏览次数:47  
标签:topic 发送 源码 mq msg null final RocketMQ

一、概述

负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。

RocketMQ支持三种消息发送方式:

  • 同步消息发送(sync):当Producer发送消息到Broker时会同步等待消息处理结果;
  • 异步消息发送(async):当Producer发送消息到Broker时会指定一个消息发送成功的回调函数,调用消息发送后立即返回不会阻塞。消息发送成功或者失败会在一个新的线程中进行处理;
  • 单向消息发送(oneway):当Producer发送消息到Broker时直接返回,只管把消息发送出去,并不关心Broker的响应结果。

同步和异步方式均需要Broker返回确认信息,单向发送不需要。

二、Producer与Consumer类体系

从下图可以看出以下几点:

  1. ProducerConsumer的共同逻辑,封装在MQClientInstanceMQClientAPIImplMQAdminImpl这3个(蓝色)类里面。所谓共同的逻辑,比如定期更新NameServer地址列表,定期更新TopicRoute,发送网络请求等。
  2. Consumer有2种Pull和Push。下面会详细讲述这2者的区别。

下面将主要从DefaultMQProducer的启动流程、send发送方法和Broker代理服务器的消息处理三方面分别进行分析和阐述。

三、生产者源码流程

3.1 生产客户端启动

根据官方提供的例子,Producer.java里面使用DefaultMQProducer启动消息生成者,如下:

public class Producer {

    /**
     * The number of produced messages.
     */
    public static final int MESSAGE_COUNT = 1000;
    public static final String PRODUCER_GROUP = "please_rename_unique_group_name";
    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
    public static final String TOPIC = "TopicTest";
    public static final String TAG = "TagA";

    public static void main(String[] args) throws MQClientException, InterruptedException {
        
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);

        //指定名称服务器地址
        producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);

        //启动实例
        producer.start();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            try {
                //构建message,指定topic,tag和message body
                Message msg = new Message(TOPIC, TAG,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) 
                );

                //发送消息,返回结果
                SendResult sendResult = producer.send(msg);
                
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        // 释放资源
        producer.shutdown();
    }
}

3.2 DefaultMQProducer启动流程

在客户端发送普通消息的demo代码部分,我们先是将DefaultMQProducer实例启动起来,里面调用了默认生成消息的实现类 — DefaultMQProducerImplstart()方法。

//org.apache.rocketmq.client.producer;
public class DefaultMQProducer extends ClientConfig implements MQProducer {
    //...

    // 实际的生存者调用实现 
    protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
    // 生产者组
    private String producerGroup;
    // 默认 4 个 队列
    private volatile int defaultTopicQueueNums = 4;
    // 发送超时时间
    private int sendMsgTimeout = 3000;
    // 消息体最大大小
    private int compressMsgBodyOverHowmuch = 1024 * 4;
    // 同步重试次数
    private int retryTimesWhenSendFailed = 2;
    // 异步重试次数
    private int retryTimesWhenSendAsyncFailed = 2;

    // 发送失败是否会指定另外的 broker
    private boolean retryAnotherBrokerWhenNotStoreOK = false;
    // 最大消息为 4M 
    private int maxMessageSize = 1024 * 1024 * 4; 
    
    @Override
    public void start() throws MQClientException {
        this.setProducerGroup(withNamespace(this.producerGroup));
        this.defaultMQProducerImpl.start();
        if (null != traceDispatcher) {
            try {
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
            } catch (MQClientException e) {
                logger.warn("trace dispatcher start failed ", e);
            }
        }
    }
}

默认生成消息的实现类 — DefaultMQProducerImpl的启动源码如下:

//org.apache.rocketmq.client.impl.producer;
public class DefaultMQProducerImpl implements MQProducerInner {
    //...
    
    private MQClientInstance mQClientFactory;
    
    public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;

                //检查配置
                this.checkConfig();

                if (!this.defaultMQProducer.getProducerGroup()
                        .equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    //修改生产者 id
                    this.defaultMQProducer.changeInstanceNameToPID();
                }
                // 初始化得到MQClientInstance实例对象
                this.mQClientFactory = MQClientManager.getInstance()
                        .getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
                //注册实例
                boolean registerOK = mQClientFactory.registerProducer(
                        this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                   this.serviceState = ServiceState.CREATE_JUST;
                   throw new MQClientException("The producer group[" 
                           + this.defaultMQProducer.getProducerGroup()
                           + "] has been created before, specify another name please."
                           + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                           null);
                }

                if (startFactory) {
                    // 启动
                    mQClientFactory.start();
                }

                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}",
                        this.defaultMQProducer.getProducerGroup(),
                        this.defaultMQProducer.isSendMessageWithVIPChannel());
                this.serviceState = ServiceState.RUNNING;
                break;
           case RUNNING:
           case START_FAILED:
           case SHUTDOWN_ALREADY:
              throw new MQClientException("The producer service state not OK, maybe started once, "
                      + this.serviceState
                      + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                      null);
           default:
              break;
        }
        //...
    }
}

主要流程如下:

  1. 初始化得到MQClientInstance实例对象,并注册至本地缓存变量 — producerTable中;
  2. 将默认Topic(“TBW102”)保存至本地缓存变量 — topicPublishInfoTable中;
  3. MQClientInstance实例对象调用自己的start()方法,启动一些客户端本地的服务线程,如拉取消息服务、客户端网络通信服务、重新负载均衡服务以及其他若干个定时任务(包括,更新路由/清理下线Broker/发送心跳/持久化consumerOffset/调整线程池),并重新做一次启动(这次参数为false);
//org.apache.rocketmq.client.impl.factory;
public class MQClientInstance {
   public void start() throws MQClientException {

      synchronized (this) {
         switch (this.serviceState) {
            case CREATE_JUST:
               this.serviceState = ServiceState.START_FAILED;
               // If not specified,looking address from name server
               if (null == this.clientConfig.getNamesrvAddr()) {
                  this.mQClientAPIImpl.fetchNameServerAddr();
               }
               // Start request-response channel
               this.mQClientAPIImpl.start();
               // Start various schedule tasks
               this.startScheduledTask();
               // Start pull service
               this.pullMessageService.start();
               // Start rebalance service
               this.rebalanceService.start();
               // Start push service
               this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
               log.info("the client factory [{}] start OK", this.clientId);
               this.serviceState = ServiceState.RUNNING;
               break;
            case START_FAILED:
               throw new MQClientException("The Factory object[" + this.getClientId()
                       + "] has been created before, and failed.", null);
            default:
               break;
         }
      }
   }
}
  1. 最后向所有的Broker代理服务器节点发送心跳包;

总结起来,DefaultMQProducer的主要启动流程如下:

这里有以下几点需要说明:

  1. 在一个客户端中,一个producerGroup只能有一个实例;
  2. 根据不同的clientId,MQClientManager将给出不同的MQClientInstance
  3. 根据不同的producerGroupMQClientInstance将给出不同的MQProducerMQConsumer(保存在本地缓存变量 —— producerTableconsumerTable中);

3.3 send发送方法的核心流程

通过RocketMQ的客户端模块发送消息主要有以下三种方法:

  1. 同步方式
  2. 异步方式
  3. Oneway方式

其中,使用1、2种方式来发送消息比较常见,具体使用哪一种方式需要根据业务情况来判断。本节内容将结合同步发送方式(同步发送模式下,如果有发送失败的最多会有3次重试(也可以自己设置),其他模式均1次)进行消息发送核心流程的简析。使用同步方式发送消息核心流程的入口如下:

// org.apache.rocketmq.client.producer;
public class DefaultMQProducer extends ClientConfig implements MQProducer {

   protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
   
    //...
   
    /**
     * 同步方式发送消息核心流程的入口,默认超时时间为3s
     *
     * @param msg     发送消息的具体Message内容
     * @param timeout 其中发送消息的超时时间可以参数设置
     * @return
     * @throws MQClientException
     * @throws RemotingException
     * @throws MQBrokerException
     * @throws InterruptedException
     */
    public SendResult send(Message msg, long timeout) throws MQClientException, 
            RemotingException, MQBrokerException, InterruptedException {
        msg.setTopic(withNamespace(msg.getTopic()));
        return this.defaultMQProducerImpl(msg, CommunicationMode.SYNC, null, timeout);
    }
}

再看defaultMQProducerImpl()方法

//org.apache.rocketmq.client.impl.producer;
public class DefaultMQProducerImpl implements MQProducerInner {
    
    //...
    
   private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode,
           final SendCallback sendCallback, final long timeout) throws MQClientException, 
           RemotingException, MQBrokerException, InterruptedException {
       
      //判断生产者是否正常运行
      this.makeSureStateOK();
      //验证topic和body没有问题
      Validators.checkMessage(msg, this.defaultMQProducer);

      final long invokeID = random.nextLong();
      long beginTimestampFirst = System.currentTimeMillis();
      long beginTimestampPrev = beginTimestampFirst;
      long endTimestamp = beginTimestampFirst;
      //根据msg的topic (从nameserver更新topic)的路由信息,这里比较复杂,下面有代码说明
      TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
      //已经获取到了topic路由信息
      if (topicPublishInfo != null && topicPublishInfo.ok()) {
         // 最后选择消息要发送到的队列
         boolean callTimeout = false;
         MessageQueue mq = null;
         Exception exception = null;
         // 最后一次发送结果
         SendResult sendResult = null;
         //设置失败重试次数 同步3次 其他都是1次
         int timesTotal = communicationMode == CommunicationMode.SYNC 
                 ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() 
                 : 1;
         // 第几次发送
         int times = 0;
         // 存储每次发送消息选择的broker名
         String[] brokersSent = new String[timesTotal];
         //在重试次数内循环
         for (; times < timesTotal; times++) {
            String lastBrokerName = null == mq ? null : mq.getBrokerName();
            //选择其中一个queue,下面有说明
            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
            //已经有了选中的queue
            if (mqSelected != null) {
               mq = mqSelected;
               brokersSent[times] = mq.getBrokerName();
               try {
                  beginTimestampPrev = System.currentTimeMillis();
                  if (times > 0) {
                     //Reset topic with namespace during resend.
                     msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                  }
                  long costTime = beginTimestampPrev - beginTimestampFirst;
                  if (timeout < costTime) {
                     callTimeout = true;
                     break;
                  }
                  //发送消息到选中的队列
                  sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback,
                          topicPublishInfo, timeout - costTime);
                  endTimestamp = System.currentTimeMillis();
                  this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                  switch (communicationMode) {
                     case ASYNC:
                        return null;
                     case ONEWAY:
                        return null;
                     case SYNC:
                        if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                           if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                              continue;
                           }
                        }
                        return sendResult;
                     default:
                        break;
                  }
               } catch (RemotingException e) {
                  endTimestamp = System.currentTimeMillis();
                  this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                  log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, " +
                          "RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                  log.warn(msg.toString());
                  exception = e;
                  continue;
               } catch (MQClientException e) {
                  endTimestamp = System.currentTimeMillis();
                  this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                  log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, " +
                          "RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                  log.warn(msg.toString());
                  exception = e;
                  continue;
               } catch (MQBrokerException e) {
                  endTimestamp = System.currentTimeMillis();
                  this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                  log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, " +
                          "RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                  log.warn(msg.toString());
                  exception = e;
                  switch (e.getResponseCode()) {
                     case ResponseCode.TOPIC_NOT_EXIST:
                     case ResponseCode.SERVICE_NOT_AVAILABLE:
                     case ResponseCode.SYSTEM_ERROR:
                     case ResponseCode.NO_PERMISSION:
                     case ResponseCode.NO_BUYER_ID:
                     case ResponseCode.NOT_IN_CURRENT_UNIT:
                        continue;
                     default:
                        if (sendResult != null) {
                           return sendResult;
                        }

                        throw e;
                  }
               } catch (InterruptedException e) {
                  endTimestamp = System.currentTimeMillis();
                  this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                  log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, " +
                          "RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                  log.warn(msg.toString());

                  log.warn("sendKernelImpl exception", e);
                  log.warn(msg.toString());
                  throw e;
               }
            } else {
               break;
            }
         }

         if (sendResult != null) {
            return sendResult;
         }

         String info = String.format("Send [%d] times, still failed, cost [%d]ms, " +
                         "Topic: %s, BrokersSent: %s", times,
                 System.currentTimeMillis() - beginTimestampFirst,
                 msg.getTopic(),
                 Arrays.toString(brokersSent));

         info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

         MQClientException mqClientException = new MQClientException(info, exception);
         if (callTimeout) {
            throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
         }

         if (exception instanceof MQBrokerException) {
            mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
         } else if (exception instanceof RemotingConnectException) {
            mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
         } else if (exception instanceof RemotingTimeoutException) {
            mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
         } else if (exception instanceof MQClientException) {
            mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
         }

         throw mqClientException;
      }

      List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
      if (null == nsList || nsList.isEmpty()) {
         throw new MQClientException("No name server address, please set it."
                 + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null)
                 .setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
      }

      throw new MQClientException("No route info of this topic, " 
              + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null)
              .setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
   }
}    

3.3.1 尝试获取TopicPublishInfo的路由信息

我们一步步debug进去后会发现在sendDefaultImpl()方法中先对待发送的消息进行前置的验证。如果消息的TopicBody均没有问题的话,那么会调用 — tryToFindTopicPublishInfo()方法,根据待发送消息的中包含的Topic尝试从Client端的本地缓存变量 — topicPublishInfoTable中查找,如果没有则会从NameServer上更新Topic的路由信息(其中,调用了MQClientInstance实例的updateTopicRouteInfoFromNameServer方法,最终执行的是MQClientAPIImpl实例的getTopicRouteInfoFromNameServer方法),这里分别会存在以下两种场景:

(1). 生产者第一次发送消息(此时,TopicNameServer中并不存在):因为第一次获取时候并不能从远端的NameServer上拉取下来并更新本地缓存变量 — topicPublishInfoTable成功。因此,第二次需要通过默认Topic—TBW102TopicRouteData变量来构造TopicPublishInfo对象,并更新DefaultMQProducerImpl实例的本地缓存变量——topicPublishInfoTable
另外,在该种类型的场景下,当消息发送至Broker代理服务器时,在SendMessageProcessor业务处理器的sendBatchMessage/sendMessage方法里面的super.msgCheck(ctx, requestHeader, response)消息前置校验中,会调用TopicConfigManagercreateTopicInSendMessageMethod方法,在Broker端完成新Topic的创建并持久化至配置文件中(配置文件路径:{rocketmq.home.dir}/store/config/topics.json)。(ps:该部分内容其实属于Broker有点超本篇的范围,不过由于涉及新Topic的创建因此在略微提了下)
(2). 生产者发送Topic已存在的消息:由于在NameServer中已经存在了该Topic,因此在第一次获取时候就能够取到并且更新至本地缓存变量中topicPublishInfoTable,随后tryToFindTopicPublishInfo方法直接可以return

RocketMQ中该部分的核心方法源码如下(已经加了注释):

//org.apache.rocketmq.client.impl.producer;
public class DefaultMQProducerImpl implements MQProducerInner {

    //...
    
    //根据msg的topic从topicPublishInfoTable获取对应的topicPublishInfo
    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        //step1.先从本地缓存变量topicPublishInfoTable中先get一次
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        //step1.2 然后从nameServer上更新topic路由信息
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }
        //step2 然后再从本地缓存变量topicPublishInfoTable中再get一次
        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {//第一次的时候isDefault为false,第二次的时候default为true,即为用默认的topic的参数进行更新
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true,
                    this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }
    
    /**
     * 本地缓存中不存在时从远端的NameServer注册中心中拉取Topic路由信息
     *
     * @param topic
     * @param timeoutMillis
     * @param allowTopicNotExist
     * @return
     * @throws MQClientException
     * @throws InterruptedException
     * @throws RemotingTimeoutException
     * @throws RemotingSendRequestException
     * @throws RemotingConnectException
     */
    public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, 
                                                          final long timeoutMillis,
                                                          boolean allowTopicNotExist) 
            throws MQClientException, InterruptedException, RemotingTimeoutException, 
            RemotingSendRequestException, RemotingConnectException {
        GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
        //设置请求头中的Topic参数后,发送获取Topic路由信息的request请求给NameServer
        requestHeader.setTopic(topic);        
        //这里由于是同步方式发送,所以直接return response的响应
        RemotingCommand request = RemotingCommand.createRequestCommand(
                RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);        
        response != null;
        //如果NameServer中不存在待发送消息的Topic
        switch (response.getCode()) {            
            case ResponseCode.TOPIC_NOT_EXIST: {                
                if (allowTopicNotExist && !topic.equals(MixAll.DEFAULT_TOPIC)) {
                    log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
                }                
                break;
            }            
            //如果获取Topic存在,则成功返回,利用TopicRouteData进行解码,且直接返回TopicRouteData
            case ResponseCode.SUCCESS: {                
                byte[] body = response.getBody();                
                if (body != null) {                    
                    return TopicRouteData.decode(body, TopicRouteData.class);
                }
            }            
            default:                
                break;
        }        
        throw new MQClientException(response.getCode(), response.getRemark());
    }
}

TopicRouteData转换至TopicPublishInfo路由信息的映射图如下:

Client中TopicRouteData到TopicPublishInfo的映射.jpg

其中,上面的TopicRouteDataTopicPublishInfo路由信息变量大致如下:

TopicPublishInfo类是用于producer端做负载均衡的关键类,producer通过这个类来识别broker并选择broker

//org.apache.rocketmq.client.impl.producer;
public class TopicPublishInfo {
    //topic是有序的
    private boolean orderTopic = false;
    //topic路由消息是有效的
    private boolean haveTopicRouterInfo = false;
    //消息队列集合
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    //上次消费的messageQueue记录
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    //topic路由消息集合
    private TopicRouteData topicRouteData;
    
    //...
}

MessageQueue类:

//org.apache.rocketmq.common.message;
public class MessageQueue implements Comparable<MessageQueue>, Serializable {
    
    private static final long serialVersionUID = 6191200464116433425L;
    //当前messageQueue的topic
    private String topic;
    //当前messageQueue属于哪个broker
    private String brokerName;
    //当前messageQueue的id
    private int queueId;
    
    //...
} 
  • 描述了单个消息队列的模型;
  • 这个队列用于管理哪个topic以及这个队列在哪个broker里!
//org.apache.rocketmq.remoting.protocol.route;
public class TopicRouteData extends RemotingSerializable {
    private String orderTopicConf;
    //消息队列集合
    private List<QueueData> queueDatas;
    //broker集合
    private List<BrokerData> brokerDatas;
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    
    //...
}    

NameSpace拿路由信息的方法

//org.apache.rocketmq.client.impl.factory;
public class MQClientInstance {
    
    //...
   
    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
                                                     DefaultMQProducer defaultMQProducer) {
        // 去 NameSpace 拿路由信息  
        TopicRouteData topicRouteData;
        if (isDefault && defaultMQProducer != null) {
            topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(
                        clientConfig.getMqClientApiTimeout());
            if (topicRouteData != null) {
                for (QueueData data : topicRouteData.getQueueDatas()) {
                    int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(),
                        data.getReadQueueNums());
                    data.setReadQueueNums(queueNums);
                    data.setWriteQueueNums(queueNums);
                }
            }
        } else {
            topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic,
                    clientConfig.getMqClientApiTimeout());
        }

        if (topicRouteData != null) {
            // 获取路由信息  
            TopicRouteData old = this.topicRouteTable.get(topic);
            // 对比  
            boolean changed = topicRouteData.topicRouteDataChanged(old);
            if (!changed) {
                // 发生变化就改路由信息  
                changed = this.isNeedUpdateTopicRouteInfo(topic);
            } else {
                log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
            }

            if (changed) {

                // 更新 Broker 地址缓存表  
                for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                    this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                }

                // Update endpoint map  
                {
                    ConcurrentMap<MessageQueue, String> mqEndPoints = 
                            topicRouteData2EndpointsForStaticTopic(topic, topicRouteData);
                    if (!mqEndPoints.isEmpty()) {
                        topicEndPointsTable.put(topic, mqEndPoints);
                    }
                }
                // Update Pub info  
                {
                    // 将 topicRouteData 转换为 TopicPublishInfo  
                    TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                    publishInfo.setHaveTopicRouterInfo(true);
                    for (Entry<String, MQProducerInner> entry : this.producerTable.entrySet()) {
                        MQProducerInner impl = entry.getValue();
                        if (impl != null) {
                            impl.updateTopicPublishInfo(topic, publishInfo);
                        }
                    }
                }

                // Update sub info  
                if (!consumerTable.isEmpty()) {
                    // 更新 consumer 信息  
                    Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                    for (Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
                        MQConsumerInner impl = entry.getValue();
                        if (impl != null) {
                            impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                        }
                    }
                }
                TopicRouteData cloneTopicRouteData = new TopicRouteData(topicRouteData);
                log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
                this.topicRouteTable.put(topic, cloneTopicRouteData);
                return true;
            } else {
                //....
            }
        }
        return false;
    }
}

3.3.2 选择消息发送的队列

在获取了TopicPublishInfo路由信息后,RocketMQ的客户端在默认方式下,selectOneMessageQueue()方法会从TopicPublishInfo中的messageQueueList中选择一个队列(MessageQueue)进行发送消息。具体的容错策略均在MQFaultStrategy这个类中定义:

//org.apache.rocketmq.client.latency;
public class MQFaultStrategy {    
    
    //维护每个Broker发送消息的延迟
    private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();    
    
    //发送消息延迟容错开关
    private boolean sendLatencyFaultEnable = false;    
    
    //延迟级别数组
    private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    //不可用时长数组
    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
    
    //...
}

这里通过一个sendLatencyFaultEnable开关来进行选择采用下面哪种方式:

  1. sendLatencyFaultEnable=true:启用Broker故障延迟机制。在随机递增取模的基础上,再过滤掉not availableBroker代理。所谓的"latencyFaultTolerance",是指对之前失败的,按一定的时间做退避。例如,如果上次请求的latency超过550Lms,就退避3000Lms;超过1000L,就退避60000L
  2. sendLatencyFaultEnable=false(默认关闭):默认不启用Broker故障延迟机制。采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息。
//org.apache.rocketmq.client.latency;
public class MQFaultStrategy {
    
    //...
   
    /**
     * 根据sendLatencyFaultEnable开关是否打开来分两种情况选择队列发送消息
     * @param tpInfo
     * @param lastBrokerName
     * @return
     */
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo,
                                              final String lastBrokerName) {
        if (this.sendLatencyFaultEnable) {
            try {
                //1.在随机递增取模的基础上,再过滤掉not available的Broker代理;对之前失败的,按一定的时间做退避
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
            return tpInfo.selectOneMessageQueue();
        }
        //2.采用随机递增取模的方式选择一个队列 (MessageQueue )来发送消息
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }
}

3.3.3 发送封装后的RemotingCommand数据包

在选择完发送消息的队列后,RocketMQ就会调用sendKernelImpl()方法发送消息(该方法为,通过RocketMQRemoting通信模块真正发送消息的核心)。

//org.apache.rocketmq.client.impl.producer;
public class DefaultMQProducerImpl implements MQProducerInner {

    //...
   
    private SendResult sendKernelImpl(final Message msg, final MessageQueue mq,
                                      final CommunicationMode communicationMode,
                                      final SendCallback sendCallback,
                                      final TopicPublishInfo topicPublishInfo,
                                      final long timeout) 
            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        //获取broker信息
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        //如果没有找到,则更新路由信息
        if (null == brokerAddr) {
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

        SendMessageContext context = null;
        if (brokerAddr != null) {
            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(),
                    brokerAddr);

            byte[] prevBody = msg.getBody();
            try {
                //for MessageBatch,ID has been set in the generating process
                if (!(msg instanceof MessageBatch)) {
                    MessageClientIDSetter.setUniqID(msg);
                }

                boolean topicWithNamespace = false;
                if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
                    msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
                    topicWithNamespace = true;
                }

                int sysFlag = 0;
                boolean msgBodyCompressed = false;
                if (this.tryToCompressMessage(msg)) {
                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                    msgBodyCompressed = true;
                }

                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                }
                //是否禁用hook
                if (hasCheckForbiddenHook()) {
                    CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                    checkForbiddenContext.setCommunicationMode(communicationMode);
                    checkForbiddenContext.setBrokerAddr(brokerAddr);
                    checkForbiddenContext.setMessage(msg);
                    checkForbiddenContext.setMq(mq);
                    checkForbiddenContext.setUnitMode(this.isUnitMode());
                    this.executeCheckForbiddenHook(checkForbiddenContext);
                }

                if (this.hasSendMessageHook()) {
                    context = new SendMessageContext();
                    context.setProducer(this);
                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                    context.setCommunicationMode(communicationMode);
                    context.setBornHost(this.defaultMQProducer.getClientIP());
                    context.setBrokerAddr(brokerAddr);
                    context.setMessage(msg);
                    context.setMq(mq);
                    context.setNamespace(this.defaultMQProducer.getNamespace());
                    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    if (isTrans != null && isTrans.equals("true")) {
                        context.setMsgType(MessageType.Trans_Msg_Half);
                    }

                    if (msg.getProperty("__STARTDELIVERTIME") != null 
                            || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                        context.setMsgType(MessageType.Delay_Msg);
                    }
                    this.executeSendMessageHookBefore(context);
                }

                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                requestHeader.setTopic(msg.getTopic());
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer
                        .getDefaultTopicQueueNums());
                requestHeader.setQueueId(mq.getQueueId());
                requestHeader.setSysFlag(sysFlag);
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                requestHeader.setFlag(msg.getFlag());
                requestHeader.setProperties(MessageDecoder
                        .messageProperties2String(msg.getProperties()));
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                requestHeader.setBatch(msg instanceof MessageBatch);
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if (reconsumeTimes != null) {
                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                    }

                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                    if (maxReconsumeTimes != null) {
                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                    }
                }

                SendResult sendResult = null;
                switch (communicationMode) {
                    case ASYNC:
                        Message tmpMessage = msg;
                        boolean messageCloned = false;
                        if (msgBodyCompressed) {
                            //If msg body was compressed, msgbody should be reset using prevBody.
                            //Clone new message using commpressed message body and recover origin massage.
                            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            messageCloned = true;
                            msg.setBody(prevBody);
                        }

                        if (topicWithNamespace) {
                            if (!messageCloned) {
                                tmpMessage = MessageAccessor.cloneMessage(msg);
                                messageCloned = true;
                            }
                            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(),
                                    this.defaultMQProducer.getNamespace()));
                        }

                        long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeAsync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            tmpMessage,
                            requestHeader,
                            timeout - costTimeAsync,
                            communicationMode,
                            sendCallback,
                            topicPublishInfo,
                            this.mQClientFactory,
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                            context,
                            this);
                        break;
                    case ONEWAY:
                    case SYNC:
                        long costTimeSync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeSync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout - costTimeSync,
                            communicationMode,
                            context,
                            this);
                        break;
                    default:
                        assert false;
                        break;
                }

                if (this.hasSendMessageHook()) {
                    context.setSendResult(sendResult);
                    this.executeSendMessageHookAfter(context);
                }

                return sendResult;
            } catch (RemotingException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (MQBrokerException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (InterruptedException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } finally {
                msg.setBody(prevBody);
                msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(),
                        this.defaultMQProducer.getNamespace()));
            }
        }

        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }
}

在该方法内总共完成以下几个步流程:

  1. 根据前面获取到的MessageQueue中的brokerName,调用MQClientInstance实例的findBrokerAddressInPublish()方法,得到待发送消息中存放的Broker代理服务器地址,如果没有找到则更新路由信息;
  2. 如果没有禁用hasSendMessageHook,则发送消息前后会有钩子函数的执行(executeSendMessageHookBefore()/executeSendMessageHookAfter()方法);
  3. MQClientAPIImplsendMessageSync方法中将与该消息相关信息封装成RemotingCommand数据包,其中请求码RequestCode为以下几种之一:
    a. SEND_MESSAGE(普通发送消息)
    b. SEND_MESSAGE_V2(优化网络数据包发送)
    c. SEND_BATCH_MESSAGE(消息批量发送)
  4. 根据获取到的Broke代理服务器地址,将封装好的RemotingCommand数据包发送对应的Broker上,默认发送超时间为3s
  5. 这里,真正调用RocketMQRemoting通信模块完成消息发送是在MQClientAPIImpl实例sendMessageSync()方法中,代码具体如下:
//org.apache.rocketmq.client.impl;
public class MQClientAPIImpl implements NameServerUpdateCallback {

    //...

    public SendResult sendMessage(
           final String addr,
           final String brokerName,
           final Message msg,
           final SendMessageRequestHeader requestHeader,
           final long timeoutMillis,
           final CommunicationMode communicationMode,
           final SendCallback sendCallback,
           final TopicPublishInfo topicPublishInfo,
           final MQClientInstance instance,
           final int retryTimesWhenSendFailed,
           final SendMessageContext context,
           final DefaultMQProducerImpl producer
    ) throws RemotingException, MQBrokerException, InterruptedException {

        long beginStartTime = System.currentTimeMillis();
        RemotingCommand request = null;
        if (sendSmartMsg || msg instanceof MessageBatch) {
            SendMessageRequestHeaderV2 requestHeaderV2 =
                    SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch
                    ? RequestCode.SEND_BATCH_MESSAGE
                    : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
        } else {
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
        }

        request.setBody(msg.getBody());

        //发送方式
        switch (communicationMode) {
            //
            case ONEWAY:
                this.remotingClient.invokeOneway(addr, request, timeoutMillis);
                return null;
            case ASYNC:
                final AtomicInteger times = new AtomicInteger();
                long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeAsync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync,
                        request, sendCallback, topicPublishInfo, instance,
                        retryTimesWhenSendFailed, times, context, producer);
                return null;
            case SYNC:
                long costTimeSync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeSync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync,
                        request);
            default:
                assert false;
                break;
        }

        return null;
    }
}    
  1. processSendResponse方法对发送正常和异常情况分别进行不同的处理并返回sendResult对象;
  2. 发送返回后,调用updateFaultItem更新Broker代理服务器的可用时间;
  3. 对于异常情况,且标志位 — retryAnotherBrokerWhenNotStoreOK,设置为true时,在发送失败的时候,会选择换一个Broker

在生产者发送完成消息后,客户端日志打印如下:

SendResult [sendStatus=SEND_OK, msgId=020003670EC418B4AAC208AD46930000, offsetMsgId=AC1415A200002A9F000000000000017A, messageQueue=MessageQueue [topic=TopicTest, brokerName=HQSKCJJIDRRD6KC, queueId=2], queueOffset=1]

3.4 Broker代理服务器的消息处理简析

Broker代理服务器中存在很多Processor业务处理器,用于处理不同类型的请求,其中一个或者多个Processor会共用一个业务处理器线程池。对于接收到消息,Broker会使用SendMessageProcessor这个业务处理器来处理。SendMessageProcessor会依次做以下处理:

  1. 消息前置校验,包括broker是否可写、校验queueId是否超过指定大小、消息中的Topic路由信息是否存在,如果不存在就新建一个。这里与上文中“尝试获取TopicPublishInfo的路由信息”一节中介绍的内容对应。如果Topic路由信息不存在,则Broker端日志输出如下:
2018-06-14 17:17:24 INFO SendMessageThread_1 - receive SendMessage request command, RemotingCommand [code=310, language=JAVA, version=252, opaque=6, flag(B)=0, remark=null, extFields={a=ProducerGroupName, b=TopicTest, c=TBW102, d=4, e=2, f=0, g=1528967815569, h=0, i=KEYSOrderID188UNIQ_KEY020003670EC418B4AAC208AD46930000WAITtrueTAGSTagA, j=0, k=false, m=false}, serializeTypeCurrentRPC=JSON]2018-06-14 17:17:24 WARN SendMessageThread_1 - the topic TopicTest not exist, producer: /172.20.21.162:626612018-06-14 17:17:24 INFO SendMessageThread_1 - Create new topic by default topic:[TBW102] config:[TopicConfig [topicName=TopicTest, readQueueNums=4, writeQueueNums=4, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]] producer:[172.20.21.162:62661]

Topic路由信息新建后,第二次消息发送后,Broker端日志输出如下:

2018-08-02 16:26:13 INFO SendMessageThread_1 - receive SendMessage request command, RemotingCommand [code=310, language=JAVA, version=253, opaque=6, flag(B)=0, remark=null, extFields={a=ProducerGroupName, b=TopicTest, c=TBW102, d=4, e=2, f=0, g=1533198373524, h=0, i=KEYSOrderID188UNIQ_KEY020003670EC418B4AAC208AD46930000WAITtrueTAGSTagA, j=0, k=false, m=false}, serializeTypeCurrentRPC=JSON]2018-08-02 16:26:13 INFO SendMessageThread_1 - the msgInner's content is:MessageExt [queueId=2, storeSize=0, queueOffset=0, sysFlag=0, bornTimestamp=1533198373524, bornHost=/172.20.21.162:53914, storeTimestamp=0, storeHost=/172.20.21.162:10911, msgId=null, commitLogOffset=0, bodyCRC=0, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={KEYS=OrderID188, UNIQ_KEY=020003670EC418B4AAC208AD46930000, WAIT=true, TAGS=TagA}, body=11body's content is:Hello world]]
  1. 构建MessageExtBrokerInner
  2. 调用“brokerController.getMessageStore().putMessage”MessageExtBrokerInner做落盘持久化处理;
  3. 根据消息落盘结果(正常/异常情况),BrokerStatsManager做一些统计数据的更新,最后设置Response并返回;

标签:topic,发送,源码,mq,msg,null,final,RocketMQ
From: https://www.cnblogs.com/ciel717/p/17363795.html

相关文章

  • RocketMQ之消息接收源码分析
    一、概述对于任何一款消息中间件而言,消费者客户端一般有两种方式从消息中间件获取消息并消费:Push方式:由消息中间件(MQ消息服务器代理)主动地将消息推送给消费者;采用Push方式,可以尽可能实时地将消息发送给消费者进行消费。但是,在消费者的处理消息的能力较弱的时候(比如,消费者端......
  • 阿里云短信发送接入
    前提1、开通开通阿里云短信服务2、申请签名、申请模版3、提前创建好阿里云接口访问的AccessKeyID与AccessKeySecret核心依赖<dependency><groupId>com.aliyun</groupId><artifactId>aliyun-java-sdk-core</artifactId><version>4.6.0</version></dep......
  • RocketMQ单机版安装
    1、下载最新的安装包  github下载地址:https://github.com/apache/rocketmq/releases。本文安装版本为:rocketmq-all-5.1.0-bin-release.zip2、安装JDK3、上传并解压安装包#从本地电脑上传安装包到Linux服务器scpE:\[email protected]......
  • 聊聊怎样快速去阅读JDK源码?
    1.前言之前断断续续读过一部分JDK常用类的源码,这里想把过程中的一些心得和方法记录下来,如果能帮到需要的小伙伴就再好不过了!本文主要分享一下我的阅读工具和阅读顺序。PS:由于当前主流使用的JDK版本仍是1.8,因此源码阅读主要是1.8版本,有些地方可以参考1.7。2.工具......
  • [HiBench] 安装HiBench,测试在Spark上跑PageRank与修改源码测试
    [HiBench]安装HiBench,测试在Spark上跑PageRank与修改源码测试背景:我想在HiBench上测试在Spark上跑PageRank性能,并想要修改PageRank的源码进行测试。本来,HiBench在README里写的已经挺清楚的了,直接照着做就行。奈何我用的服务器没有珂学上网,所以还是遇到了一点小麻烦。下载HiBe......
  • 番外篇:分享一道用Python基础+蒙特卡洛算法实现排列组合的题目(附源码)
    今日鸡汤夕阳无限好,只是近黄昏。    大家好,我是Python进阶者。    是不是觉得很诧异?明明上周刚发布了这篇:分享一道用Python基础+蒙特卡洛算法实现排列组合的题目(附源码),今天又来一篇,名曰番外篇!其实今天是想给大家分享【......
  • 分享一道用Python基础+蒙特卡洛算法实现排列组合的题目(附源码)
    今日鸡汤沙场烽火连胡月,海畔云山拥蓟城。    大家好,我是Python进阶者。这篇文章的题目真的是很难取,索性先取这个了,装个13好了。前言    前几天在才哥交流群里,有个叫【RickXiang】的粉丝在Python交流群里问了一道关于排列组合的问题,初步一看觉得很简单,实际上确实是有难度的......
  • django视图层与cbv源码分析
    目录一、视图层之必会三板斧二、JsonResponse对象两种序列化数据的方式方式一:使用json模块方式二:使用JsonResponse对象使用JsonResponse对象序列化除字典外的数据类型如果给JsonResponse对象内部的json代码传参三、视图层之request对象获取文件四、视图层之FBV与CBV概念介绍五、CB......
  • RocketMQ之消息轨迹
    一、概述消息轨迹是用来跟踪记录消息发送、消息消费的轨迹。如何启用消息轨迹?broker端需要在broker端的配置文件中添加配置项:traceTopicEnable=true,注意:对于消息轨迹数据量较大的场景,可以在RocketMQ集群中选择其中一个Broker节点专用于存储消息轨迹,使得用户普通的消息数据与......
  • RocketMQ之通信机制
    一、概述RocketMQ消息队列集群主要包括NameServer、Broker(Master/Slave)、Producer、Consumer4个角色,基本通讯流程如下:Broker启动后需要完成一次将自己注册至NameServer的操作;随后每隔30s时间定时向NameServer上报Topic路由信息。消息生产者Producer作为客户端发送消息时候,需......