TransmittableThreadLocal原理浅析
1.从ThreadLocal到TransmittableThreadLocal
TTL(TransmittableThreadLocal)是继承自InheritableThreadLocal,本质上也是一个ThreadLocal,可以理解为是一个可以在线程之间传递上下文的ThreadLocal。
1.1 ThreadLocal
ThreadLocal中填充的变量属于当前线程,该变量对其他线程而言是隔离的,也就是说该变量是当前线程独有的变量。ThreadLocal为变量在每个线程中都创建了一个副本,每个线程可以访问自己内部的副本变量。
ThreadLoal 的特点就是线程隔离,同一个 ThreadLocal 所包含的对象,在不同的 Thread 中有不同的副本。因为每个 Thread 内有自己的实例副本,且该副本只能由当前 Thread 使用,且其它 Thread 不可访问,自然不存在多线程间共享的问题。
Thread类中持有2个ThreadLocal.ThreadLocalMap
对象分别是threadLocals和inheritableThreadLocals,在ThreadLocal中提供了get和set方法,实际上也是对Thread中持有的ThreadLocal.ThreadLocalMap
操作:
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
map.set(this, value);
} else {
createMap(t, value);
}
}
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
ThreadLocal存在什么副作用?脏数据和内存泄漏问题,这两个问题更多的是开发时产生的问题。在每次使用 ThreadLocal 时,一定要记得在结束前及时调用 remove()方法清理数据。
ThreadLocal存在什么局限性?无法传递变量到子线程。这个问题将由其子类InheritableThreadLocal解决。
1.2 InheritableThreadLocal
InheritableThreadLocal类解决了父线程到子线程的值传递,InheritableThreadLocal的代码实现很简单
public class InheritableThreadLocal<T> extends ThreadLocal<T> {
public InheritableThreadLocal() {}
// 方便覆盖浅拷贝
protected T childValue(T parentValue) {
return parentValue;
}
ThreadLocalMap getMap(Thread t) {
return t.inheritableThreadLocals;
}
void createMap(Thread t, T firstValue) {
t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
}
}
值传递主要体现在ThreadLocalMap的构造方法上:
可以看出InheritableThreadLocal默认实现只是对父线程的值浅拷贝到子线程,可以覆盖实现深拷贝。
那么这个构造方法在哪里用到了呢?答案就是Thread类的构造方法new Thread(() - >{})
,可以看到如果父线程的inheritThreadLocals不为null 且 inheritThreadLocals=true,则调用ThreadLocal.createInheritedMap
方法进而调用上述构造方法实现为子线程赋值。
public Thread(Runnable target) {
init(null, target, "Thread-" + nextThreadNum(), 0);
}
private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals) {
// ...
// 获取父线程 - 当前线程
Thread parent = currentThread();
// ...
// 如果父线程的inheritThreadLocals不为null 且 inheritThreadLocals=true
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
// 设置子线程中的inheritableThreadLocals变量
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
// ...
}
static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
return new ThreadLocalMap(parentMap);
}
InheritableThreadLocal存在什么问题?在使用线程池等会池化复用线程的执行组件情况下,异步执行执行任务,复用的线程间将无法传递上下文。
1.3 TransmittableThreadLocal
TransmittableThreadLocal在使用线程池等会池化复用线程的执行组件情况下,提供ThreadLocal
值的传递功能,解决异步执行时上下文传递的问题。
对于使用线程池等会池化复用线程的执行组件的情况,线程由线程池创建好,并且线程是池化起来反复使用的;这时父子线程关系的ThreadLocal
值传递已经没有意义,应用需要的实际上是把 任务提交给线程池时的ThreadLocal
值传递到 任务执行时。
1.3.1 弥补ITL的局限性
考虑下面代码,因为线程池创建线程时当前线程并没有inheritableThreadLocals,所以线程池中线程打印结果为null。
void testInheritableThreadLocal() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(1);
executor.submit(() -> {}); // 先进行工作线程创建
final InheritableThreadLocal<String> parent = new InheritableThreadLocal<>();
parent.set("value-set-in-parent");
executor.submit(() -> System.out.println(Thread.currentThread().getName() + ": " + parent.get()));
}
// 输出结果:pool-1-thread-1: null
使用TtlRunnable增强传入线程池的Runnable
,参考代码如下:
void testTtlInheritableThreadLocal() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(1);
executor.submit(() -> {}); // 先进行工作线程创建
// 使用TTL
final TransmittableThreadLocal<String> parent = new TransmittableThreadLocal<>();
parent.set("value-set-in-parent");
// 将Runnable通过TtlRunnable包装下
executor.submit(TtlRunnable.get(() -> System.out.println(Thread.currentThread().getName() + ": " + parent.get())));
}
// 输出结果:pool-1-thread-1: value-set-in-parent
1.3.2 修饰Runnable/Callable
使用TtlRunnable
和TtlCallable
来修饰传入线程池的Runnable
和Callable
。
TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();
// 在父线程中设置
context.set("value-set-in-parent");
Runnable task = new RunnableTask();
// 额外的处理,生成修饰了的对象ttlRunnable
Runnable ttlRunnable = TtlRunnable.get(task);
executorService.submit(ttlRunnable);
// Task中可以读取,值是"value-set-in-parent"
String value = context.get();
注意:
即使是同一个Runnable
任务多次提交到线程池时,每次提交时都需要通过修饰操作(即TtlRunnable.get(task)
)以抓取这次提交时的TransmittableThreadLocal
上下文的值;即如果同一个任务下一次提交时不执行修饰而仍然使用上一次的TtlRunnable
,则提交的任务运行时会是之前修饰操作所抓取的上下文。
// 第一次提交
Runnable task = new RunnableTask();
executorService.submit(TtlRunnable.get(task));
// ...业务逻辑代码,
// 并且修改了 TransmittableThreadLocal上下文 ...
context.set("value-modified-in-parent");
// 再次提交
// 重新执行修饰,以传递修改了的 TransmittableThreadLocal上下文
executorService.submit(TtlRunnable.get(task));
1.3.3 修饰线程池
通过工具类TtlExecutors
省去每次Runnable
和Callable
传入线程池时的修饰,有下面的方法:
getTtlExecutor
:修饰接口Executor
getTtlExecutorService
:修饰接口ExecutorService
getTtlScheduledExecutorService
:修饰接口ScheduledExecutorService
ExecutorService executorService = ...
// 额外的处理,生成修饰了的对象executorService
executorService = TtlExecutors.getTtlExecutorService(executorService);
TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();
// 在父线程中设置
context.set("value-set-in-parent");
Runnable task = new RunnableTask();
Callable call = new CallableTask();
executorService.submit(task);
executorService.submit(call);
// Task或是Call中可以读取,值是"value-set-in-parent"
String value = context.get();
更多详细使用以及Java Agent修饰线程池实现无侵入使用,参考项目Github主页。
2 TTL的实现原理
TTL的主要原理可以概括为捕获快照、备份快照以及还原快照,核心的三个方法为:
- 在TtlRunnable执行get方法时初始化,进行capture,捕获主线程的TransmittableThreadLocal变量
- 在run方法调用runnable.run()前进行replay得到执行任务前的ThreadLocalMaps快照backup,并且将captured设置到当前线程中;
- 在run方法调用runnable.run()后进行restore,根据备份的快照还原上下文;
通过整个过程的完整时序图可以看到这三个方法作用的时机:
首先讲一下TtlRunnable
类中的两个重要的成员变量:holder
和 threadLocalHolder
,
// holder本身是一个InheritableThreadLocal,其泛型为WeakHashMap,但是值始终为null,被用作Set
private static final InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder =
new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {
@Override
protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() {
return new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
}
@Override
protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) {
return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue);
}
};
// Register the ThreadLocal(including subclass InheritableThreadLocal) instances to enhance the Transmittable ability for the existed ThreadLocal instances.
// 通过作者对registerThreadLocal方法的注释可以知道,该方法是为了将已经存在的threadLocal注册到threadLocalHolder中,在capture时增强其传播能力
private static volatile WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>> threadLocalHolder =
new WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>>();
// TTL的set方法
public final void set(T value) {
if (!disableIgnoreNullValueSemantics && null == value) {
// may set null to remove value
remove();
} else {
// 加入threadlocalMaps 且 加入holder
super.set(value);
addThisToHolder();
}
}
// 将ttl加入holder
private void addThisToHolder() {
if (!holder.get().containsKey(this)) {
holder.get().put((TransmittableThreadLocal<Object>) this, null);
}
}
下面我们从TtlRunnable.get
方法开始分析上下文在线程间传递的过程:
// =================== 主线程中 ===================
// ------------------ Main -----------------
TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();
context.set("value-set-in-parent");
Runnable task = new RunnableTask();
// 额外的处理,生成修饰了的对象ttlRunnable
Runnable ttlRunnable = TtlRunnable.get(task);
executorService.submit(ttlRunnable);
// ----------------- TtlRunnable#get ----------------
public static TtlRunnable get(@Nullable Runnable runnable) {
return get(runnable, false, false);
}
public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
if (null == runnable) return null;
if (runnable instanceof TtlEnhanced) {
// avoid redundant decoration, and ensure idempotency
if (idempotent) return (TtlRunnable) runnable;
else throw new IllegalStateException("Already TtlRunnable!");
}
// 执行 TtlRunnable 的构造方法
return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun);
}
private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
// capturedRef 是主线程传递下来的ThreadLocal的值,关键在这个AtomicReference原子引用,用volatile保证其内部持有value的可见性,所以在主线程中capture到的数据可以被其他线程进入此TtlRunnable对象后读取到
this.capturedRef = new AtomicReference<Object>(capture());
this.runnable = runnable;
this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}
// ----------------- TransmittableThreadLocal.Transmitter#capture ----------------
// TransmittableThreadLocal类中定义静态内部类Transmitter,其中包含capture、replay以及restore
// 捕获holder副本,又分为ttl缓存抓取和ThreadLocal缓存抓取
public static Object capture() {
return new Snapshot(captureTtlValues(), captureThreadLocalValues());
}
// 其中Snapshot类持有ttl缓存和threadlocal缓存
private static class Snapshot {
final HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value;
final HashMap<ThreadLocal<Object>, Object> threadLocal2Value;
private Snapshot(HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value,
HashMap<ThreadLocal<Object>, Object> threadLocal2Value) {
this.ttl2Value = ttl2Value;
this.threadLocal2Value = threadLocal2Value;
}
}
// 抓取ttl的缓存
private static HashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
// 返回值,即ttlValue是一个key为ttl,value为所存值的Map对象
HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value =
new HashMap<TransmittableThreadLocal<Object>, Object>();
for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
// 遍历holder,拷贝副本,注意这里的copyValue,会将当前ttl加入holder并且得到threadlocalMap中的对应值
ttl2Value.put(threadLocal, threadLocal.copyValue());
}
return ttl2Value;
}
// 抓取ThreadLocal的缓存
private static HashMap<ThreadLocal<Object>, Object> captureThreadLocalValues() {
// 返回值,即 threadLocal2Value 是一个key为ThreadLocal,value为所存值的Map对象
final HashMap<ThreadLocal<Object>, Object> threadLocal2Value = new HashMap<ThreadLocal<Object>, Object>();
for (Map.Entry<ThreadLocal<Object>, TtlCopier<Object>> entry : threadLocalHolder.entrySet()) {
// 遍历threadLocalHolder,拷贝副本
final ThreadLocal<Object> threadLocal = entry.getKey();
final TtlCopier<Object> copier = entry.getValue();
threadLocal2Value.put(threadLocal, copier.copy(threadLocal.get()));
}
return threadLocal2Value;
}
// =================== pool-1-thread-1中 ===================
// -------------- ExecutorService#submit && TtlRunnable#run ------------------
public void run() {
final Object captured = capturedRef.get();
if (captured == null || releaseTtlValueReferenceAfterRun &&
!capturedRef.compareAndSet(captured, null)) {
throw new IllegalStateException("TTL value reference is released after run!");
}
// 将当前线程中捕获到的ThreadLocal值在其他线程中set进去,并且得到backup备份值
final Object backup = replay(captured);
try {
// 线程执行
runnable.run();
} finally {
// 快照备份复原到当前线程中
restore(backup);
}
}
// -------------------- TransmittableThreadLocal.Transmitter#replay ------------------
public static Object replay(@NonNull Object captured) {
final TransmittableThreadLocal.Transmitter.Snapshot capturedSnapshot =
(TransmittableThreadLocal.Transmitter.Snapshot) captured;
return new
TransmittableThreadLocal.Transmitter.Snapshot(replayTtlValues(capturedSnapshot.ttl2Value), replayThreadLocalValues(capturedSnapshot.threadLocal2Value));
}
// replayTtlValues 备份pool-1-thread-1线程的threadLocal数据
private static HashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> captured) {
HashMap<TransmittableThreadLocal<Object>, Object> backup = new HashMap<TransmittableThreadLocal<Object>, Object>();
for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
TransmittableThreadLocal<Object> threadLocal = iterator.next();
// backup
backup.put(threadLocal, threadLocal.get());
// 清除不在captured的ttl值,避免在捕获后holder中有新的ttl加入
if (!captured.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// set TTL values to captured
setTtlValuesTo(captured);
// 执行前的回调函数,可以覆盖
doExecuteCallback(true);
return backup;
}
// -------------------- TransmittableThreadLocal.Transmitter#restore ------------------
public static void restore(@NonNull Object backup) {
final TransmittableThreadLocal.Transmitter.Snapshot backupSnapshot =
(TransmittableThreadLocal.Transmitter.Snapshot) backup;
restoreTtlValues(backupSnapshot.ttl2Value);
restoreThreadLocalValues(backupSnapshot.threadLocal2Value);
}
下面详细展示IDEA打断点跟踪TTL的上下文传递过程:
1.首先在主线程中创建ttl实例,并set值
2.进入TTL的set方法中
2.1.进入到TL的set方法中,当前线程下没有threadlocalMap,创建map
2.2 进入到addThisToHolder方法中,调用holder.get方法,执行map.getEntry,此时map中没有以holder为key的记录,所以执行setInitialValue方法,此时将holder加入inheritableThreadLocals中,
3.设置完值后,执行TtlRunnable.get包装Runnable对象,调用到TtlRunnable的构造方法
3.1 AtomicReference原子引用,用volatile关键字保证其内部持有的value的可见性,所以在主线程中capture到的数据可以被其他线程进入此TtlRunnable对象后读取到。
4.执行capture方法,此时capture方法捕获的是main线程的thredLocal信息,分别拷贝holder和threadLocalHolder中的ThreadLocal对象,值得注意的是threadLocal.copyValue方法,以及copier.copy(threadLocal.get())方法,都是到threadlocalmap中拷贝tl对应的值。
5,在获取到值组成快照并返回后,TtlRunnable.get方法执行结束,此时线程池执行任务,执行到TtlRunnable.run方法,此时已经由pool-1-thread-1执行,注意此线程中是没有holder等tl的,ITL的map也为null,TTL要做的事情就是把主线程捕获的快照通过capturedRef这个原子引用为媒介,传递到此线程(pool-1-thread-1),再将此线程的threadLocals和inheritableThreadLocals拷贝下来作为backup,等任务执行完成后再根据快照恢复,以保证任务的执行不会污染此线程的threadLocals。
6.类似的replay方法也分为replayTtlValues和replayThreadLocalValues方法,将threadlocal和其对应值拷贝到backup中
7.注意setTtlValuesTo方法,因为前面capture得到的是一个HashMap<TransmittableThreadLocal<Object>, Object>
类型的map,ttl对应的值存在map的value中,所以这个方法实际上就是把value提出来set到当前线程的inheritableThreadLocals中去(因为TTL继承自ITL),并且将此ttl实例加入到当前线程(pool-1-thread-1)的holder中。
执行完setTtlValuesTo(captured)
后,run方法中就可以获取到主线程传递的值了
8.接着执行run方法
9.接着进入restore方法,同样的restore方法也分为restoreTtlValues和restoreThreadLocalValues
10.进入restoreTtlValues方法,去除执行任务前线程(pool-1-thread-1)中不存在的threadlocal,然后同理给backup中的ttl设置值,给备份的backup中的hashmap中的ttl赋值,此时子线程的inheritableThreadLocals恢复到了执行任务前的状态
至此,一次线程池中的TransmittableThreadLocal对象的上下文传递过程就结束了。
本博客内容仅供个人学习使用,禁止用于商业用途。转载需注明出处并链接至原文。
标签:set,get,ThreadLocal,线程,TransmittableThreadLocal,原理,TtlRunnable,浅析 From: https://www.cnblogs.com/zhaobo1997/p/18226067