首页 > 其他分享 >好烦呀为什么rocketmq监听打印的日志没有traceId啊

好烦呀为什么rocketmq监听打印的日志没有traceId啊

时间:2023-07-26 13:56:28浏览次数:37  
标签:traceId String MDC void public 好烦 payload rocketmq

在使用springboot整合rocketmq使用方便使用注解即可实现消费,十分简洁。随之而来的是问题怎么追踪,一个链路的日志能看到是多么美好的事情。搜寻很久没有找到解决方案,于是自己搞吧!

大前提:已经在使用spring-cloud-starter-sleuth,日志格式已经含有traceId、spanId
好的,那么我们来看看吧!

生产者端:生产者产生的消息被消费者消费,那么怎么做到两个日志产生关联呢?有两个比较好的办法,一个是生产者把自己的traceId存入消息头部,消费者获取到拿出来设置自己的traceId。另一个则是生产者只管发消息,消费者把messageId作为自己的traceId也能搜寻。第一种比较适合自产自销,第二种则是A服务产B服务消的情况。我们依次来看看。

1、生产者配置,便捷的使用rocketTemplate,没有其他多余的配置,组合方式使用扩展一下就好
`

@Component
public class MyRocketMqTemplate {

@Autowired
private RocketMQTemplate rocketMQTemplate;

/**
 * 封装发送失败,进行保存并定时重试
 *
 * @param topic   主题
 * @param payload 发送内容
 */
public void sendAndMakeUp(String topic, Object payload) {
    String traceId = (String) MDC.get(TraceFilter.MDC_TRACE_ID_KEY);
           GenericMessage messageExt = new GenericMessage(payload);
    messageExt.getHeaders().put("traceId",traceId);
    rocketMQTemplate.asyncSend(topic, messageExt, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            try {

              // 异步发送traceId连接
                MDC.put(TraceFilter.MDC_TRACE_ID_KEY, traceId);
                log.info("发送完毕,返回发送结果:{}", sendResult);
                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                    // 补偿
                    makeUp(topic, payload);
                }
            }finally {
                MDC.clear();
            }

        }

        @Override
        public void onException(Throwable throwable) {
            log.error("参数:{},mq消息发送异常",payload, throwable);
            // 补偿
            makeUp(topic, payload);
        }

        private void makeUp(String topic, Object payload) {
            try {
             // 补偿逻辑
            } catch (Exception e) {
                log.error("参数:{},补偿失败",JSON.toJSONString(payload), e);
            }
        }
    });
}

`

2、消费者:简单使用@RocketMQMessageListener注解就可以开启一个消费。那么需要关注的消费之前把traceId设置进去就可以。

`
@Component
@Slf4j
public class RocketMqTraceListener {

@EventListener
public void onApplicationEvent(ApplicationReadyEvent event) {
    ConfigurableApplicationContext applicationContext = event.getApplicationContext();
    Map<String, DefaultRocketMQListenerContainer> beansOfType = applicationContext.getBeansOfType(DefaultRocketMQListenerContainer.class);
    if(beansOfType.isEmpty()){
        return;
    }
    for (Map.Entry<String, DefaultRocketMQListenerContainer> stringDefaultRocketMQListenerContainerEntry : beansOfType.entrySet()) {
        DefaultRocketMQListenerContainer value = stringDefaultRocketMQListenerContainerEntry.getValue();
     value.getConsumer().getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumeMessageHook() {
            @Override
            public String hookName() {
                return null;
            }

            @Override
            public void consumeMessageBefore(ConsumeMessageContext context) {
                String traceId = null;
                // 可能是批量消费,这个时候取一个就不合适,那么我们就自定义一个uuid
                List<MessageExt> msgList = context.getMsgList();
                if(CollectionUtils.isNotEmpty(msgList) && msgList.size() == 1){
                    traceId = msgList.get(0).getMsgId();
                }
                // 可以取生产者传递
                if (StringUtils.isEmpty(traceId)) {
                         Map<String, String> props = context.getProps();
                        traceId = props.get("traceId");
                }
                // 自定义
                if (StringUtils.isEmpty(traceId)) {
                    traceId = UUID.randomUUID().toString().replaceAll("-", "");
                }
                MDC.put(TraceFilter.MDC_TRACE_ID_KEY, traceId);
            }

            @Override
            public void consumeMessageAfter(ConsumeMessageContext context) {
                MDC.clear();
            }
        });
    }
}

}
`
特别注意:getDefaultMQPushConsumerImpl()方法在升级版本可能不再提供获取 的入口,考虑自己封装consumer。

源码解读:看源码是一个枯燥的事情,代码路径我大概列一下,感兴趣的可以看看
RocketMQAutoConfiguration -> RocketMQListenerConfiguration->ListenerContainerConfiguration->DefaultRocketMQListenerContainer->DefaultMQPushConsumer->DefaultMQPushConsumerImpl

标签:traceId,String,MDC,void,public,好烦,payload,rocketmq
From: https://www.cnblogs.com/little-xiao/p/17582260.html

相关文章

  • RocketMQ关键技术整理
    form https://gitee.com/apache/rocketmq/tree/master/docs/cn技术架构RocketMQ架构上主要分为四部分,如上图所示:Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。C......
  • RocketMQ 生产端与消费端
    参考:en_oc:https://www.cnblogs.com/enoc/p/rocketmq-so-no-roku.html田守枝(rebalance):https://cloud.tencent.com/developer/article/1554950官方文档:https://rocketmq.apache.org/zh/docs/  发送消息RocketMQ中定义了如下三种消息通信的方式:SYNC:同步发送,生产端会阻塞等......
  • 分布式开放消息系统(RocketMQ)的原理与实践
    备注:1.如果您此前未接触过RocketMQ,请先阅读附录部分,以便了解RocketMQ的整体架构和相关术语2.文中的MQServer与Broker表示同一概念分布式消息系统作为实现分布式系统可扩展、可伸缩性的关键组件,需要具有高吞吐量、高可用等特点。而谈到消息系统的设计,就回避不了两个问题:消息的顺序问......
  • 从互联网到云时代,Apache RocketMQ 是如何演进的?
    作者:隆基2022年,RocketMQ5.0的正式版发布。相对于4.0版本而言,架构走向云原生化,并且覆盖了更多业务场景。消息队列演进史操作系统、数据库、中间件是基础软件的三驾马车,而消息队列属于最经典的中间件之一,已经有30多年的历史。消息队列的发展主要经历了以下几个阶段:第一......
  • 关于使用RocketMQ搭建多Master多Slave模式(同步)集群时遇到的问题
    搭建多Master多Slave模式(同步)集群时的java.lang.NullPointerException异常一、运行环境等基本描述(问题产生原因是权限问题,即权限不够导致无法启动broker,甚至broker线程无法通过jps命令查出。下面阐述分析思路)1.1)操作系统:Linux虚拟机:VMwareWorkstation16Pro、WSL ......
  • rocketmq
        ......
  • RocketMQ安装部署
    1.下载安装包解压2.环境配置系统变量3.修改配置conf/broker.confenablePropertyFilter=true#指定nameser的地址,把borker与nameser关联起来namesrvAddr=127.0.0.1:98764.启动先启动mqnamesrv.cmd,再启动mqbroker.cmd5.自动创建topicstartmqbroker.cmd-n1......
  • 2023-07-16:讲一讲Kafka与RocketMQ中零拷贝技术的运用?
    2023-07-16:讲一讲Kafka与RocketMQ中零拷贝技术的运用?答案2023-07-16:什么是零拷贝?零拷贝(英语:Zero-copy)技术是指计算机执行操作时,CPU不需要先将数据从某处内存复制到另一个特定区域。这种技术通常用于通过网络传输文件时节省CPU周期和内存带宽。➢零拷贝技术可以减少数据......
  • 2023-07-16:讲一讲Kafka与RocketMQ中零拷贝技术的运用?
    2023-07-16:讲一讲Kafka与RocketMQ中零拷贝技术的运用?答案2023-07-16:什么是零拷贝?零拷贝(英语:Zero-copy)技术是指计算机执行操作时,CPU不需要先将数据从某处内存复制到另一个特定区域。这种技术通常用于通过网络传输文件时节省CPU周期和内存带宽。➢零拷贝技术可以减少数据拷贝和......
  • 2023-07-14:讲一讲Kafka与RocketMQ中存储设计的异同?
    2023-07-14:讲一讲Kafka与RocketMQ中存储设计的异同?答案2023-07-14:在Kafka中,文件的布局采用了Topic/Partition的方式,每个分区对应一个物理文件夹,且在分区文件级别上实现了顺序写入。然而,当一个Kafka集群拥有大量的主题和每个主题拥有数百个分区时,在高并发写入消息的情况下,IO操作......