首页 > 其他分享 >线程池学习

线程池学习

时间:2023-07-28 23:24:55浏览次数:43  
标签:ijk int 学习 队列 任务 线程 null

具体文章见: Java线程池实现原理及其在美团业务中的实践

线程池好处

  • 降低资源消耗
  • 提高响应速度
  • 提高线程的可管理性
  • 提供更多更强大的功能

线程池解决的问题

  • 频繁申请、销毁资源和调度资源,将地阿莱额外的消耗,可能会非常巨大。
  • 对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。
  • 系统无法合理管理内部的资源分布,降低稳定性。

线程池核心类 

ThreadPoolExecutor

/**
* 一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务
*/
public class ThreadPoolExecutor extends AbstractExecutorService{} 

/**
*将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可
*/ public abstract class AbstractExecutorService implements ExecutorService {}

//(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;
//(2)提供了管控线程池的方法,比如停止线程池的运行。 public interface ExecutorService extends Executor {}

//Executor提供了一种思想:将任务提交和任务执行进行解耦。 public interface Executor {}

 

运行流程

  • 任务部分
    • 任务分配
      • 直接执行
      • 缓冲执行,放到阻塞队列里
      • 任务拒绝
  • 线程池部分
    • 核心线程数
    • 非核心线程数
    • 线程回收
  • 执行任务
    • 线程去任务缓冲取任务或直接执行被分配的任务
    • 执行具体任务

源码大致梳理

顺序有点乱。中间execute是入口方法,下面是创建线程的方法。

  • 线程数量
  • 运行状态
 1 /**
 2  * ThreadPoolExecutor内的ctl
 3  * 高3位保存runState,低29位保存workerCount,两个变量之间互不干扰
 4  * 
 5  */
 6 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 7 
 8 private static final int COUNT_BITS = Integer.SIZE - 3;
 9 private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
10 
11 
12 //当前运行状态 取前三位状态
13 private static int runStateOf(int c)     { return c & ~CAPACITY; }
14 //当前线程数量  取三位之后的状态
15 private static int workerCountOf(int c)  { return c & CAPACITY; }
16 //生成ctl 根据状态和线程数
17 private static int ctlOf(int rs, int wc) { return rs | wc; }

 初始化时 rs 传 -536870912  wc 传 0  ctl 就是 -536870912

Integer.SIZE - 3 = 29;

CAPACITY = 1 左移 29 位 , 为 536870912 , 二进制是 1000000000 0000000000 0000000000

~CAPACITY的作用 对所有位取非 ;               二进制是110 1111111111 1111111111 111111111

& 之后  就可以得到对应位的状态。

 ThreadPoolExecutor五种状态

  1. RUNNING 能接受新任务,能处理阻塞队列任务
  2. SHUTDOWN 关闭状态,不在接受新任务,能处理阻塞队列任务
    1. shutdown()方法执行之后是这个状态
  3. STOP 不接受新任务
    1. shutdownNow()执行之后是这个
  4. TIDYING 所有的线程已经终止,workerCount为0
  5. TERMINATED 在terminated方法执行之后进入这个状态
    1. TIDYING状态执行terminated()是这个
 1 //状态
 2 private static final int RUNNING    = -1 << COUNT_BITS;
 3 
 4 private static final int SHUTDOWN   =  0 << COUNT_BITS;
 5 
 6 private static final int STOP       =  1 << COUNT_BITS;
 7 
 8 private static final int TIDYING    =  2 << COUNT_BITS;
 9 
10 private static final int TERMINATED =  3 << COUNT_BITS;
11 
12 
13 //判断状态的方法
14 int c = ctl.get();
15 private static boolean isRunning(int c) {
16     return c < SHUTDOWN;
17 }
18 
19 
20 final boolean isRunningOrShutdown(boolean shutdownOK) {
21     int rs = runStateOf(ctl.get());
22     return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
23 }

增加线程的方法

// core true 核心线程 false  非核心线程
private boolean addWorker(Runnable firstTask, boolean core){}

retry: 用法

for (int i = 0 ; i < 3; i++){
    a:
    for (int j = 0 ; j < 3; j++){

        for (int k = 0 ; k < 3; k++){

            System.out.println("ijk = " + i+":"+j+":"+k);
            if(k == 2){
                break a;
            }
        }
    }
}
//输出
ijk = 0:0:0
ijk = 0:0:1
ijk = 0:0:2
ijk = 1:0:0
ijk = 1:0:1
ijk = 1:0:2
ijk = 2:0:0
ijk = 2:0:1
ijk = 2:0:2
a:
for (int i = 0 ; i < 3; i++){
    for (int j = 0 ; j < 3; j++){

        for (int k = 0 ; k < 3; k++){

            System.out.println("ijk = " + i+":"+j+":"+k);
            if(k == 2){
                break a;
            }
        }
    }
}
//输出
ijk = 0:0:0
ijk = 0:0:1
ijk = 0:0:2

用来跳出多层for循环好。

 

threadPoolExecutor.execute(new ThreadPool());

首先,所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。

/**
1.如果运行的线程少于corePoolSize,请尝试以给定的命令作为第一个任务来启动一个新线程。
对addWorker的调用原子性地检查runState和workerCount,从而通过返回false来防止错误警报,这些错误警报会在不应该添加线程的情况下添加线程。 2.如果一个任务可以成功排队,那么我们仍然需要仔细检查我们是否应该添加一个线程(因为现有线程自上次检查以来已经死亡),
或者池自进入该方法以来已经关闭。因此,我们重新检查状态,如果有必要,如果停止,则回滚排队,如果没有,则启动一个新线程。(非核心线程) 3.如果我们不能对任务进行排队,那么我们尝试添加一个新线程。如果它失败了,我们就知道我们被关闭或饱和了 **/ public void execute(Runnable command) { if (command == null){ throw new NullPointerException(); } int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)){ return; } c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)){ reject(command); }else if (workerCountOf(recheck) == 0){ addWorker(null, false); } }else if (!addWorker(command, false)){ reject(command); } }

 

方法/处理方式抛出异常返回特殊值一直阻塞超时退出
插入方式 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time.unit)
检查方法 element() peek() 不可用 不可用

 

 

 

 

 

 一直阻塞 插入成功返回true,取不到元素则返回null

/**
  检查是否可以根据当前池状态和给定绑定(核心或最大值)添加新的辅助进程。
  如果是,工作人员计数会相应地进行调整,如果可能的话,会创建并启动一个新的工作人员,将firstTask作为其第一个任务运行。
  如果池已停止或有资格关闭,此方法将返回false。如果线程工厂在被请求时未能创建线程,它也会返回false。
  如果线程创建失败,无论是由于线程工厂返回null,还是由于异常(通常是thread.start()中的OutOfMemoryError),我们都会干净地回滚。   @param firstTask 新线程应该首先运行的任务(如果没有,则为null)。
  当线程少于corePoolSize时(在这种情况下,我们总是启动一个线程),
  或者当队列已满时(在那种情况下,必须绕过队列),会创建一个初始的第一个任务(在方法execute()中)来绕过排队。   最初的空闲线程通常是通过prestartCoreThread创建的,或者用来替换其他垂死的工作线程。   @param core如果为true,则使用corePoolSize作为绑定,否则   maximumPoolSize。 **/ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 仅在必要时检查队列是否为空. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }

 总结:(面试也可!常考)

  1. 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。

  2. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。

  3. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。

  4. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。

  5. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

workQueue.offer(command) 加到该阻塞队列
用于容纳任务并将任务移交给工作线程的队列。我们不要求workQueue.poll()返回null就一定意味着workQueue.isEmpty(),所以只依赖isEmpty来查看队列是否为空(例如,在决定是否从SHUTDOWN转换为TIDYING时,我们必须这样做)。这适用于特殊用途的队列,如DelayQueues,允许poll()返回null,即使它稍后在延迟到期时可能返回非null。
private final BlockingQueue<Runnable> workQueue;

 Worker线程管理

ThreadPoolExecutor里面有个内部类worker

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        
        private static final long serialVersionUID = 6138294804551838833L;

        final Thread thread;
        Runnable firstTask;
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1);
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        public void run() {
            runWorker(this);
        }

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
View Code

worker

实现了Runnable 里面的run方法 调用了runWorker() (这个runworker里 有线程自动回收的方法 非核心线程。 通过JVM回收

try {
  while (task != null || (task = getTask()) != null) {
    //执行任务
  }
} finally {
  processWorkerExit(w, completedAbruptly);//获取不到任务时,主动回收自己
}

 

继承了AQS 

 

Worker是通过继承AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。

  1. lock方法一旦获取了独占锁,表示当前线程正在执行任务中。

  2. 如果正在执行任务,则不应该中断线程。

  3. 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。

  4. 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。

Worker线程增加

 addWorker()

Worker线程回收

线程池中线程的销毁依赖JVM自动的回收

 

 

线程池在实际业务中的实践 的学习

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

标签:ijk,int,学习,队列,任务,线程,null
From: https://www.cnblogs.com/jiangym/p/17587279.html

相关文章

  • 实现多线程多任务的步骤
    步骤 1.导入线程模块importthreading2.创建子线程并指定执行的任务sub_thread=threading.Thread(target=任务名3.启动线程执行任务sub_thread.start() 示例1'''2在Python中,实现多线程多任务可以通过使用threading模块来创建和管理线程34最佳实践:......
  • SpringBoot学习---第五篇:动态数据源(多数据源自动切换)
    目录一、应用场景二、准备工作2.1 创建数据表2.2添加依赖2.3生成bean、dao、mapper三、动态数据源3.1 配置文件application.properties3.2动态数据源核心代码3.3 启动类添加注解四、使用方法4.1Controller4.2Service五、测试六、Springboot2.0动态多数据源切换一、应用......
  • 程序员进阶必备,这份Android架构师进阶学习资料全家桶助你提升无忧
    走技术这条路的程序员进阶需要具备什么条件呢?大概总结起来有两点:1.扎实的基础底层功底(四大组件、布局使用、多线程、动画…)2.技术的深度和广度(自定义View、性能优化、Flutter、热修复、插件化…)3.同时,了解和学习常用的开源库也十分重要,常用的开源库主要包括图片加载、网络请求、......
  • 基于LSTM深度学习网络的时间序列预测matlab仿真
    1.算法理论概述       时间序列预测是一类重要的预测问题,在很多领域都有着广泛的应用,如金融、交通、气象等。然而,由于时间序列数据本身具有时序性和相关性,因此预测难度较大。传统的时间序列预测方法大多采用统计学方法,如ARIMA模型、指数平滑法等,但这些方法在处理非线性、......
  • docker aspnetcore学习笔记
    在终端窗口cmd:  示例应用程序对于示例应用程序,让我们使用.NET从模板创建一个简单的应用程序。在本地计算机中创建一个名为的目录。打开终端并切换到该目录。运行以下命令,使用ASP.NET核心Web应用模板创建C#应用。$mkdirdotnet-docker$cddotnet-docker$dotne......
  • JavaScript学习 -- HMAC算法基本原理
    HMAC(Hash-basedMessageAuthenticationCode)算法是一种基于哈希算法的消息认证码算法。它可以用于验证和保护数据在传输过程中的完整性和真实性。在JavaScript中,我们可以使用HMAC算法来保证数据的安全性。本篇文章将介绍HMAC算法的基本原理和相关技术,并提供一些实例来演示如何在Ja......
  • RHCE7 认证之学习笔记
    -------------------------------------------------------------------------------------------初始化:两台服务器设置yum源-------------------------------------------------------------------------------------------[root@system1~]#vim/etc/yum.repos.d/yum.repo[root@......
  • 【算法】哈希学习笔记
    1.哈希(hash)简介1.1前言又来写算法总结了qwq。今天是2023/7/8,期末考试已经考完了。初二下注定是一个煎熬的学期,所以我在这一学期并没有学什么新算法,OI也没什么长进。但倒是深造了几个算法,比如:dp,hash,线段树。之前一直想写一篇hash的学习笔记,但由于种种原因,并没有写成。于......
  • 学习vue又一天
    学习vue又一天,终于把样式、表单、等全部看完了,真的是很简单,都不知道怎么形容,可能是有一定基础的学习比较快吧,然后看了axios,感觉与Ajax的差别不是太大,估计是没有太深入了解的原因,这块我先简单看了一下,然后等后面项目开始的时候再看混合式api看了几次,说实话没看太明白,计划先略过......
  • FedR——攻击代码的学习
    攻击客户机1这段代码是用于进行攻击的部分。它试图通过使用客户端0的信息(实体嵌入和关系嵌入)来破解客户端1的信息(部分实体和关系的嵌入)。攻击的过程包括以下步骤:加载训练得到的模型参数:通过torch.load()函数加载之前训练得到的模型参数,其中ent_embed和rel_embed分别表示实体嵌......