首页 > 其他分享 >线程池核心原理浅析

线程池核心原理浅析

时间:2024-05-06 22:22:05浏览次数:34  
标签:return 浅析 任务 线程 new 原理 public ThreadPoolExecutor

前言

由于系统资源是有限的,为了降低资源消耗,提高系统的性能和稳定性,引入了线程池对线程进行统一的管理和监控,本文将详细讲解线程池的使用、原理。


为什么使用线程池

池化思想

线程池主要用到了池化思想,池化思想在计算机领域十分常见,主要用于减少资源浪费、提高性能等。

池化思想主要包含以下几个方面:

一些常见的资源池包括线程池、数据库连接池、对象池、缓存池、连接池等。

池化思想可以提高系统的性能,因为它减少了资源的创建和销毁次数,避免了不必要的开销。通过池化,系统可以更好地应对高并发情况,降低资源竞争,提高响应速度。

什么是线程池

根据池化思想,在一个系统中,为了避免线程频繁的创建和销毁,让线程可以复用,引入了线程池的概念。线程池中,总有那么几个活跃线程。

当你需要使用线程时,可以从池子中随便拿一个空闲线程,当完成工作时,并不急着关闭线程,而是将这个线程退回到池子,方便其他人使用。

简单说就是,在使用线程池后,创建线程变成了从线程池中获得空闲线程,关闭线程编程了向池子里归还线程。

大致流程如下:
image

为什么使用线程池

Java 中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。

在开发过程中,合理地使用线程池能够带来3个好处。

  1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  3. 提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

要做到合理利用线程池,必须对其实现原理了如指掌。

线程池的使用

image

ThreadPoolExecutor

ThreadPoolExecutor 的创建方法总体来说可分为 2 种:

  • 通过 ThreadPoolExecutor 构造函数
  • 通过 Executors 类创建

通过构造函数

1.1. 入参含义

这个也是推荐使用的方法,因为通过 Executors 类创建可能会导致 OOM,如下图阿里开发规范中的描述。

image

构造函数入参:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) 

构造函数入参含义:

1.2. 阻塞队列

workQueue 可选的 BlockingQueue:

1.3. 拒绝策略

如下图,上述拒绝策略均实现 RejectedExecutionHandler 接口,且为 ThreadPoolExecutor 的内部类。

若以上策略仍无法满足实际应用需要,完全可以自已扩展 RejectedExecutionHandler 接口。

public interface RejectedExecutionHandler {

    /**
     * @param r 当前请求执行的任务
     * @param executor 当前的线程池
     */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

示例:

public class RejectedExecutionDemo {
    public static class MyTask implements Runnable{

        @Override
        public void run() {
            System.out.println(new Date() + ":Thread ID is" + Thread.currentThread().getId());

            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask myTask = new MyTask();
        ExecutorService executorService = new ThreadPoolExecutor(5, 5,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(10),
                Executors.defaultThreadFactory(),
                (r, executor) -> System.out.println(r.hashCode() + "is discard")
        );

        for (int i = 0; i < 100; i++) {
            executorService.submit(myTask);
            Thread.sleep(10);
        }
    }
}

上述示例中,mytask 执行需要花费100毫秒,因此,必然会导致一些任务被直接丢弃。在实际应用中,我们可以将更详细的信息记录到日志中,来分析任务丢失情况和系统负载。

通过 Executors

Executors 类扮演着线程池工厂的角色,通过该类可以取得一个拥有定功能的线程池。

该类可以创建三种类型的 ThreadPoolExecutor:

  • FixedThreadPool
  • SingleThreadExecutor
  • CachedThreadPool

2.1. FixedThreadPool

固定线程数的线程池,该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂时存在任务队列中,待有线程空闲时,在处理队列中的任务。

FixedThreadPool 使用的无界任务队列 LinkedBlockingQueue,可能造成内存泄露。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}

2.2. SingleThreadExecutor

只有一个工作线程的线程池,当多于 1 个任务被提交时,会存到任务队列中。该线程池使用的无界任务队列 LinkedBlockingQueue,可能造成内存泄露。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}

2.3. CachedThreadPool

根据实际情况调整线程数的线程池,线程池的线程数量不确定,若有空闲线程可复用,则会优先使用。若所有线程均在工作,此时新的任务则会创建新的线程优先处理。所有线程在任务执行完毕后,将返回线程池进行复用。

corePoolSize 被设置为0,maximumPoolSize 被设置为无界,存活时间设置为 60s,空闲线程超过60秒后将会被
终止。极端情况线程创建过多,会导致内存泄露。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}


public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}

ScheduledThreadPoolExecutor

简介

如下图, ScheduledThreadPoolExecutor 继承自ThreadPoolExecutor,它主要用来定期执行任务,功能与 Timer 类似且更加强大,可以在构造函数中指定多个对应的后台线程数。

使用

可通过 Executors 创建,源码如下:

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1, threadFactory));
}

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

这里的返回值是 ScheduledExecutorService,根据时间对线程进行调度。有三个主要方法:

public interface ScheduledExecutorService extends ExecutorService {

    /**
     * 给定时间对任务进行调度
     */
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

    /**
     * 周期性对任务进行调度
     * 以第一个任务的开始时间 initialDelay + period 
     * 第一个任务在 initialDelay + period 执行
     * 第二个任务在 initialDelay + period * 2 执行
     */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    /**
     * 周期性对任务进行调度
     * 上一个任务结束后,再经过 period 时间开始执行
     */
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
}

如果任务遇到异常,那么后续的所有子任务都会停止调度,因此,必须保证异常被及时处理,为周期性任务的稳定调度提供条件。

ForkJoinPool

fork 是开启子进程,join 是等待,意思是分支子进程结束后才能得到结果,实际开发中,若频繁的 fork 开启线程可能严重影响系统性能,所以引入了 ForkJoinPool。

大致流程是,向 ForkJoinPool 线程池中提交一个 ForkJoinTask 任务,就是将任务分解成多个小任务,等任务全部完成后进行处理,这里采用了分治的思想,具体我将在后续单独展开,这里不多做赘述。

ForkJoin 可能出现两个问题:

  1. 子线程积累过多,可能导致系统性能严重下降;
  2. 调用层次过深,可能导致栈溢出。

线程池的任务提交

execute()

该方法用于提交不需要返回值的任务,且无法判断任务是否被线程池执行成功。

源码见下面的线程池原理章节。

submit()

该方法用于提交需要返回值的任务。线程池会返回 Future 对象,可以判断任务是否执行成功,还可以通过 Future 的get()方法来获取返回值。

get()方法会阻塞当前线程直到任务完成,还可以设置超时时间,到时立即返回,不过这时有可能任务没有执行完。

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

线程池的关闭

可以通过调用线程池的 shutdown 或 shutdownNow 方法来关闭线程池。

它们的原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt() 来中断线程,所以无法响应中断的任务可能永远无法终止。

两种方法存在一定的区别,shutdownNow首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。而 shutdown 只是将线程池的状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程。

只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务都已关闭后,表示线程池关闭成功,这时调用isTerminaed方法会返回true。

至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用 shutdown 方法来关闭线程池,如果任务不一定要执行完,则可以调用 shutdownNow 方法。

线程池执行原理

执行源码

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    
    // 如果当前工作线程数是否小于核心线程数
    if (workerCountOf(c) < corePoolSize) {
        // 添加核心线程去执行任务,成功则return
        if (addWorker(command, true))
            return;
        // 添加失败,ctl有变化,需重新获取
        c = ctl.get();
    }


    // 判断是否为RUNNING,此时核心线程数已满,需加入任务队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 检查若不是RUNNING则将任务从队列移除
        if (! isRunning(recheck) && remove(command))
            // 执行拒绝策略
            reject(command);
            
        // 正常则添加一个非核心空线程,执行队列中的任务
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }

    // 表示核心线程满了,队列也满了,创建非核心线程,执行任务
    else if (!addWorker(command, false))
        // 最大线程数也满了,走拒绝策略
        reject(command);
}

流程图


参考:
[1] 魏鹏. Java并发编程的艺术.
[2] 葛一鸣/郭超. 实战Java高并发程序设计.

标签:return,浅析,任务,线程,new,原理,public,ThreadPoolExecutor
From: https://www.cnblogs.com/fuxing/p/18176050

相关文章

  • Nftables漏洞原理分析(CVE-2022-32250)
    前言在nftales中存在着集合(sets),用于存储唯一值的集合。sets 提供了高效地检查一个元素是否存在于集合中的机制,它可以用于各种网络过滤和转发规则。而CVE-2022-32250漏洞则是由于nftables在处理set时存在uaf的漏洞。环境搭建ubuntu20+QEMU-4.2.1+Linux-5.15.config文件......
  • axios 拦截器实现原理
    Axios拦截器是Axios提供的一种强大功能,允许你在请求发送到服务器之前或响应返回客户端之前对其进行修改或处理。拦截器主要有两种:请求拦截器(requestinterceptors)和响应拦截器(responseinterceptors)。实现原理拦截器数组:Axios内部维护了两个数组,一个用于存储请求拦截器,另......
  • 《深度学习原理与Pytorch实战》(第二版)(三)11-15章
    第11章神经机器翻译器——端到端机器翻译神经机器翻译,google旗下的NMT编码-解码模型:用编码器和解码器组成一个翻译机,先用编码器将源信息编码为内部状态,再通过解码器将内部状态解码为目标语言。编码过程对应了阅读源语言句子的过程,解码过程对应了将其重组为目标语言的过程——......
  • 操作系统原理
    计算机硬件的五大核心组件控制器:计算机的指挥系统,负责控制所有硬件的运行运算器:负责数学运算和逻辑运算存储器:内存:比如内存条,基于电存取数据,断点数据全部丢失;用于临时保存数据,存取速度快;外存:用于永久保存数据,断电数据仍然存在;1.机械硬盘:存取数据慢,基于磁存取数据2.固态硬......
  • 火箭上天和飞机上天的原理有何不同?
    火箭靠向后喷射高温高压气体产生的反作用力飞行。飞机靠机翼上下气流的压差产生的升力飞行。参考:https://baijiahao.baidu.com/s?id=1641924575181476234&wfr=spider&for=pc>>空气动力学:空气动力学是力学的一个分支,研究飞行器或其他物体在同空气或其他气体作相对运动情况下的受......
  • Java线程基础 - 线程状态
    线程状态转换新建(New)创建后尚未启动,也就是初始状态可运行(Runnable)可能正在运行,也可能正在等待CPU时间片。包含了操作系统线程状态中的Running和Ready。阻塞(Blocking)阻塞状态是指线程因为等待某个条件(通常是I/O操作、获取锁、等待某些资源或者其他同步机制)而暂......
  • [转]IRIG-B码授时工作原理
    在授时设备中有一种是B码授时的,但是大部分人不太清楚何为B码授时?这种类型的授时工作原理是怎么样?首先我们要知道什么是B码,然后再介绍它的授时工作原理,B码是一种电力术语,它是IRIG-B码的通俗叫法,英文全称是inter-rangeinstrumentationgroup-B,是在2020年公布的电力词汇,B码起源于20......
  • FFmpeg开发笔记(十九)FFmpeg开启两个线程分别解码音视频
    ​同步播放音视频的时候,《FFmpeg开发实战:从零基础到短视频上线》一书第10章的示例程序playsync.c采取一边遍历一边播放的方式,在源文件的音频流和视频流交错读取的情况下,该方式可以很好地实现同步播放功能。但个别格式的音频流和视频流是分开存储的,前面一大段放了所有的音频帧,后......
  • 通过劫持线程arena实现任意地址分配 n1ctf2018_null
    通过劫持线程arena,当堆开了一个线程之后,如果没有做好保护随之的危险也悄然而至❗BUU上的n1ctf2018_null很好的说明了这个问题题目链接:BUUCTF在线评测(buuoj.cn)看一下保护:除了pie保护剩下的保护全开了,64位ida载入看一下上来是一个输入密码,密码是i'mreadyforchallenge......
  • 《深度学习原理与Pytorch实战》(第二版)(二)
    第6章手写数字加法器——迁移学习迁移学习允许训练集和测试集的数据有不同的分布、目标、领域;而一般的监督学习要求训练集和测试集上的数据有相同的分布特性一个有意思的想法:大公司运用大数据训练大模型,再将这些模型迁移到小公司擅长的特定垂直领域中,这样就可以将泛化的大模......