线程池基本概念
概念:线程池主要是控制运行线程的数量,将待处理任务放到等待队列,然后创建线程执行这些任务。如果超过了最大线程数,则等待。
优点:
- 线程复用:不用一直new新线程,重复利用已经创建的线程来降低线程的创建和销毁开销,节省系统资源。
- 提高响应速度:当任务达到时,不用创建新的线程,直接利用线程池的线程。
- 管理线程:可以控制最大并发数,控制线程的创建等。
体系:Executor
→ExecutorService
→AbstractExecutorService
→ThreadPoolExecutor
。ThreadPoolExecutor
是线程池创建的核心类。类似Arrays
、Collections
工具类,Executor
也有自己的工具类Executors
。
线程池三种常用创建方式
newFixedThreadPool:使用LinkedBlockingQueue
实现,
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
nThreads
:此线程池中可以同时执行的线程数。0L
,TimeUnit.MILLISECONDS
:代表空闲线程存活时间,这里设置为0毫秒,即只要线程池没有被关闭,空闲线程就会一直存活下去。new LinkedBlockingQueue<Runnable>()
:指定一个无界队列作为任务缓存队列。
newSingleThreadExecutor:使用LinkedBlockingQueue
实现,一池只有一个线程。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newCachedThreadPool:使用SynchronousQueue
实现,变长线程池。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
线程池的行为常用方法
JUC中的线程池提供了一系列方法来管理和控制线程池的行为,其中包括:
execute(Runnable task)
:将一个任务提交到线程池中执行。submit(Callable task)
:将一个Callable任务提交到线程池中执行,并返回一个Future对象,可以使用该对象来获取任务的执行结果。shutdown()
:关闭线程池,不再接受新任务,但是会处理已经提交到任务队列中的任务。shutdownNow()
:立即关闭线程池,尝试中断所有正在执行的任务并清空任务队列。isShutdown()
:判断线程池是否已经关闭。isTerminated()
:判断线程池是否已经彻底终止。awaitTermination(long timeout, TimeUnit unit)
:等待线程池终止,直到超时或者所有任务都完成。getActiveCount()
:获取当前活动的线程数。getPoolSize()
:获取当前线程池的大小。getTaskCount()
:获取已经提交到线程池的任务数量。prestartCoreThread()
:预启动一个核心线程,如果线程池中的线程数少于核心线程数,则会创建一个新的线程。prestartAllCoreThreads()
:预启动所有核心线程,如果线程池中的线程数少于核心线程数,则会创建一个或多个新的线程。
需要注意的是,在使用线程池时,需要根据具体情况选取合适的方法来管理和控制线程池的行为。例如,在关闭线程池前需要等待所有任务都执行完毕并且线程池处于终止状态才能安全地关闭线程池。
线程池创建的七个参数
参数 | 类型 | 含义 |
---|---|---|
corePoolSize | int | 线程池核心线程数 |
maximumPoolSize | int | 线程池最大线程数 |
keepAliveTime | long | 空闲线程存活时间 |
unit | TimeUnit | 空闲线程存活时间单位 |
workQueue | BlockingQueue |
线程池等待队列 |
threadFactory | ThreadFactory | 线程创建工厂 |
handler | RejectedExecutionHandler | 拒绝策略处理器 |
-
corePoolSize
:线程池的核心线程数,即线程池中保持活动状态的最小线程数。当提交任务时,线程池会创建新线程来执行任务,直到当前线程数等于核心线程数。 -
maximumPoolSize
:线程池中允许存在的最大线程数。如果当前需要执行的任务数超过了核心线程数,并且等待队列已满,则线程池可以创建更多线程来执行任务,直到达到最大线程数。 -
keepAliveTime
:当线程数大于核心线程数时,空闲线程在终止前等待新任务的最长时间。默认情况下,只有在有超过核心线程数的线程被创建时才会使用此参数。 -
unit
:用于指定keepAliveTime
参数的时间单位。 -
workQueue
:任务等待队列,用于存放未执行的任务。当线程池中的线程数达到核心线程数时,新提交的任务将被加入到等待队列中。如果等待队列已满,则新提交的任务将由新创建的线程执行。 -
threadFactory
:用于创建新线程的工厂类。可以自定义工厂类来对线程进行定制化的设置。 -
handler
:拒绝策略处理器,用于处理无法执行的任务。当等待队列已满并且当前线程数等于最大线程数时,新提交的任务将被拒绝并交给拒绝策略处理器处理。默认情况下,线程池会使用AbortPolicy
策略,该策略会直接抛出异常。
理解:线程池的创建参数,就像一个银行。
corePoolSize
就像银行的“当值窗口“,比如今天有2位柜员在受理客户请求(任务)。如果超过2个客户,那么新的客户就会在等候区(等待队列workQueue
)等待。当等候区也满了,这个时候就要开启“加班窗口”,让其它3位柜员来加班,此时达到最大窗口maximumPoolSize
,为5个。如果开启了所有窗口,等候区依然满员,此时就应该启动”拒绝策略“handler
,告诉不断涌入的客户,叫他们不要进入,已经爆满了。由于不再涌入新客户,办完事的客户增多,窗口开始空闲,这个时候就通过keepAlivetTime
将多余的3个”加班窗口“取消,恢复到2个”当值窗口“。
线程池底层原理
原理图:上面银行的例子,实际上就是线程池的工作原理。
流程图:
新任务到达→
如果正在运行的线程数小于corePoolSize
,创建核心线程;大于等于corePoolSize
,放入等待队列。
如果等待队列已满,但正在运行的线程数小于maximumPoolSize
,创建非核心线程;大于等于maximumPoolSize
,启动拒绝策略。
当一个线程无事可做一段时间keepAliveTime
后,如果正在运行的线程数大于corePoolSize
,则关闭非核心线程。
JUC线程池实现线任务复用的原理
线程池中包含一组工作线程,这些工作线程都处于等待状态,并且在任务队列中获取任务进行处理。当有新的任务被提交时,线程池会按照一定的策略从线程池中选择一个空闲的线程来执行该任务。如果所有的线程都正在执行任务或者任务队列已满,则线程池会根据预设的拒绝策略对新任务进行拒绝或抛出异常。
通过使用线程池技术,可以避免创建大量的线程并减少线程的上下文切换开销,从而提高程序的性能和稳定性。同时,线程池还可以根据应用程序的需求动态调整线程数量、任务队列长度以及其他参数,以适应不同的负载情况和性能要求。
线程池是一种用于管理和复用线程的机制,它可以提高应用程序的性能和稳定性。线程池通常由以下几个组成部分:
- 任务队列:用于存储待执行的任务。线程池中的空闲线程会从任务队列中获取任务并执行。
- 线程池管理器:负责创建、销毁和维护线程池中的线程。
- 工作线程:实际执行任务的线程。当线程池启动时,会创建一些工作线程,并将它们加入到线程池中。
- 任务接口:定义了任务的执行逻辑,通常是一个Runnable或Callable接口的实现类。
- 线程池参数配置:包括线程池大小、任务队列容量、线程池最大大小、线程池最小空闲时间等参数配置。
Executor框架
在Java中,线程池通常由Executor框架来实现。Java中的线程池框架包括以下几个类:
- Executor接口:定义了一个执行任务的方法execute(Runnable)。
- ExecutorService接口:继承自Executor接口,定义了提交任务、关闭线程池shutdown等操作。
- ThreadPoolExecutor类:实现了ExecutorService接口,是Java中最基本的线程池实现类。
- ScheduledExecutorService接口:继承自ExecutorService接口,定义了按照一定的时间间隔周期性地执行任务的功能。
- ScheduledThreadPoolExecutor类:实现了ScheduledExecutorService接口,是Java中最基本的支持周期性任务的线程池实现类。
+-----------+
| Executor |
+-----+-----+
^
|
+-----+-----+
| ExecutorService |
+-----+-----+
^
|
+-----+---------------------+
| ThreadPoolExecutor |
+-----+---------------------+
^
|
+-----+-----------------------------+
| ScheduledThreadPoolExecutor |
+-----------------------------------+
在这个继承和实现关系的流程图中,可以看到Executor
接口作为所有执行器的基本接口,ExecutorService
接口定义了一些更高级别的任务管理方法,并且它扩展了Executor
接口。ScheduledExecutorService
接口扩展了ExecutorService
接口,支持周期性任务。ThreadPoolExecutor
类实现了ExecutorService
接口,是Java中最基本的线程池实现类。ScheduledThreadPoolExecutor
类实现了ScheduledExecutorService
接口,是Java中最基本的支持周期性任务的线程池实现类。
在这个继承和实现关系的流程图中,还可以看到ScheduledFuture
接口表示一个可以周期性执行的延迟计算结果,并且它扩展了Future
接口。FutureTask
类实现了RunnableFuture
接口,也就是同时实现了Runnable
接口和Future
接口,因此它既可以作为一个任务提交给线程池执行,也可以通过调用get()
方法获取计算结果。
Executors工具类
java.util.concurrent.Executors
类是JUC中一个非常重要的工具类,它提供了一组静态方法用于创建不同类型的线程池。Executors
类中定义了一些有用的工厂方法,可以简化线程池的创建过程并提高程序的可读性和可维护性。
下面是Executors
类的一些常用方法:
-
newFixedThreadPool(int n)
:创建一个固定大小的线程池,该线程池中最多同时运行n个线程。如果所有线程都处于活动状态,则新提交的任务将在队列中等待,直到有空闲线程可用。 -
newSingleThreadExecutor()
:创建一个只有一个线程的线程池,该线程池按顺序逐个执行提交的任务。 -
newCachedThreadPool()
:创建一个可以根据需要自动扩展或收缩线程数的线程池。该线程池中的线程在60秒内无活动时将被回收,并且新提交的任务将始终创建新线程。 -
newScheduledThreadPool(int corePoolSize)
:创建一个支持周期性任务执行的线程池,该线程池能够以固定时间间隔周期性地执行任务。 -
newSingleThreadScheduledExecutor()
:创建一个只有一个线程的线程池,该线程池能够以固定时间间隔周期性地执行任务。
使用Executors
类创建线程池不仅可以大大简化线程池的创建过程,并且还能够提高程序的可读性和可维护性。当需要创建一个新的线程池时,开发人员可以选择合适的工厂方法,并使用该方法返回的线程池对象对任务进行管理和执行。
JUC中的线程池状态
RUNNING
:表示线程池处于正常运行状态,接受新任务并处理队列中的任务。SHUTDOWN
:表示线程池正在关闭,不再接受新的任务,但是会继续处理已经提交到任务队列中的任务。STOP
:表示线程池已经停止,不再接受新任务,也不再处理队列中的任务,并且会尝试中断正在执行的任务。TIDYING
:表示线程池正在清理队列和资源,等待所有任务都被完成。TERMINATED
:表示线程池已经彻底终止,所有的任务都已经完成。
这些状态是通过线程池的控制状态位(ctl)来进行设置和管理的。在ThreadPoolExecutor
类中,使用了CAS
操作来更新控制状态位,保证了状态的正确性和一致性。同时,ThreadPoolExecutor
类还提供了一些方法用于获取当前线程池的状态,例如isShutdown()
、isTerminated()
等方法。
线程池的拒绝策略
JUC(Java Util Concurrent)线程池中的拒绝策略,是指当线程池中的执行任务队列已满,并且线程池中的线程数已达到最大线程数时,线程池需要在新的任务提交时采取的策略。
JUC线程池提供了以下四种默认的拒绝策略:
-
AbortPolicy(默认的拒绝策略):当线程池中的任务队列已满,且线程池中的线程数已达到最大线程数时,新提交的任务将会被拒绝,并抛出RejectedExecutionException异常。
-
CallerRunsPolicy:当线程池中的任务队列已满,且线程池中的线程数已达到最大线程数时,在提交该任务的线程中直接执行该任务,而不是交给线程池中的其他线程来执行。
-
DiscardOldestPolicy:当线程池中的任务队列已满,且线程池中的线程数已达到最大线程数时,将队列中最老的任务从执行队列中删除,并尝试重新将该任务提交给线程池。
-
DiscardPolicy:当线程池中的任务队列已满,且线程池中的线程数已达到最大线程数时,直接丢弃新提交的任务,不做任何处理。
除了以上四种默认的拒绝策略,JUC线程池还支持自定义拒绝策略,通过实现RejectedExecutionHandler接口并重写其rejectedExecution()方法来实现。
实际生产使用哪一个线程池?-都是自定义ThreadPoolExecutor
单一、可变、定长都不用!原因就是FixedThreadPool
和SingleThreadExecutor
底层都是用LinkedBlockingQueue
实现的,这个队列最大长度为Integer.MAX_VALUE
,显然会导致OOM。所以实际生产一般自己通过ThreadPoolExecutor
的7个参数,自定义线程池。
ExecutorService threadPool=new ThreadPoolExecutor(2,5,
1L,TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
自定义线程池参数选择
对于CPU密集型任务(加密计算,HASH计算),最大线程数是CPU线程数的一到两倍,每个线程分配一个CPU。
对于IO密集型任务,尽量多配点,可以是CPU线程数*10。
钩子方法 Hook methods
在线程池的生命周期中,可以被扩展或重写的一些方法,可以在这些方法中添加自定义的逻辑,以满足特定的需求。
Java中的ThreadPoolExecutor类提供了几个可以扩展或重写的钩子方法,包括:
-
beforeExecute(Thread t, Runnable r):在线程执行任务之前,会先调用该方法。可以在该方法中进行一些准备工作,比如记录日志、统计任务等。
-
afterExecute(Runnable r, Throwable t):在线程执行任务之后,会调用该方法。可以在该方法中进行一些清理工作,比如释放资源、记录任务结束时间等。如果任务执行期间抛出了异常,则将异常作为参数传递给该方法。
-
terminated():在线程池被关闭后,会调用该方法。可以在该方法中进行一些资源释放等收尾工作。
通过继承ThreadPoolExecutor类并重写这些钩子方法,我们可以针对具体业务场景做一些自定义的处理,比如在beforeExecute方法中根据任务类型来设置不同的线程优先级,在afterExecute方法中记录任务执行情况并做出统计等。
举个栗子
import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class PauseableThreadPool extends ThreadPoolExecutor {
private final ReentrantLock lock = new ReentrantLock();
//ReentrantLock 是 Java 中的一个可重入锁,它允许同一个线程多次获得同一把锁,
// 并在最后释放锁时完全释放。在多线程编程中,使用可重入锁可以避免死锁等问题,
// 确保线程安全。
private Condition unpaused = lock.newCondition();
// 表示线程不处于暂停状态的条件对象。
private boolean isPause;
//注意要重写构造方法哈,!!!!!
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
lock.lock();
try {
while (isPause) {
//标记为想要暂停了,那我就等待不是暂停的信号
unpaused.await();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
private void pause(){
lock.lock();
try{
isPause=true;
}finally {
lock.unlock();
}
}
public void resume(){
lock.lock();
try {
isPause=false;
unpaused.signalAll();
}finally {
lock.unlock();
}
}
public static void main(String[] args) {
PauseableThreadPool pool = new PauseableThreadPool(10, 20, 10l, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("running");
try {
Thread.sleep(50);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
for (int i = 0; i < 10000; i++) {
pool.execute(runnable);
}
try {
Thread.sleep(1000);
pool.pause();
System.out.println("暂停了");
Thread.sleep(1000);
pool.resume();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
ReentrantLock是Java中的一种锁机制,它允许线程在获取到锁之后再次获取同一个锁,而不会被阻塞。这种锁机制的主要作用是保护共享资源,避免多个线程同时对共享资源进行修改而导致数据不一致的情况。
下面是一个使用ReentrantLock的示例代码:
import java.util.concurrent.locks.ReentrantLock; public class Example { private ReentrantLock lock = new ReentrantLock(); public void accessResource() { lock.lock(); try { // 访问共享资源的代码 } finally { lock.unlock(); } } }
在这个示例中,我们创建了一个名为Example的类,并在其中定义了一个ReentrantLock对象。然后,我们编写了一个accessResource()方法,在该方法中使用了lock()方法获得了锁对象,然后访问了共享资源。最后使用unlock()方法释放了锁。
需要注意的是,在访问共享资源的代码块中可能会出现异常,如果不正确处理异常,将会导致锁无法释放,因此我们使用了try...finally语句块,确保在任何情况下锁都能够被正确地释放。
此外,ReentrantLock还有其他高级特性,例如可以设置公平或非公平锁、支持可重入性、支持中断等,可以根据具体需求进行选择和配置。
标签:JAVA,队列,lock,池中,任务,线程,new,随笔 From: https://www.cnblogs.com/indullged/p/17386390.html假设我们要实现一个银行账户类,该类有两个方法:一个方法用于存款,另一个方法用于取款。为了确保在多线程环境下对该账户对象进行访问时的安全性,我们需要使用锁来同步这两个方法。
首先,在银行账户类中声明一个ReentrantLock对象:
public class BankAccount { private ReentrantLock lock = new ReentrantLock(); private double balance; public BankAccount(double balance) { this.balance = balance; } public void deposit(double amount) { lock.lock(); // 获取锁 try { balance += amount; } finally { lock.unlock(); // 释放锁 } } public void withdraw(double amount) { lock.lock(); // 获取锁 try { if (balance >= amount) { balance -= amount; } else { System.out.println("Insufficient balance."); } } finally { lock.unlock(); // 释放锁 } } }
在BankAccount类中,deposit()和withdraw()方法都使用了lock()方法获取锁,并且在执行完相应的操作后再使用unlock()方法释放锁。这样可以确保同一时间只有一个线程能够执行这两个方法中的任意一个。
接下来,我们创建两个线程,并启动它们:
public class Test { public static void main(String[] args) { BankAccount account = new BankAccount(1000.0); Runnable depositTask = new Runnable() { @Override public void run() { account.deposit(500.0); } }; Thread t1 = new Thread(depositTask); Thread t2 = new Thread(() -> account.withdraw(1500.0)); t1.start(); t2.start(); } }
在这段代码中,我们创建了一个BankAccount对象,并分别将deposit()和withdraw()方法封装到两个lambda表达式中,并作为参数传递给Thread的构造方法。接着,我们启动这两个线程。
当t1线程执行完deposit()方法后,账户余额会增加500元,变成1500元。当t2线程执行完withdraw()方法后,由于账户余额不足,所以会输出"Insufficient balance."。
在整个过程中,由于使用了ReentrantLock来对deposit()和withdraw()方法进行同步,所以可以确保在多线程环境下对BankAccount对象进行访问时的安全性。