TransmittableThreadLocal 相信很多人用过,一个在多线程情况下共享线程上下文的利器
名字好长,以下简称 ttl
本文以两年前一个真实项目开发遇到的问题,从源码的角度分析并解决
环境
item | version |
---|---|
java | 8 |
springboot | 2.2.2.RELEASE |
ttl | 2.11.4 |
代码如下,主线程并行启复数任务丢给线程池处理
点击查看代码
List<ProcDef> defs = createsValidate(procCreate);
List<CompletableFuture<CreateResult>> cfs = Lists.newArrayListWithCapacity(defs.size());
defs.forEach(def -> {
AbstractTransientVariable variable = procCreate.getProcDefKeyVars().get(def.getProcDefKey());
CompletableFuture<CreateResult> cf = CompletableFuture.supplyAsync(() -> create(def, variable), threadPoolTaskExecutor)
.handle((r, e) -> {
if (e != null) {
expHandle(def.getProcDefKey(), procCreate.getUserId(), variable, e);
}
return r;
});
cfs.add(cf);
});
List<CreateResult> results = cfs.stream().map(CompletableFuture::join).filter(Objects::nonNull).collect(Collectors.toList());
if (defs.size() != results.size()) {
log.error("Process create fail exist: [{}]", JacksonUtil.toJsonString(procCreate));
}
第一行 createsValidate 方法在主线程设置了当前用户的上下文
将用户信息放入了 ttl,方便后续子线程使用
测试的时候, 线程池在处理任务的时候,有时会获取不到主线程 ttl 信息
很奇怪,之前也是一直这样使用,为什么没有问题
于是本地main方法模拟
点击查看代码
UserContext.set(new ContextUser().setUserId("mycx26"));
IntStream.range(0, 10).forEach(e -> {
Supplier<Void> supplier = () -> {
String userId = UserContext.get() != null ? UserContext.getUserId() : null;
System.out.println(Thread.currentThread().getName() + " get: " + userId);
return null;
};
CompletableFuture.supplyAsync(supplier);
});
Thread.currentThread().join();
点击查看代码
ForkJoinPool.commonPool-worker-9 get: mycx26
ForkJoinPool.commonPool-worker-6 get: mycx26
ForkJoinPool.commonPool-worker-13 get: mycx26
ForkJoinPool.commonPool-worker-4 get: mycx26
ForkJoinPool.commonPool-worker-11 get: mycx26
ForkJoinPool.commonPool-worker-2 get: mycx26
ForkJoinPool.commonPool-worker-6 get: mycx26
ForkJoinPool.commonPool-worker-15 get: mycx26
ForkJoinPool.commonPool-worker-8 get: mycx26
ForkJoinPool.commonPool-worker-9 get: mycx26
将相同的代码放到工程的单元测试方法里跑
点击查看代码
ForkJoinPool.commonPool-worker-10 get: null
ForkJoinPool.commonPool-worker-15 get: null
ForkJoinPool.commonPool-worker-9 get: null
ForkJoinPool.commonPool-worker-1 get: null
ForkJoinPool.commonPool-worker-13 get: null
ForkJoinPool.commonPool-worker-8 get: null
ForkJoinPool.commonPool-worker-3 get: null
ForkJoinPool.commonPool-worker-2 get: null
ForkJoinPool.commonPool-worker-11 get: null
ForkJoinPool.commonPool-worker-15 get: null
找到 ttl 的 github readme 阅读
要保证线程池中传递值,一种方式是修饰 Runnable 和 Callable,Supplier 也有类似的包装器
于是修改代码重新测试,测试通过
虽然问题是解决了,但是原因却无从得知,等于还是绕过了问题
下次遇到 ttl 的问题,不知道原理还是无从下手
找到了一个已经 closed 类似的 issue
https://github.com/alibaba/transmittable-thread-local/issues/138
但还是没有解决我的疑问
没有办法,只能看源码了,问题还是要一个一个解决
一. main方法没有修饰的任务为什么能跨越线程池传递 ttl
1.1 首先看看 ttl 的 set 方法做了什么
点击查看代码
public final void set(T value) {
if (!disableIgnoreNullValueSemantics && null == value) {
// may set null to remove value
remove();
} else {
super.set(value);
addThisToHolder();
}
}
点击查看代码
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
map.set(this, value);
} else {
createMap(t, value);
}
}
先看一下 ttl 的继承体系
ttl 继承 InheritableThreadLocal,InheritableThreadLocal 继承 ThreadLocal
InheritableThreadLocal 可以让子线程访问父线程设置的本地变量
点击查看代码
ThreadLocalMap getMap(Thread t) {
return t.inheritableThreadLocals;
}
void createMap(Thread t, T firstValue) {
t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
}
通过重写 getMap 和 createMap 方法将 ThreadLocal 的维护职责
由 threadLocals 转移给了 inheritableThreadLocals
threadLocals 和 inheritableThreadLocals 类型一样
是 ThreadLocal 中的静态内部类 ThreadLocalMap
为了维护线程本地变量定制化的哈希map, 两者由 Thread 持有
回到上文 TheadLocal set方法
首先获取当前线程,入参调用 getMap 方法获取当前线程的 inheritableThreadLocals
- map不为null
将 ttl 做为 key,value 作为值,放入当前线程的 inheritableThreadLocals
- map为null
将 ttl 和 value 构造一个新的 ThreadLocalMap,初始化当前线程的 inheritableThreadLocals
1.2 接下来看 CompletableFuture 的 supplyAsync 方法
这个方法调用栈很深,如果多线程功力不深,基本看不懂
但这不妨碍排查这个问题
supplyAsync 默认用的 ForkJoinPool 跑任务
那么必然会启一个线程
即必然会调用 Thread 的 init 方法初始化线程
首先将断点加到 CompletableFuture.supplyAsync(supplier); 这行
debug跑起来
然后将断点加到 Thread init 方法的第一行
(防止jvm启动初始化的线程产生干扰,比如 c2 complier thread)
点击查看代码
Thread parent = currentThread();
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
重点是这里,判断父线程的 inheritableThreadLocals 如果不为 null
就把父线程的 inheritableThreadLocals 复制到子线程
1.3 接着看 ttl 的 get 方法
点击查看代码
public final T get() {
T value = super.get();
if (disableIgnoreNullValueSemantics || null != value) addThisToHolder();
return value;
}
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
同样走的 ThreadLocal 的 get 方法
首先获取当前线程,getMap 获取其对应的 inheritableThreadLocals
顺利拿到之前父线程设置的变量
到这里,第一个问题算是解了
二. 同样的代码跑在单元测试,没有修饰的任务为什么不能跨越线程池传递 ttl
这里我有理由怀疑是 spring 容器在拉起的时候,提前用到了 ForkJoinPool 的 commonPool
但是项目依赖众多,如何定位
既然用到了,那么将断点加在 ForkJoinPool 启动线程
然后沿着调用栈帧一直向上找不就行了
将断点加在 ForkJoinPool 的 createWorker方法的第一行
开始找
点击查看代码
/* 检查逻辑删除字段只能有最多一个 */
Assert.isTrue(fieldList.parallelStream().filter(TableFieldInfo::isLogicDelete).count() < 2L,
String.format("annotation of @TableLogic can't more than one in class : %s.", clazz.getName()));
果然,熟悉的身影,mybatis plus
spring容器拉起时在初始化 SqlSessionFactory 时
会调用 TableInfoHelper 的 initTableFields 方法初始化表主键和字段
注意这里用的 stream 的并行流 parallel stream,很熟悉了
底层默认用的 ForkJoinPool 的 commonPool
那么在主线程设置的 TTL,线程池中的线程之前已经初始化,当然就拿不到了
好,这是第二个问题
三. 为什么项目中自定义线程池获取不到前面主线程创建的 ttl
通过排查发现,执行操作前,线程池已经被调度执行任务了
线程既然已经池化,那么后续在跑异步任务时就没有父子线程之说了
那么现在只剩最后一个问题
四. 为什么项目中任务加了包装器后又拿到了
点击查看代码
TtlWrappers.wrap(() -> create(def, variable))