首页 > 其他分享 >JUC学习-线程池部分

JUC学习-线程池部分

时间:2023-02-21 21:55:49浏览次数:43  
标签:JUC task System 学习 线程 println pool out

自定义线程池

package com.appletree24;  
  
import java.util.ArrayDeque;  
import java.util.Deque;  
import java.util.HashSet;  
import java.util.concurrent.ExecutionException;  
import java.util.concurrent.TimeUnit;  
import java.util.concurrent.locks.Condition;  
import java.util.concurrent.locks.ReentrantLock;  
  
class Main {  
    public static void main(String[] args) throws ExecutionException, InterruptedException {  
        ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MICROSECONDS, 5, (queue, task) -> {  
            //带超时等待  
//            queue.offer(task,500,TimeUnit.MILLISECONDS);  
        });  
        for (int i = 0; i < 10; i++) {  
            int j = i;  
            threadPool.execute(() -> {  
                try {  
                    Thread.sleep(1000L);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
                System.out.println(j);  
            });  
        }  
    }  
}  
  
//策略模式接口 此处使用策略模式是因为在实现拒绝策略时,有许多种拒绝的方式,这些方式如果不使用恰当的模式,就需要大量的if..else来编写  
//且方式数量大于4个,会造成类膨胀的问题,推荐使用混合模式  
//https://www.runoob.com/design-pattern/strategy-pattern.html  
@FunctionalInterface  
interface RejectPolicy<T> {  
    void reject(BlockingQueue<T> queue, T task);  
}  
  
class ThreadPool {  
    //任务队列  
    private BlockingQueue<Runnable> taskQueue;  
    //线程集合  
    private HashSet<Worker> workers = new HashSet<>();  
  
    //线程数  
    private int coreSize;  
  
    //超时时间  
    private long timeout;  
  
    private TimeUnit timeUnit;  
    private RejectPolicy<Runnable> rejectPolicy;  
  
    //执行任务  
    public void execute(Runnable task) {  
        //当任务数未超过核心线程数时,直接交给Worker对象执行  
        //如果超过,则加入阻塞任务队列,暂存起来  
        synchronized (workers) {  
            if (workers.size() < coreSize) {  
                Worker worker = new Worker(task);  
                workers.add(worker);  
                worker.start();  
            } else {  
                //第一种选择死等  
//                taskQueue.put(task);  
                //第二种为超时等待  
                //第三种为消费者放弃任务执行  
                //第四种为主线程抛出异常  
                //第五种让调用者自行执行任务  
                taskQueue.tryPut(rejectPolicy, task);  
            }  
        }  
    }  
  
    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) {  
        this.coreSize = coreSize;  
        this.timeout = timeout;  
        this.timeUnit = timeUnit;  
        this.taskQueue = new BlockingQueue<>(queueCapcity);  
        this.rejectPolicy = rejectPolicy;  
    }  
  
    class Worker extends Thread {  
        private Runnable task;  
  
        public Worker(Runnable task) {  
            this.task = task;  
        }  
  
        @Override  
        public void run() {  
            //执行任务  
            //1.当传递过来的task不为空,执行任务  
            //2.当task执行完毕,再接着取下一个任务并执行  
            while (task != null || (task = taskQueue.poll(1000, TimeUnit.MILLISECONDS)) != null) {  
                try {  
                    task.run();  
                } catch (Exception e) {  
                    e.printStackTrace();  
                } finally {  
                    task = null;  
                }  
            }  
            synchronized (workers) {  
                workers.remove(this);  
            }  
        }  
    }  
}  
  
class BlockingQueue<T> {  
    //1. 任务队列  
    private final Deque<T> queue = new ArrayDeque<>();  
  
    //2. 锁  
    private final ReentrantLock lock = new ReentrantLock();  
  
    //3. 生产者条件变量  
    private final Condition fullWaitSet = lock.newCondition();  
  
    //4. 消费者条件变量  
    private final Condition emptyWaitSet = lock.newCondition();  
  
    //5. 容量上限  
    private int capcity;  
  
    public BlockingQueue(int capcity) {  
        this.capcity = capcity;  
    }  
  
    //带超时的等待获取  
    public T poll(long timeout, TimeUnit unit) {  
        lock.lock();  
        long nanos = unit.toNanos(timeout);  
        try {  
            while (queue.isEmpty()) {  
                try {  
                    if (nanos <= 0) {  
                        return null;  
                    }  
                    nanos = emptyWaitSet.awaitNanos(nanos);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
            T t = queue.removeFirst();  
            fullWaitSet.signal();  
            return t;  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    //消费者拿取任务的方法  
    public T take() {  
        lock.lock();  
        try {  
            while (queue.isEmpty()) {  
                try {  
                    emptyWaitSet.await();  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
            T t = queue.removeFirst();  
            fullWaitSet.signal();  
            return t;  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    //阻塞添加  
    public void put(T task) {  
        lock.lock();  
        try {  
            while (queue.size() == capcity) {  
                try {  
                    fullWaitSet.await();  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
            queue.offerLast(task);  
            //添加完后唤醒消费者等待  
            emptyWaitSet.signal();  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    //带超时时间的阻塞添加  
    public boolean offer(T task, long timeout, TimeUnit unit) {  
        lock.lock();  
        try {  
            long nanos = unit.toNanos(timeout);  
            while (queue.size() == capcity) {  
                try {  
                    if (nanos <= 0) return false;  
                    nanos = fullWaitSet.awaitNanos(nanos);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
            queue.offerLast(task);  
            //添加完后唤醒消费者等待  
            emptyWaitSet.signal();  
            return true;  
        } finally {  
            lock.unlock();  
        }  
    }  
  
  
    //获取当前阻塞队列大小  
    public int size() {  
        lock.lock();  
        try {  
            return queue.size();  
        } finally {  
            lock.unlock();  
        }  
    }  
  
    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {  
        lock.lock();  
        try {  
            //判断队列是否已满  
            if (queue.size() == capcity) {  
                rejectPolicy.reject(this, task);  
            } else {  
                queue.addLast(task);  
                emptyWaitSet.signal();  
            }  
        } finally {  
            lock.unlock();  
        }  
    }  
}

上述的自定义线程池虽然能够执行完毕主线程给予的任务,但任务全部执行结束后,开辟的线程池内核心线程仍然在运行,并没有结束,这是因为目前线程池中的take方法仍然为不会有超时等待的take方法,造成了死等,需要为其加入超时停止的功能。也就是替代take()的poll()

JDK自带线程池

介绍

image.png
ThreadPoolExecutor使用int的高三位表示线程池状态,低29位表示线程数量
image.png
image.png
在ThreadPoolExecutor中,同样也存在拒绝策略。其图结构如下:
image.png
其中接口就对应着在自定义线程池中实现的策略模式接口,下面的四个实现类就对应着四种不同的拒绝方式:
image.png

利用工具类创建固定大小线程池

image.png

利用工具类创建带缓冲的线程池

image.png
从源码可以看出,带缓冲的线程池中缓冲队列的使用的是一个名为SynchronousQueue的队列,这个队列的特点如下:队列不具有容量,当没有线程来取时,是无法对其内部放入数据的,例如队列内部已有一个数字1,但此时没有线程取走,则线此队列目前并不能继续存入数据,直到1被取走

利用工具类创建单线程线程池

image.png
从源码可以看出,单线程线程池中核心线程数与最大线程数相等,即不存在应急线程。只能解决一个任务
那么这个线程池和我自己创建一个线程的线程池有什么区别呢?区别如下:
image.png

ThreadPoolExecutor-submit method

public static void main(String[] args) throws ExecutionException, InterruptedException {  
    ExecutorService pool = Executors.newFixedThreadPool(2);  
    Future<String> result = pool.submit(() -> {  
        System.out.println("running");  
        Thread.sleep(1000);  
        return "ok";  
    });  
    System.out.println(result.get());  
}

submit方法可以传入Runnable和Callable类型的参数,并且将线程内部所执行任务的结果返回,用Future包装

ThreadPoolExecutor-invokeAll

public static void main(String[] args) throws ExecutionException, InterruptedException {  
    ExecutorService pool = Executors.newFixedThreadPool(2);  
    List<Future<String>> results = pool.invokeAll(Arrays.asList(() -> {  
                System.out.println("begin");  
                Thread.sleep(1000);  
                return "1";  
            },  
            () -> {  
                System.out.println("begin");  
                Thread.sleep(500);  
                return "2";  
            }));  
    results.forEach(f -> {  
        try {  
            System.out.printf(f.get());  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        } catch (ExecutionException e) {  
            e.printStackTrace();  
        }  
    });  
}

invokeAll方法可以传入任务的集合,同样的任务的返回值也会以列表形式返回

ThreadPoolExecutor-invokeAny

public static void main(String[] args) throws ExecutionException, InterruptedException {  
    ExecutorService pool = Executors.newFixedThreadPool(2);  
    String result = pool.invokeAny(Arrays.asList(() -> {  
                System.out.println("begin");  
                Thread.sleep(1000);  
                return "1";  
            },  
            () -> {  
                System.out.println("begin");  
                Thread.sleep(500);  
                return "2";  
            }));  
    pool.awaitTermination(1000, TimeUnit.MILLISECONDS);  
    System.out.println(result);  
}

invokeAny方法同样可以传入任务的集合,只不过最后返回的结果并不是任务的结果集合,而是最早完成的那个任务的结果。

ThreadPoolExecutor-shutdown

public static void main(String[] args) throws ExecutionException, InterruptedException {  
    ExecutorService pool = Executors.newFixedThreadPool(2);  
    List<Future<String>> results = pool.invokeAll(Arrays.asList(() -> {  
                System.out.println("begin");  
                Thread.sleep(1000);  
                return "1";  
            },  
            () -> {  
                System.out.println("begin");  
                Thread.sleep(500);  
                return "2";  
            }));  
    pool.shutdown();  
    results.forEach(f -> {  
        try {  
            System.out.println(f.get());  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        } catch (ExecutionException e) {  
            e.printStackTrace();  
        }  
    });  
}

shutdown方法会将线程池的状态变为SHUTDOWN

  • 不会接受新任务
  • 但已提交的任务会执行完
  • 此方法不会阻塞调用线程的执行

ThreadPoolExecutor-shutdownNow

public static void main(String[] args) throws ExecutionException, InterruptedException {  
    ExecutorService pool = Executors.newFixedThreadPool(2);  
    List<Future<String>> results = pool.invokeAll(Arrays.asList(() -> {  
                System.out.println("begin");  
                Thread.sleep(1000);  
                return "1";  
            },  
            () -> {  
                System.out.println("begin");  
                Thread.sleep(500);  
                return "2";  
            }));  
    List<Runnable> runnables = pool.shutdownNow();  
    results.forEach(f -> {  
        try {  
            System.out.println(f.get());  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        } catch (ExecutionException e) {  
            e.printStackTrace();  
        }  
    });  
}

shutdownNow方法会将线程池状态变为STOP

  • 不会接受新任务
  • 会将队列中现有的任务返回
  • 并且用interrupt方法中断正在执行的任务

标签:JUC,task,System,学习,线程,println,pool,out
From: https://www.cnblogs.com/appletree24/p/17142610.html

相关文章

  • python学习——【第十一弹】
    前言上一篇文章 ​​python学习——【第十弹】​​中介绍了python中类的相关属性和方法,这篇文章接着学习python中的浅拷贝,下一篇文章为大家介绍深拷贝。简单了解浅拷贝......
  • 【多线程】定位线程死锁
    定位线程死锁的方式jstackpid使用arthas写一个死锁的小例子importjava.util.concurrent.ArrayBlockingQueue;importjava.util.concurrent.CountDownLatch;imp......
  • Linux学习-DAY11
    7.2LVM逻辑卷管理器LVM是Linux系统用于对硬盘分区进行管理的一种机制,理论性较强,其创建初衷是为了解决硬盘设备在创建分区后不易修改分区大小的缺陷。尽管对传统的硬盘分区进......
  • Django学习笔记记录(整理了B站武老师的讲课课件,供大家学习)
    day1、初识DjangoPython知识点:函数、面向对象。前端开发:HTML、CSS、JavaScript、jQuery、BootStrap。MySQL数据库。Python的Web框架:Flask,自身短小精悍+第三方组......
  • Johnson 全源最短路 学习笔记
    我居然不会这玩意,过来学一下。算法简介Johnson全源最短路用于求一个带负权的图的任意两点之间的最短路,时间复杂度为\(\Theta(nm\logm)\)。算法流程考虑到\(n\)次......
  • 对比学习简记
    目录Self-SupervisedLearning的核心思想两大主流方法实践应用ContrastiveRepresentationLearning对比学习目标函数ContrastiveLossTripletLossN-pairLossNCEInfoNCE......
  • docker学习
    1.背景打算装虚拟机,嫌麻烦,想到docker也可以实现,所以在本地部署docker2.docker和虚拟机的区别linux环境安装docker和制作镜像win11环境安装docker和制作镜像......
  • Java多线程技能-线程的启动
    java多线程技能技术点:线程的启动如何使线程暂停如何使线程停止线程的优先级线程安全相关的问题进程和线程的定义及多线程的优点进程:进程是受操作系统管理的基本......
  • 学习进度
    今天完成了对android入门的学习,用了2小时的时间。我知道了什么是安卓开发,并安装了Androidstudio编译器,知道了什么是线性布局,需要把原有的布局删掉,重新写一个布局,并且了解了......
  • 2023.2.21 我的第一篇博客——软件工程学习心得体会
    今天是我第一次在博客园写博客,本人目前是上海海洋大学软件工程系大二在读,第一篇博客就聊聊我这一年半对软件工程学习的感想吧。编程语言方面,大一学习了C和C++,大二上学期学......