首页 > 其他分享 >RocketMQ之消息轨迹

RocketMQ之消息轨迹

时间:2023-05-06 13:55:36浏览次数:55  
标签:轨迹 String 消息 context msg final RocketMQ

一、概述

消息轨迹是用来跟踪记录消息发送、消息消费的轨迹。

如何启用消息轨迹?

  • broker端

需要在broker端的配置文件中添加配置项:traceTopicEnable=true,注意:对于消息轨迹数据量较大的场景,可以在RocketMQ集群中选择其中一个Broker节点专用于存储消息轨迹,使得用户普通的消息数据与消息轨迹数据的物理IO完全隔离,互不影响。

  • 客户端

producer端和consumer端需要启用消息轨迹,具体是在初始化客户端时打开打开启用消息轨迹的开关并根据实际需求决定是否使用默认的topic来存储消息轨迹

producer端消息轨迹相关API

public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace);

public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic);

public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,
    final String customizedTraceTopic);

public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, 
    boolean enableMsgTrace, final String customizedTraceTopic);

consumer端消息轨迹相关API

public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace);

public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic);

public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, 
    AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic);

public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
    AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic);
  • 消息轨迹存储的topic

默认情况下消息轨迹是存储在RMQ_SYS_TRACE_TOPIC,此外消息轨迹还可以存储在用户自定义的topic中,注意:自定义的topic需要提前创建

二、案例

public class TraceProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);
        producer.start();

        for (int i = 0; i < 128; i++)
            try {
                {
                    Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        producer.shutdown();
    }
}
public class TracePushConsumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        // Here,we use the default message track trace topic name
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1", true);
        consumer.subscribe("TopicTest", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // Wrong time format 2017_0422_221800
        consumer.setConsumeTimestamp("20181109221800");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n",
                        Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

三、源码分析

下面分别从producer端、consumer端看看消息轨迹是如何产生以及如何发送到broker端。

3.1 producer端

3.1.1 初始化producer

producer初始化阶段会完成以下操作:

  • 初始化producer
  • 初始化AsyncTraceDispatcher对象traceDispatcherAsyncTraceDispatcher是实现消息轨迹功能的重点,后面会详细介绍
  • producer端会注册SendMessageHook
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
        boolean enableMsgTrace, final String customizedTraceTopic) {
    this.namespace = namespace;
    this.producerGroup = producerGroup;
    defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
    //if client open the message trace feature
    if (enableMsgTrace) {
        try {
            AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup,
                TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
            dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
            traceDispatcher = dispatcher;
            this.getDefaultMQProducerImpl().registerSendMessageHook(
                    new SendMessageTraceHookImpl(traceDispatcher));
        } catch (Throwable e) {
            log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
        }
    }
}

3.1.2 启动producer

在启动producer的时候会启动traceDispatcher,下面详细看下启动traceDispatcher的过程中都完成了哪些操作:

  • AsyncTraceDispatchertraceProducer的作用是发送消息轨迹到broker,这里会启动traceProducer
  • 会启动一个worker线程,其完成的任务封装在AsyncRunnable
  • 注册一个shutdownHook
public void start() throws MQClientException {
    this.setProducerGroup(withNamespace(this.producerGroup));
    this.defaultMQProducerImpl.start();
    if (null != traceDispatcher) {
        try {
            traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
        } catch (MQClientException e) {
            log.warn("trace dispatcher start failed ", e);
        }
    }
}

public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
    if (isStarted.compareAndSet(false, true)) {
        traceProducer.setNamesrvAddr(nameSrvAddr);
        traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
        traceProducer.start();
    }
    this.accessChannel = accessChannel;
    this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
    this.worker.setDaemon(true);
    this.worker.start();
    this.registerShutDownHook();
}

下面详细分析AsyncRunnable,从其run函数可以看到它主要完成的任务是从traceContextQueue队列中获取消息轨迹上下文信息,这里会获取100条信息,然后会根据这100TraceContext初始化一个AsyncAppenderRequest对象,最后将其提交到traceExecutor线程池中(注意:traceContextQueuebatchSize以及traceExecutor是在初始化traceDispatcher时确定的)。这里会有一个疑问:AsyncAppenderRequest的作用是什么?带着这个问题我们继续看AsyncAppenderRequest

public void run() {
    while (!stopped) {
        List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);
        for (int i = 0; i < batchSize; i++) {
            TraceContext context = null;
            try {
                //get trace data element from blocking Queue — traceContextQueue
                context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }
            if (context != null) {
                contexts.add(context);
            } else {
                break;
            }
        }
        if (contexts.size() > 0) {
            AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
            traceExecutor.submit(request);
        } else if (AsyncTraceDispatcher.this.stopped) {
            this.stopped = true;
        }
    }
}

AsyncAppenderRequestrun方法非常简单,就是执行sendTraceData方法,该方法是将消息轨迹上下文信息进行组装并调用flushData方法发送数据,在这个方法中还会调用encoderFromContextBean方法对消息轨迹上下信息重新编码的处理。flushData方法则是按照maxMessageSize(默认是4M)对数据进行分批次,然后调用sendTraceDataByMQ方法发送分批次后的数据,从sendTraceDataByMQ方法可以看到是traceProducer在承担发送消息轨迹的角色,发送消息轨迹是异步发送以及发送消息轨迹的请求与发送普通消息的请求类型是一样的

public void run() {
    sendTraceData(contextList);
}

public void sendTraceData(List<TraceContext> contextList) {
    Map<String, List<TraceTransferBean>> transBeanMap = new HashMap<String, List<TraceTransferBean>>();
    for (TraceContext context : contextList) {
        if (context.getTraceBeans().isEmpty()) {
            continue;
        }
        // Topic value corresponding to original message entity content
        String topic = context.getTraceBeans().get(0).getTopic();
        String regionId = context.getRegionId();
        // Use  original message entity's topic as key
        String key = topic;
        if (!StringUtils.isBlank(regionId)) {
            key = key + TraceConstants.CONTENT_SPLITOR + regionId;
        }
        List<TraceTransferBean> transBeanList = transBeanMap.get(key);
        if (transBeanList == null) {
            transBeanList = new ArrayList<TraceTransferBean>();
            transBeanMap.put(key, transBeanList);
        }
        TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context);
        transBeanList.add(traceData);
    }
    for (Map.Entry<String, List<TraceTransferBean>> entry : transBeanMap.entrySet()) {
        String[] key = entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
        String dataTopic = entry.getKey();
        String regionId = null;
        if (key.length > 1) {
            dataTopic = key[0];
            regionId = key[1];
        }
        flushData(entry.getValue(), dataTopic, regionId);
    }
}


private void flushData(List<TraceTransferBean> transBeanList, String dataTopic, String regionId) {
    if (transBeanList.size() == 0) {
        return;
    }
    // Temporary buffer
    StringBuilder buffer = new StringBuilder(1024);
    int count = 0;
    Set<String> keySet = new HashSet<String>();

    for (TraceTransferBean bean : transBeanList) {
        // Keyset of message trace includes msgId of or original message
        keySet.addAll(bean.getTransKey());
        buffer.append(bean.getTransData());
        count++;
        // Ensure that the size of the package should not exceed the upper limit.
        if (buffer.length() >= traceProducer.getMaxMessageSize()) {
            sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId);
            // Clear temporary buffer after finishing
            buffer.delete(0, buffer.length());
            keySet.clear();
            count = 0;
        }
    }
    //count大于0表示还有剩余的消息轨迹,但是剩余的数据没有达到4M,这里是发送剩余的消息轨迹
    if (count > 0) {
        sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId);
    }
    transBeanList.clear();
}

到这里我们知道了producer端是如何组装消息轨迹以及发送消息轨迹,那么消息轨迹的来源呢?

3.1.3 发送消息

producer在发送消息前会判断其sendMessageHookList是否为空,这里由于在producer初始化时在producer端会注册SendMessageHook,所以不为空。由于sendMessageHookList不为空,此时会构建SendMessageContext对象,并调用executeSendMessageHookBefore函数在SendMessageContext对象中设置好TraceContextTraceBeanList。接着producer会发送消息,在发送完消息后会将消息发送的结果设置到SendMessageContext中并调用executeSendMessageHookAfter函数根据SendMessageContext构建TraceContext对象,最终会将构建的TraceContext对象放到traceContextQueue队列中。这里需要注意当在发送消息的过程中出现exception时会先将具体的exception设置到SendMessageContext然后再调用executeSendMessageHookAfter函数。

private SendResult sendKernelImpl(final Message msg,
    final MessageQueue mq,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final TopicPublishInfo topicPublishInfo,
    final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    
    long beginStartTime = System.currentTimeMillis();
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    if (null == brokerAddr) {
        tryToFindTopicPublishInfo(mq.getTopic());
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    }

    SendMessageContext context = null;
    if (brokerAddr != null) {
        brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

        byte[] prevBody = msg.getBody();
        try {
            //for MessageBatch,ID has been set in the generating process
            if (!(msg instanceof MessageBatch)) {
                MessageClientIDSetter.setUniqID(msg);
            }

            boolean topicWithNamespace = false;
            if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
                msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
                topicWithNamespace = true;
            }

            int sysFlag = 0;
            boolean msgBodyCompressed = false;
            if (this.tryToCompressMessage(msg)) {
                sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                msgBodyCompressed = true;
            }

            final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
            if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
            }

            if (hasCheckForbiddenHook()) {
                CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                checkForbiddenContext.setCommunicationMode(communicationMode);
                checkForbiddenContext.setBrokerAddr(brokerAddr);
                checkForbiddenContext.setMessage(msg);
                checkForbiddenContext.setMq(mq);
                checkForbiddenContext.setUnitMode(this.isUnitMode());
                this.executeCheckForbiddenHook(checkForbiddenContext);
            }

            if (this.hasSendMessageHook()) {
                context = new SendMessageContext();
                context.setProducer(this);
                context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                context.setCommunicationMode(communicationMode);
                context.setBornHost(this.defaultMQProducer.getClientIP());
                context.setBrokerAddr(brokerAddr);
                context.setMessage(msg);
                context.setMq(mq);
                context.setNamespace(this.defaultMQProducer.getNamespace());
                String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (isTrans != null && isTrans.equals("true")) {
                    context.setMsgType(MessageType.Trans_Msg_Half);
                }

                if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                    context.setMsgType(MessageType.Delay_Msg);
                }
                this.executeSendMessageHookBefore(context);
            }

            SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
            requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
            requestHeader.setTopic(msg.getTopic());
            requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
            requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setSysFlag(sysFlag);
            requestHeader.setBornTimestamp(System.currentTimeMillis());
            requestHeader.setFlag(msg.getFlag());
            requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
            requestHeader.setReconsumeTimes(0);
            requestHeader.setUnitMode(this.isUnitMode());
            requestHeader.setBatch(msg instanceof MessageBatch);
            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                if (reconsumeTimes != null) {
                    requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                }

                String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                if (maxReconsumeTimes != null) {
                    requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                }
            }

            SendResult sendResult = null;
            switch (communicationMode) {
                case ASYNC:
                    Message tmpMessage = msg;
                    boolean messageCloned = false;
                    if (msgBodyCompressed) {
                        //If msg body was compressed, msgbody should be reset using prevBody.
                        //Clone new message using commpressed message body and recover origin massage.
                        //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                        tmpMessage = MessageAccessor.cloneMessage(msg);
                        messageCloned = true;
                        msg.setBody(prevBody);
                    }

                    if (topicWithNamespace) {
                        if (!messageCloned) {
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            messageCloned = true;
                        }
                        msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
                    }

                    long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeAsync) {
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        mq.getBrokerName(),
                        tmpMessage,
                        requestHeader,
                        timeout - costTimeAsync,
                        communicationMode,
                        sendCallback,
                        topicPublishInfo,
                        this.mQClientFactory,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                        context,
                        this);
                    break;
                case ONEWAY:
                case SYNC:
                    long costTimeSync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeSync) {
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        mq.getBrokerName(),
                        msg,
                        requestHeader,
                        timeout - costTimeSync,
                        communicationMode,
                        context,
                        this);
                    break;
                default:
                    assert false;
                    break;
            }

            if (this.hasSendMessageHook()) {
                context.setSendResult(sendResult);
                this.executeSendMessageHookAfter(context);
            }

            return sendResult;
        } catch (RemotingException e) {
            if (this.hasSendMessageHook()) {
                context.setException(e);
                this.executeSendMessageHookAfter(context);
            }
            throw e;
        } catch (MQBrokerException e) {
            if (this.hasSendMessageHook()) {
                context.setException(e);
                this.executeSendMessageHookAfter(context);
            }
            throw e;
        } catch (InterruptedException e) {
            if (this.hasSendMessageHook()) {
                context.setException(e);
                this.executeSendMessageHookAfter(context);
            }
            throw e;
        } finally {
            msg.setBody(prevBody);
            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
        }
    }

    throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

3.2 consumer端

producer一样,consumer在其初始化阶段会初始化AsyncTraceDispatcher对象并在consumer中注册hook,只是这里注册的是ConsumeMessageHook。在其启动阶段会启动AsyncTraceDispatcher对象。也就是说在consumer端消息轨迹的组装以及发送与producer是完全一样的。那么在consumer端消息轨迹是如何产生的呢?

consumer端会在ConsumeMessageService中构建ConsumeRequest并将其提交到consumeExecutor线程池中执行。消息轨迹就是在执行ConsumeRequest时产生的,与producer不同,consumer会在执行ConsumeRequest时产生两条消息轨迹,在调用consumeMessage方法前后会分别调用executeHookBeforeexecuteHookAfter,根据ConsumeMessageContext来构建TraceContext,并将其添加到traceContextQueue。这里有个细节需要注意:由于consumer在消费消息时会一次消费多条消息,所以会给每条消息构建一个TraceBean,而在producer端由于每次发送一条Message或者MessageBatch所以在构建TraceContext时其TraceBean对象只有一个。

public void run() {
    if (this.processQueue.isDropped()) {
        log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
        return;
    }

    MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
    ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
    ConsumeConcurrentlyStatus status = null;
    defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());

    ConsumeMessageContext consumeMessageContext = null;
    if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
        consumeMessageContext = new ConsumeMessageContext();
        consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
        consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
        consumeMessageContext.setProps(new HashMap<String, String>());
        consumeMessageContext.setMq(messageQueue);
        consumeMessageContext.setMsgList(msgs);
        consumeMessageContext.setSuccess(false);
        ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
    }

    long beginTimestamp = System.currentTimeMillis();
    boolean hasException = false;
    ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
    try {
        if (msgs != null && !msgs.isEmpty()) {
            for (MessageExt msg : msgs) {
                MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
            }
        }
        status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
    } catch (Throwable e) {
        log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", 
            RemotingHelper.exceptionSimpleDesc(e),
            ConsumeMessageConcurrentlyService.this.consumerGroup,
            msgs,
            messageQueue);
        hasException = true;
    }     
    long consumeRT = System.currentTimeMillis() - beginTimestamp;
    if (null == status) {
        if (hasException) {
            returnType = ConsumeReturnType.EXCEPTION;
        } else {
            returnType = ConsumeReturnType.RETURNNULL;
        }
    } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
        returnType = ConsumeReturnType.TIME_OUT;
    } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
        returnType = ConsumeReturnType.FAILED;
    } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
        returnType = ConsumeReturnType.SUCCESS;
    }

    if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
        consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
    }

    if (null == status) {
        log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
            ConsumeMessageConcurrentlyService.this.consumerGroup,
            msgs,
            messageQueue);
        status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }

    if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
        consumeMessageContext.setStatus(status.toString());
        consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
        ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
    }

    ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
        .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

    if (!processQueue.isDropped()) {
        ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
    } else {
        log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
    }
}

四、消息轨迹中存储的信息

在分析完RocketMQ中是如何实现消息轨迹后,我们来总结下消息轨迹中都记录了哪些信息。在RocketMQ中消息轨迹的信息封装在TraceContextTraceBean以及TraceType中。

4.1 TraceContext

TraceContext中存储的是消息轨迹的上下文信息,具体包含:消息轨迹的类型、broker所属RegionIDbroker所属Region的名称、组名、耗时、被追踪的消息是否发送成功、请求ID、消费结果返回的状态码、轨迹数据的集合

/**
 * The context of Trace
 */
public class TraceContext implements Comparable<TraceContext> {

    private TraceType traceType;
    private long timeStamp = System.currentTimeMillis();
    private String regionId = "";
    private String regionName = "";
    private String groupName = "";
    private int costTime = 0;
    private boolean isSuccess = true;
    private String requestId = MessageClientIDSetter.createUniqID();
    private int contextCode = 0;
    private List<TraceBean> traceBeans;

    //...
}

4.2 TraceBean

TraceBean中存储的是消息轨迹数据,具体包含:消息topic、消息ID、消息offset、消息tag、消息key、消息存储地址、消息客户端地址、消息存储时间、消息重试次数、消息长度、消息类型

public class TraceBean {
    private static final String LOCAL_ADDRESS = UtilAll.ipToIPv4Str(UtilAll.getIP());
    private String topic = "";
    private String msgId = "";
    private String offsetMsgId = "";
    private String tags = "";
    private String keys = "";
    private String storeHost = LOCAL_ADDRESS;
    private String clientHost = LOCAL_ADDRESS;
    private long storeTime;
    private int retryTimes;
    private int bodyLength;
    private MessageType msgType;
}

4.3 TraceType

TraceType表示消息轨迹的类型

  • Pub:消息发送
  • SubBefore:消息消费前
  • SubAfter:消息消费后
public enum TraceType {
    Pub,
    SubBefore,
    SubAfter,
}

五、总结

最后我们用一张图来总结消息轨迹的工作原理:

从图中可以看出以下三点:

  1. 消息轨迹的来源:
  • producer端在发送消息后会将TraceContext添加到traceContextQueue队列中
  • consumer端在消费消息前后会分别向traceContextQueue队列中添加TraceContext
  1. 对消息轨迹的处理:

producer端和consumer端都会启动一个worker线程从traceContextQueue队列中获取一批TraceContext,然后new一个AsynAppenderRequest并提交到traceExecutor线程池中执行,最终在执行AsynAppenderRequest时会通过traceProducer来将消息轨迹发送到broker中。

  1. 消息轨迹的实现方法:

producer端和consumer端通过注册hook函数来构建消息轨迹

https://blog.csdn.net/qq_25145759/article/details/117410275

https://codingw.blog.csdn.net/article/details/98376981

标签:轨迹,String,消息,context,msg,final,RocketMQ
From: https://www.cnblogs.com/ciel717/p/17370224.html

相关文章

  • RocketMQ之通信机制
    一、概述RocketMQ消息队列集群主要包括NameServer、Broker(Master/Slave)、Producer、Consumer4个角色,基本通讯流程如下:Broker启动后需要完成一次将自己注册至NameServer的操作;随后每隔30s时间定时向NameServer上报Topic路由信息。消息生产者Producer作为客户端发送消息时候,需......
  • RocketMQ之重试机制
    一、概述Producer对发送失败的消息进行重新发送的机制,称为消息发送重试机制,也称为消息重投机制。对于消息重投,需要注意以下几点:生产者在发送消息时,若采用同步或异步发送方式,发送失败会重试,但oneway消息发送方式发送失败是没有重试机制的;只有普通消息具有发送重试机制,顺序消......
  • rocketmq启动nameserver的坑
    当你启动rocketmq时可以启动但找不到日志时:第一:减小JVM的内存其次:要到bin目录中输入启动命令。楼主就是第二步解决的 ......
  • 本地消息表模式
    分布式事务|使用dotnetcore/CAP的本地消息表模式 本地消息表模式本地消息表模式,其作为柔性事务的一种,核心是将一个分布式事务拆分为多个本地事务,事务之间通过事件消息衔接,事件消息和上个事务共用一个本地事务存储到本地消息表,再通过定时任务轮询本地消息表进行消息投递,下......
  • Rabbitmq 介绍 、安装、基于Queue实现生产者消费者模型、基本使用、消息安全之ack、du
    师承老刘llnb一、消息队列介绍1.1介绍消息队列就是基础数据结构中的“先进先出”的一种数据机构。想一下,生活中买东西,需要排队,先排的人先买消费,就是典型的“先进先出”1.2MQ解决什么问题MQ是一直存在,不过随着微服务架构的流行,成了解决微服务之间问题的常用工具。应用解耦......
  • 开源项目消息推送平台Austin
    开源项目消息推送平台Austin终于要上线了,迎来在线演示的第一版!......
  • python dingtalk钉钉群告警消息发布
    目录pythondingtalk钉钉群告警消息发布Dingtalk简介钉钉告警消息通知脚本pythondingtalk钉钉群告警消息发布公司用oprator部署的prometheus,理论上时可以直接通过alertmanager的配置。通过钉钉机器人的webhook向群里发送告警信息。但是想要格式化输出格式,并且规范化就成了问题......
  • RocketMQ消费者是如何负载均衡的
    摘要:RocketMQ支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。本文分享自华为云社区《一文讲透RocketMQ消费者是如何负载均衡的》,作者:勇哥java实战分享。RocketMQ支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。集群消费:同一Topic下的一......
  • RocketMQ消费者是如何负载均衡的
    摘要:RocketMQ支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。本文分享自华为云社区《一文讲透RocketMQ消费者是如何负载均衡的》,作者:勇哥java实战分享。RocketMQ支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。集群消费:同一Topic下的一条消......
  • RocketMQ笔记(十一):消息存储删除机制
    RocketMQ的消息采用文件进行持久化存储。1、存储目录详情RocketMQ中默认文件存储位置/root/store,文件详情如下 commitLog:消息存储目录config:运行期间一些配置信息consumerqueue:消息消费队列存储目录index:消息索引文件存储目录checkpoint:文件......