首页 > 编程语言 >详细剖析Java动态线程池的扩容以及缩容操作

详细剖析Java动态线程池的扩容以及缩容操作

时间:2025-01-23 16:31:32浏览次数:1  
标签:缩容 tasks Java 修改 线程 es pool ThreadPoolExecutor

前言

在项目中,我们经常会使用到线程来处理加快我们的任务。但为了节约资源,大多数程序员都会把线程进行池化,使用线程池来更好的支持我们的业务。

Java线程池ThreadPoolExecutor有几个比较核心的参数,如corePoolSize、maximumPoolSize等等。无论是在工作中还是在面试中,都会被问到,如何正确的设置这几个参数。

线程池的参数并不好配置。一方面线程池的运行机制不是很好理解,配置合理需要强依赖开发人员的个人经验和知识。项目IO密集型还是CPU密集型等等,总归很难确定一个完美的参数,此时就有了动态线程池的诞生。

动态线程池(DTP)原理

其实动态线程池并不是很高大上的技术,它底层依旧是依赖了ThreadPoolExecutor的一些核心接口方法。我们通过下面图片可以很清楚的看到,ThreadPoolExecutor本身就给我们提供了很多钩子方法,让我们去定制化。

那么其原理也非常简单了,我们在运行中假设有一个线程池叫做TaskExecutor

  1. 他的核心线程池默认假设是10,现在我发觉不够用了,此时我想把他的核心线程池调整为20
  2. 我可以写一个远程配置(可以阿波罗,zk,redis什么都可以)。然后监听到了这个配置变为了core.pool.size=20
  3. 然后我获取到了这个线程池TaskExecutor,并且调用setCorePoolSize(20),那么这个TaskExecutor核心线程数就变为了20

就是这么简单,拨开表面,探究原理,内部其实非常的简单。当时公司里面的线程池还有加一些友好的界面、监控告警、操作日志、权限校验、审核等等,但本质就是监听配置,然后调用setCorePoolSize方法去实现的,最大线程数类似。

public void setCorePoolSize(int corePoolSize) {
    if (corePoolSize < 0)
        throw new IllegalArgumentException();
    int delta = corePoolSize - this.corePoolSize;
    this.corePoolSize = corePoolSize;
    if (workerCountOf(ctl.get()) > corePoolSize)
        interruptIdleWorkers();
    else if (delta > 0) {
        int k = Math.min(delta, workQueue.size());
        while (k-- > 0 && addWorker(null, true)) {
            if (workQueue.isEmpty())
                break;
        }
    }
}

动态线程池缩容

首先提出几个问题

  1. 核心线程数为5,现在有3个线程在执行,并且没有执行完毕,我修改核心线程数为4,是否修改成功
  2. 核心线程数为5,现在有3个线程在执行,并且没有执行完毕,我修改核心线程数为1,是否修改成功

让我们带着疑问去思考问题。

  1. 首先第一个问题,因为核心线程池数为5,仅有3个在执行,我修改为4,那么因为有2个空闲的线程,它只需要销毁1个空闲线程即可,因此是成功的
  2. 第二个问题,核心线程池数为5,仅有3个在执行,我修改为1。虽然有2个空闲线程,但是我需要销毁4个线程。因为有2个空闲线程,2个非空闲线程。我只能销毁2个空闲线程,另外2个执行的任务不能被打断,也就是执行后仍然为3个核心线程数。
  3. 那什么时候销毁剩下2个执行的线程呢,等到2个执行的任务完毕之后,就会销毁它了。假设这个任务是一个死循环,永远不会结束,那么核心线程数永远是3,永远不能设置为1

我们举一个代码的例子如下

ThreadPoolExecutor es = new ThreadPoolExecutor(5, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
es.prestartAllCoreThreads();  // 预启动所有核心线程

// 启动三个任务,执行次数不一样
for (int i = 0; i < 3; i++) {
    int finalI = i;
    es.execute(() -> {
        int cnt = 0;
        while (true) {
            try {
                cnt++;
                TimeUnit.SECONDS.sleep(2);

                if (cnt > finalI + 1) {
                    log.info(Thread.currentThread().getName() + " 执行完毕");
                    break;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
}
TimeUnit.SECONDS.sleep(1);  // 等待线程池中的线程执行
log.info("修改前 es = {}", es);  // 这里核心线程数必定是5

es.setCorePoolSize(1);  // 修改核心线程数为1,但是核心线程数为5,并且有3个线程在执行任务,

while (true) {
    TimeUnit.SECONDS.sleep(1); // 等待
    log.info("修改后 es = {}", es);
}

输出结果为如下

// 修改前核心线程数为5,运行线程数为3
[修改前 es = java.util.concurrent.ThreadPoolExecutor@72d818d1[Running, pool size = 5, active threads = 3, queued tasks = 0, completed tasks = 0]]

// 因为有2个空闲线程,先把2个空闲线程给销毁了,剩下3个线程
[修改后 es = java.util.concurrent.ThreadPoolExecutor@72d818d1[Running, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 0]]

// 等第1个任务执行完毕,剩下2个线程
[Main.lambda$d$0:38] [pool-2-thread-1 执行完毕]
[修改后 es = java.util.concurrent.ThreadPoolExecutor@72d818d1[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 1]]

// 等第2个任务执行完毕,剩下1个线程
[Main.lambda$d$0:38] [pool-2-thread-2 执行完毕]
[修改后 es = java.util.concurrent.ThreadPoolExecutor@72d818d1[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 2]]

// 等第3个任务执行完毕,剩下1个线程。因为我修改的就是1个核心线程
[Main.lambda$d$0:38] [pool-2-thread-3 执行完毕]
[修改后 es = java.util.concurrent.ThreadPoolExecutor@72d818d1[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 3]]

有兴趣的读者可以拿这块带去自己去试试,输出结果里面的注释 我写的非常详细,大家可以详细品品这块输出结果。

动态线程池扩容

扩容我就不提问问题了,和缩容异曲同工,但我希望读者可以先看下以下代码,不要看答案,认为会输出什么结果,看下是否和自己想的是否一样,如果一样,那说明你已经完全懂了,如果不一样,是什么原因。

// 核心线程数1,最大线程数10
ThreadPoolExecutor es = new ThreadPoolExecutor(1, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
es.prestartAllCoreThreads();  // 预启动所有核心线程

for (int i = 0; i < 5; i++) {
    int finalI = i;
    es.execute(() -> {
        int cnt = 0;
        while (true) {
            try {
                cnt++;
                TimeUnit.SECONDS.sleep(2);

                if (cnt > finalI + 1) {
                    log.info(Thread.currentThread().getName() + " 执行完毕");
                    break;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
}

TimeUnit.SECONDS.sleep(1);  // 等待线程池中的线程执行
log.info("修改前 es = {}", es);  // 这里核心线程数必定是1, 队列里面有4个任务

es.setCorePoolSize(3);  // 修改核心线程数为3

while (true) {
    TimeUnit.SECONDS.sleep(1); // 等待
    log.info("修改后 es = {}", es);
}   

输出结果为如下 (注意观察输出queued tasks的变化!!!

[修改前 es = java.util.concurrent.ThreadPoolExecutor@1e397ed7[Running, pool size = 1, active threads = 1, queued tasks = 4, completed tasks = 0]]
[修改后 es = java.util.concurrent.ThreadPoolExecutor@1e397ed7[Running, pool size = 3, active threads = 3, queued tasks = 2, completed tasks = 0]]

[Main.lambda$a$1:73] [pool-2-thread-1 执行完毕]
[修改后 es = java.util.concurrent.ThreadPoolExecutor@1e397ed7[Running, pool size = 3, active threads = 3, queued tasks = 1, completed tasks = 1]]

[Main.lambda$a$1:73] [pool-2-thread-2 执行完毕]
[修改后 es = java.util.concurrent.ThreadPoolExecutor@1e397ed7[Running, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 2]]

[Main.lambda$a$1:73] [pool-2-thread-3 执行完毕]
[修改后 es = java.util.concurrent.ThreadPoolExecutor@1e397ed7[Running, pool size = 3, active threads = 2, queued tasks = 0, completed tasks = 3]]

[Main.lambda$a$1:73] [pool-2-thread-1 执行完毕]
[修改后 es = java.util.concurrent.ThreadPoolExecutor@1e397ed7[Running, pool size = 3, active threads = 1, queued tasks = 0, completed tasks = 4]]

[Main.lambda$a$1:73] [pool-2-thread-2 执行完毕]
[修改后 es = java.util.concurrent.ThreadPoolExecutor@1e397ed7[Running, pool size = 3, active threads = 0, queued tasks = 0, completed tasks = 5]]

最后

在业务中,我们为了提高效率使用了线程,为了加快线程我们使用了线程池,而又为了更好的利用线程池的资源,我们又实现了动态化线程池。这也就是遇到问题、探索问题、解决问题的一套思路吧。

我们从底层原理分析,发现动态线程池的底层原理非常简单,希望大家不要恐惧,往往拨开外衣,发现里面最根本的原理,才能是我们更好的捋清楚其中的逻辑。希望本文提供的动态化线程池思路能对大家有帮助。

最终也极力希望读者朋友们,可以将上述两个例子详细分析一下原因,相信会有不小的进步,谢谢大家。

标签:缩容,tasks,Java,修改,线程,es,pool,ThreadPoolExecutor
From: https://www.cnblogs.com/wenbochang/p/18688107

相关文章

  • 探索JavaScript前端开发:开启交互之门的神奇钥匙(二)
     目录 引言四、事件处理4.1事件类型4.2事件监听器五、实战案例:打造简易待办事项列表5.1HTML结构搭建5.2JavaScript功能实现六、进阶拓展:异步编程与Ajax6.1异步编程概念6.2Ajax原理与使用七、前沿框架:Vue.js入门窥探7.1Vue.js简介7.2基础使用示......
  • Java 反射
    目录概述反射机制反射机制原理类加载概述类加载的时机:类加载各阶段完成的功能加载阶段连接阶段——验证连接阶段——准备连接阶段——解析初始化阶段Class类方法一:直接通过一个类class中的静态变量class获取:方法二:如果我们有一个类class的对象,可以通过该对象引用提供的get......
  • Java 泛型
    目录泛型概述使用泛型的好处泛型的定义与使用定义和使用含有泛型的类含有泛型的方法含有泛型的接口定义类时确定泛型的类型始终不确定泛型的类型,直到创建对象时,确定泛型的类型泛型通配符通配符基本使用通配符高级使用----受限泛型泛型概述在前面学习集合时,我们都知道集合中是可......
  • [新]Java8的新特性
    原文首发在我的博客:https://blog.liuzijian.com/post/86955c3b-9635-47a0-890c-f1219a27c269.html1.Lambda表达式lambda表达式是Java8的重要更新,lambda表达式可以用更简洁的代码来创建一个只有一个抽象方法的接口(函数式接口)的实例,从而更简单的创建匿名内部类的对象。语法和......
  • 基于Java的宠物医院管理系统 毕业设计源码14635
    目 录1绪论1.1选题背景1.2研究现状1.3论文结构与章节安排2 宠物医院管理系统系统分析2.1可行性分析2.1.1技术可行性分析2.1.2 操作可行性分析2.1.3 法律可行性分析2.2系统功能分析2.2.1功能性分析2.2.2非功能性分析2.3 系统用例分析......
  • JAVA 策略模式
    策略模式(strategypattern)的原始定义是:定义一系列算法,将每一个算法封装起来,并使它们可以相互替换。策略模式让算法可以独立于使用它的客户端而变化。不同国家发送短信验证码算法不同例如中国和哈萨克斯坦的短信模版、使用系统、签名不同策略模式实现策略模式的本质是通过Conte......
  • java基础Day6 java数组
    一、数组的定义二、数组的声明和创建dataType[]arrayRefVar;//首选方法dataTypearrayRefVar[];//效果相同,但不是首选方法int[]nums;//声明一个数组nums=newint[10];//创建一个数组//给数组元素赋值nums[0]=1;nums[1]=2;nums[2]=3;nums[3]=4;nums[4]=......
  • JAVA与数据结构-线性表
    目录一.线性表的概念二.线性表的关系及分类三.数组与顺序表四.链表1.静态链表(链表的的数组底层实现)2.循环链表3.双向链表五.栈1.栈的概念2.栈的底层实现3.共享空间栈4.逆波兰表达式(后缀表达式)5.栈与递归 六.队列1.队列概念2.队列的底层实现3.循环队列七.链......
  • java基于SSM框架的健康医疗体检管理系统
    Java基于SSM(Spring+SpringMVC+MyBatis)框架的健康医疗体检管理系统是一种专为医疗机构设计的信息化解决方案。一、系统背景与目的随着医疗行业的快速发展和人们对健康需求的日益增加,体检业务已成为医疗机构的重要组成部分。为了提高体检业务的管理效率和服务质量,基于Java和......
  • Java02-基础语法
    Java基础语法[任务列表]1.注释2.字面量3.变量4.关键字、标识符5.方法6.类型转换7.输入输出8.运算符9.其他知识点——————————————————————————————————————————————————1.注释注释:解释说明代码功能。单行注......