你好,我是蜗牛!
在实际开发中需要在父子线程之间传递一些数据,比如用户登录信息使用ThreadLocal存放保证线程隔离,代码如下:
/**
* @author 公众号:woniuxgg
* @description 用户上下文信息
*/
public class UserUtils {
private static final ThreadLocal<String> userLocal=new ThreadLocal<>();
public static String getUserId(){
return userLocal.get();
}
public static void setUserId(String userId){
userLocal.set(userId);
}
public static void clear(){
userLocal.remove();
}}
那么子线程想要获取这个userId如何做呢?
1. 手动设置
代码如下:
public void handlerAsync() {
//1. 获取父线程的userId
String userId = UserUtils.getUserId();
CompletableFuture.runAsync(()->{
//2. 设置子线程的值,复用
UserUtils.setUserId(userId);
log.info("子线程的值:{}",UserUtils.getUserId());
});
}
这样子每次开异步线程都需要手动设置,重复代码太多,看了头疼!
2. 线程池设置TaskDecorator
TaskDecorator是一个执行回调方法的装饰器,主要应用于传递上下文,或者提供任务的监控/统计信息。
那我们该如何去使用?代码如下:
/**
* @author 公众号:woniuxgg
* @description 上下文装饰器
*/
public class CustomTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
String robotId = UserUtils.getUserId();
System.out.println(robotId);
return () -> {
try {
// 将主线程的请求信息,设置到子线程中
UserUtils.setUserId(robotId);
// 执行子线程,这一步不要忘了
runnable.run();
} finally {
// 线程结束,清空这些信息,否则可能造成内存泄漏
UserUtils.clear();
}
}; }
}
TaskDecorator
需要结合线程池使用,实际开发中异步线程建议使用线程池,只需要在对应的线程池配置一下,代码如下:
@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
log.info("start asyncServiceExecutor----------------");
//ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//使用可视化运行状态的线程池
ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(corePoolSize);
//配置最大线程数
executor.setMaxPoolSize(maxPoolSize);
//配置队列大小
executor.setQueueCapacity(queueCapacity);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix(namePrefix);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//增加线程池修饰类
executor.setTaskDecorator(new CustomTaskDecorator());
//增加MDC的线程池修饰类
//executor.setTaskDecorator(new MDCTaskDecorator());
//执行初始化
executor.initialize();
log.info("end asyncServiceExecutor------------");
return executor;
}
此时业务代码就不需要去设置子线程的值,直接使用即可,代码如下:
public void handlerAsync() {
log.info("父线程的用户信息:{}", UserUtils.getUserId());
//执行异步任务,需要指定的线程池
CompletableFuture.runAsync(()-> log.info("子线程的用户信息:{}", UserUtils.getUserId()), taskExecutor);
}
来看一下结果,如下图:
这里使用的是CompletableFuture
执行异步任务,使用@Async
这个注解同样是可行的。
注意:无论使用何种方式,都需要指定线程池
3. InheritableThreadLocal
InheritableThreadLocal虽然能够实现父子线程间的复用,但是在线程池中使用会存在复用的问题
这种方案使用也是非常简单,直接用InheritableThreadLocal替换ThreadLocal即可,代码如下:
/**
* @author 公众号:程序员蜗牛g
* @description 用户上下文信息
*/
public class UserUtils {
private static final InheritableThreadLocal<String> threadLocal = new InheritableThreadLocal<>();
public static String get(){
return threadLocal.get();
}
public static void set(String userId){
threadLocal.set(userId);
}
public static void clear(){
threadLocal.remove();
}
}
4. TransmittableThreadLocal
TransmittableThreadLocal是由阿里开发的一个线程变量传递工具包,解决了InheritableThreadLocal只能再new Thread的时候传递本地变量,无法应用到线程池的问题。可以应用来作链路追踪,传递变量等用途,下面我们来了解一下原理。
使用起来也是非常简单,添加依赖如下:
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.14.2</version>
</dependency>
UserUtils改造代码如下:
/**
* @author 公众号:程序员蜗牛g
* @description 用户上下文信息
*/
public class UserUtils {
private static final TransmittableThreadLocal<String> threadLocal =new TransmittableThreadLocal<>();
public static String get(){
return threadLocal.get();
}
public static void set(String userId){
threadLocal.set(loginVal);
}
public static void clear(){
threadLocal.remove();
}
}
TransmittableThreadLocal原理
TransmittableThreadLocal继承自InheritableThreadLocal,因此它可以在创建线程的时候将值传递给子线程,那么怎么确保使用线程池的时候也有效呢?我们来看一下源码
1、构造方法
public TransmittableThreadLocal() {
this(false);
}
public TransmittableThreadLocal(boolean disableIgnoreNullValueSemantics) {
// 是否忽略null值set,默认false
this.disableIgnoreNullValueSemantics = disableIgnoreNullValueSemantics;
}
2、set方法
/**
* {@inheritDoc}
*/
@Override
public final void set(T value) {
if (!disableIgnoreNullValueSemantics && value == null) {
// may set null to remove value
remove();
} else {
super.set(value);
addThisToHolder();
}
}
先看addThisToHolder方法
@SuppressWarnings("unchecked")
private void addThisToHolder() {
if (!holder.get().containsKey(this)) {
holder.get().put((TransmittableThreadLocal<Object>) this, null);
// WeakHashMap supports null value.
}
}
属性holder又是什么呢?
1、final static修饰的变量,只会存在一份
2、使用了WeakHashMap,弱引用,方便垃圾回收
3、key就是TransmittableThreadLocal对象
remove方法
/**
* {@inheritDoc}
*/
@Override
public final void remove() {
removeThisFromHolder();
super.remove();
}
4、get方法
/**
* {@inheritDoc}
*/
@Override
public final T get() {
T value = super.get();
if (disableIgnoreNullValueSemantics || value != null) addThisToHolder();
return value;
}
5、当我们使用线程池时,需要使用TtlRunnable.get(runnable)对runnable进行包装,或者使用TtlExecutors.getTtlExecutor(executor)对执行器进行包装,才能使线程池的变量传递起效果,那么我们就接着看一下源码的执行流程
TtlExecutors.getTtlExecutor(executor)
public static Executor getTtlExecutor(@Nullable Executor executor) {
if (TtlAgent.isTtlAgentLoaded() || null == executor || executor instanceof TtlEnhanced) {
return executor;
}
// 包装执行器
return new ExecutorTtlWrapper(executor, true);
}
ExecutorTtlWrapper(@NonNull Executor executor, boolean idempotent) {
this.executor = executor;
this.idempotent = idempotent;
}
public void execute(@NonNull Runnable command) {
// 实际上也是通过TtlRunnable对原runnable进行包装
executor.execute(TtlRunnable.get(command, false, idempotent));
}
可以看到,两种方式原理一样,我们直接看TtlRunnable.get()
public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
if (null == runnable) return null;
if (runnable instanceof TtlEnhanced) {
if (idempotent) return (TtlRunnable) runnable;
else throw new IllegalStateException("Already TtlRunnable!");
}
// 返回TtlRunnable
return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun);
}
构建TtlRunnable
private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
// 原子引用
this.capturedRef = new AtomicReference<Object>(capture());
this.runnable = runnable;
this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}
capture捕获父线程的ttl
public static Object capture() {
return new Snapshot(captureTtlValues(), captureThreadLocalValues());
}
private static
HashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value =
new HashMap<TransmittableThreadLocal<Object>, Object>();
// 遍历了所有holder
for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
// copyValue实际上调用了TransmittableThreadLocal的get方法获取线程存储的变量值
ttl2Value.put(threadLocal, threadLocal.copyValue());
}
return ttl2Value;
}
private static HashMap<ThreadLocal<Object>, Object> captureThreadLocalValues() {
final HashMap<ThreadLocal<Object>, Object> threadLocal2Value =
new HashMap<ThreadLocal<Object>, Object>();
for (Map.Entry<ThreadLocal<Object>, TtlCopier<Object>> entry :
threadLocalHolder.entrySet()) {
final ThreadLocal<Object> threadLocal = entry.getKey();
final TtlCopier<Object> copier = entry.getValue();
threadLocal2Value.put(threadLocal, copier.copy(threadLocal.get()));
}
return threadLocal2Value;
}
再看TtlRunnable的run方法
public void run() {
// 获取Snapshot对象,里面存储了父线程的值
final Object captured = capturedRef.get();
if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
throw new IllegalStateException("TTL value reference is released after run!");
}
// 传入capture方法捕获的ttl,然后在子线程重放,也就是调用ttl的set方法,
// 这样就会把值设置到当前的线程中去,最后会把子线程之前存在的ttl返回
final Object backup = replay(captured);
try {
// 调用原runnable的run
runnable.run();
} finally {
//
restore(backup);
}
}
总结
上述列举了4种方案,蜗牛这里推荐方案2和方案4,其中两种方案的缺点非常明显,实际开发中也是采用的方案2或者方案4。
最后说一句(求关注!)
如果这篇文章对您有所帮助,或者有所启发的话,求一键三连:点赞、转发、在看。
关注公众号:woniuxgg,在公众号中回复:笔记 就可以获得蜗牛为你精心准备的java实战语雀笔记,回复面试、开发手册、有超赞的粉丝福利!
标签:get,Spring,Boot,static,线程,executor,new,public From: https://blog.51cto.com/u_16502039/9231014