前言
在项目中,对于每一次请求,我们都需要一个 traceId 将整个请求链路串联起来,这样就会很方便我们根据日志排查问题。但是如果每次打印日志都需要手动传递 traceId 参数,也会很麻烦, MDC 就是为了解决这个场景而使用的。
注:这里我们使用 slf4j + logback
logback 配置
logback.xml
<appender name="APPLICATION"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<!--定义日志输出的路径-->
<file>${LOG_FILE}</file>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%X{traceId}] [%thread] %logger{50} - %msg%n</pattern>
<charset>utf8</charset>
</encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<maxHistory>7</maxHistory>
<maxFileSize>50MB</maxFileSize>
<totalSizeCap>20GB</totalSizeCap>
</rollingPolicy>
</appender>
[%X{traceId}] 就是我们系统中需要使用的唯一标识,配置好之后日志中就会将这个标识打印出来。如果不存在,就是空字符串,也不影响。
使用过滤器拦截每个请求
也可以使用使用 AOP 来实现,这里我们使用了过滤器
public class TraceIdFilter extends OncePerRequestFilter {
public static final String TRACE_ID = "traceId";
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain)
throws ServletException, IOException {
try {
String traceId = request.getHeader(TRACE_ID);
if (StringUtils.isBlank(traceId)) {
traceId = UUID.randomUUID().toString().replace("-", "");
}
MDC.put(TRACE_ID, traceId);
filterChain.doFilter(request, response);
} finally {
MDC.clear();
}
}
}
向MDC中存放traceId的值,提供给Logger使用。
@Configuration
public class WebConfig implements WebMvcConfigurer {
/**
* 添加日志traceId过滤器
*
* @return
*/
@Bean
public FilterRegistrationBean<TraceIdFilter> traceIdFilter() {
FilterRegistrationBean<TraceIdFilter> registration = new FilterRegistrationBean<>();
registration.setFilter(new TraceIdFilter());
registration.addUrlPatterns("/*");
registration.setName("traceIdFilter");
registration.setOrder(Ordered.HIGHEST_PRECEDENCE);
return registration;
}
}
支持Feign
/**
* feign请求拦截器
*
* @return
*/
@Bean
public RequestInterceptor requestInterceptor() {
return requestTemplate -> {
String traceId = MDC.get(TraceIdFilter.TRACE_ID);
if (Objects.nonNull(traceId)) {
requestTemplate.header(TraceIdFilter.TRACE_ID, traceId);
}
};
}
如果项目中使用了 feign 调用其他系统,也需要将traceId传递过去。
支持RestTemplate
@Component
public class RestTemplateBeanPostProcessor implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof RestTemplate) {
((RestTemplate) bean).getInterceptors()
.add((request, body, execution) -> {
String traceId = MDC.get(TraceIdFilter.TRACE_ID);
if (Objects.nonNull(traceId)) {
request.getHeaders().add(TraceIdFilter.TRACE_ID, traceId);
}
return execution.execute(request, body);
});
}
return bean;
}
}
如果项目中使用了 RestTemplate 调用其他系统,也需要将traceId传递过去。
支持@Async及线程池
如果我们项目中使用了线程池,也需要将本线程的 traceId 传递到 新线程中去,不然新老线程的日志就没办法通过 traceId 关联起来。
/**
* 线程池配置
*/
@Configuration
@EnableAsync
@Slf4j
public class ThreadPoolConfig {
@Bean(value = "threadPoolExecutor")
public ThreadPoolTaskExecutor threadPoolExecutor() {
log.info("start threadPoolExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor() {
/**
* 所有线程都会委托给这个execute方法,在这个方法中我们把父线程的MDC内容赋值给子线程
* https://logback.qos.ch/manual/mdc.html#managedThreads
*
* @param runnable runnable
*/
@Override
public void execute(Runnable runnable) {
// 获取父线程MDC中的内容,必须在run方法之前,否则等异步线程执行的时候有可能MDC里面的值已经被清空了,这个时候就会返回null
Map<String, String> context = MDC.getCopyOfContextMap();
super.execute(() -> {
// 将父线程的MDC内容传给子线程
if (context != null) {
MDC.setContextMap(context);
}
try {
// 执行异步操作
runnable.run();
} finally {
// 清空MDC内容
MDC.clear();
}
});
}
@Override
public <T> Future<T> submit(Callable<T> task) {
// 获取父线程MDC中的内容,必须在run方法之前,否则等异步线程执行的时候有可能MDC里面的值已经被清空了,这个时候就会返回null
Map<String, String> context = MDC.getCopyOfContextMap();
return super.submit(() -> {
// 将父线程的MDC内容传给子线程
if (context != null) {
MDC.setContextMap(context);
}
try {
// 执行异步操作
return task.call();
} finally {
// 清空MDC内容
MDC.clear();
}
});
}
};
executor.setCorePoolSize(15);
// 配置最大线程数
executor.setMaxPoolSize(100);
// 空线程回收时间15s
executor.setKeepAliveSeconds(15);
Executors.defaultThreadFactory();
// 配置队列大小
executor.setQueueCapacity(3000);
// 配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("async-order-service-");
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 执行初始化
executor.initialize();
return executor;
}
}
重写execute()方法支持我们自己通过ThreadPoolTaskExecutor来手动调用 execute
threadPoolTaskExecutor.execute(() -> {
log.info("threadPoolTaskExecutor.execute()");
});
重写submit()方法来支持@Async注解和我们手动调用 submit
threadPoolTaskExecutor.submit(() -> {
log.info("threadPoolTaskExecutor.execute()");
});
支持线程池的更方便写法
/**
* 线程池配置
*/
@Configuration
@EnableAsync
@Slf4j
public class ThreadPoolConfig {
@Bean(value = "threadPoolExecutor")
public ThreadPoolTaskExecutor threadPoolExecutor() {
log.info("start threadPoolExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setTaskDecorator(runnable -> {
Map<String, String> context = MDC.getCopyOfContextMap();
return () -> {
if (context != null) {
MDC.setContextMap(context);
}
runnable.run();
};
});
executor.setCorePoolSize(15);
// 配置最大线程数
executor.setMaxPoolSize(100);
// 空线程回收时间15s
executor.setKeepAliveSeconds(15);
Executors.defaultThreadFactory();
// 配置队列大小
executor.setQueueCapacity(3000);
// 配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("async-order-service-");
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 执行初始化
executor.initialize();
return executor;
}
}
使用包装器来替代方法的重写,更好用,以下3种都支持。
threadPoolTaskExecutor.execute(() -> {
log.info("threadPoolTaskExecutor.execute()");
});
threadPoolTaskExecutor.submit(() -> {
log.info("threadPoolTaskExecutor.submit()");
});
threadPoolTaskExecutor.submit(() -> {
log.info("threadPoolTaskExecutor.submit(Callable)");
return 12;
});
支持 MQ 消息
注:这些我们使用 RabbitMQ
@Configuration
public class RabbitMqConfig {
@Autowired
public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
// 消息发送时,携带 traceId
rabbitTemplate.setBeforePublishPostProcessors(new RabbitTemplateSendTraceIdPostProcessor());
}
@Autowired
public void configureSimpleRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactory containerFactory) {
// 消息消费时,获取 traceId
containerFactory.setAfterReceivePostProcessors(new RabbitTemplateReceiveTraceIdPostProcessor());
}
public static class RabbitTemplateSendTraceIdPostProcessor implements MessagePostProcessor {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
Map<String, Object> headers = message.getMessageProperties().getHeaders();
String traceIdKey = TraceIdFilter.TRACE_ID;
String traceId = MDC.get(traceIdKey);
headers.putIfAbsent(traceIdKey, traceId);
return message;
}
}
public static class RabbitTemplateReceiveTraceIdPostProcessor implements MessagePostProcessor {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
Map<String, Object> headers = message.getMessageProperties().getHeaders();
String traceIdKey = TraceIdFilter.TRACE_ID;
if (headers.containsKey(traceIdKey)) {
MDC.put(traceIdKey, headers.get(traceIdKey).toString());
}
return message;
}
}
}
MDC 原理分析
MDC 底层也是通过 ThreadLocal 来实现线程间数据隔离的。
参考
java 注解结合 spring aop 实现日志traceId唯一标识
RabbitMQ消息的链路跟踪