在使用springboot整合rocketmq使用方便使用注解即可实现消费,十分简洁。随之而来的是问题怎么追踪,一个链路的日志能看到是多么美好的事情。搜寻很久没有找到解决方案,于是自己搞吧!
大前提:已经在使用spring-cloud-starter-sleuth,日志格式已经含有traceId、spanId
好的,那么我们来看看吧!
生产者端:生产者产生的消息被消费者消费,那么怎么做到两个日志产生关联呢?有两个比较好的办法,一个是生产者把自己的traceId存入消息头部,消费者获取到拿出来设置自己的traceId。另一个则是生产者只管发消息,消费者把messageId作为自己的traceId也能搜寻。第一种比较适合自产自销,第二种则是A服务产B服务消的情况。我们依次来看看。
1、生产者配置,便捷的使用rocketTemplate,没有其他多余的配置,组合方式使用扩展一下就好
`
@Component
public class MyRocketMqTemplate {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 封装发送失败,进行保存并定时重试
*
* @param topic 主题
* @param payload 发送内容
*/
public void sendAndMakeUp(String topic, Object payload) {
String traceId = (String) MDC.get(TraceFilter.MDC_TRACE_ID_KEY);
GenericMessage messageExt = new GenericMessage(payload);
messageExt.getHeaders().put("traceId",traceId);
rocketMQTemplate.asyncSend(topic, messageExt, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
try {
// 异步发送traceId连接
MDC.put(TraceFilter.MDC_TRACE_ID_KEY, traceId);
log.info("发送完毕,返回发送结果:{}", sendResult);
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
// 补偿
makeUp(topic, payload);
}
}finally {
MDC.clear();
}
}
@Override
public void onException(Throwable throwable) {
log.error("参数:{},mq消息发送异常",payload, throwable);
// 补偿
makeUp(topic, payload);
}
private void makeUp(String topic, Object payload) {
try {
// 补偿逻辑
} catch (Exception e) {
log.error("参数:{},补偿失败",JSON.toJSONString(payload), e);
}
}
});
}
`
2、消费者:简单使用@RocketMQMessageListener注解就可以开启一个消费。那么需要关注的消费之前把traceId设置进去就可以。
`
@Component
@Slf4j
public class RocketMqTraceListener {
@EventListener
public void onApplicationEvent(ApplicationReadyEvent event) {
ConfigurableApplicationContext applicationContext = event.getApplicationContext();
Map<String, DefaultRocketMQListenerContainer> beansOfType = applicationContext.getBeansOfType(DefaultRocketMQListenerContainer.class);
if(beansOfType.isEmpty()){
return;
}
for (Map.Entry<String, DefaultRocketMQListenerContainer> stringDefaultRocketMQListenerContainerEntry : beansOfType.entrySet()) {
DefaultRocketMQListenerContainer value = stringDefaultRocketMQListenerContainerEntry.getValue();
value.getConsumer().getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumeMessageHook() {
@Override
public String hookName() {
return null;
}
@Override
public void consumeMessageBefore(ConsumeMessageContext context) {
String traceId = null;
// 可能是批量消费,这个时候取一个就不合适,那么我们就自定义一个uuid
List<MessageExt> msgList = context.getMsgList();
if(CollectionUtils.isNotEmpty(msgList) && msgList.size() == 1){
traceId = msgList.get(0).getMsgId();
}
// 可以取生产者传递
if (StringUtils.isEmpty(traceId)) {
Map<String, String> props = context.getProps();
traceId = props.get("traceId");
}
// 自定义
if (StringUtils.isEmpty(traceId)) {
traceId = UUID.randomUUID().toString().replaceAll("-", "");
}
MDC.put(TraceFilter.MDC_TRACE_ID_KEY, traceId);
}
@Override
public void consumeMessageAfter(ConsumeMessageContext context) {
MDC.clear();
}
});
}
}
}
`
特别注意:getDefaultMQPushConsumerImpl()方法在升级版本可能不再提供获取 的入口,考虑自己封装consumer。
源码解读:看源码是一个枯燥的事情,代码路径我大概列一下,感兴趣的可以看看
RocketMQAutoConfiguration -> RocketMQListenerConfiguration->ListenerContainerConfiguration->DefaultRocketMQListenerContainer->DefaultMQPushConsumer->DefaultMQPushConsumerImpl