首页 > 其他分享 >学习线程池原理从手写一个线程池开始

学习线程池原理从手写一个线程池开始

时间:2022-10-19 13:31:59浏览次数:58  
标签:task 队列 lock 任务 线程 原理 手写 public

概述

线程池技术想必大家都不陌生把,相信在平时的工作中没有少用,而且这也是面试频率非常高的一个知识点,那么大家知道它的实现原理和细节吗?如果直接去看jdk源码的话,可能有一定的难度,那么我们可以先通过手写一个简单的线程池框架,去掌握线程池的基本原理后,再去看jdk的线程池源码就会相对容易,而且不容易忘记。

线程池框架设计

我们都知道,线程资源的创建和销毁并不是没有代价的,甚至开销是非常高的。同时,线程也不是任意多创建的,因为活跃的线程会消耗系统资源,特别是内存,在一定的范围内,增加线程可以提高系统的吞吐率,如果超过了这个范围,反而会降低程序的执行速度。

因此,设计一个容纳多个线程的容器,容器中的线程可以重复使用,省去了频繁创建和销毁线程对象的操作, 达到下面的目标:

  • 降低资源消耗,减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务
  • 提高响应速度,当任务到达时,如果有线程可以直接用,不会出现系统僵死
  • 提高线程的可管理性,如果无限制的创建线程,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控

线程池的核心思想: 线程复用,同一个线程可以被重复使用,来处理多个任务。

为了实现线程池功能,需要考虑下面几个设计要点:

  1. 线程池可以接口外部提交的任务执行
  2. 线程池有工作线程的数量,有任务执行,没有任务也空闲在那,等待任务过来,这样既避免线程频繁创建销毁带来的开销,同时也可以避免线程池无限制的创建线程
  3. 如果线程池接受提交的任务超过工作线程的数量了,该怎么办?可以用一个队列把任务存下来,等工作线程完成任务后去队列中获取任务,执行
  4. 那如果任务实在是太多太多了,达到了我们认为的队列最大值,怎么办,我们可以设计一种任务太多的策略,可以进行切换,比如直接丢弃任务、报错等等

看了上面的设计目标和要点,是不是能立刻想到一个非常经典的设计模型——生产者消费者模型。

学习线程池原理从手写一个线程池开始_线程池

  • 阻塞队列存储执行任务,比如外部main函数作为生产者向队列生产任务。
  • 线程池中的工作线程作为消费者获取任务执行。

现在我们将我们的设计思路转换为代码。

代码实现

阻塞队列的实现

  • 阻塞队列主要存放任务,有容量限制
  • 阻塞队列提供添加和删除任务的API, 如果超过容量,阻塞不能添加任务,如果没有任务,阻塞无法获取任务。
/**
* <p>自定义任务队列, 用来存放任务 </p>
*
* @author: cxw (332059317@qq.com)
* @date: 2022/10/18 10:15
* @version: 1.0.0
*/
@Slf4j(topic = "c.BlockingQueue")
public class BlockingQueue<T> {
// 容量
private int capcity;
// 双端任务队列容器
private Deque<T> deque = new ArrayDeque<>();
// 重入锁
private ReentrantLock lock = new ReentrantLock();
// 生产者条件变量
private Condition fullWaitSet = lock.newCondition();
// 生产者条件变量
private Condition emptyWaitSet = lock.newCondition();

public BlockingQueue(int capcity) {
this.capcity = capcity;
}

// 阻塞的方式添加任务
public void put(T task) {
lock.lock();
try {
// 通过while的方式
while (deque.size() >= capcity) {
log.debug("wait to add queue");
try {
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
deque.offer(task);
log.debug("task add successfully");
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}

// 阻塞获取任务
public T take() {
lock.lock();
try {
// 通过while的方式
while (deque.isEmpty()) {
try {
log.debug("wait to take task");
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
fullWaitSet.signal();
T task = deque.poll();
log.debug("take task successfully");
// 从队列中获取元素
return task;
} finally {
lock.unlock();
}
}
}
  • put()方法是向阻塞队列中添加任务
  • take()方法是向阻塞队列中获取任务

线程池消费端实现

  1. 定义执行器接口
/**
* <p>定义一个执行器的接口:</p>
*
* @author: cxw (332059317@qq.com)
* @date: 2022/10/18 12:31
* @version: 1.0.0
*/
public interface Executor {

/**
* 提交任务执行
* @param task 任务
*/
void execute(Runnable task);
}
  1. 定义线程池类实现该接口
@Slf4j(topic = "c.ThreadPool")
public class ThreadPool implements Executor {

/**
* 任务队列
*/
private BlockingQueue<Runnable> taskQueue;

/**
* 核心工作线程数
*/
private int coreSize;

/**
* 工作线程集合
*/
private Set<Worker> workers = new HashSet<>();

/**
* 创建线程池
* @param coreSize 工作线程数量
* @param capcity 阻塞队列容量
*/
public ThreadPool(int coreSize, int capcity) {
this.coreSize = coreSize;
this.taskQueue = new BlockingQueue<>(capcity);
}

/**
* 提交任务执行
*/
@Override
public void execute(Runnable task) {
synchronized (workers) {
// 如果工作线程数小于阈值,直接开始任务执行
if(workers.size() < coreSize) {
Worker worker = new Worker(task);
workers.add(worker);
worker.start();
} else {
// 如果超过了阈值,加入到队列中
taskQueue.put(task);
}
}
}

/**
* 工作线程,对执行的任务做了一层包装处理
*/
class Worker extends Thread {
private Runnable task;

public Worker(Runnable task) {
this.task = task;
}

@Override
public void run() {
// 如果任务不为空,或者可以从队列中获取任务
while (task != null || (task = taskQueue.take()) != null) {
try {
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 执行完后,设置任务为空
task = null;
}
}

// 移除工作线程
synchronized (workers){
log.debug("remove worker successfully");
workers.remove(this);
}
}
}
}
  • Worker类是工作线程类,包装了执行任务,里面实现了从队列获取任务,然后执行任务。
  • execute方法的实现中,如果工作线程数量小于阈值的话,直接创建新的工作线程,否则将任务添加到队列中。
  1. 演示
@Test
public void testThreadPool1() throws InterruptedException {
Executor executor = new ThreadPool(2, 4);
// 提交任务
for (int i = 0; i < 6; i++) {
final int j = i;
executor.execute(() -> {
try {
Thread.sleep(10);
log.info("run task {}", j);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread.sleep(10);
}

Thread.sleep(10000);
}

运行结果:

学习线程池原理从手写一个线程池开始_工作线程_02

获取任务超时设计

目前从队列中获取任务是永久阻塞等待的,可以改成阻塞一段时间没有获取任务,丢弃的策略。

@Slf4j(topic = "c.TimeoutBlockingQueue")
public class TimeoutBlockingQueue<T> {
// 容量
private int capcity;
// 双端任务队列容器
private Deque<T> deque = new ArrayDeque<>();
// 重入锁
private ReentrantLock lock = new ReentrantLock();
// 生产者条件变量
private Condition fullWaitSet = lock.newCondition();
// 生产者条件变量
private Condition emptyWaitSet = lock.newCondition();

public TimeoutBlockingQueue(int capcity) {
this.capcity = capcity;
}

// 带超时时间的获取
public T poll(long timeout, TimeUnit unit){
lock.lock();
try{
// 将 timeout 统一转换为 纳秒
long nanos = unit.toNanos(timeout);
while (deque.isEmpty()){
try {
if (nanos<=0){
return null;
}
// 返回的是剩余的等待时间,更改navos的值,使虚假唤醒的时候可以继续等待
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
fullWaitSet.signal();
return deque.getFirst();
}finally {
lock.unlock();
}
}

// 带超时时间的增加
public boolean offer(T task , long timeout , TimeUnit unit){
lock.lock();
try{
// 将 timeout 统一转换为 纳秒
long nanos = unit.toNanos(timeout);
while (deque.size() == capcity){
try {
if (nanos<=0){
return false;
}
// 更新剩余需要等待的时间
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列 {}", task);
deque.addLast(task);
emptyWaitSet.signal();
return true;
}finally {
lock.unlock();
}
}
}
  • 新加TimeoutBlockingQueue类,添加offer和poll待超时的添加和获取任务的方法。

拒绝策略设计

目前的实现还是有个漏洞,无法自定义任务超出阈值的一个拒绝策略,我们可以通过利用函数式编程+策略模式去实现。

  1. 定义策略模式的函数式接口
/**
* <p>拒绝策略的函数式接口:</p>
*
* @author: cxw (332059317@qq.com)
* @date: 2022/10/18 13:15
* @version: 1.0.0
*/
@FunctionalInterface
public interface RejectPolicy<T> {

/**
* 拒绝策略的接口
* @param queue
* @param task
*/
void reject(BlockingQueue<T> queue, T task);
}
  1. 添加函数式接口的调用入口

我们可以在阻塞队列添加任务新加一个api, 添加任务如果超过容量,调用函数式接口。

@Slf4j(topic = "c.BlockingQueue")
public class BlockingQueue<T> {
........

/**
* 尝试添加任务
* @param rejectPolicy
* @param task
*/
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try{
// 如果队列超过容量
if (deque.size()> capcity){
log.debug("task too much, do reject");
rejectPolicy.reject(this, task);
}else {
deque.offer(task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}
}
  1. 修改ThreadPool类
@Slf4j(topic = "c.ThreadPool")
public class ThreadPool implements Executor {
.....

/**
* 拒绝策略
*/
private RejectPolicy rejectPolicy;

// 通过构造方法传入执行的拒绝策略
public ThreadPool(int coreSize, int capcity, RejectPolicy rejectPolicy) {
this.coreSize = coreSize;
this.taskQueue = new BlockingQueue<>(capcity);
this.rejectPolicy = rejectPolicy;
}

/**
* 提交任务执行
*/
@Override
public void execute(Runnable task) {
synchronized (workers) {
// 如果工作线程数小于阈值,直接开始任务执行
if(workers.size() < coreSize) {
Worker worker = new Worker(task);
workers.add(worker);
worker.start();
} else {
// 如果超过了阈值,加入到队列中
//taskQueue.put(task);

// 调用tryPut的方式
taskQueue.tryPut(rejectPolicy, task);
}
}
}

....
}
  • 通过构造方法的方式传入要执行的拒绝策略
  • 调用tryPut方法添加任务
  1. 演示

学习线程池原理从手写一个线程池开始_工作线程_03

总结

jdk中的线程池实现远比这里手写的要复杂,里面还涉及救急线程、各种内置的拒绝策略以及不同的队列容器等等,但是他们的思想基本一致的,通过这个练习,在后面阅读线程池源码的时候会有很大的帮助。

标签:task,队列,lock,任务,线程,原理,手写,public
From: https://blog.51cto.com/u_15773567/5769686

相关文章

  • 深度理解Redux原理并实现一个redux
    Redux的作用是什么Redux的作用在于实现状态传递、状态管理。在这里你可能会说了,如果是状态传递,那我props的传递不也是可以达到这样的效果吗?context上下文方案不也是可以达......
  • 进程与线程的区别
    1)地址空间:线程共享本进程的地址空间,而进程之间是独立的地址空间。2)资源:线程共享本进程的资源如内存、I/O、cpu等,不利于资源的管理和保护,而进程之间的资源是独立的,能很好......
  • GRU原理及其实现
    title:GRU原理及其实现date:2022-10-0309:31:44mathjax:truetags:GRUGRU原理及其实现https://www.bilibili.com/video/BV1jm4y1Q7uh/?spm_id_from=333.788&v......
  • 多线程 学习
    ​​http://www.vchome.net/dotnet/dotnetdocs/dotnet1.htm​​作者:沐雪文章均系作者原创或翻译,如有错误不妥之处,欢迎各位批评指正。本文版权归作者有,如需......
  • JavaWeb对于C3P0链接池的CURD实例原理详解
    一.java对于C3P0链接池的详解1.1C3P0是什么?c3p0是一个开源的JDBC连接池,它实现了数据源和JNDI绑定,支持JDBC3规范和JDBC2的标准扩展。1.2C3P0工作原理开源JDBC连接池......
  • 深入剖析Redis系列: Redis集群模式搭建与原理详解
    前言在Redis3.0之前,使用 哨兵(sentinel)机制来监控各个节点之间的状态。RedisCluster是Redis的 分布式解决方案,在3.0版本正式推出,有效地解决了Redis在 分布式 ......
  • Kubernetes快速实战与核心原理剖析
    K8S概览1.1K8S是什么?K8S官网文档:​​https://kubernetes.io/zh/docs/home/​​K8S是Kubernetes的全称,源于希腊语,意为“舵手”或“飞行员”,官方称其是:用于自动部署、扩展......
  • [原创]一款基于Reactor线程模型的java网络爬虫框架
    AJSpridergithub:​​https://github.com/zhuchangwu/AJSpider​​概述AJSprider是笔者基于Reactor线程模式+Jsoup+HttpClient封装的一款轻量级java多线程网络爬虫框架,简......
  • 03-Go的执行原理及Go的常用命令
    go的源码文件分为三类:命令源码文件,库源码文件,测试源码文件命令源码文件:后缀.go的文件,一个目录下,只能有一个main的入口,否则build或install会报错。库源码文件:普通的源码......
  • RANSAC的基本原理(最小二乘法拟合的改进版)
    转载:https://zhuanlan.zhihu.com/p/62238520  RANSAC简介RANSAC(RAndom SAmple Consensus,随机采样一致)算法是从一组含有“外点”(outliers)的数据中正确估计数......