文章目录
- 一致性问题
- ThreadLocal
- ThreadLocal 在一些开源框架中的应用
- ThreadLocal 扩展
一致性问题
一致性问题简介
在多线程或分布式系统中,数据一致性问题通常发生在多个主体(如线程或服务器节点)试图对同一份数据进行访问和修改时。这时,如果没有合理的机制来协调这些操作,就可能导致数据不一致的情况。数据一致性问题的关键在于多个主体无法就数据的状态达成一致意见。
解决一致性问题的常见方法
-
排队(序列化访问)
排队是解决一致性问题的一种简单而直接的方式。通过将多个主体对同一数据的访问排成队列,一个接一个地进行处理,可以确保每个主体都能看到前一个主体的修改结果。这种方式虽然能有效保证数据的一致性,但由于需要等待队列中的其他主体完成操作,性能会受到一定影响。操作系统中的锁、互斥量、管程等机制都是基于排队思想来解决一致性问题的。 -
投票(共识算法)
投票机制允许多个主体同时参与决策或修改数据,但最终通过投票来决定谁的修改生效。这种方式效率较高,但也带来了复杂性,尤其是在分布式系统中。网络中断、欺诈行为等问题可能影响投票的准确性。解决这些问题通常需要数学上的严格证明和信使节点传递消息来达成共识。Paxos和Raft是两种常见的分布式一致性算法,它们都采用了投票机制来解决一致性问题。 -
避免(局部化修改)
避免一致性问题的思路是通过设计,尽量减少或消除多个主体同时修改同一数据的情况。 ThreadLocal是一种避免一致性问题的编程手段,它通过为每个线程提供独立的变量副本,防止多个线程间的冲突,从而避免一致性问题的产生。
这些方法各有优劣,实际应用中通常需要根据具体场景选择合适的方案来处理数据一致性问题。
ThreadLocal
什么是 ThreadLocal
ThreadLocal 是 Java 中用于创建线程局部变量的类。通过 ThreadLocal,每个线程都可以拥有自己独立的变量副本,这些副本在多个线程之间是彼此隔离的。这样,每个线程对该变量的操作不会影响到其他线程,避免了线程间共享数据引发的一致性问题。
ThreadLocal 的 线程模型
- 进程:左边的黑色大圆圈代表一个进程。一个进程可以包含多个线程。
- 线程表:进程中包含一个线程表,每个线程(红色波浪线)都被分配一定的资源。
- 独占数据:每个线程都有自己独占的数据,这些数据是由进程分配的。在 Java 中,很多独占数据是通过
Thread
类分配的。 - ThreadLocalMap:每个线程内部都有一个
ThreadLocalMap
对象,这实际上是一个哈希表,用于存储线程的局部变量。ThreadLocal 的核心机制就是通过ThreadLocalMap
来实现的。
// Thread类里的变量:
ThreadLocal.ThreadLocalMap threadLocals = null;
// ThreadLocalMap的定义:
static class ThreadLocalMap {
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
// ...
}
ThreadLocal 的工作原理
- ThreadLocal 变量的存储:每个线程都有自己的
ThreadLocalMap
,ThreadLocal
变量作为键(key)被存储在该 Map 中,对应的值是该线程独享的变量副本。 - 独立副本:由于每个线程都有独立的
ThreadLocalMap
,因此不同线程对 ThreadLocal 变量的操作是互不影响的。即使多个线程访问同一个 ThreadLocal 变量,它们获取到的也是各自的副本。
使用场景
ThreadLocal 常用于需要线程独立的场景,比如用户会话、数据库连接、事务管理等场景。在这些场景中,不同线程之间的数据互不干扰,可以避免不必要的同步操作,从而提高程序的性能。
ThreadLocal 的使用相对简单,但需要谨慎处理,以防止内存泄漏等问题,尤其是在使用完 ThreadLocal 变量后,建议显式地调用 remove()
方法清理数据。
ThreadLocal 的基本 API
1. 构造函数 ThreadLocal()
- 用途: 创建一个新的 ThreadLocal 对象。它是泛型的,传入的类型是你想在每个线程中独立保存的变量类型。
2. 初始化方法 initialValue()
- 用途: 当调用
get()
方法且没有调用过set()
方法时,initialValue()
会被调用来提供一个初始值。默认情况下,initialValue()
返回null
,但可以通过重写这个方法来指定默认值。
3. 访问器 get()
和 set()
get()
方法: 获取当前线程的局部变量值。如果该线程没有为这个 ThreadLocal 设置值,会调用initialValue()
获取默认值。set(T value)
方法: 设置当前线程的局部变量值。调用set()
后,再次调用get()
会返回通过set()
设置的值,而不会调用initialValue()
方法。
4. 回收方法 remove()
- 用途: 删除当前线程对应的 ThreadLocal 值,清除后再调用
get()
时,将重新调用initialValue()
获取默认值。
代码示例与输出分析
public class ThreadLocalDemo {
public static final ThreadLocal<String> THREAD_LOCAL = ThreadLocal.withInitial(() -> {
System.out.println("invoke initial value");
return "default value";
});
public static void main(String[] args) throws InterruptedException {
new Thread(() ->{
THREAD_LOCAL.set("first thread");
System.out.println(THREAD_LOCAL.get());
}).start();
new Thread(() ->{
THREAD_LOCAL.set("second thread");
System.out.println(THREAD_LOCAL.get());
}).start();
new Thread(() ->{
THREAD_LOCAL.set("third thread");
THREAD_LOCAL.remove();
System.out.println(THREAD_LOCAL.get());
}).start();
new Thread(() ->{
System.out.println(THREAD_LOCAL.get());
}).start();
SECONDS.sleep(1L);
}
}
输出结果:
first thread
second thread
invoke initial value
default value
invoke initial value
default value
分析:
-
第一个线程: 调用了
THREAD_LOCAL.set("first thread")
,因此get()
返回 “first thread”。 -
第二个线程: 调用了
THREAD_LOCAL.set("second thread")
,因此get()
返回 “second thread”。 -
第三个线程: 调用了
THREAD_LOCAL.set("third thread")
设置了值,然后调用了remove()
清除值。之后再调用get()
,由于值已被移除,所以调用initialValue()
返回并打印了 “default value”。 -
第四个线程: 没有调用
set()
,直接调用了get()
,因此initialValue()
被调用并返回 “default value”。
每个线程都有自己的独立变量副本,互不干扰,因此输出了线程独立的结果。
源码解析
ThreadLocalMap 概述
ThreadLocalMap
是 ThreadLocal 的一个内部静态类,用于存储每个线程对应的局部变量。它是一个自定义的哈希表,只适用于维护线程局部变量。
关键特点
-
仅由 ThreadLocal 维护:
ThreadLocalMap
仅供ThreadLocal
使用,它不会对外公开任何操作接口。该类的访问权限是包私有的,因此只能在Thread
类中声明字段。也就是说,ThreadLocalMap
是一种专门为ThreadLocal
设计的数据结构。 -
弱引用 (WeakReferences):
ThreadLocalMap
中的键 (key) 是使用弱引用保存的。这意味着,如果一个ThreadLocal
对象只有弱引用指向它(即没有其他强引用),那么在下一次垃圾回收 (GC) 时,该对象将会被回收。当某个键被回收后,ThreadLocalMap
中对应的条目 (entry) 就会变成所谓的 “stale entries” (陈旧条目),这些条目将在哈希表空间不足时被清除。
Entry 定义
在 ThreadLocalMap
中,数据存储在 Entry
对象中。Entry
是一个继承自 WeakReference
的内部静态类。
关键特点
-
弱引用的键:
Entry
类的键是一个弱引用,引用的是ThreadLocal
对象。这意味着如果ThreadLocal
对象被垃圾回收,那么该Entry
的键就会被回收。此时,Entry
会被标记为陈旧条目,可以从表中清除。 -
值 (value):
每个Entry
除了持有一个弱引用的键外,还保存了一个value
对象,这个value
是线程局部变量的实际值。
弱引用 (WeakReferences)
- 定义: 弱引用是一种特殊的引用类型。与强引用不同,弱引用不会阻止垃圾回收。当一个对象只有弱引用指向它时,该对象就可以在垃圾回收时被回收。
- 在
ThreadLocalMap
中的作用:ThreadLocalMap
使用弱引用来保存ThreadLocal
对象的引用。这确保了即使ThreadLocal
对象不再被使用,垃圾回收器仍然可以回收它,而不会因为ThreadLocalMap
中的引用而导致内存泄漏。
代码解析
static class ThreadLocalMap {
// ThreadLocalMap 实际存储数据的地方是 Entry 类,Entry 的键使用弱引用 (WeakReferences)
static class Entry extends WeakReference<ThreadLocal<?>> {
/** 与此 ThreadLocal 关联的值 */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k); // 通过弱引用引用 ThreadLocal 键
value = v; // 线程局部变量的值
}
// 其他实现细节省略
}
}
- Entry 继承自
WeakReference<ThreadLocal<?>>
:Entry
的键是一个弱引用,引用的是ThreadLocal
对象。 - value 字段:
value
是与ThreadLocal
关联的实际值,这个值是每个线程独立持有的。
ThreadLocalMap
是一个专门用于存储线程局部变量的哈希表。它使用弱引用作为键,确保即使ThreadLocal
对象被垃圾回收后,哈希表中的陈旧条目也能够被清除。这种设计有助于减少内存泄漏风险,特别是在长生命周期的应用中。
Thread、ThreadLocal、ThreadLocalMap、Entry 的关系
如下图所示:
set() 方法
1. set(T value)
方法概述
set(T value)
方法用于将当前线程的局部变量设置为指定的值。其主要操作流程如下:
- 获取当前线程 (
Thread.currentThread()
). - 从当前线程获取
ThreadLocalMap
对象。如果该对象存在,则将value
存入该ThreadLocalMap
中。 - 如果
ThreadLocalMap
不存在,则创建一个新的ThreadLocalMap
对象,并将当前的ThreadLocal
对象和value
存入其中。
public void set(T value) {
Thread t = Thread.currentThread(); // 1. 拿到当前的线程
ThreadLocalMap map = getMap(t); // 2. 根据当前的线程拿到ThreadLocalMap
if (map != null) { // 3. 如果map不为空就set value
map.set(this, value);
} else {
createMap(t, value); // 4. 否则创建一个新的ThreadLocalMap,并且set value
}
}
2. getMap(Thread t)
方法解析
getMap(Thread t)
方法用于获取当前线程的 ThreadLocalMap
对象。每个线程都有一个私有的 ThreadLocalMap
,该对象用于存储该线程的所有 ThreadLocal
变量及其对应的值。
ThreadLocalMap getMap(Thread t) {
return t.threadLocals; // 返回当前线程的 threadLocals 参数
}
在 Thread
类中,threadLocals
是一个类型为 ThreadLocalMap
的字段,用于存储当前线程的局部变量。其默认值为 null
,只有在第一次调用 set()
方法时才会被初始化。
// java.lang.Thread
ThreadLocal.ThreadLocalMap threadLocals = null; // 初始化为 null
3. createMap(Thread t, T firstValue)
方法解析
createMap(Thread t, T firstValue)
方法用于在当前线程中创建一个新的 ThreadLocalMap
,并将当前的 ThreadLocal
对象作为键(key),将传入的 firstValue
作为值(value)存储在该 ThreadLocalMap
中。
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue); // 创建新的ThreadLocalMap,并赋值给当前线程
}
在 createMap()
方法中,ThreadLocalMap
的键 (key
) 是当前的 ThreadLocal
对象 (this
),值 (value
) 是传入的 firstValue
。
代码执行流程总结
- 获取当前线程:
Thread.currentThread()
获取当前正在执行的线程对象。 - 获取
ThreadLocalMap
: 通过getMap(t)
方法获取当前线程的ThreadLocalMap
,该对象存储了线程局部变量。 - 判断
ThreadLocalMap
是否存在:- 如果存在,则将当前
ThreadLocal
对象和指定的值 (value
) 存储在ThreadLocalMap
中。 - 如果不存在,则调用
createMap()
方法,创建新的ThreadLocalMap
并进行初始化,将当前ThreadLocal
对象和值存入其中。
- 如果存在,则将当前
小结
ThreadLocal
的 set(T value)
方法通过在当前线程中存储和管理局部变量,使得每个线程都可以拥有自己独立的变量副本。通过这种方式,可以避免线程间共享数据时的一致性问题,确保每个线程的数据独立性。
get()
方法
1. get()
方法概述
get()
方法用于获取当前线程的局部变量值。如果当前线程的 ThreadLocalMap
中存在与当前 ThreadLocal
对象相关联的值,则返回该值;如果不存在,则调用 initialValue()
方法初始化一个值,并将其存入 ThreadLocalMap
中。
public T get() {
Thread t = Thread.currentThread(); // 1. 获取当前线程
ThreadLocalMap map = getMap(t); // 2. 获取当前线程的 ThreadLocalMap 对象
if (map != null) { // 3. 如果当前线程的 ThreadLocalMap 对象不为空
ThreadLocalMap.Entry e = map.getEntry(this); // 4. 从当前的 ThreadLocalMap 对象中获取 key 为当前 ThreadLocal 对象的 Entry
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value; // 5. 获取 Entry 的值
return result;
}
}
return setInitialValue(); // 6. 如果不存在 Entry,则返回初始值
}
2. setInitialValue()
方法解析
当 get()
方法在当前线程的 ThreadLocalMap
中找不到对应的 Entry
时,会调用 setInitialValue()
方法进行初始化。setInitialValue()
方法通过 initialValue()
方法获取初始值,将其存入当前线程的 ThreadLocalMap
中,然后返回该初始值。
private T setInitialValue() {
T value = initialValue(); // 1. 调用 initialValue() 获取初始值
Thread t = Thread.currentThread(); // 2. 获取当前线程
ThreadLocalMap map = getMap(t); // 3. 获取当前线程的 ThreadLocalMap 对象
if (map != null)
map.set(this, value); // 4. 如果 map 不为空,将初始值存入 map
else
createMap(t, value); // 5. 如果 map 为空,创建新的 ThreadLocalMap,并存入初始值
return value; // 6. 返回初始值
}
代码执行流程解析
-
获取当前线程:
Thread.currentThread()
获取当前正在执行的线程对象。 -
获取
ThreadLocalMap
:
通过getMap(t)
方法获取当前线程的ThreadLocalMap
对象,该对象存储了当前线程所有的ThreadLocal
变量及其对应的值。 -
检查
ThreadLocalMap
是否为空:- 如果不为空,使用当前
ThreadLocal
对象作为键,从ThreadLocalMap
中获取对应的Entry
。 - 如果
Entry
存在,则返回Entry
的值。 - 如果
Entry
不存在,或ThreadLocalMap
为空,则调用setInitialValue()
方法获取初始值并存入ThreadLocalMap
。
- 如果不为空,使用当前
-
调用
setInitialValue()
初始化值:- 调用
initialValue()
方法获取一个初始值。 - 将该初始值存入当前线程的
ThreadLocalMap
中,以当前ThreadLocal
对象为键,以初始值为值。 - 返回初始值。
- 调用
小结
get()
方法的作用是从当前线程的局部变量中获取值,如果该值不存在,则通过调用 setInitialValue()
方法来初始化一个值并存入当前线程的 ThreadLocalMap
中。这种机制确保了每个线程都有独立的变量副本,并且变量的初始值只在需要时才会被创建。
remove()
方法
1. remove()
方法概述
remove()
方法用于显式地移除当前线程中与某个 ThreadLocal
对象相关联的值。虽然 ThreadLocalMap
中的 Entry
是使用弱引用来引用 ThreadLocal
对象,但在某些情况下,比如线程池的线程复用场景,显式调用 remove()
方法可以避免数据混淆和内存泄露问题。
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread()); // 获取当前线程的 ThreadLocalMap 对象
if (m != null) {
m.remove(this); // 从 ThreadLocalMap 中移除与当前 ThreadLocal 对象相关联的 Entry
}
}
2. ThreadLocalMap
中的 remove(ThreadLocal<?> key)
方法
remove(ThreadLocal<?> key)
方法用于从 ThreadLocalMap
中删除与指定 ThreadLocal
对象相关联的 Entry
。其主要步骤如下:
-
定位
Entry
:
通过ThreadLocal
的threadLocalHashCode
计算出索引位置,并尝试找到对应的Entry
。 -
清除
Entry
:
如果找到了匹配的Entry
,则调用Entry
的clear()
方法将其清除。clear()
方法会将Entry
的key
(即ThreadLocal
对象)设置为null
,从而让Entry
成为一个“过期的条目”(stale entry),等待后续的清理。 -
清理过期条目:
调用expungeStaleEntry(int staleSlot)
方法清理该过期条目,并对后续位置的条目进行再散列处理,以维持ThreadLocalMap
的正确性。
private void remove(ThreadLocal<?> key) {
Entry[] tab = table; // 获取 ThreadLocalMap 的 Entry 数组
int len = tab.length;
int i = key.threadLocalHashCode & (len-1); // 通过 hash 计算索引位置
for (Entry e = tab[i]; // 遍历链地址法冲突处理的链表
e != null;
e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) { // 找到与当前 ThreadLocal 对象匹配的 Entry
e.clear(); // 清除 Entry
expungeStaleEntry(i); // 清理过期条目并再散列处理
return;
}
}
}
3. 为什么需要 remove()
方法
虽然 ThreadLocalMap
使用了弱引用,可以在 ThreadLocal
对象被回收时自动清除相关的 Entry
,但在以下情况下,显式调用 remove()
方法非常重要:
-
线程复用:
在线程池中,线程会被重复使用。如果在前一个任务执行完毕后不及时清理ThreadLocal
中的值,后续任务可能会读取到前一个任务留下的数据,导致逻辑错误。 -
避免内存泄露:
如果ThreadLocal
的键被垃圾回收了,但其对应的值(即Entry
中的value
)仍然存在且无法被回收,这就会导致内存泄露。显式调用remove()
可以及时清理这些值,避免内存泄露问题。
代码执行流程总结
-
获取当前线程的
ThreadLocalMap
对象:
通过getMap(Thread t)
方法获取当前线程的ThreadLocalMap
对象。 -
移除相关联的
Entry
:
在ThreadLocalMap
中定位与当前ThreadLocal
对象相关联的Entry
,并将其清除。 -
清理过期条目:
清理过期条目并对后续位置的条目进行再散列处理,确保ThreadLocalMap
的数据结构正确性。
小结
remove()
方法在多线程编程尤其是使用线程池时显得尤为重要。它不仅能防止数据混淆,避免任务之间的相互干扰,还能有效防止内存泄露问题。因此,在使用 ThreadLocal
时,开发者应养成在任务结束后显式调用 remove()
方法的习惯。
使用ThreadLocal的注意事项
ThreadLocal 内存泄露
ThreadLocal 为什么会内存泄露
示例代码如下:
public class ThreadLocalOOMDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
ThreadLocal<RedSpider> threadLocal = new ThreadLocal<>();
RedSpider redSpider = new RedSpider();
threadLocal.set(redSpider);
//threadLocal=null //将threadLocal 引用赋值为空
});
}
}
上述代码逻辑图如下:
我们在前面介绍ThreadLocalMap时已知:ThreadLocalMap中,Entry的key为 WeakReference,当我们给 threadLocal=null 时,逻辑图中 强引用 ② 会消失,这样 ThreadLocal对象实例只有一个 Entry中的 key 的一个 弱引用③。因为弱引用的性质,在下一次 GC 时就会回收 ThreadLocal对象实例。这时逻辑图中的引用只剩下 强引用① 和 强引用④ 。如果当前线程迟迟不断掉的话就会一直存在一条强引用链:thread(ref)->Thread->ThreadLocalMap->Entry->redSpider(ref)
。所以ThreadLocal 内存泄露的原因也就找到了:
- 堆中有一个强引用指向 RedSpider对象实例,该实例没法被 GC。
- 因为 Entry 的 key 为null ,所以没有任何途径能够接触到redSpider(ref),因此也不能访问到 RedSpider对象实例。
ThreadLocal 解决方法
由上图 源码解析中: remove()
方法 中调用了一个expungeStaleEntry()
方法,这个方法是解决问题的关键。
// staleSlot index of slot known to have null key;
// java.lang.ThreadLocal.ThreadLocalMap#expungeStaleEntry
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;
// expunge entry at staleSlot
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;
// ...(省略)
return i;
}
入参:staleSlot
index of slot known to have null key;
该方法的逻辑:
- 将entry里值的强引用(强引用④)置为null(这样值对象就对被GC回收)。
- 将entry对应引用(弱引用③)置为null(这样Entry就能被GC回收)。
这样逻辑图中只剩下强引用①
和强引用③
,这样RedSpider对象实例
和对应的 Entry
实例就可以被回收掉了。
因此,只要调用了expungeStaleEntry()
就能将无用 Entry 回收清除掉。
ThreadLocalMap 中 get()
方法 ,set()
方法,间接的调用了该方法。remove()
方法直接调用了该方法,以下为expungeStaleEntry() 方法的调用链。
综上所述:针对ThreadLocal 内存泄露的原因,我们可以从两方面去考虑:
- 删除无用 Entry 对象,断掉指向ThreadLocal实例的弱引用。即 用完ThreadLocal后手动调用remove()方法。
- 可以让ThreadLocal 的强引用一直存在,保证任何时候都可以通过 ThreadLocal 的弱引用访问到 Entry的 value值。即 将ThreadLocal 变量定义为 private static
ThreadLocal 父子线程传值
InheritableThreadLocal
public class InheritableThreadLocalDemo {
public static void main(String[] args) throws InterruptedException{
Thread a = new Thread(() -> {
InheritableThreadLocal<String> itl = new InheritableThreadLocal<>(); ①
itl.set("InheritableThreadLocal");
Thread b = new Thread(() -> {
String str = itl.get();
//拿到父线程放进去的“InheritableThreadLocal”,因为tl是InheritableThreadLocal
System.out.println(str);
});
b.start();
//确保子线程b执行完毕
Thread.sleep(10);
itl.remove();
});
a.start();
}
}
//执行结果
InheritableThreadLocal
//如果 ① 处定义的是 ThreadLocal 执行结果则为
null
可以看到使用了 InheritableThreadLocal 后,子线程b 获取到了父线程a set 的值。
原理:
首先我们看ThreadLocal 类的 set(T value)
方法
//java.lang.ThreadLocal#set
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value); //如果map为空,调用createMap 方法
}
//java.lang.ThreadLocal#createMap
void createMap(Thread t, T firstValue) {
//ThreadLocal 类给当前线程的threadLocals变量赋值
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
//java.lang.InheritableThreadLocal#createMap
void createMap(Thread t, T firstValue) {
//InheritableThreadLocal的 inheritableThreadLocals 变量赋值
t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
}
//与此线程有关的 ThreadLocal 值,由ThreadLocal 类维护
//java.lang.Thread
ThreadLocal.ThreadLocalMap threadLocals = null;
//与此线程有关的 InheritableThreadLocal 值,由InheritableThreadLocal 类维护
//java.lang.Thread
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
可以看到 当我们调用 set()
方法时,如果声明的是 InheritableThreadLocal 类时,会给当前线程的 inheritableThreadLocals 变量赋值。
在Thread 类中只有两处使用到 inheritableThreadLocals
变量。分别是 init()
方法和 exit()
方法。exit()
方法是给当前线程的一写变量赋值为null,这里不做过多阐述。init()
方法是什么?通过查看调用该方法的地方可以看到,Thread 类的所有构造函数都调用了init() 方法。即当我们新建一个线程时,就会调用init()
方法,并给线程的inheritableThreadLocals
变量赋值。相关代码如下:
//java.lang.Thread#init
private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals) {
//省略 ...
Thread parent = currentThread(); //这里的parent指的是调用init()方法的线程,即所谓的父线程
//省略 ...
//如果inheritThreadLocals 为ture 并且当前线程的inheritableThreadLocals变量不为空
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); //这里传入的是当前线程的 inheritableThreadLocals 变量
//省略 ...
}
//java.lang.ThreadLocal#createInheritedMap
static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
return new ThreadLocalMap(parentMap);
}
//java.lang.ThreadLocalMap Constructor
private ThreadLocalMap(ThreadLocalMap parentMap) {
//父线程 Entry
Entry[] parentTable = parentMap.table;
int len = parentTable.length;
setThreshold(len);
table = new Entry[len];
//每个Entry都赋值到子线程的Entry
for (int j = 0; j < len; j++) {
Entry e = parentTable[j];
if (e != null) {
@SuppressWarnings("unchecked")
ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
if (key != null) {
//关键的一行 e.value是父线程Entry中的值,childValue()是一个可重载的方法,
Object value = key.childValue(e.value);
Entry c = new Entry(key, value);
int h = key.threadLocalHashCode & (len - 1);
while (table[h] != null)
h = nextIndex(h, len);
table[h] = c;
size++;
}
}
}
}
//java.lang.ThreadLocal#childValue
T childValue(T parentValue) {
throw new UnsupportedOperationException();
}
//java.lang.InheritableThreadLocal#childValue 对于 InheritableThreadLocal 来说,返回了传入的值。
protected T childValue(T parentValue) {
return parentValue;
}
当我们再调用 get()
方法获取时
//java.lang.ThreadLocal#get
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t); //InheritableThreadLocal 对象调用getMap()方法时,会返回当前线程的inheritableThreadLocals 变量
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
//java.lang.InheritableThreadLocal#getMap
ThreadLocalMap getMap(Thread t) {
return t.inheritableThreadLocals;
}
综上可得:当我们使用 InheritableThreadLocal 时,当前线程的数据会存储在 inheritableThreadLocals变量赋值,将数据存储在inheritableThreadLocals 指向的 ThreadLocalMap 中。当新建一个子线程时,会将当前线程的 inheritableThreadLocals里的 ThreadLocalMap 赋值给 子线程的 inheritableThreadLocals 变量。然后子线程 通过调用InheritableThreadLocal 对象的 get()
方法可以得到相应的值。
InheritableThreadLocal 无法向线程池中的子线程传递数据。
平常我们开发时很少新建线程来并发编程,一般都是使用线程池。但是 InheritableThreadLocal 无法向线程池中的子线程传递数据
示例代码:
public class InheritableThreadLocalDemo2 {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(1);
InheritableThreadLocal<String> itl = new InheritableThreadLocal<>();
itl.set("first");
executorService.execute(() -> {
String firstValue = itl.get();
System.out.println(firstValue);
});
//确保线程池任务执行完
Thread.sleep(10);
itl.remove();
itl.set("second");
executorService.execute(() -> {
String secondValue = itl.get();
System.out.println(secondValue);
});
//确保线程池任务执行完
Thread.sleep(10);
itl.remove();
}
}
执行结果:
first
first
我们预期得到的结果是 first second,然而现在输出的却是 first first。说明我们 InheritableThreadLocal 对象第二次调用 set()方法失效。 在上面原理中讲过,Thread 对象的 inheritableThreadLocals 变量只有在新建线程时会从父线程的inheritableThreadLocals 变量中拷贝过来。所以后续阶段,没有能使 子线程的inheritableThreadLocals变量变化的地方,所以第二次调用 get()
方法输出的还是 first。在线程池中,由于线程复用所以我们也不能通过 InheritableThreadLocal 类来传递值。关于线程池线程复用,详见 12.2.4 ThreadPoolExecutor如何做到线程复用的? alibaba 提供了
transmittable-thread-local 框架来解决了 在使用线程池等会池化复用线程的执行组件情况下传递ThreadLocal值问题。
transmittable-thread-local
对于 transmittable-thread-local 不做过多阐述,只介绍核心思想。
由上文可知:InheritableThreadLocal 是在创建线程时将父类线程赋值到Thread的 inheritableThreadLocals 变量中,当子线程调用对应的get方法时读取inheritableThreadLocals 变量,这样就完成了父子线程之间变量的传递,但是由于我们使用线程池提交任务的时候并非总是新建一个线程来执行任务,这导致InheritableThreadLocal 的工作原理不适用于 线程池。但我们思考一下,InheritableThreadLocal 是父线程创建子线程的时候将值传递,每创建一下新线程都会在 init()方法中进行操作,而我们使用线程池的时候,有那些操作是这种“新建” 的呢? 没错,线程池每次提交的任务 Runnable对象 就是一个新的,不会重复的。所以我们新建一个类实现Runnable接口,然后将父线程得知封装在我们新建的类中。
public class TransmittableThreadLocalDemo {
static ExecutorService executorService = Executors.newFixedThreadPool(1);
static ThreadLocal<String> context = new TransmittableThreadLocal<>();
public static void main(String[] args) throws InterruptedException {
// 在父线程中设置
context.set("first");
//创建runnable 对象
Runnable firstRunnable = () -> {
String firstValue = context.get();
System.out.println(firstValue);
};
//封装为TtlRunnable对象
TtlRunnable firstTtlRunnable = TtlRunnable.get(firstRunnable);
//提交任务
executorService.execute(firstTtlRunnable);
//确保线程池任务执行完
Thread.sleep(10);
context.set("second");
//创建runnable 对象
Runnable secondRunnable = () -> {
String secondValue = context.get();
System.out.println(secondValue);
};
//封装为TtlRunnable对象
TtlRunnable secondTtlRunnable = TtlRunnable.get(secondRunnable);
//提交任务
executorService.execute(secondTtlRunnable);
Thread.sleep(10);
context.remove();
}
}
输出结果:
first
second
由此可见,使用TransmittableThreadLocal,将 Runnable 封装成 TtlRunnable 对象完成了线程池中父子线程中值的传递。
首先我们先看看TransmittableThreadLocal 类:
public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> implements TtlCopier<T> {}
可以看到 TransmittableThreadLocal类 继承了 InheritableThreadLocal。
然后我们来看看相关方法:
set()方法
//com.alibaba.ttl.TransmittableThreadLocal#set
public final void set(T value) {
if (!disableIgnoreNullValueSemantics && null == value) {
// may set null to remove value
remove();
} else {
//赋值,将value赋值给Thread的 inheritableThreadLocals 变量
super.set(value);
//将当前对象塞到 TransmittableThreadLocal类 的holder 变量中
addThisToHolder();
}
}
//java.lang.ThradLocal#set
public void set(T value) {
Thread t = Thread.currentThread();
//调用的是InheritableThreadLocal的getMap()方法
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
//调用的是InheritableThreadLocal的createMap()方法
createMap(t, value);
}
//java.lang.InheritableThreadLocal#getMap
ThreadLocalMap getMap(Thread t) {
return t.inheritableThreadLocals;
}
java.lang.InheritableThreadLocal#createMap
void createMap(Thread t, T firstValue) {
t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
}
//com.alibaba.ttl.TransmittableThreadLocal#addThisToHolder
private void addThisToHolder() {
if (!holder.get().containsKey(this)) {
holder.get().put((TransmittableThreadLocal<Object>) this, null); // WeakHashMap supports null value.
}
}
由上可知,调用TransmittableThreadLocal 的set(T value)
方法会将value 赋值给Thread(调用线程) 的inheritableThreadLocals 变量。然后将当前 TransmittableThreadLocal 对象存了起来。
// Note about the holder:
// 1. holder self is a InheritableThreadLocal(a *ThreadLocal*).
// 2. The type of value in the holder is WeakHashMap<TransmittableThreadLocal<Object>, ?>.
// 2.1 but the WeakHashMap is used as a *Set*:
// the value of WeakHashMap is *always* null, and never used.
// 2.2 WeakHashMap support *null* value.
private static final InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder =
new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {
@Override
protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() {
return new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
}
@Override
protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) {
return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue);
}
};
ThreadLocal 在一些开源框架中的应用
Quartz
Quartz是一个非常知名的开源任务调度系统。以 2.3.2
版本为例
我们要看的源码是Quartz的SimpleSemaphore这个类。它是一个信号量的实现,在生产者-消费者模型里,信号量代表的就是队列里有多少item需要处理。
在信号量的模型里面有一个“等待”操作。当消费者消费完后,会轮询等待。SimpleSemaphore有一个获取锁的方法**obtainLock()**
,我们要看的也是这个方法的内部代码:
public class SimpleSemaphore implements Semaphore {
ThreadLocal<HashSet<String>> lockOwners = new ThreadLocal<HashSet<String>>();
HashSet<String> locks = new HashSet<String>();
private HashSet<String> getThreadLocks() {
HashSet<String> threadLocks = lockOwners.get();
if (threadLocks == null) {
threadLocks = new HashSet<String>();
lockOwners.set(threadLocks);
}
return threadLocks;
}
/**
* Grants a lock on the identified resource to the calling thread (blocking
* until it is available).
*
* @return true if the lock was obtained.
*/
public synchronized boolean obtainLock(Connection conn, String lockName) {
lockName = lockName.intern();
if(log.isDebugEnabled()) {
log.debug(
"Lock '" + lockName + "' is desired by: "
+ Thread.currentThread().getName());
}
if (!isLockOwner(lockName)) {
if(log.isDebugEnabled()) {
log.debug(
"Lock '" + lockName + "' is being obtained: "
+ Thread.currentThread().getName());
}
while (locks.contains(lockName)) {
try {
this.wait();
} catch (InterruptedException ie) {
if(log.isDebugEnabled()) {
log.debug(
"Lock '" + lockName + "' was not obtained by: "
+ Thread.currentThread().getName());
}
}
}
if(log.isDebugEnabled()) {
log.debug(
"Lock '" + lockName + "' given to: "
+ Thread.currentThread().getName());
}
getThreadLocks().add(lockName);
locks.add(lockName);
} else if(log.isDebugEnabled()) {
log.debug(
"Lock '" + lockName + "' already owned by: "
+ Thread.currentThread().getName()
+ " -- but not owner!",
new Exception("stack-trace of wrongful returner"));
}
return true;
}
}
36行的while循环就是去进行轮询操作,while里面的locks是一个HashSet,为true代表这个lockName对应的锁正在被别的线程持有,所以当前线程需要等待。
我们看到,在while循环的外层30行,有一个判断,其实是用到了ThreadLocal。这个外层的判断起什么作用呢?其实是判断当前线程是否已经持有了这个锁。如果持有了,那就直接跳到最后return true了。因为同一个线程,可能有多个程序片段会调用这个获取锁的方法。
可以看到,使用ThreadLocal可以非常高效地判断当前线程的状态,可以快速检测出当前线程是否已经获取了锁,避免了后续锁的检测和争用。
MyBatis
Mybatis不用多说,搞Java的应该都听过或者用过。我们今天要介绍的是它的SqlSessionManager。Mybatis是一个持久化框架。持久化框架,必然会面临事务的问题。我们的数据库(比如MySQL)可以保证本地事务,但也要求必须在同一个连接才行。
应用程序使用MyBatis,可能会在多个程序片段去访问数据库,做一些增删改查的操作。它们可能需要在同一个事务里面。举个例子,我们修改完订单状态后,可能还需要修改积分,它们应该在同一个事务里。Mybatis使用SqlSessionManager保证了我们同一个线程取出来的连接总是同一个。它是如何做到的呢?其实很简单,就是内部使用了一个ThreadLocal。然后所有的创建连接、取连接都是通过这个ThreadLocal变量的get/set方法进行操作。
public class SqlSessionManager implements SqlSessionFactory, SqlSession {
private final ThreadLocal<SqlSession> localSqlSession = new ThreadLocal();
// 创建连接
public void startManagedSession() {
this.localSqlSession.set(openSession());
}
// 取连接
@Override
public Connection getConnection() {
final SqlSession sqlSession = localSqlSession.get();
if (sqlSession == null) {
throw new SqlSessionException("Error: Cannot get connection. No managed session is started.");
}
return sqlSession.getConnection();
}
}
ThreadLocal 扩展
Netty
中的FastThreadLocal
和 Dubbo
中的InternalThreadLocal
都对JDK的 ThreadLocal
进行了增强,不做过多阐述。