Java并发
-
什么是线程,Java中实现线程的常用方式有几种,如何创建线程,终止线程有哪些方式
线程是操作系统调度的最小单元;
java 创建线程:执行new Thread().start() 方法调用操作系统native方法 start0();
java常用创建的方式有四种:
- 继承Thread 类,继承 Thread类中 start() 方法
- 实现 Runnable方法,在实现Thread 对象
- 实现 Callable 接口,并结合 Future 实现
- 通过线程池来创建线程
有三种方法可以结束线程:
-
1.设置退出标志,使线程正常退出
-
2.使用interrupt()方法中断线程,在线程中使用isInterrupted()来判断是否存在中断标志
-
3.使用stop方法强行终止线程(不推荐使用,Thread.stop, Thread.suspend, Thread.resume 和Runtime.runFinalizersOnExit 这些终止线程运行的方法已经被废弃,使用它们是极端不安全的!)
-
执行线程为什么会调用run()方法:https://heapdump.cn/article/2288557
由 JVM 调用此线程的 run 方法,使线程开始执行。其实这就是一个 JVM 的回调过程
-
线程的生命周期:https://juejin.cn/post/6844903558433734669 https://www.cnblogs.com/vipstone/p/15907280.html
-
初始(NEW):新创建了一个线程对象,但还没有调用start()方法。
-
运行(RUNNABLE):Java线程中将就绪(ready)和运行中(running)两种状态笼统的称为“运行”。
线程对象创建后,其他线程(比如main线程)调用了该对象的start()方法。该状态的线程位于可运行线程池中,等待被线程调度选中,获取CPU的使用权,此时处于就绪状态(ready)。就绪状态的线程在获得CPU时间片后变为运行中状态(running)。 -
阻塞(BLOCKED):表示线程阻塞于锁。
-
等待(WAITING):进入该状态的线程需要等待其他线程做出一些特定动作(通知或中断)。
-
超时等待(TIMED_WAITING):该状态不同于WAITING,它可以在指定的时间后自行返回。
-
终止(TERMINATED):表示该线程已经执行完毕。
-
-
进程、线程、协程的区别
- 进程
进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进行资源分配和调度的一个独立单位。每个进程都有自己的独立内存空间,不同进程通过进程间通信来通信。由于进程比较重量,占据独立的内存,所以上下文进程间的切换开销(栈、寄存器、虚拟内存、文件句柄等)比较大,但相对比较稳定安全。
- 线程
线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源。线程间通信主要通过共享内存,上下文切换很快,资源开销较少,但相⽐进程不够稳定容易丢失数据。
- 协程
协程是一种用户态的轻量级线程,协程的调度完全由用户控制。协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈,直接操作栈则基本没有内核切换的开销,可以不加锁的访问全局变量,所以上下文的切换非常快。
-
浅谈as-if-serial语义和happen-before原则
-
as-if-serial语义的意思指:不管怎么重排序(编译器和处理器为了提高并行度),(单线程)程序的执行结果不能被改变。编译器,runtime 和处理器都必须遵守as-if-serial语义。
为了遵守as-if-serial语义,编译器和处理器不会对存在数据依赖关系的操作做重排序,因为这种重排序会改变执行结果。但是,如果操作之间不存在数据依赖关系,这些操作可能被编译器和处理器重排序
-
JVM定义的Happens-Before原则是一组偏序关系:对于两个操作A和B(共享数据),这两个操作可以在不同的线程中执行。如果A Happens-Before B,那么可以保证,当A操作执行完后,A操作的执行结果对B操作是可见的
-
as-if-serial 和 happens-before 的区别
-
as-if-serial语义保证单线程内程序的执行结果不被改变,happens-before关系保证正确同步的多线程程序的执行结果不被改变。
-
as-if-serial语义给编写单线程程序的程序员创造了一个幻境:单线程程序是按程序的顺序来执行的。happens-before关系给编写正确同步的多线程程序的程序员创造了一个幻境:正确同步的多线程程序是按happens-before指定的顺序来执行的。
-
as-if-serial语义和happens-before这么做的目的,都是为了在不改变程序执行结果的前提下,尽可能地提高程序执行的并行度
-
-
-
什么是线程安全,什么是线程的竞态条件
线程安全
在拥有共享数据的多条线程并行执行的程序中,线程安全的代码会通过同步机制保证各个线程都可以正常且正确的执行,不会出现数据污染等意外情况
线程的竞态条件
竞态条件(race condition)
竞态条件(race condition)指的是两个或者以上进程或者线程并发执行时,其最终的结果依赖于进程或者线程执行的精确时序。竞争条件会产生超出预期的情况,一般情况下我们都希望程序执行的结果是符合预期的,因此竞争条件是一种需要被避免的情形。
竞争条件分为两类:
- Mutex(互斥):两个或多个进程彼此之间没有内在的制约关系,但是由于要抢占使用某个临界资源(不能被多个进程同时使用的资源,如打印机,变量)而产生制约关系。
- Synchronization(同步):两个或多个进程彼此之间存在内在的制约关系(前一个进程执行完,其他的进程才能执行),如严格轮转法。
要阻止出现竞态条件的关键就是不能让多个进程/线程同时访问那块共享变量。访问共享变量的那段代码就是临界区(critical section)。所有的解决方法都是围绕这个临界区来设计的。
-
什么是类锁,什么是对象锁,它们有哪些区别:https://www.cnblogs.com/fengzheng/p/12066239.html
-
类锁
由于一个class不论被实例化多少次,其中的静态方法和静态变量在内存中都只有一份。所以,一旦一个静态的方法被申明为synchronized。此类所有的实例化对象在调用此方法,共用同一把锁,我们称之为类锁
-
对象锁
类声明后,我们可以 new 出来很多的实例对象。这时候,每个实例在 JVM 中都有自己的引用地址和堆内存空间,这时候,我们就认为这些实例都是独立的个体,很显然,在实例上加的锁和其他的实例就没有关系
-
-
volatile关键字的作用,为什么不能保证原子性 :https://www.zhihu.com/question/329746124
volitale是Java虚拟机提供的一种轻量级的同步机制
三大特性:
-
保证可见性
可见性主要存于JMM的内存模型当中,指当一个线程改变其内部的工作内存当中的变量后,其他内存是否可以感知到,因为不同的工作线程无法访问到对方的工作内存,线程间的通信必须依靠主内存进行同步
-
不保证原子性
当在多线程改变变量时,需要将变量同步到工作线程中;当线程A对变量修改时,还没同步到主内存中线程挂起,线程B也对变量进行修改,这时线程A进行执行,就会覆盖线程B的值,由于可见性,这是线程B也会变成线程A修改的值,导致一致性问题
-
禁止指令重排
在本线程内观察,所有操作都是有序的(即指令重排不会导致单线程程序执行结果与排序前有任何差别)。在一个线程观察另一个线程,所有操作都是无序的,无序是因为发生了指令重排序。在 Java 内存模型中,允许编译器和处理器对指令进行重排序,重排序过程不会影响到单线程程序的执行,却会影响到多线程并发执行的正确性。
想要线程安全必须保证原子性,可见性,有序性。而volatile只能保证可见性和有序性
在说明这个问题之前,首先还是要说明下cpu和内存,cpu和内存直接是有高速缓存的,一般分为多级。cpu首先是要从内存中读取一个数据进缓存,然后从缓存中读取进行操作,将结果返回给缓存,再把缓存写回内存
-
-
Object对象wait()方法和Thread.sleep()方法的区别,wait()方法的作用域
-
相同点
sleep()和wait()都可以暂停线程的执行。
-
异同点
-
所在类不同
sleep()是Thread类的静态方法。wait()是Object类的方法。 -
锁释放不同
sleep()是不释放锁的。wait()是释放锁的。 -
用途不同
sleep()常用于一定时间内暂停线程执行。
wait()常用于线程间交互和通信。 -
用法不同
sleep()方法睡眠指定时间之后,线程会自动苏醒。
wait()方法被调用后,可以通过notify()或notifyAll()来唤醒wait的线程。
wait()方法的作用域:wait()方法在Object类中,只有当该类获取锁,才能调用该方法;所以wait() 方法必须在同步代码块中调用,否则会抛出异常IllegalMonitorStateException
wait() 方法为什么要释放锁:https://blog.csdn.net/small_love/article/details/110959097
线程在运行的时候占用了计算机的共享资源,当调用wait()方法,线程进入阻塞状态,让出系统资源
-
-
-
如何使用wait()和notify()实现阻塞队列:https://juejin.cn/post/6844903817843064846
public class BlockingQueue<T> { private Queue<T> queue = new LinkedList<>(); private int capacity; public BlockingQueue(int capacity) { this.capacity = capacity; } public synchronized void put(T element) throws InterruptedException { while (queue.size() == capacity) { wait(); } queue.add(element); notifyAll(); } public synchronized T take() throws InterruptedException { while (queue.isEmpty()) { wait(); } T element = queue.remove(); notifyAll(); return element; } }
在上面的代码中,
put()
方法用于将元素添加到队列中。如果队列已满,则线程将进入等待状态,直到其他线程从队列中删除一个元素并通知它。take()
方法用于从队列中获取元素。如果队列为空,则线程将进入等待状态,直到其他线程将一个元素添加到队列中并通知它。注意,在使用
wait()
和notify()
方法时,线程必须先获取对象的监视器锁。在上面的代码中,我们使用synchronized
关键字来确保方法在同一时间只能由一个线程执行,从而避免了并发问题。另外,请注意,
wait()
和notify()
方法应该始终在循环中使用,以避免因线程调度问题而导致的虚假唤醒。 -
synchronized关键字解析,锁升级的过程:https://blog.csdn.net/weixin_40910372/article/details/107726978
synchronized
关键字是Java中用于实现线程同步的关键字。它可以用来标记方法或代码块,以确保在同一时间只能由一个线程访问它们。在使用synchronized
关键字时,实际上是在获取一个对象的监视器锁(也称为内部锁或互斥锁),从而保证了线程安全性。在Java中,锁可以分为四个级别:无锁、偏向锁、轻量级锁、重量级锁和GC锁。锁的级别会根据竞争情况自动升级,以保证性能和可靠性的平衡。下面是锁升级的过程:
-
无锁:无锁态
-
偏向锁(Biased Locking)
偏向锁是在对象创建时默认启用的锁,它会将锁标记为偏向状态,即锁对象只会被一个线程占用。如果其他线程需要访问锁对象,则会撤销偏向锁,将锁升级为轻量级锁。 -
轻量级锁(Lightweight Locking)
轻量级锁是当多个线程访问同一对象的锁时启用的锁,它的实现基于CAS(Compare And Swap)操作和自旋。在竞争不激烈的情况下,轻量级锁可以避免线程阻塞和上下文切换的开销,提高程序的性能。如果自旋失败,则锁升级为重量级锁。 -
重量级锁(Heavyweight Locking)
重量级锁是在多个线程频繁竞争同一对象的锁时启用的锁,它会将其他线程阻塞,等待持有锁的线程释放锁。重量级锁的实现基于操作系统提供的互斥量(Mutex)或信号量(Semaphore),因此在竞争激烈的情况下,重量级锁的性能会比轻量级锁差很多。 -
GC锁(GC Locking)
GC锁是在Java垃圾收集器执行垃圾回收时启用的锁,它会将所有线程阻塞,等待垃圾收集完成。由于GC锁是由操作系统实现的,因此在竞争激烈的情况下,GC锁的性能也会很差。
总的来说,锁的升级过程会根据竞争情况自动进行,并且会尽可能地避免线程阻塞和上下文切换的开销,以提高程序的性能
-
-
Juc中的阻塞队列
-
ArrayBlockingQueue
基于数组的阻塞队列实现,其内部维护一个定长的数组,用于存储队列元素。线程阻塞的实现是通过ReentrantLock来完成的,数据的插入与取出共用同一个锁,因此ArrayBlockingQueue并不能实现生产、消费同时进行。而且在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。
-
LinkedBlockingQueue 由链表结构组成的有界(但是默认大小 Integer.MAX_VALUE)的阻塞队列(两个独立锁提高并发)
基于单向链表的阻塞队列实现,在初始化LinkedBlockingQueue的时候可以指定大小,也可以不指定,默认类似一个无限大小的容量(Integer.MAX_VALUE),不指队列容量大小也是会有风险的,一旦数据生产速度大于消费速度,系统内存将有可能被消耗殆尽,因此要谨慎操作。另外LinkedBlockingQueue中用于阻塞生产者、消费者的锁是两个(锁分离),因此生产与消费是可以同时进行的。
-
PriorityBlockingQueue 支持优先级排序的无界阻塞队列(compareTo 排序优先)
一个支持优先级排序的无界阻塞队列,进入队列的元素会按照优先级进行排序
-
DelayQueue 使用优先级队列实现的延迟无界阻塞队列(用于缓存失效,定时任务)
DelayQueue是一个支持延时获取元素的无界阻塞队列,里面的元素全部都是“可延期”的元素,列头的元素是最先“到期”的元素,如果队列里面没有元素到期,是不能从列头获取元素的,哪怕有元素也不行,也就是说只有在延迟期到时才能够从队列中取元素
-
SynchronusQueue 不存储元素的阻塞队列,也即单个元素的队列(不存储数据,可用于数据传递)
同步阻塞队列,SynchronousQueue没有容量,与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue,每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然
-
LinkedTransferQueue
LinkedTransferQueue是基于链表的FIFO无界阻塞队列,它出现在JDK7中,Doug Lea 大神说LinkedTransferQueue是一个聪明的队列,它是ConcurrentLinkedQueue、SynchronousQueue(公平模式下)、无界的LinkedBlockingQueues等的超集,
LinkedTransferQueue
包含了ConcurrentLinkedQueue、SynchronousQueue、LinkedBlockingQueues
三种队列的功能 -
LinkedBlockDeque 由链表结构组成的双向阻塞队列
-
ConcurrentLinkedQueue 它是一个适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,是个高性能无锁队列
-
-
重入锁,自旋锁,排它锁,公平锁,非公平锁常用锁语义描述
- 重入锁(ReentrantLock):重入锁是一种可重入的独占锁,可以被同一个线程多次获取。在同一线程中,多次获取锁不会阻塞,只需确保每次获取锁后都必须释放锁。重入锁支持公平锁和非公平锁两种模式,默认情况下为非公平锁。
- 自旋锁(Spinlock):自旋锁是一种基于忙等待的锁,当一个线程请求锁时,如果锁已经被其他线程占用,则该线程会一直循环等待直到锁被释放。自旋锁适用于锁被占用的时间较短的场景,避免了线程上下文切换的开销。
- 排它锁(Exclusive Lock):排它锁是一种独占锁,同一时间只允许一个线程持有锁。在Java中,重入锁和synchronized都是排它锁。
- 公平锁(Fair Lock):公平锁是一种保证锁获取的顺序与线程请求锁的顺序相同的锁。当多个线程同时请求锁时,公平锁会按照请求的顺序依次分配锁。在高并发场景下,公平锁可能会导致线程上下文切换频繁,因此在性能要求高的场景下,通常使用非公平锁。
- 非公平锁(Non-Fair Lock):非公平锁是一种不保证锁获取顺序的锁,允许“插队”获取锁。在高并发场景下,非公平锁相比于公平锁具有更好的性能,因为线程不必等待已经请求锁的线程释放锁。
总之,不同的锁具有不同的语义和特点,在实际应用中应根据具体的场景选择合适的锁。例如,如果并发量较大、竞争比较激烈,可以选择自旋锁或非公平锁以提高性能;如果需要保证公平性,可以选择公平锁。同时,应尽量避免使用过多的锁,以避免锁竞争、死锁等问题。
-
浅谈AQS模型和实现原理:AQS源码解析:https://javadoop.com/post/AbstractQueuedSynchronizer
AQS(AbstractQueuedSynchronizer)是Java.util.concurrent(JUC)包中提供的一个用于实现锁和同步器的框架。AQS实现了一个FIFO队列,用于管理等待获取锁的线程,同时提供了一组方法来管理和操作队列。
AQS的核心思想是使用一个volatile变量state来表示锁的状态,当state为0时表示锁空闲,当state大于0时表示锁已被占用,小于0时表示锁已被占用且有线程等待获取锁。AQS通过对state的原子性操作来实现锁和同步器。
AQS的具体实现基于一个双向链表,通过CAS操作来实现加锁和解锁操作。当线程请求获取锁时,如果锁已经被其他线程占用,则将该线程包装成一个节点并加入等待队列中,然后阻塞该线程。当持有锁的线程释放锁时,AQS会将队列头节点的线程唤醒,使其尝试获取锁。由于AQS是一个公共的框架,可以通过继承AQS来实现自己的同步器。
具体来说,如果需要实现一个独占锁,只需要继承AQS并实现tryAcquire()和tryRelease()方法即可。其中,tryAcquire()方法尝试获取锁,如果成功则返回true,失败则返回false;tryRelease()方法尝试释放锁,如果释放成功则返回true,失败则返回false。同时,AQS还提供了一些方法来操作等待队列,如addWaiter()、unparkSuccessor()等。
总之,AQS是Java并发编程中一个重要的工具,通过使用AQS,我们可以快速、高效地实现各种类型的锁和同步器。然而,由于AQS对并发的细节进行了非常细致的处理,因此在使用AQS时需要注意一些细节问题,如死锁、饥饿等问题。
-
什么是Exclusive(独占锁),什么是Shard(共享锁)
Exclusive Lock(独占锁)是一种互斥锁,同一时间只有一个线程可以获取该锁。当线程A获取到Exclusive Lock后,其他线程B、C等就不能再获取该锁,只能等待线程A释放锁后才能尝试获取。在Java中,ReentrantLock就是一种独占锁。Shared Lock(共享锁)是一种允许多个线程同时获取的锁,通常用于读取操作,可以提高并发读取效率。当线程A获取到Shared Lock后,其他线程B、C等仍然可以获取该锁,只有当所有线程都释放了该锁后,该锁才算被完全释放。在Java中,ReadWriteLock就是一种支持读写分离的锁,其中读锁是共享锁,写锁是独占锁。
总之,独占锁和共享锁是实现并发控制的两种常用机制,独占锁用于保证同一时间只有一个线程能够修改共享资源,而共享锁则用于允许多个线程同时读取共享资源,从而提高读取效率。在选择锁的机制时,需要根据具体应用场景来进行选择,以便获得更好的性能和可靠性。
-
什么是原子类,JDK unsafe类如何来实现原子操作
原子类是Java中提供的一种线程安全的数据类型,可以保证多线程并发访问时的数据一致性。在Java中,原子类通过使用CAS(Compare-And-Swap)算法来保证数据的原子性。
JDK Unsafe类是Java提供的一个不安全的类,它提供了一些直接操作内存和线程的方法,可以用来实现原子操作。Unsafe类中的方法基本上都是native方法,使用的是底层的操作系统API,可以直接访问内存并进行操作。
CAS算法是实现原子操作的关键,它是一种无锁算法,使用乐观锁的思想,先比较目标值与期望值是否相等,如果相等则使用CAS指令将新值写入内存;否则就认为操作失败,重新尝试。在JDK中,原子类的实现都是基于CAS算法来实现的。
使用JDK Unsafe类实现原子操作的步骤一般如下:
- 使用Unsafe类的静态方法getUnsafe()来获取Unsafe实例;
- 使用Unsafe类提供的allocateMemory()方法来分配内存;
- 使用Unsafe类提供的putXXX()方法将数据写入内存;
- 使用Unsafe类提供的compareAndSwapXXX()方法来实现CAS算法。
需要注意的是,Unsafe类提供的方法具有高度的危险性,不当的使用可能导致内存泄漏和其他严重的后果。因此,在使用Unsafe类时需要格外小心,并严格遵循相关的使用规范。另外,由于Unsafe类是JDK内部的实现类,不建议在业务代码中直接使用Unsafe类来实现原子操作,而应该使用JDK提供的原子类来实现
-
ReententLock和 Synchronized的区别
ReentrantLock和Synchronized都是Java中用于实现线程同步的机制,它们的作用是保证多个线程对共享资源的访问顺序和执行结果的一致性,从而避免数据竞争和并发错误。下面是ReentrantLock和Synchronized的主要区别:
- 可重入性:ReentrantLock是可重入锁,同一线程可以多次获得同一个锁,而Synchronized也是可重入锁,但是它是基于Java对象监视器的实现,同一线程多次获得锁是基于监视器计数器的实现。
- 锁的类型:ReentrantLock提供了公平锁和非公平锁两种锁的实现,而Synchronized只提供了非公平锁的实现。
- 可中断性:ReentrantLock提供了可中断的锁和不可中断的锁,而Synchronized只提供了不可中断的锁。
- 条件变量:ReentrantLock支持条件变量,可以通过条件变量来实现复杂的线程同步机制,而Synchronized不支持条件变量。
- 性能:在低并发的情况下,Synchronized的性能比ReentrantLock更好,因为它的实现比较简单;但在高并发的情况下,ReentrantLock的性能比Synchronized更好,因为它提供了更细粒度的控制,可以更好地支持高并发场景。
总之,ReentrantLock和Synchronized都是实现线程同步的重要机制,它们各有优劣,在具体应用中需要根据实际情况选择合适的机制,以达到最好的性能和效果。
-
CountDownLatch、CyclicBarrier、Semphore的用法和作用:https://juejin.cn/post/7107855826945048584
CountDownLatch、CyclicBarrier和Semaphore都是Java中用于多线程编程的同步工具,它们可以协调多个线程的执行,控制线程的并发数量,保证线程的安全和一致性。下面是它们的用法和作用:
-
CountDownLatch(倒计时门栓):
CountDownLatch是一种非常简单的同步工具,它可以让一个或多个线程等待其他线程执行完毕后再执行。CountDownLatch初始化时传入一个计数器,每个线程执行完毕后会将计数器减1,直到计数器为0时,所有等待的线程才会继续执行。主要用于控制线程的执行顺序。
-
CyclicBarrier(循环栅栏):
CyclicBarrier是一种更加复杂的同步工具,它可以让多个线程在某个屏障处等待,直到所有线程都到达屏障处才继续执行。CyclicBarrier初始化时需要指定线程数量和回调函数,每个线程执行完毕后会到达屏障处并调用回调函数,直到所有线程都到达屏障处后才会继续执行。主要用于控制线程的并发数量和协调线程的执行顺序。
-
Semaphore(信号量):
Semaphore是一种更加复杂的同步工具,它可以控制同时访问某个资源的线程数量,保证多个线程可以同时访问该资源。Semaphore初始化时需要指定信号量的数量,每个线程访问该资源时会尝试获取一个信号量,如果获取失败则会被阻塞等待其他线程释放信号量。主要用于控制线程的并发数量和资源的访问顺序。
总之,CountDownLatch、CyclicBarrier和Semaphore都是非常有用的同步工具,它们可以协调多个线程的执行顺序,保证线程的安全和一致性。在实际开发中,需要根据具体的应用场景选择合适的同步工具,以达到最好的性能和效果
-
-
什么是CAS,在CAS中如何解决ABA问题
CAS(Compare and Swap)是一种基于原子操作的并发控制机制,用于实现多线程之间的同步。CAS操作包含三个操作数,分别为内存位置V、期望值A和新值B。当且仅当V的值等于A时,CAS将V的值设为B,否则不做任何操作。
在多线程编程中,常常会遇到ABA问题。简单来说,ABA问题就是指线程A读取了共享变量V的值,然后线程B将V的值改为了其他值,再次改回了原来的值,然后线程A又进行了写操作。这种情况下,线程A会认为V的值没有发生变化,但实际上V的值已经发生了变化。
为了解决ABA问题,Java中提供了一个带有时间戳的原子类AtomicStampedReference。AtomicStampedReference类可以通过增加时间戳来解决ABA问题,时间戳的作用是记录每一次变量的修改操作,使得在比较并交换时不仅需要比较变量的值,还需要比较变量的时间戳是否相同。这样就可以避免ABA问题的发生。
具体来说,AtomicStampedReference类中的compareAndSet方法不仅会比较当前值和期望值是否相等,还会比较当前的时间戳是否相等。只有当前值和时间戳都相等时,才会执行CAS操作,否则不会执行。
总之,CAS是一种基于原子操作的并发控制机制,可以用来实现多线程之间的同步。为了解决ABA问题,Java中提供了一个带有时间戳的原子类AtomicStampedReference,通过比较时间戳来避免ABA问题的发生。
-
为什么CAS操作是原子性的
CAS操作是原子性的,是因为在CPU底层硬件支持下实现的。
在CPU的底层,有一种特殊的指令称为“比较并交换指令”(Compare and Swap,CAS),它可以原子地进行三个操作:读取一个内存位置的值、比较这个值是否等于预期值,如果相等,则将这个内存位置的值修改为新值。在执行过程中,由于底层硬件的支持,CAS操作是原子性的,即执行CAS操作时不会被其他线程中断。
因此,Java中的CAS操作实际上是利用了底层硬件的支持来实现的,确保了CAS操作的原子性。这种底层硬件支持的实现方式非常高效,因此CAS操作也被广泛地应用于多线程编程中。
-
常用的线程池实现方式有哪四种
Java中常用的线程池实现方式有以下四种:
- FixedThreadPool:固定大小的线程池。线程池中的线程数固定不变,不会动态地增加或减少。
- CachedThreadPool:缓存线程池。线程池中的线程数根据任务的数量动态地调整,如果有空闲的线程则会被复用,否则会创建新的线程。
- ScheduledThreadPool:定时任务线程池。线程池中的线程可以定时执行任务,也可以周期性地执行任务。
- SingleThreadExecutor:单线程线程池。线程池中只有一个线程在工作,它保证所有的任务按顺序执行,即遵循队列的入队和出队顺序执行。
以上四种线程池实现方式都是通过Executors工厂类来创建的,通过Executors提供的工厂方法可以方便地创建不同类型的线程池。同时,Java中也提供了ThreadPoolExecutor类,通过ThreadPoolExecutor可以更加灵活地配置线程池的参数,例如核心线程数、最大线程数、任务队列类型等。
-
线程池的核心参数有哪些,如何实现一个自定义线程池(需要注意的参数)
线程池的核心参数包括以下几个:
- 核心线程数(corePoolSize):线程池中保留的核心线程数,即使它们处于空闲状态。
- 最大线程数(maximumPoolSize):线程池中允许的最大线程数。
- 队列容量(workQueue):用于保存等待执行的任务的阻塞队列。
- 线程空闲时间(keepAliveTime):当线程池中的线程数大于核心线程数时,空闲线程的存活时间,超过这个时间就会被回收。
- 线程工厂(threadFactory):用于创建新线程的工厂。
- 拒绝策略(RejectedExecutionHandler):当线程池无法执行新的任务时的处理策略。
以下是一个简单的自定义线程池的实现示例:
import java.util.concurrent.*; public class MyThreadPool { private ThreadPoolExecutor threadPool; public MyThreadPool(int corePoolSize, int maximumPoolSize, int keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } public void execute(Runnable task) { threadPool.execute(task); } public void shutdown() { threadPool.shutdown(); } }
在这个自定义线程池中,通过ThreadPoolExecutor来创建线程池,同时提供了execute方法和shutdown方法供用户使用。用户可以在创建线程池时指定不同的参数来配置线程池的行为。
-
线程池的拒绝策略有哪几种,如何实现一个自定义拒绝策略
线程池的拒绝策略有以下几种:
- AbortPolicy(默认):当任务添加到线程池中被拒绝时,直接抛出RejectedExecutionException异常。
- CallerRunsPolicy:当任务添加到线程池中被拒绝时,在主线程中直接执行该任务。
- DiscardOldestPolicy:当任务添加到线程池中被拒绝时,丢弃队列中最旧的任务,然后重新提交当前任务。
- DiscardPolicy:当任务添加到线程池中被拒绝时,直接丢弃该任务。
用户也可以通过实现RejectedExecutionHandler接口来自定义拒绝策略,只需要实现rejectedExecution方法即可,该方法会在任务被拒绝时被调用。
以下是一个简单的自定义拒绝策略的实现示例:
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; public class MyRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // 自定义拒绝策略 System.out.println("Task rejected: " + r.toString()); } }
在这个自定义拒绝策略中,实现了RejectedExecutionHandler接口,并重写了rejectedExecution方法。在方法中,可以自定义拒绝策略的行为,例如记录日志、发送通知等。然后,在创建线程池时,将该自定义拒绝策略传递给ThreadPoolExecutor即可。例如:
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new MyThreadFactory(), new MyRejectedExecutionHandler());
-
线程池是如何创建一个线程的,从源码的角度进行分析:https://javadoop.com/post/java-thread-pool
在ThreadPoolExecutor的构造方法中,会创建一个线程池管理器,它是一个线程,用于管理ThreadPoolExecutor中的线程。线程池管理器会在后台一直运行,直到线程池被shutdown()关闭为止。
线程池管理器主要工作如下:
- 当线程池中的任务数小于corePoolSize时,创建新线程并立即执行任务。
- 当线程池中的任务数大于corePoolSize时,将任务添加到workQueue中等待执行。
- 当workQueue已满且线程池中的线程数小于maximumPoolSize时,创建新线程执行任务。
- 当workQueue已满且线程池中的线程数等于maximumPoolSize时,采取拒绝策略处理任务。
ThreadPoolExecutor创建线程的具体实现是在addWorker方法中完成的。该方法会创建一个新的Worker对象,用于执行任务。Worker是ThreadPoolExecutor中的内部类,实现了Runnable接口,用于执行任务。
在创建Worker对象时,会先通过ThreadFactory创建一个新的线程,然后将该线程传递给Worker对象。Worker对象会调用线程的start()方法启动线程,并调用runWorker()方法开始执行任务。
具体的实现细节还有很多,包括任务的执行、线程的创建和销毁等,这些都是ThreadPoolExecutor的核心功能。了解这些实现细节可以更深入地理解线程池的工作原理。
-
线程池execute()和submit() 的区别,为什么submit() 能获取返回值,从FutureTask源码进行解析
线程池的
execute()
和submit()
都可以将一个任务提交到线程池中进行执行,但二者有以下几点不同:execute()
方法只能接收一个Runnable
类型的任务,而submit()
方法既可以接收Runnable
类型的任务,也可以接收Callable
类型的任务。submit()
方法会返回一个Future
对象,通过这个Future
对象可以获取任务执行的结果或取消任务的执行。而execute()
方法没有返回值,无法获取任务的执行结果或取消任务的执行。- 在线程池中,当任务执行失败或抛出异常时,
execute()
方法只能通过线程池的UncaughtExceptionHandler
进行处理,无法得知任务执行的具体结果。而submit()
方法可以通过Future
对象获取任务执行的结果或抛出的异常。
至于为什么
submit()
方法能够获取返回值,这是因为在submit()
方法内部,会将Callable
任务转换为一个FutureTask
对象,然后将这个FutureTask
对象提交到线程池中进行执行。FutureTask
是Future
接口的实现类,实现了获取任务执行结果和取消任务执行的方法。submit()
方法会返回一个Future
对象,这个Future
对象就是包装了FutureTask
对象的结果。 -
线程池Shutdown和ShutdownNow的区别,从源码角度分析:https://juejin.cn/post/6844903927960305671
在Java中,线程池是一种重要的并发编程技术,它可以管理和重用多个线程,以减少创建线程的开销和提高系统的性能。Java中的线程池框架提供了两种方法来关闭线程池:shutdown()和shutdownNow()。
- shutdown()方法:
shutdown()
方法将线程池的状态设置为“正在关闭”(RUNNING -> SHUTDOWN
),并在适当的时候中断所有空闲线程,直到所有任务都被处理完毕。线程池会拒绝所有新的任务,并将它们放入一个任务队列中,直到所有已提交的任务都已完成。这个方法并不会强制中断正在执行的任务,而是等待它们自己完成。这个方法返回后,线程池将不再接受新的任务。如果在关闭线程池的过程中,有新的任务提交,那么这些任务将被拒绝。- shutdownNow()方法:
shutdownNow()
方法会立即将线程池的状态设置为“正在关闭”(RUNNING -> STOP
),并尝试中断所有正在执行的任务,以及所有还未执行的任务。这个方法返回一个包含所有未执行任务的列表。这个方法会强制中断所有线程,包括正在执行的任务和空闲线程。如果线程池中存在被阻塞的任务,那么这些任务将无法被中断,并且这个方法返回后,线程池可能仍然有一些线程在运行。从源码的角度来看,
shutdown()
和shutdownNow()
方法的区别主要在于它们对线程池中正在执行的任务的处理方式。shutdown()
方法会将状态设置为“正在关闭”,然后使用ThreadPoolExecutor
类中的interruptIdleWorkers()
方法中断所有空闲线程,并等待所有任务完成。而shutdownNow()
方法会将状态设置为“正在关闭”,然后使用ThreadPoolExecutor
类中的interrupt()
方法中断所有线程,包括正在执行的任务和空闲线程,然后返回未执行的任务列表。下面是一个简单的例子,展示了如何使用线程池的
shutdown()
和shutdownNow()
方法:ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // submit some tasks for (int i = 0; i < 100; i++) { executor.submit(new MyTask()); } // shutdown the executor gracefully executor.shutdown(); // shutdown the executor immediately List<Runnable> tasks = executor.shutdownNow();
在这个例子中,我们首先创建一个
ThreadPoolExecutor
对象,然后向线程池中提交一些任务。接着我们使用shutdown()
方法来关闭线程池,这个方法会等待所有任务完成后再返回。最后,我们使用shutdownNow()
方法来立即关闭线程池,并返回未执行的任务列表。 -
线程如何中断:https://www.cnblogs.com/myseries/p/10918819.html
- 在Java中,可以通过调用线程的
interrupt()
方法来中断线程。interrupt()
方法并不是直接停止线程的执行,而是向线程发出一个中断信号,通知线程需要停止执行。
当调用
interrupt()
方法时,如果线程正在阻塞(如在sleep()
、wait()
或join()
方法中),那么它会抛出一个InterruptedException
异常。如果线程没有被阻塞,那么它会设置一个中断标志位(即将线程的interrupted
属性设置为true
),表示线程已经被中断了。线程可以通过调用isInterrupted()
方法来检查自己是否被中断。通常情况下,在
run()
方法中通过检查中断标志位来判断线程是否应该停止执行。例如:public void run() { while (!Thread.currentThread().isInterrupted()) { // do some work } }
在上面的代码中,线程在每次循环开始时检查自己是否被中断。如果中断标志位为
true
,则退出循环,线程停止执行。另外,当一个线程调用
join()
方法等待另一个线程执行完毕时,也可以通过调用interrupt()
方法来中断线程。例如:Thread t1 = new Thread(() -> { // do some work }); Thread t2 = new Thread(() -> { t1.join(); // do some work }); t1.start(); t2.start(); // Interrupt t2 after 5 seconds Thread.sleep(5000); t2.interrupt();
在上面的代码中,线程
t2
等待线程t1
执行完毕后再执行。我们可以在等待一段时间后调用t2
的interrupt()
方法来中断它的执行,以便尽早结束等待。需要注意的是,
interrupt()
方法并不会直接停止线程的执行,而是通过设置中断标志位来通知线程需要停止执行。因此,在编写线程的run()
方法时,需要定期检查中断标志位,以确保能够及时退出线程。此外,在使用interrupt()
方法中断线程时,需要注意处理InterruptedException
异常,以避免出现意外情况。 - 在Java中,可以通过调用线程的
-
为什么要中断线程
在Java多线程编程中,中断线程是一种常用的技术手段,其主要原因有以下几个:
- 响应外部事件:线程通常是在执行一个任务,如果在任务执行过程中需要响应外部事件(如用户输入、网络数据到达等),那么可以通过中断线程的方式来实现响应。
- 程序设计错误:在程序中可能会出现死锁、无限循环、资源竞争等问题,导致线程无法正常执行。在这种情况下,可以通过中断线程来使其停止执行,以避免程序卡死或资源耗尽等问题。
- 优雅的停止:在某些情况下,需要让线程优雅地停止执行。例如,当程序即将关闭时,需要让所有线程停止执行并释放资源,这时可以通过中断线程来实现优雅的停止。
总之,中断线程是一种有效的控制线程执行的手段,可以帮助我们实现更加可靠、高效的多线程程序。但需要注意的是,中断线程应该是一种协作式的行为,需要线程自己检查中断标志位并进行合适的处理,而不能强制停止线程的执行,以避免出现数据不一致或资源泄漏等问题。
-
如何实现动态线程池
动态线程池是指能够根据任务负载动态调整线程数量的线程池。它可以根据实际需要动态增加或减少线程数量,以提高任务执行的效率和性能。
下面是实现动态线程池的基本步骤:
- 创建一个
ThreadPoolExecutor
对象,并设置核心线程数、最大线程数、空闲线程存活时间、任务队列等参数。例如:
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
其中,5为核心线程数,10为最大线程数,60为空闲线程存活时间,
LinkedBlockingQueue
为任务队列。- 在程序运行过程中,根据当前任务负载动态调整线程池大小。例如,可以使用如下代码实现线程池动态调整:
// 获取当前任务队列中的任务数量 int taskCount = threadPool.getQueue().size(); // 获取当前线程池中线程的数量 int threadCount = threadPool.getPoolSize(); if (taskCount > threadCount && threadCount < 10) { // 如果任务队列中的任务数量大于当前线程数,并且当前线程数小于最大线程数,就增加线程数量 threadPool.setCorePoolSize(threadCount + 1); threadPool.setMaximumPoolSize(threadCount + 1); } else if (taskCount < threadCount && threadCount > 5) { // 如果任务队列中的任务数量小于当前线程数,并且当前线程数大于最小线程数,就减少线程数量 threadPool.setCorePoolSize(threadCount - 1); threadPool.setMaximumPoolSize(threadCount - 1); }
在上述代码中,通过比较当前任务队列中的任务数量和当前线程池中线程的数量,动态调整线程池的大小。如果任务数量过多,就增加线程数量;如果任务数量较少,就减少线程数量。同时,需要注意设置线程池的最大线程数和最小线程数,以避免线程池数量过多或过少。
- 在程序结束时,调用线程池的
shutdown()
方法关闭线程池。例如:
threadPool.shutdown();
需要注意的是,在使用动态线程池时,需要根据实际情况调整线程池的参数,以达到最优的性能和效率。同时,也需要注意避免线程池数量过多或过少,以免影响程序的运行效果。
- 创建一个
-
ThreadLocal源码分析,为什么ThreadLocal存在内存泄漏的问题:https://juejin.cn/post/7065931214120550413
每个Thread维护了一个ThreadLocalMap,这个映射表的Key是ThreadLocal,Value是要存的值,同时Key是作为弱引用使用,弱引用对象会在GC时被回收。ThreadLocal对象作为key使用,而ThreadLocalMap和Thread生命周期一致,当ThreadLocal对象被回收时如果当前线程未结束,就会存在key已经为null,value访问不到的情况而导致内存泄漏。这里弱引用的使用会在ThreadLocal的get或者set方法在某些时候被调用时会调用expungeStaleEntry方法用来清除Entry中Key为null的Value,但是这是不及时的,也不一定每次都能执行,所以需要在使用之后调用remove()方法来显示调用expungeStaleEntry方法进行回收。其本质问题是ThreadLocalMap的生命周期和Thread一样长,没及时remove时线程没有结束就会导致内存泄漏。弱引用的使用增加了回收的机会,一点程度上避免了泄漏。当使用线程池和ThreadLocal时要注意线程是不断重复的,不手动删除会导致value的积累
-
ThreadLocal,InheritableThreadLocal和MisttableThreadLocal的区别:https://juejin.cn/post/7106793786293878791
ThreadLocal解决的是每个线程可以拥有自己线程的变量实例。可以从隔离的角度解决变量线程安全,相信大家也十分熟悉,此处不再演示。
但是它并不支持子线程,因为父线程与子线程并不是同一个Thread,例:
public class UserContext { private static ThreadLocal<String> userHolder = new ThreadLocal<>(); public static String getUser() { return userHolder.get(); } public static void setUser(String user) { userHolder.set(user); } public static void clean() { userHolder.remove(); } } 复制代码 public class Test { @SneakyThrows public static void main(String[] args) { UserContext.setUser("小明"); System.out.println("父线程获取:" + UserContext.getUser()); new Thread(() -> { // 无法获取父线程的ThreadLocal System.out.println("子线程获取:" + UserContext.getUser()); }).start(); Thread.sleep(1000); } } 复制代码
JDK为此提供另一个类解决此问题 ↓
InheritableThreadLocal
我们只需将上面的演示代码
new ThreadLocal<>()
替换成new InheritableThreadLocal<>()
即可解决问题。原理: 点开InheritableThreadLocal,实没多少代码,原理的码在此,
java.lang.Thread#init(java.lang.ThreadGroup, java.lang.Runnable, java.lang.String, long, java.security.AccessControlContext, boolean)
init方法末尾初:
... if (inheritThreadLocals && parent.inheritableThreadLocals != null) this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); 复制代码
可以看到inheritableThreadLocals还是一个ThreadLocalMap,只不过是在Thread的init方法中把父Thread的inheritableThreadLocals变量copy了一份给自己。同样借助ThreadLocalMap子线程可以获取到父线程的所有变量。
根据它的实现,我们也可以看到它的缺点,就是Thread的init方法是在线程构造方法中copy的;
也就是说inheritThreadLocals 一旦创建就会copy父线程的信息,但是在线程池这种复用线程的场景下,线程被多次复用,而inheritThreadLocals还是原来的,这样就会有问题了。如下
public class Test { @SneakyThrows public static void main(String[] args) { ExecutorService ttlExecutorService = Executors.newFixedThreadPool(1); UserContext.setUser("小明"); ttlExecutorService.submit(() -> { System.out.println("第一次,子线程获取:" + UserContext.getUser()); }); UserContext.setUser("小红"); ttlExecutorService.submit(() -> { //此处由于是复用线程,所以获得的用户依然是小明 System.out.println("第二次,子线程获取:" + UserContext.getUser()); }); Thread.sleep(1000); } } 复制代码
为此阿里开源了一个工具 TransmittableThreadLocal
TransmittableThreadLocal
使用例子:
public class UserContext { private static ThreadLocal<String> userHolder = new TransmittableThreadLocal<>(); public static String getUser() { return userHolder.get(); } public static void setUser(String user) { userHolder.set(user); } public static void clean() { userHolder.remove(); } } 复制代码 public class Test { @SneakyThrows public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(1); //需要手动修饰线程池 //如果仅仅是吧InheritableThreadLocal修改为TransmittableThreadLocal是不起作用的。 executorService = TtlExecutors.getTtlExecutorService(executorService); UserContext.setUser("小明"); executorService.submit(() -> { System.out.println("第一次,子线程获取:" + UserContext.getUser()); }); UserContext.setUser("小红"); executorService.submit(() -> { //不再是小明,而是小红 System.out.println("第二次,子线程获取:" + UserContext.getUser()); }); executorService.shutdown(); Thread.sleep(1000); } } 复制代码
原理:在线程run方法之前复制了父线程的ThreadLocal变量
先看修饰线程池的代码做了啥
@Nullable public static ExecutorService getTtlExecutorService(@Nullable ExecutorService executorService) { return (ExecutorService)(!TtlAgent.isTtlAgentLoaded() && executorService != null && !(executorService instanceof TtlEnhanced) ? new ExecutorServiceTtlWrapper(executorService, true) : executorService); } 复制代码
主要看
ExecutorServiceTtlWrapper
里面的内容提交任务时,使用TtlRunnable创建方法
@NonNull @Override public <T> Future<T> submit(@NonNull Runnable task, T result) { return executorService.submit(TtlRunnable.get(task, false, idempotent), result); } 复制代码
TtlRunnable 在run方法之前复制了父线程的ThreadLocal变量
com.alibaba.ttl.TtlRunnable#run
@Override public void run() { //当前线程ThreadLocal final Object captured = capturedRef.get(); if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) { throw new IllegalStateException("TTL value reference is released after run!"); } //父线程ThreadLocal final Object backup = replay(captured); try { runnable.run(); } finally { restore(backup); } }
-
TransmittableThreadLocal源码分析,是如何实现在线程池中值传递的 :https://www.jianshu.com/p/aab6b1e7357d
核心部分源码
在 TransmittableThreadLocal 中,定义了一个全局静态变量 holder,用于存储使用 TransmittableThreadLocal set 的上下文。
private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder = new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() { protected Map<TransmittableThreadLocal<?>, ?> initialValue() { return new WeakHashMap<TransmittableThreadLocal<?>, Object>(); } protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) { return new WeakHashMap<TransmittableThreadLocal<?>, Object>(parentValue); } };
- initialValue 方法会在 InheritableThreadLocal 创建时被调用,默认创建一个 WeakHashMap。
- childValue 方法会在创建子线程时,Thread 调用 init 方法,会调用 ThreadLocal.createInheritedMap(parent.inheritableThreadLocals),createInheritedMap 中会创建 ThreadLocalMap,ThreadLocalMap 的构造方法中会调用 childValue 方法。
public class Thread implements Runnable { public Thread() { init(null, null, "Thread-" + nextThreadNum(), 0); } private void init(ThreadGroup g, Runnable target, String name, long stackSize, AccessControlContext acc, boolean inheritThreadLocals) { Thread parent = currentThread(); // TODO 忽略其他源码 if (inheritThreadLocals && parent.inheritableThreadLocals != null) this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); // TODO 忽略其他源码 } } public class ThreadLocal<T> { static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) { return new ThreadLocalMap(parentMap); } private ThreadLocalMap(ThreadLocalMap parentMap) { // TODO 忽略部分代码 for (int j = 0; j < len; j++) { if (e != null) { ThreadLocal<Object> key = (ThreadLocal<Object>) e.get(); if (key != null) { Object value = key.childValue(e.value); // TODO 忽略部分代码 } } } } }
可能有人有疑问,为什么使用 WeakHashMap。关于 WeakHashMap 不了解的,大家可以自行查询一下。这里只是阐述一下,为什么 TTL 会使用 WeakHashMap。
- 使用 WeakHashMap 是 “继承” ThreadLocalMap.Entry 的“优良传统”,在没有其它强引用的情况下,下一次GC 时才会被垃圾回收,避免内存泄露。
- 程序中可能会使用到多个 ThreadLocal,所以需要使用 Map 作为容器储存,使用 Map 还能快速 remove 当前 ThreadLocal。
在使用线程池时,需要使用 TTL 提供的 TtlExecutors 包装,如:
TtlExecutors.getTtlExecutor(Executors.newCachedThreadPool());
让我们继续跟进 TtlExecutors.getTtlExecutor 方法中,探究下这个方法里面究竟做了什么?
public final class TtlExecutors { public static Executor getTtlExecutor(Executor executor) { if (null == executor || executor instanceof ExecutorTtlWrapper) { return executor; } return new ExecutorTtlWrapper(executor); } }
- 使用 ExecutorTtlWrapper 包装 Executor。
使用 ExecutorTtlWrapper 包装有什么用呢?那么就继续看看 ExecutorTtlWrapper 里面的实现:
class ExecutorTtlWrapper implements Executor { private final Executor executor; ExecutorTtlWrapper(Executor executor) { this.executor = executor; } public void execute(Runnable command) { executor.execute(TtlRunnable.get(command)); } // TODO 忽略部分代码 }
- 重点是在执行 execute 方法的时候,使用 TtlRunnable 做了线程上下文的处理,再执行真正的 Runnable run 方法。
现在重点介绍一下 TtlRunnable 里面做了什么处理:
public final class TtlRunnable implements Runnable { private final AtomicReference<Object> capturedRef; private final Runnable runnable; private final boolean releaseTtlValueReferenceAfterRun; private TtlRunnable(Runnable runnable, boolean releaseTtlValueReferenceAfterRun) { this.capturedRef = new AtomicReference<Object>(TransmittableThreadLocal.Transmitter.capture()); // TODO 忽略部分代码 } public void run() { Object captured = capturedRef.get(); // TODO 忽略部分代码 Object backup = TransmittableThreadLocal.Transmitter.replay(captured); try { runnable.run(); } finally { TransmittableThreadLocal.Transmitter.restore(backup); } } public static TtlRunnable get(Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) { // TODO 忽略部分代码 return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun); } }
public static class Transmitter { public static Object capture() { Map<TransmittableThreadLocal<?>, Object> captured = new HashMap<TransmittableThreadLocal<?>, Object>(); for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) { captured.put(threadLocal, threadLocal.copyValue()); } return captured; } public static Object replay(Object captured) { Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured; Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>(); for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator(); iterator.hasNext(); ) { Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next(); TransmittableThreadLocal<?> threadLocal = next.getKey(); // backup backup.put(threadLocal, threadLocal.get()); // clear the TTL values that is not in captured // avoid the extra TTL values after replay when run task if (!capturedMap.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } // set values to captured TTL setTtlValuesTo(capturedMap); // call beforeExecute callback doExecuteCallback(true); return backup; } public static void restore(Object backup) { Map<TransmittableThreadLocal<?>, Object> backupMap = (Map<TransmittableThreadLocal<?>, Object>) backup; // call afterExecute callback doExecuteCallback(false); for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator(); iterator.hasNext(); ) { Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next(); TransmittableThreadLocal<?> threadLocal = next.getKey(); // clear the TTL values that is not in backup // avoid the extra TTL values after restore if (!backupMap.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } // restore TTL values setTtlValuesTo(backupMap); } private static void setTtlValuesTo(Map<TransmittableThreadLocal<?>, Object> ttlValues) { for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : ttlValues.entrySet()) { TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey(); threadLocal.set(entry.getValue()); } } }
- TtlRunnable 是实现于 Runnable,所以线程池执行的是 TtlRunnable,但是在 TtlRunnable run 方法中会执行 Runnable run 方法。
- 线程池执行时,执行了 ExecutorTtlWrapper execute 方法,execute 方法中调用了 TtlRunnable.get(command) ,get 方法中创建了一个 TtlRunnable 对象返回了。
- TtlRunnable 构造方法中,调用了 TransmittableThreadLocal.Transmitter.capture() 获取当前线程中所有的上下文,并储存在 AtomicReference 中。
- 当线程执行时,调用 TtlRunnable run 方法,TtlRunnable 会从 AtomicReference 中获取出调用线程中所有的上下文,并把上下文给 TransmittableThreadLocal.Transmitter.replay 方法把上下文复制到当前线程。并把上下文备份。
- 当线程执行完,调用 TransmittableThreadLocal.Transmitter.restore 并把备份的上下文传入,恢复备份的上下文,把后面新增的上下文删除,并重新把上下文复制到当前线程