一、ThreadLocal
多线程是Java实现多任务的基础,Thread
对象代表一个线程,我们可以在代码中调用Thread.currentThread()
获取当前线程。例如,打印日志时,可以同时打印出当前线程的名字,
对于多任务,Java标准库提供的线程池可以方便地执行这些任务,同时复用线程。Web应用程序就是典型的多任务应用,每个用户请求页面时,我们都会创建一个任务,类似:
public void process(User user) {
checkPermission();
doWork();
saveStatus();
sendResponse();
}
然后,通过线程池去执行这些任务。
观察process()
方法,它内部需要调用若干其他方法,同时,我们遇到一个问题:如何在一个线程内传递状态?
process()
方法需要传递的状态就是User
实例。有的童鞋会想,简单地传入User
就可以了:
public void process(User user) {
checkPermission(user);
doWork(user);
saveStatus(user);
sendResponse(user);
}
但是往往一个方法又会调用其他很多方法,这样会导致User
传递到所有地方:
void doWork(User user) {
queryStatus(user);
checkStatus();
setNewStatus(user);
log();
}
这种在一个线程中,横跨若干方法调用,需要传递的对象,我们通常称之为上下文(Context),它是一种状态,可以是用户身份、任务信息等。
给每个方法增加一个context参数非常麻烦,而且有些时候,如果调用链有无法修改源码的第三方库,User
对象就传不进去了。
Java标准库提供了一个特殊的ThreadLocal
,它可以在一个线程中传递同一个对象。
ThreadLocal
实例通常总是以静态字段初始化如下:
static ThreadLocal<User> threadLocalUser = new ThreadLocal<>();
它的典型使用方式如下:
void processUser(user) {
try {
threadLocalUser.set(user);
step1();
step2();
} finally {
threadLocalUser.remove();
}
}
通过设置一个User
实例关联到ThreadLocal
中,在移除之前,所有方法都可以随时获取到该User
实例:
void step1() {
User u = threadLocalUser.get();
log();
printUser();
}
void log() {
User u = threadLocalUser.get();
println(u.name);
}
void step2() {
User u = threadLocalUser.get();
checkUser(u.id);
}
注意到普通的方法调用一定是同一个线程执行的,所以,step1()
、step2()
以及log()
方法内,threadLocalUser.get()
获取的User
对象是同一个实例。
实际上,可以把ThreadLocal
看成一个全局Map<Thread, Object>
:每个线程获取ThreadLocal
变量时,总是使用Thread
自身作为key:
Object threadLocalValue = threadLocalMap.get(Thread.currentThread());
因此,ThreadLocal
相当于给每个线程都开辟了一个独立的存储空间,各个线程的ThreadLocal
关联的实例互不干扰。
最后,特别注意ThreadLocal
一定要在finally
中清除:
try {
threadLocalUser.set(user);
...
} finally {
threadLocalUser.remove();
}
这是因为当前线程执行完相关代码后,很可能会被重新放入线程池中,如果ThreadLocal
没有被清除,该线程执行其他代码时,会把上一次的状态带进去。
为了保证能释放ThreadLocal
关联的实例,我们可以通过AutoCloseable
接口配合try (resource) {...}
结构,让编译器自动为我们关闭。例如,一个保存了当前用户名的ThreadLocal
可以封装为一个UserContext
对象:
public class UserContext implements AutoCloseable {
static final ThreadLocal<String> ctx = new ThreadLocal<>();
public UserContext(String user) {
ctx.set(user);
}
public static String currentUser() {
return ctx.get();
}
@Override
public void close() {
ctx.remove();
}
}
使用的时候,我们借助try (resource) {...}
结构,可以这么写:
try (var ctx = new UserContext("Bob")) {
// 可任意调用UserContext.currentUser():
String currentUser = UserContext.currentUser();
} // 在此自动调用UserContext.close()方法释放ThreadLocal关联对象
这样就在UserContext
中完全封装了ThreadLocal
,外部代码在try (resource) {...}
内部可以随时调用UserContext.currentUser()
获取当前线程绑定的用户名。
ThreadLocal只是线程传递,但是当在本线程中创建了一个新的线程,比如说又new了一个Thread的情况下,就不能把ThreadLocal中的数据传递给子线程。此时解决办法是通过InheritThreadLocal来解决
二、InheritThreadLocal
同一个ThreadLocal变量在父线程中被设置值后,在子线程中是获取不到的。而子类InheritableThreadLocal提供了一个特性,就是让子线程可以访问在父线程中设置的本地变量
当提交一个新任务到线程池时,线程池的处理流程如下:
- 线程池判断线程数是否达到核心线程数且线程都处于工作状态。如果不是,则创建一个新的工作线程来执行任务。否则进入下个流程
- 线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。否则进入下个流程
- 线程池判断线程数是否达到最大线程数且线程都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。否则按照策略处理无法执行的任务
如果使用InheritableThreadLocal+线程池,提交任务时导致线程池创建了新的工作线程,此时工作线程(子线程)能够访问到父线程(提交任务的线程)的本地变量;如果提交任务复用了已经创建的工作线程,此时工作线程(子线程)访问的本地变量来源于第一个提交任务给该工作线程的外部线程,造成线程本地变量混乱
public class InheritableThreadLocalDemo {
/**
* 模拟tomcat线程池
*/
private static ExecutorService tomcatExecutors = Executors.newFixedThreadPool(10);
/**
* 业务线程池,默认Control中异步任务执行线程池
*/
private static ExecutorService businessExecutors = Executors.newFixedThreadPool(5);
/**
* 线程上下文环境,模拟在Control这一层,设置环境变量,然后在这里提交一个异步任务,模拟在子线程中,是否可以访问到刚设置的环境变量值
*/
private static InheritableThreadLocal<Integer> requestIdThreadLocal = new InheritableThreadLocal<>();
/**
* 模式10个请求,每个请求执行ControlThread的逻辑,其具体实现就是,先输出父线程的名称,
* 然后设置本地环境变量,并将父线程名称传入到子线程中,在子线程中尝试获取在父线程中的设置的环境变量
*
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; ++i) {
tomcatExecutors.submit(new ControlThread(i));
}
TimeUnit.SECONDS.sleep(10);
businessExecutors.shutdown();
tomcatExecutors.shutdown();
}
/**
* 模拟Control任务
*/
static class ControlThread implements Runnable {
private int i;
public ControlThread(int i) {
this.i = i;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ":" + i);
requestIdThreadLocal.set(i);
//使用线程池异步处理任务
businessExecutors.submit(new BusinessTask(Thread.currentThread().getName()));
}
}
/**
* 业务任务,主要是模拟在Control控制层,提交任务到线程池执行
*/
static class BusinessTask implements Runnable {
private String parentThreadName;
public BusinessTask(String parentThreadName) {
this.parentThreadName = parentThreadName;
}
@Override
public void run() {
//如果与上面的能对应上来,则说明正确,否则失败
System.out.println("parentThreadName:" + parentThreadName + ":" + requestIdThreadLocal.get());
}
}
}
其中一次执行结果如下:
pool-1-thread-1:0
pool-1-thread-4:3
pool-1-thread-3:2
pool-1-thread-2:1
pool-1-thread-6:5
pool-1-thread-5:4
pool-1-thread-7:6
pool-1-thread-8:7
pool-1-thread-9:8
pool-1-thread-10:9
parentThreadName:pool-1-thread-1:0
parentThreadName:pool-1-thread-4:3
parentThreadName:pool-1-thread-8:3
parentThreadName:pool-1-thread-7:6
parentThreadName:pool-1-thread-10:0
parentThreadName:pool-1-thread-3:3
parentThreadName:pool-1-thread-5:4
parentThreadName:pool-1-thread-2:1
parentThreadName:pool-1-thread-9:6
parentThreadName:pool-1-thread-6:0
在子线程中出现出现了线程本地变量混乱的现象
InheritThreadLocal解决了创建新的子线程的传递问题。但是如果我们使用的并不是通过new Thread的办法异步创建线程,而是通过线程池来进行异步来解决。如果线程池新建线程的话,使用InheritThreadLocal可以保证数据的传递。但是线程池中的线程是重复使用的,当重复使用线程的时候,重复使用的线程中的InheritThreadLocal仍然是上次创建的数据。此时解决办法可以参考阿里的TransmittableThreadLocal
三、TransmittableThreadLocal
依赖
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.12.1</version>
</dependency>
使用
public class TransmittableThreadLocalDemo {
/**
* 模拟tomcat线程池
*/
private static ExecutorService tomcatExecutors = Executors.newFixedThreadPool(10);
/**
* 业务线程池,默认Control中异步任务执行线程池 使用ttl线程池
*/
private static ExecutorService businessExecutors = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(5));
/**
* 线程上下文环境,模拟在Control这一层,设置环境变量,然后在这里提交一个异步任务,模拟在子线程中,是否可以访问到刚设置的环境变量值
*/
private static TransmittableThreadLocal<Integer> requestIdThreadLocal = new TransmittableThreadLocal<>();
/**
* 模式10个请求,每个请求执行ControlThread的逻辑,其具体实现就是,先输出父线程的名称,
* 然后设置本地环境变量,并将父线程名称传入到子线程中,在子线程中尝试获取在父线程中的设置的环境变量
*
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; ++i) {
tomcatExecutors.submit(new ControlThread(i));
}
TimeUnit.SECONDS.sleep(10);
businessExecutors.shutdown();
tomcatExecutors.shutdown();
}
/**
* 模拟Control任务
*/
static class ControlThread implements Runnable {
private int i;
public ControlThread(int i) {
this.i = i;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ":" + i);
requestIdThreadLocal.set(i);
//使用线程池异步处理任务
businessExecutors.submit(new BusinessTask(Thread.currentThread().getName()));
}
}
/**
* 业务任务,主要是模拟在Control控制层,提交任务到线程池执行
*/
static class BusinessTask implements Runnable {
private String parentThreadName;
public BusinessTask(String parentThreadName) {
this.parentThreadName = parentThreadName;
}
@Override
public void run() {
//如果与上面的能对应上来,则说明正确,否则失败
System.out.println("parentThreadName:" + parentThreadName + ":" + requestIdThreadLocal.get());
}
}
}
执行结果符合预期结果
阿里的解决方式仍然是使用的InheritThreadLocal,不同的是,阿里通过Javaagent修改了线程池的字节码,在线程池创建Runnable或者Callable的时候进行了包装,我们就叫RunnableWrapper。把需要传递的数据在new RunnableWrapper的时候就传递到了RunnableWrapper的成员变量中。在RunnableWrapper执行run方法的时候,先将成员变量的数据重新放一遍ThreadLocal,然后再真正执行被包装的Runnable的run方法。这样在真正的run方法中就可以拿到ThreadLocal的数据
实际项目使用TransmittableThreadLocal时,需要对线程池进行封装才可以,比如常用的线程池创建方式:
1、ExecutorService ExecutorService service = Executors.newFixedThreadPool(5);
使用TransmittableThreadLocal,需要对创建的ExecutorService封装。
ExecutorService businessExecutors = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(5));
2、定义一个TtlThreadPoolTaskExecutor,继承ThreadPoolTaskExecutor,重写submit/execute方法,在初始化线程池时,使用TtlThreadPoolTaskExecutor即可
public class TtlThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
@Override
public void execute(Runnable command) {
Runnable ttlRunnable = TtlRunnable.get(command);
super.execute(ttlRunnable);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
Callable ttCallable = TtlCallable.get(task);
return super.submit(ttCallable);
}
@Override
public Future<?> submit(Runnable task) {
Runnable ttlRunnable = TtlRunnable.get(task);
return super.submit(ttlRunnable);
}
@Override
public ListenableFuture<?> submitListenable(Runnable task){
Runnable ttlRunnable = TtlRunnable.get(task);
return super.submitListenable(ttlRunnable);
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
Callable ttlCallable = TtlCallable.get(task);
return super.submitListenable(ttlCallable);
}
}
要使用的地方
ThreadPoolTaskExecutor threadPoolTaskExecutor = new TtlThreadPoolTaskExecutor();
spring容器中可以定义为bean全局使用
@Configuration
public class ThreadPoolConfig {
@Value("${config.thread.pool.core_pool_size:20}")
private Integer corePoolSize;
@Value("${config.thread.pool.max_pool_size:100}")
private Integer maxPoolSize;
@Value("${config.thread.pool.keep_alive_seconds:2}")
private Integer keepAliveSeconds;
@Value("${config.thread.pool.queue_capacity:200}")
private Integer queueCapacity;
@Value("${config.thread.pool.allow_core_thread_timeout:true}")
private String allowCoreThreadTimeOut;
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new TtlThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);
threadPoolTaskExecutor.setKeepAliveSeconds(keepAliveSeconds);
threadPoolTaskExecutor.setQueueCapacity(queueCapacity);
threadPoolTaskExecutor.setAllowCoreThreadTimeOut(Boolean.parseBoolean(allowCoreThreadTimeOut));
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return threadPoolTaskExecutor;
}
}
依赖注入使用
@Autowired private ThreadPoolTaskExecutor threadPoolTaskExecutor;
threadPoolTaskExecutor.submit(() -> { //任务 });
标签:thread,parentThreadName,ThreadLocal,线程,TransmittableThreadLocal,InheritThreadLoc From: https://blog.51cto.com/u_11334685/5740093