首页 > 其他分享 >线程池的运行逻辑与你想象的不一样,它是池族中的异类

线程池的运行逻辑与你想象的不一样,它是池族中的异类

时间:2024-04-22 14:46:44浏览次数:23  
标签:Runnable 池族 任务 task 线程 Executor 执行 异类

只要是 web 项目,程序都会直接或间接使用到线程池,它的使用是如此频繁,以至于像空气一样,大多数时候被我们无视了。但有时候,我们会相当然地认为线程池与其它对象池(如:数据库连接池)一样,要用的时候向池子索取,用完后归还给它即可。然后事实上,线程池独树一帜、鹤立鸡群,它与普通的对象池就是不同。本文本将先阐述这种差异,接着用最简单的代码实现一个线程池,最后再对 JDK 中与线程池相关的 Executor 体系做一个全面介绍。

线程池与普通资源池的差异

提到 pool 这个设计思想,第一反映是这样的:从一个资源容器中获取空闲的资源对象。如果容器中有空闲的,就直接从空闲资源中取出一个返回,如果容器中没有空闲资源,且容器空间未用尽,就新创建一个资源对象,然后再返回给调用方。这个容器就是资源池,它看起来就像这样:

pool-illustration-via-workman

图中的工人队伍里,有3人是空闲的,工头(资源池的管理者)可以任选两人来提供劳务服务。同时,队队伍尚未饱和,还可以容纳一名工人。如果雇主要求一次性提供4名劳工服务,则工头需要再招纳一名工人加入队伍,然后再向雇主提供服务。此时,这个团队(资源池)已达到饱和,不能再对外提供劳务服务了,除非某些工人完成了工作。

以上是一个典型资源池的基本特点,那么线程池是否也同样如此呢。至少第一感觉是没问题的,大概应该也是这样吧,毕竟拿从池中取出一个线程,再让它执行对应的代码,这听上去很科学嘛。等等,总感觉哪里不对呢,线程这东西能像普通方法调用那样,让我们在主程序里随意支配吗?没错,问题就在这里,线程一旦运行起来,就完全闭关锁国了,除了按照运行前约定好的方式进行数据通信外,再也不能去打扰它老人家了。因此,线程池有点像发动机,池中的各个线程就对应发动机的各个汽缸。整个发动机一旦启动(线程池激活),各个汽缸中的活塞便按照预定的设计,不停地来回运动,永远也不停止,直到燃油耗尽,或人为地关闭油门。在此期间,我们是不能控制单个汽缸的活动方向的。就如同我们不能控制正在运行的线程,让其停止正在执行的代码,转而去执行其它代码一样(利用 Thread.interrpt() 方法也达不到此目的,而 Thread.stop() 更是直接终止了线程)

four-stroke-engine-illustration

既然不能直接给线程池里的单个线程明确指派任务,那线程池的意义何在呢?意义就在于,虽然不能一对一精确指派任务,但可以给整个线程池提交任务,至于这些任务由池中的哪个线程来执行,则是不可控的。此时,可以把线程池看作是生产流水线上的单个工序。这里以给「老干妈香辣酱」的玻璃瓶加盖子为例,给瓶子加盖就是要执行的任务,最初该工序上只设置了一个机械臂,加盖子也顺序操作的。但单个机械臂忙不过来,后来又加了一个机械臂,这样效率就提高了。瓶子被加盖的顺序也是不确定的,但最终所有瓶子都会被加盖。

手动编写一个简易的线程池

如上小节所述,线程池与其它池类组件不一样,调用方不可能直接从池中取出一个线程,然后让它执行一段任务代码。因为线程一旦启动起来,就会在自己的频轨道内独立运行,不受外部控制。要让这些线程执行外部提交的任务,需要提供一个数据通道,将任务打包成一个数据结构传递过去。而这些运行起来的线程,他们都执行一个相同的循环操作:读取任务 → 执行任务 → 读取任务 → ......

      ┌──────────┐    ┌──────────────┐
  ┌─→ │Take Task │ -→ │ Execute Task │ ─┐
  │   └──────────┘    └──────────────┘  │
  └─────────────────────────────────────┘

这个读取任务的数据通道就是队列,池中的所有线程都不断地执行 ② 处的循环逻辑,这便是线程池运行的基本原理。

相对于线程池这个叫法,实际上「执行器 Executor」这个术语在实践中使用得要更多些。因为在 jdk 的 java.util.concurrent 包下,有一个 Executor 接口,它只有一个方法:

public interface Executor {
    void execute(Runnable command);
}

这便是执行器接口,顾名思义,它接受一个 Runnable 对象,并能够执行它。至于如何执行,交由具体的实现类负责,目前至少有以下四种执行方式

  • 在当前线程中同步执行
  • 总是新开线程来异步执行
  • 只使用一个线程来异步串行执行
  • 使用多个线程来并发执行

本小节将以一个简易的线程池方式来实现 Executor。

编写只有一个线程的线程池

这是线程池的最简形式,实现代码也非常简单,如下所示

public class SingleThreadPoolExecutor implements Executor {
    // 任务队列
    private final Queue<Runnable> tasks = new LinkedBlockingDeque<>();

    // 直接将任务添加到队列中
    @Override
    public void execute(Runnable task) {
        tasks.offer(task);
    }

    public SingleThreadPoolExecutor() {
        // 在构造函数中,直接创建一个线程,作为为线程池的唯一任务执行线程
        // 它将在被创建后立即执行,执行逻辑为:
        // 1. 从队列中获取任务
        // 2. 如果获取到任务,则执行它,执行完后,返回第1步
        // 3. 如果未获取到任务,则简短休息,继续第1步
        Thread taskRunner = new Thread(() -> {
            Runnable task;
            while (true) {
                task = tasks.poll();
                if (task != null) {
                    task.run();
                    continue;
                }
                try {
                    TimeUnit.MILLISECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    break;
                }
            }
        });
        taskRunner.start();
    }
}

上述的单线程执行器实现中,执行任务的线程是永远不会停止的,获取到任务时,就执行它,没有获取到,就一直不断的获取。下面是这个执行器的测试代码:

public class SingleThreadPoolTest {
    public static void main(String[] args) throws InterruptedException {
        SingleThreadPoolExecutorstp stp= new SingleThreadPoolExecutor();
        // 连续添加 5 个任务
        for (int i = 1; i <= 5; i++) {
            stp.execute(new SpeakNameTask("Coding Change The World " + i));
        }
        System.out.println("主线程已结束");
    }

    // 一个模拟的任务:简单地输出名称
    static class SpeakNameTask implements Runnable {
        private String name;

        public SpeakNameTask(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            Random random = new Random();
            int milliseconds = 500 + random.nextInt(1000);
            try {
                TimeUnit.MILLISECONDS.sleep(milliseconds);
                System.out.println("["+Thread.currentThread().getName()+"]: I believe " + name);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

下面是输出结果:

主线程已结束
[Thread-0]: I believe Coding Change The World 1
[Thread-0]: I believe Coding Change The World 2
[Thread-0]: I believe Coding Change The World 3
[Thread-0]: I believe Coding Change The World 4
[Thread-0]: I believe Coding Change The World 5

可以看到:作为测试程序的主线程,已经先执行结束了,而线程池还在顺序地执行主线程添加的任务。并且线程池在执行完所有任务后,并没有退出,jvm 进程会一直存在。

改进为拥有多个线程的线程池

多线程版本的线程池执任务执行器,只是在单线程版本上,增加了执行线程的数量,其它的变化不是很大。但为了更好的组织代码,需要将任务执行线程的逻辑单独抽取出来。另外,为了模拟得更像一个池,本示例代码还增加了以下特性

  • 支持核心线程数功能
    核心线程数在执行器创建时,一起创建,并永不结束

  • 支持最大线程数功能
    当核心线程执行任务效率变慢时,增加执行线程

  • 支持空闲线程移除功能
    当非核心线程空闲时长超过限定值时,结束该线程,并从池中移除

主要代码如下:

MultiThreadPoolExecutor.java (点击查看代码)
public class MultiThreadPoolExecutor implements Executor {

    // 线程池
    private final Set<TaskRunner> runnerPool = new HashSet<>();

    // 任务队列
    private final Queue<Runnable> tasks = new LinkedBlockingDeque<>();

    // 单个线程最大空闲毫秒数
    private int maxIdleMilliSeconds = 3000;

    // 核心线程数
    private int coreThreadCount = 1;

    // 最大线程数
    private int maxThreadCount = 3;

    public MultiThreadPoolExecutor() {
        // 初始化核心线程
        for (int i = 0; i < coreThreadCount; i++) {
            addRunner(true);
        }
    }

    private void addRunner(boolean isCoreRunner) {
        TaskRunner runner = new TaskRunner(isCoreRunner);
        runnerPool.add(runner);
        runner.start();
    }

    @Override
    public void execute(Runnable task) {
        tasks.add(task);
        addRunnerIfRequired();
    }

    // 视情况增加线程数,这里简化为当任务数超过线程数的两倍时,就增加线程
    private void addRunnerIfRequired() {
        if (tasks.size() <= 2 * runnerPool.size()) {
            return;
        }
        // 未达到最大线程数时,可增加执行线程
        if (runnerPool.size() < maxThreadCount) {
            synchronized (this) {
                if (runnerPool.size() < maxThreadCount) {
                    addRunner(false);
                }
            }
        }
    }

    class TaskRunner extends Thread {
        // 是否为核心线程
        private final boolean coreRunner;

        // 已空闲的毫秒数
        private long idleMilliseconds = 0;

        TaskRunner(boolean coreRunner) {
            this.coreRunner = coreRunner;
        }

        @Override
        public void run() {
            Runnable task;
            while (true) {
                task = tasks.poll();
                if (task != null) {
                    task.run();
                    continue;
                }
                try {
                    TimeUnit.MILLISECONDS.sleep(10);
                    idleMilliseconds += 10;
                    if(coreRunner) {
                        continue;
                    }
                    if (idleMilliseconds > maxIdleMilliSeconds) {
                        // 超过最大空间时间,线程结束,并从池中移徐本线程
                        runnerPool.remove(this);
                        break;
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    break;
                }
            }
        }
    }
}

完整代码已上传至 thread-pool-sample

其实多线程版本的主要难点,是判定增加新线程来执行任务的算法,即如何确定当前需要添加新线程,而不是保持当前的线程数量来执行任务,以保证最高的效率。以这个粗糙的原始版本为基准,不断丰富细节和增强健壮性,就可以慢慢演进出 Jdk 中的 Executor 体系。

JDK 线程池任务执行器浅析

Executor 体系类结构

Executor 接口是任务执行器的顶级接口,它仅定义了一个方法,但并未限制如何执行传递过来的任务。正如第③处所述,「线程池执行」也只是多种方式中的一种,也是用得最多的一种。由于 Executor 接口定义的功能过于单一,于是在 JDK 的并发包下,又对它进行了扩展,这个扩展就是 ExecutorService,如下所示:

public interface ExecutorService extends Executor {
    Future<?> submit(Runnable task);
    <T> Future<T> submit(Callable<T> task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
            throws InterruptedException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;

    void shutdown();
    List<Runnable> shutdownNow();
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

    boolean isShutdown();
    boolean isTerminated();
}

这些扩展方法共分为三组,分别是:任务提交类、状态控制类、状态检查类。从分类上可以看出,ExecutorService 增加了「提交任务」的概念(相对于 Executor 的「执行任务」)。另外,还有「关闭」操作,以及检测执行器当前的状态,这些都是 Exector 不具备的。下面这个分类列表更加清晰:

  • 任务提交

    方法 异步提交 批量提交 超时等待
    submit(Runnable task)
    submit(Callable task)
    invokeAll(Collection<? extends Callable> tasks)
    invokeAll(Collection<? extends Callable> tasks,long timeout, TimeUnit unit)
    invokeAll(Collection<? extends Callable> tasks)
    invokeAny(Collection<? extends Callable> tasks,long timeout, TimeUnit unit)
  • 状态控制

    • shutdown()
    • shutdownNow()
    • awaitTermination(long timeout, TimeUnit unit)
  • 状态检查

    • isShutdown()
    • isTerminated()

除了增加了新的方法外,还新增加了一种任务类型,即:java.util.concurrent.Callable,而 Executor 接口定义的任务接口是 java.lang.Runnable。二者的区别是,Callable#call() 方法有返回值,而后者没有。一般而言,任务提交给执行器后,通常都会异步执行。提交任务的线程是拿不到这个 call() 方法执行完毕后的返回值的,既然这样,那定义这个有返回值的方法还有什么意义呢?

为了拿到返回值,引入了 java.util.concurrent.Future 接口,它定义了获取单个异步任务执行结果的方法,不仅如此,它还定义了其它一些访问和控制单个任务的方法,见下表:

方法 解释
get() 阻塞调用线程,直到所关联的任务执行结束,拿到返回值,或任务执行结束(取消操作和发生异常均会导致结)
get(long timeout, TimeUnit unit) 同上,但会有一个最大等待时长,若超过该时长后,任务依然未执行结束,则结束等待,并抛出 TimeoutException
cancel(boolean mayInterruptIfRunning) 尝试取消关联的任务,只是尝试,遇到以下情况,均无法取消
· 任务已经取消
· 任务已完成
· 其它原因

通常任务一旦开始执行,就无法取消,
除非是极其特定的任务,这类任务的代码本身会与外界通信,判断是否应该取消自己的执行。
因此本方法提供了一个 mayInterruptIfRunning 参数,用来做这种信息传达,
但也仅仅是一个信息传达,表达了期望已运行的任务能自我终止,
但能否真的终止,取决于任务本身的代码逻辑
isCancelled() 检测关联的任务是否已「取消」
isDone() 检测关联的任务是否已「结束」,任务正常执行完毕、遭遇异常和被取消均视为任务已「结束」

标签:Runnable,池族,任务,task,线程,Executor,执行,异类
From: https://www.cnblogs.com/guzb/p/18108245/difference-of-thread-pool-implementation

相关文章

  • Java 线程安全思路
    线程安全1、先来了解一下:为什么多线程并发是不安全的?****在操作系统中,线程是不拥有资源的,进程是拥有资源的。而线程是由进程创建的,一个进程可以创建多个线程,这些线程共享着进程中的资源。所以,当线程一起并发运行时,同时对一个数据进行修改,就可能会造成数据的不一致性,看下面的例......
  • 多线程第一章
    线程的等待与唤醒线程的join需要在几个线程执行完毕之后再执行,例如加载资源等,join方法可以让线程顺序执行例如publicclassExample_1{publicstaticvoidmain(String[]args)throwsInterruptedException{ThreadthreadOne=newThread(()->{......
  • 【Java 线程】SpringBoot 启动后都有哪些线程呢?
    1 前言现在流行搞微服务,基本也都是SpringBoot打底的,那么你可否知道一个基本的SpringBoot启动后,都开辟了哪些线程呢?这节我们就来看看。为什么要看呢?这个主要是增加对服务的了解,比如你管的支付中心或者订单中心,你都有哪些线程,各个线程都是干什么的,你不了解这些你怎么调优,你......
  • JavaSE【9】-Java多线程
    JavaSE【9】-Java多线程synchronized修饰符(方法)------表示这个方法被同步了,就是基于线程安全的;集合容器----有一些集合容器是基于线程同步的(集合的内部使用的方法是基于synchronized来修饰的);一、线程相关概念进程和线程的概念:◆进程就是正在执行的程序,一个进程通常就是一个......
  • Java的六种线程状态及代码示例
    Java的线程有6个状态,分别是NEW           新建状态。刚new出来的thread,还没有调用start,就是这个状态RUNNABLE     运行状态(分为运行中和就绪)。正在运行,或者等待CPU调度,调用完start,就是这个状态BLOCKED       阻塞状态。还未竞争......
  • 基元线程同步构造
    一、基元(一)概述基元指的是在代码中可以使用的最简单的构造。基元是指编程中最基本、最简单的构造或元素,可以直接在代码中使用。基元通常是编程语言中的原始数据类型或基本操作符,用于构建更复杂的数据结构和算法。举例来说,对于C#编程语言,基元可以包括整型(int)、浮点型(float)......
  • pytest多线程运行控制台日志输出异常
    开启多线程后控制台日志显示错误,但是日志文件输出正确百度了一个晚上也没有解决,AI也问不出来解决办法,希望有大佬看到。开启多线程运行用例单独运行只有一个线程【gw1】输出日志信息。【gw2,gw0,gw3】都不能输出日志信息通过main()方式运行,控制台日志信息乱码......
  • 线程池
    线程池目录线程池线程java线程的状态线程的基本方法线程池(ThreadPoolExecutor)常见面试题线程线程是操作系统能够进行运算调度的最小单位,它包含在进程之中,是进程中的实际运作单位。一个线程是由线程的ID当前指令指针PC寄存器集合和堆栈组成每个线程都是系统调度和分派的......
  • JTCR-多线程-09
    基于进程和线程的多任务,其最小的调度单位分别是进程和线程。在基于线程的环境中,单个进程可以同时处理不同的任务,每个线程共享地址空间。基于线程的多任务和基于进程的相比,开销小。相互间的通信和上下文切换开销不同。Java的线程模型Java的运行时系统使用多线程,当某一个线程......
  • java多线程 读取list--动态读取list
    java多线程读取list--动态读取list的案例 本次介绍,我使用的是synchronized同步代码块的关键字来读取list,在写java多线程时,一定要注意synchronized关键字的有效范围。ps:如果synchronized关键字的代码块范围太大,可能会导致优先获取到cpu资源的第一个线程在满足条件的情......