首页 > 其他分享 >日志之MDC和异步多线程间传递线程id

日志之MDC和异步多线程间传递线程id

时间:2023-03-23 20:00:52浏览次数:75  
标签:return MDC 线程 import 日志 多线程 public

目录
日志追踪对于接口故障排查非常重要,可以有效、快捷的定位故障点,但在多线程环境中,若没有相关框架的支持,想要实现日志追踪,就需要编码实现将主线程的日志参数传递给子线程,本文就在线程池场景下借助MDC实现了traceId参数的透传

1 MDC

1.1 简介

MDCMapped Diagnostic Context,映射调试上下文)是 log4jlogback 提供的一种方便在多线程条件下记录日志的功能。某些应用程序采用多线程的方式来处理多个用户的请求。在一个用户的使用过程中,可能有多个不同的线程来进行处理。典型的例子是 Web 应用服务器。当用户访问某个页面时,应用服务器可能会创建一个新的线程来处理该请求,也可能从线程池中复用已有的线程。在一个用户的会话存续期间,可能有多个线程处理过该用户的请求。这使得比较难以区分不同用户所对应的日志。当需要追踪某个用户在系统中的相关日志记录时,就会变得很麻烦。

一种解决的办法是采用自定义的日志格式,把用户的信息采用某种方式编码在日志记录中。这种方式的问题在于要求在每个使用日志记录器的类中,都可以访问到用户相关的信息。这样才可能在记录日志时使用。这样的条件通常是比较难以满足的。MDC 的作用是解决这个问题。MDC 可以看成是一个与当前线程绑定的哈希表,可以往其中添加键值对。MDC 中包含的内容可以被同一线程中执行的代码所访问。当前线程的子线程会继承其父线程中的 MDC 的内容。当需要记录日志时,只需要从 MDC 中获取所需的信息即可。MDC 的内容则由程序在适当的时候保存进去。对于一个 Web 应用来说,通常是在请求被处理的最开始保存这些数据

1.2 MDC坐标和使用

<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
  </dependency>
  <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.21</version>
  </dependency>

log4j.xml配置样例,追踪日志自定义格式主要在name="traceId"layout里面进行设置,我们使用%X{traceId}来定义此处会打印MDC里面keytraceIdvalue,如果所定义的字段在MDC不存在对应的key,那么将不会打印,会留一个占位符
点击了解Loback.xml文件解释

1.3 主要方法

API 说明:

  • clear():移除所有MDC
  • get (String key):获取当前线程 MDC 中指定 key 的值
  • getCopyOfContextMap():将MDC从内存获取出来,再传给线程
  • put(String key, Object o):往当前线程的 MDC 中存入指定的键值对
  • remove(String key):删除当前线程 MDC 中指定的键值对
  • setContextMap():将父线程的MDC内容传给子线程

MDC异步线程间传递:
MDC的put时,子线程在创建的时候会把父线程中的inheritableThreadLocals变量设置到子线程的inheritableThreadLocals中,而MDC内部是用InheritableThreadLocal实现的,所以会把父线程中的上下文带到子线程中
但在线程池中,由于线程会被重用,但是线程本身只会初始化一次,所以之后重用线程的时候,就不会进行初始化操作了,也就不会有父线程inheritableThreadLocals拷贝到子线程中的过程了,这个时候如果还想传递父线程的上下文的话,就要使用getCopyOfContextMap方法

2 多线程间使用

2.1 MDC工具类

定义MDC工具类,支持RunnableCallable两种,目的就是为了把父线程的traceId设置给子线程

import org.slf4j.MDC;
import org.springframework.util.CollectionUtils;

import java.util.Map;
import java.util.concurrent.Callable;

/**
 * @Description 封装MDC用于向线程池传递
 */
public class MDCUtil {
    public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) {
        return () -> {
            if (CollectionUtils.isEmpty(context)) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            try {
                return callable.call();
            } finally {//清除子线程的,避免内存溢出,就和ThreadLocal.remove()一个原因
                MDC.clear();
            }
        };
    }

 public static Runnable wrap(final Runnable runnable, final Map<String, String> context) {
        return () -> {
            if (context == null) {
                MDC.clear();
            } else {
                MDC.setContextMap(context);
            }
            setTraceIdIfAbsent();
            try {
                runnable.run();
            } finally {
                MDC.clear();
            }
        };
    }

    public static void setMDCContextMap(final Map<String, String> context) {
        if (CollectionUtils.isEmpty(context)) {
            MDC.clear();
        } else {
            MDC.setContextMap(context);
        }
    }

}

2.2 拦截器定义和配置

package demo;

import org.slf4j.MDC;
import org.springframework.lang.Nullable;
import org.springframework.util.StringUtils;
import org.springframework.web.servlet.handler.HandlerInterceptorAdapter;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.Arrays;
import java.util.List;

public class RequestInterceptor extends HandlerInterceptorAdapter {

    private static final List<String> paramSet = Arrays.asList("traceId");

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        this.setParam(request);
        return super.preHandle(request, response, handler);
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, @Nullable Exception ex) throws Exception {
        MDC.clear();
    }

    private void setParam(HttpServletRequest request) {
        // 设置要放到MDC中的参数
        for (String key : paramSet) {
            String val = request.getHeader(key);
            if (!StringUtils.isEmpty(val)) {
                MDC.put(key, val);
            }
        }
    }

}

拦截器配置

import demo.RequestInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

/**
 * 拦截WEB请求
 */
@Configuration
public class InterceptorConfig implements WebMvcConfigurer {

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(new RequestInterceptor());
    }

}

2.3 Java线程池中使用

2.3.1 配置线程池

@Configuration
public class ThreadPoolService {
	@Bean
	public ThreadPoolExecutor threadPoolExecutor() {
	    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor
	            (4, 8, 10,
                TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(5536),
                new ScheduledThreadFactory("demo-"), new ThreadPoolExecutor.CallerRunsPolicy());
	    return threadPoolExecutor;
	}	
}

点击了解线程池相关信息

2.3.2 使用ExecutorCompletionService方式

使用ExecutorCompletionService实现多线程调用
点击了解更多关于ExecutorCompletionService信息

/**
 * 使用MDC传递traceId
 */
public class Demo {

	@Autowired
	private ThreadPoolExecutor threadPoolExecutor;

	public void demo() {
	    ExecutorCompletionService ecs = new ExecutorCompletionService(threadPoolExecutor);
	    ecs.submit(MDCUtil.wrap(new TestMDC(), MDC.getCopyOfContextMap()));
	}
	
	class TestMDC implements Callable {
	    @Override
	    public Object call() throws Exception {
	        // TODO 代码逻辑
	        return null;
	    }
	}
}

2.3.3 使用CompletableFuture方式

使用CompletableFuture实现多线程调用,其中收集CompletableFuture运行结果,
点击了解更多关于CompletableFuture信息

public class Result {}
/**
 * 使用MDC传递traceId
 */
public class Demo {

	@Autowired
	private ThreadPoolExecutor threadPoolExecutor;

    private CompletableFuture<Result> test() {
    
        Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap();
        
        return CompletableFuture.supplyAsync(() -> {
        
            // 必须在打印日志前设置
            MDCUtil.setMDCContextMap(copyOfContextMap);
            //MDC.put("subTraceId",''); //如果需要对子线程进行加线程跟踪号,可在此处设定
            // TODO 业务逻辑
            return new Result();
            
        }, threadPoolExecutor).exceptionally(new Function<Throwable, Result>() {
            /**捕捉异常,不会导致整个流程中断**/
            @Override
            public Result apply(Throwable throwable) {
                log.error("线程[{}]发生了异常[{}], 继续执行其他线程", Thread.currentThread().getName(), throwable.getMessage());
                return null;
            }
        });
    }
}

2.4 Spring线程池中使用

2.4.1 继承ThreadPoolTaskExecutor

public class ThreadPoolMdcWrapper extends ThreadPoolTaskExecutor {

    public ThreadPoolMdcWrapper() {

    }

    @Override
    public void execute(Runnable task) {
        super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()), startTimeout);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }

    @Override
    public Future<?> submit(Runnable task) {
        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        return super.submitListenable(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        return super.submitListenable(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    }
}

2.4.2 配置线程池

继承ThreadPoolTaskExecutor ,重写线程执行的方法。到这我们就做完了大部分的准备工作,还剩下最关键的就是让程序用到我们封装后的线程池。我们可以在声明线程池的时候,直接使用我们封装好的线程池(因为继承了ThreadPoolTaskExecutor)
点击了解Spring线程池配置

@Bean
public ThreadPoolTaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolMdcWrapper();
    //核心线程数,默认为1
    taskExecutor.setCorePoolSize(1);
    //最大线程数,默认为Integer.MAX_VALUE
    taskExecutor.setMaxPoolSize(200);
    //队列最大长度,一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE
    taskExecutor.setQueueCapacity(2000);
    //线程池维护线程所允许的空闲时间,默认为60s
    taskExecutor.setKeepAliveSeconds(60);
    //线程池对拒绝任务(无线程可用)的处理策略
    taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
    // 初始化线程池
    taskExecutor.initialize();
    return  taskExecutor;
}

到这我们所做的准备工作,改造工作也就结束了,剩下的就是使用了。只要在程序异步调用时,利用声明好的taskExecutor线程池进行调用,就可以在线程上下文正确传递Traceid了

标签:return,MDC,线程,import,日志,多线程,public
From: https://www.cnblogs.com/jingzh/p/17248886.html

相关文章

  • Java多线程之ExecutorCompletionService
    目录1ExecutorCompletionService1.1简介1.2原理1.3Demo示例1.3.1未使用ExecutorCompletionService1.3.2使用ExecutorCompletionService1.4深入分析说明1.4.1所有方......
  • python创建线程和关闭线程
    importthreadingimportinspectimportctypesimporttimedeftask1(): whileTrue: print("hello") time.sleep(1)deftask2(): whileTrue:......
  • qt 多线程 moveToThread 的一个骚操作
    moveToThread 相当于是一个多线程的阻塞函数,本案例可多次点击按钮,多次触发,这个信号触发类似于内部建立一个队列,处理函数会按照顺序处理信号 test_moveToThread.p......
  • 进程管理 一 进程与线程
    为什么要引入进程?多道程序环境下,程序之间因共享资源而相互制约着运行,因此体现出间断性的特征。传统的程序是一组指令的集合,体现不出其在内存中的运行情况(间断性导致的何......
  • 线程池
    publicstaticExecutorServicetaskExecutor=Executors.newFixedThreadPool(5); publicstaticExecutorServicetaskExecutor=newThreadPoolExecutor(2,5,......
  • Linux线程 | 创建 终止 回收 分离
    一、线程简介线程是参与系统调度的最小单位。它被包含在进程之中,是进程中的实际运行单位。一个进程中可以创建多个线程,多个线程实现并发运行,每个线程执行不同的任务......
  • 爬虫进阶之多线程爬虫问题详解
    大多数正常人在下载图片的时候都是一个一个点击保存,图片越多花费的时间越多,大大的降低了工作效率。如果是学了爬虫的,一定会想到多线程来自动下载保存图片。多线程介绍:多......
  • 线程通信-采用标志位
    packagecom.Java;publicclassTestflag{publicstaticvoidmain(String[]args){TVtv=newTV();newPlayer(tv).start();newWatcher......
  • NodeJS 多线程编程
    一、开发环境Node.JSv14.8.0二、快速开始-worker_threadsjs和nodejs一直都是单线程,直到官方推出了worker_threads模块,用来解决CPU密集型计算场景。可以通过......
  • C# 多线程访问之 SemaphoreSlim(信号量)【进阶篇】
    C#多线程访问之SemaphoreSlim(信号量)【进阶篇】 阅读目录一、简介二、用法示例 三、属性or函数or方法释义属性-AvailableWaitHandle属性-CurrentCount......