首页 > 其他分享 >自定义线程池

自定义线程池

时间:2024-08-05 11:38:55浏览次数:19  
标签:return 自定义 int private 任务 线程 public

自定义线程池

   概要

   自JDK1.5起,util包提供了ExecutorService线程池的实现,主要目的是为了重复利用线程,提高系统效率。我们知道Thread是一个重量级的资源,创建、启动以及销毁都是比较耗费系统资源的,因此对线程的重复利用一种是非常好的程序设计习惯,加之系统中可创建的线程数量是有限的,线程数量和系统性能是一种抛物线的关系,也就是说当线程数量达到某个数值的时侯,性能反倒会降低很多,因此对线程的管理,尤其是数量的控制更能直接决定程序的性能。

   我们从原理入手,设计一个线程池,其目的并不是重复地发明轮子,而是为了弄清楚一个线程池应该具备哪些功能,线程池的实现需要注意哪些细节。然后再讲解一下
ExecutorService的使用。

   一、线程池原理

   所谓线程池,通俗的理解就是有一个池子,里面存放着已经创建好的线程,当有任务提交给线程池执行时,池子中的某个线程会主动执行该任务。如果池子中的线程数量不够应付数量众多的任务时,则需要自动扩充新的线程到池子中,但是该数量是有限的,就好比池塘的水界线一样。当任务比较少的时候,池子中的线程能够自动回收,释放资源。为了能够异步地提交任务和缓存未被处理的任务,需要有一个任务队列。

   如下图所示: 

  

   通过上面的描述可知,一个完整的线程池应该具备如下要素。

   1)任务队列:用于缓存提交的任务。

   2)线程数量管理功能:一个线程池必须能够很好地管理和控制线程数量,可通过如下三个参数来实现,比如创建线程池时初始的线程数量init;线程池自动扩充时最大的线程数量max;在线程池空闲时需要释放线程但是也要维护一定数量的活跃数量或者核心数量core。有了这三个参数,就能够很好地控制线程池中的线程数量,将其维护在一个合理的范围之内,三者之间的关系是init<=core<=max。

   3)任务拒绝策略:如果线程数量已达到上限且任务队列已满,则需要有相应的拒绝策略来通知任务提交者。

   4)线程工厂:主要用于个性化定制线程,比如将线程设置为守护线程以及设置线程名称等。

   5)QueueSize: 任务队列主要存放提交的Runnable,但是为了防止内存溢出,需要有 limit 数量对其进行控制。

   6)KeepAlive时间:该时间主要决定线程各个重要参数自动维护的时间间隔。

   7)Internal Task

   InternalTask 是Runnable的一个实现,主要用于线程池内部,该类会使用到 RunnableQueue,然后不断地从queue中取出某个Runnable,并运行Runnable的run 方法,除此之外,代码还对该类增加了一个开关方法stop,主要用于停止当前线程,一般在线程池销毁和线程数量维护的时候会使用到。

  二、线程池的应用

  下面将写一个简单的程序分别测试线程池的任务提交、线程池线程数量的动态扩展,以及线程池的挡毁功能。

   先整体梳理下:

   1)ThreadPool接口

   2)RunnableQueue接口

   3)ThreadFactory接口

   4)DenyPolicy接口和RunnableDenyException自定义异常类

   5)InternalTask类

   6)LinkedRunnableQueue实现类

   7)BasicThreadPool 线程池实现类

   BasicThreadPool实现类分析

   BasicThreadPool实现类_属性和构造器的编写

   BasicThreadPool实现类_初始化线程池和提交任务

   BasicThreadPool实现类_线程池自动维护

   BasicThreadPool实现类_线程池销毁和其他方法

   线程池中线程数量的维护主要由run负责,这也是为什么BasicThreadPool继承自Thread了,不过不推荐使用直接继承的方式。

   下面是代码示例:

   1.  ThreadPool 线程池接口:

 1 /**
 2  * 线程池接口
 3  */
 4 
 5 public interface ThreadPool {
 6 
 7     /**
 8      * 提交任务到线程池
 9      *
10      * @param runnable
11      */
12     void execute(Runnable runnable);
13 
14     /**
15      * 关闭线程池
16      */
17     void shutdown();
18 
19     /**
20      * 查看线程池是否已经被shutdown
21      *
22      * @return
23      */
24     boolean isShutdown();
25 
26     /**
27      * 获取线程池的初始化大小
28      *
29      * @return
30      */
31     int getInitSize();
32 
33     /**
34      * 获取线程池最大的线程数
35      *
36      * @return
37      */
38     int getMaxSize();
39 
40     /**
41      * 获取线程池的核心线程数量
42      *
43      * @return
44      */
45     int getCoreSize();
46 
47     /**
48      * 获取线程池用于缓存任务队列的大小
49      *
50      * @return
51      */
52     int getQueueSize();
53 
54     /**
55      * 获取线程池中活跃线程的数量
56      *
57      * @return
58      */
59     int getActiveCount();
60 }

   2. RunnableQueue 任务队列:

 1 /**
 2  * 任务队列,主要用于缓存提交到线程池中的任务
 3  */
 4 
 5 public interface RunnableQueue {
 6 
 7     /**
 8      * 当有新的任务进来时首先会offer到队列中
 9      */
10     void offer(Runnable runnable);
11 
12 
13     /**
14      * 工作线程通过take方法获取Runnable
15      * @return
16      */
17     Runnable take();
18 
19     /**
20      * 获取任务队列中任务的数量
21      * @return
22      */
23     int size();
24 }

   3. ThreadFactory 线程工厂

1 @FunctionalInterface
2 public interface ThreadFactory {
5     Thread createThread(Runnable runnable);
8 }

   4. DenyPolicy 拒绝策略

 1 /**
 2  * 拒绝策略
 3  */
 4 @FunctionalInterface
 5 public interface DenyPolicy {
 6     void reject(Runnable runnable, ThreadPool threadPool);
 7 
 8     /**
 9      * 该拒绝策略会直接将任务丢弃
10      */
11     class DiscardDenyPolicy implements DenyPolicy {
12         @Override
13         public void reject(Runnable runnable, ThreadPool threadPool) {
14             // ...
15             System.out.println(runnable + "任务已被丢弃");
16         }
17     }
18 
19     /**
20      * 该拒绝策略会向任务提交者抛出异常
21      */
22     class AbortDenyPolicy implements DenyPolicy {
23         @Override
24         public void reject(Runnable runnable, ThreadPool threadPool) {
25             throw new RunnableDenyException("任务" + runnable + " 将被终止。");
26         }
27     }
28 
29 
30     /**
31      * 该拒绝策略会使任务在提交者所在的线程中执行任务
32      */
33     class RunnerDenyPolicy implements DenyPolicy {
34         @Override
35         public void reject(Runnable runnable, ThreadPool threadPool) {
36             if (!threadPool.isShutdown()) {
37                 runnable.run();
38             }
39         }
40     }
41 }

   自定义拒绝策略异常类:

1 /**
2  * 自定义拒绝策略异常类
3  */
4 
5 public class RunnableDenyException extends RuntimeException {
6     public RunnableDenyException(String message) {
7         super(message);
8     }
9 }

   5. InternalTask 任务类

 1 public class InternalTask implements Runnable {
 2 
 3     private final RunnableQueue runnableQueue;
 4     private volatile boolean running = true;
 5 
 6     public InternalTask(RunnableQueue runnableQueue) {
 7         this.runnableQueue = runnableQueue;
 8 
 9     }
10 
11     @Override
12     public void run() {
13         Runnable task;
14 
15         // 如果当前任务为running并且没有被中断,则不断的从 queue 中获取runnable然后执行run方法
16         while (running && !Thread.currentThread().isInterrupted()) {
17             task = runnableQueue.take();
18             task.run();
19         }
20     }
21 
22     /**
23      * 停止当前任务,主要会在线程池的 shutdown 方法中使用
24      */
25     public void stop() {
26         this.running = false;
27     }
28 }

   6. LinkedRunnableQueue 任务队列实现类

 1 public class LinkedRunnableQueue implements RunnableQueue {
 2 
 3     /**
 4      * 任务队列的最大容量
 5      */
 6     private final int limit;
 7 
 8     /**
 9      * 若任务队列中的任务已经满了,则需要执行拒绝策略
10      */
11     private final DenyPolicy denyPolicy;
12 
13     /**
14      * 存放任务的队列
15      */
16     private final LinkedList<Runnable> runnableList = new LinkedList<>();
17     private final ThreadPool threadPool;
18 
19     public LinkedRunnableQueue(int limit, DenyPolicy denyPolicy, ThreadPool threadPool) {
20         this.limit = limit;
21         this.denyPolicy = denyPolicy;
22         this.threadPool = threadPool;
23     }
24 
25     /**
26      * 当有新的任务进来时首先会offer到队列中
27      */
28     @Override
29     public void offer(Runnable runnable) {
30         synchronized (runnableList) {
31             if (runnableList.size() >= limit) {
32                 //无法容纳新的任务时执行拒绝策略
33                 denyPolicy.reject(runnable, threadPool);
34             } else {
35                 // 将任务加入到队尾,并且唤醒阻塞中的线程
36                 runnableList.addLast(runnable);
37                 runnableList.notifyAll();
38             }
39         }
40     }
41 
42     /**
43      * 工作线程通过take方法获取Runnable
44      * @return
45      */
46     @Override
47     public Runnable take() {
48         synchronized (runnableList) {
49            while (runnableList.isEmpty()){
50                try {
51                    // 如果任务队列中没有可执行的任务,则当前线程将会挂起
52                    runnableList.wait();
53                } catch (InterruptedException e) {
54                    e.printStackTrace();
55                }
56            }
57 
58            // 从任务队列头部移除一个任务
59            return runnableList.removeFirst();
60         }
61     }
62 
63     /**
64      * 获取任务队列中任务的数量
65      * @return
66      */
67     @Override
68     public int size() {
69         synchronized (runnableList) {
70             //返回当前任务队列中的任务数
71           return  runnableList.size();
72         }
73     }
74 }

    7.  BasicThreadPool 线程池实现类

  1 public class BasicThreadPool extends Thread implements ThreadPool {
  2     /**
  3      * 初始化线程数量
  4      */
  5     private final int initSize;
  6     /**
  7      * 线程池最大线程数量
  8      */
  9     private final int maxSize;
 10     /**
 11      * 线程池核心线程数量
 12      */
 13     private final int coreSize;
 14     /**
 15      * 当前活跃的线程数量
 16      */
 17     private int activeCount;
 18 
 19     /**
 20      * 线程池是否被 shutdown 的标记
 21      */
 22     private volatile boolean isShutdown = false;
 23     /**
 24      * 线程存活时间
 25      */
 26     private final long keepAliveTime;
 27     /**
 28      * 时间单位
 29      */
 30     private final TimeUnit timeUnit;
 31     /**
 32      * 创建线程所需的工厂
 33      */
 34     private final ThreadFactory threadFactory;
 35     private final static ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory();
 36 
 37 
 38     /**
 39      * 工厂实现类(内部类)
 40      */
 41     private static class DefaultThreadFactory implements ThreadFactory {
 42 
 43         private static final AtomicInteger GROUP_COUNTER = new AtomicInteger(0);
 44         private static final ThreadGroup GROUP = new ThreadGroup("MyThreadGroupPool-" + GROUP_COUNTER.getAndIncrement());
 45         private static final AtomicInteger COUNTER = new AtomicInteger(0);
 46 
 47         @Override
 48         public Thread createThread(Runnable runnable) {
 49             return new Thread(GROUP, runnable, "thread-pool-" + COUNTER.getAndIncrement());
 50         }
 51     }
 52 
 53     /**
 54      * 任务队列
 55      */
 56     private final RunnableQueue runnableQueue;
 57 
 58     /**
 59      * 工作线程队列
 60      */
 61     private final Queue<ThreadTask> threadQueue = new ArrayDeque<>();
 62 
 63 
 64     private static class ThreadTask {
 65         Thread thread;
 66         InternalTask internalTask;
 67 
 68         public ThreadTask(Thread thread, InternalTask internalTask) {
 69             this.thread = thread;
 70             this.internalTask = internalTask;
 71         }
 72     }
 73 
 74     /**
 75      * 拒绝策略
 76      */
 77     private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy();
 78 
 79     /**
 80      * 构造器
 81      *
 82      * @param initSize
 83      * @param maxSize
 84      * @param coreSize
 85      * @param queueSize
 86      */
 87     public BasicThreadPool(int initSize, int maxSize, int coreSize, int queueSize) {
 88         this(initSize, maxSize, coreSize, queueSize, DEFAULT_DENY_POLICY, 10, TimeUnit.SECONDS, DEFAULT_THREAD_FACTORY);
 89     }
 90 
 91     public BasicThreadPool(int initSize, int maxSize, int coreSize, int queueSize, DenyPolicy denyPolicy, long keepAliveTime, TimeUnit timeUnit, ThreadFactory threadFactory) {
 92         this.initSize = initSize;
 93         this.maxSize = maxSize;
 94         this.coreSize = coreSize;
 95         this.runnableQueue = new LinkedRunnableQueue(queueSize, denyPolicy, this);
 96         this.keepAliveTime = keepAliveTime;
 97         this.timeUnit = timeUnit;
 98         this.threadFactory = threadFactory;
 99         // 初始化线程池
100         init();
101     }
102 
103     /**
104      * 初始化线程池
105      */
106     private void init() {
107 
108         //启动线程池
109         this.start();
110 
111         //创建任务线程
112         System.out.println("初始化线程池数量>>>" + initSize);
113         for (int i = 0; i < initSize; i++) {
114             newThread();
115         }
116     }
117 
118     /**
119      * 创建任务线程并启动
120      */
121     private void newThread() {
122         InternalTask internalTask = new InternalTask(runnableQueue);
123         Thread thread = this.threadFactory.createThread(internalTask);
124 
125         //线程和任务类关联在一起
126         ThreadTask threadTask = new ThreadTask(thread, internalTask);
127         threadQueue.offer(threadTask);
128         this.activeCount++;
129 
130         //启动任务线程
131         thread.start();
132     }
133 
134     /**
135      * 从线程池中移除某个线程
136      */
137     private void removeThread() {
138         ThreadTask threadTask = threadQueue.remove();
139         threadTask.internalTask.stop();
140         this.activeCount--;
141     }
142 
143     /**
144      * 提交任务到线程池
145      * @param runnable
146      */
147     @Override
148     public void execute(Runnable runnable) {
149         if (this.isShutdown) {
150             throw new IllegalStateException("线程池已销毁。");
151         }
152 
153         //提交任务
154         this.runnableQueue.offer(runnable);
155     }
156 
157     /**
158      * 线程池自动维护(相当于线程池的扩容)
159      */
160     @Override
161     public void run() {
162 
163         //线程池没有被shutdown也没有中断的标
164         while (!isShutdown && !isInterrupted()) {
165             try {
166                 timeUnit.sleep(keepAliveTime);
167             } catch (InterruptedException e) {
168                 e.printStackTrace();
169             }
170 
171             synchronized (this) {
172                 if (isShutdown) {
173                     break;
174                 }
175 
176                 // 当前的队列中有任务尚未处理,并且activeCount < coreSize则继续扩容
177                 System.out.println("activeCount>>>" + activeCount);
178                 if (runnableQueue.size() > 0 && activeCount < coreSize) {
179                     System.out.println("线程池扩容至coreSize>>>" + coreSize);
180                     for (int i = initSize; i < coreSize; i++) {
181                         newThread();
182                     }
183 
184                     continue;
185                 }
186 
187                 // 当前的队列中有任务尚未处理,并且activeCount < maxSize则继续扩容
188                 if (runnableQueue.size() > 0 && activeCount < maxSize) {
189                     System.out.println("线程池扩容至maxSize>>>" + maxSize);
190                     for (int i = coreSize; i < maxSize; i++) {
191                         newThread();
192                     }
193                 }
194 
195                 //当前的队列中没有任务,则需要回收,回收至coreSize即可
196                 if (runnableQueue.size() == 0 && activeCount > coreSize) {
197                     System.out.println("当前队列中没有任务,回收线程至>>>" + coreSize);
198                     for (int i = coreSize; i < activeCount; i++) {
199                         removeThread();
200                     }
201                 }
202             }
203         }
204     }
205 
206 
207     /**
208      * 线程池销毁
209      */
210     @Override
211     public void shutdown() {
212         synchronized (this) {
213             if (isShutdown) {
214                 return;
215             }
216 
217             isShutdown = true;
218             threadQueue.forEach(threadTask -> {
219                 threadTask.internalTask.stop();
220 
221                 //打上中断标记
222                 threadTask.thread.interrupt();
223             });
224 
225             this.interrupt();
226         }
227     }
228 
229     @Override
230     public boolean isShutdown() {
231         if (this.isShutdown) {
232             throw new IllegalStateException("线程池已销毁。");
233         }
234         return this.isShutdown;
235     }
236 
237     @Override
238     public int getInitSize() {
239         if (this.isShutdown) {
240             throw new IllegalStateException("线程池已销毁。");
241         }
242         return this.initSize;
243     }
244 
245     @Override
246     public int getMaxSize() {
247         if (this.isShutdown) {
248             throw new IllegalStateException("线程池已销毁。");
249         }
250         return this.maxSize;
251     }
252 
253     /**
254      * 获取线程池的核心线程数量
255      *
256      * @return
257      */
258     @Override
259     public int getCoreSize() {
260         if (this.isShutdown) {
261             throw new IllegalStateException("线程池已销毁。");
262         }
263         return this.coreSize;
264     }
265 
266     /**
267      * 获取线程池用于缓存任务队列的大小
268      *
269      * @return
270      */
271     @Override
272     public int getQueueSize() {
273         if (this.isShutdown) {
274             throw new IllegalStateException("线程池已销毁。");
275         }
276         return this.runnableQueue.size();
277     }
278 
279     /**
280      * 获取线程池中活跃线程的数量
281      *
282      * @return
283      */
284     @Override
285     public int getActiveCount() {
286         if (this.isShutdown) {
287             throw new IllegalStateException("线程池已销毁。");
288         }
289         return this.activeCount;
290     }
291 }

 

   

标签:return,自定义,int,private,任务,线程,public
From: https://www.cnblogs.com/hld123/p/18342922

相关文章

  • echarts自定义x轴和tooltip数据格式
    echarts自定义x轴和tooltip数据格式x轴和y轴数据格式如下x:[0,1,2,3,4,5,6,.....,23],y:[2.5,3.1,3.2,2.2,2.3,3.1,3.1,null,null,null,....,null]//接口返回0-23点的数据,每一个小时一个间隔,没有的话则为null 修改后xy轴数据格式如下//每五分钟一......
  • vue + quill2.0+ 工具栏自定义行高
    在网上查了好多,基本都是基于1.0+版本的,拿过来都用不了,官方又没有文档,只能参考各位前辈的经验+解析源码查找问题。目前已经解决,下面是实现过程。实现代码  先看效果图我用的是原生quill库,正常引入quill,注册行高插件importQuillfrom'quill'import'quill/dist/quill.......
  • Delphi 线程
    不是原创,只是看到好的内容复制了保存下来,留着学习。 CreadteThred参考,同步参考,WaitForSingleObject参考,互斥参考, 一、在Delphi中使用多线程有两种方法:调用API、使用TThread类;使用API的代码更简单.1、调用API:CreateThread()functionCreateThread( lpThr......
  • 使用TaskDecorator装饰器实现再线程隔离下的数据复制
    自定装饰器importorg.slf4j.MDC;importorg.springframework.core.task.TaskDecorator;importjava.util.Map;publicclassComTaskDecoratorimplementsTaskDecorator{@OverridepublicRunnabledecorate(Runnablerunnable){//主线程可执行的代......
  • 打造Perl中的词法分析器:深入自定义文本处理
    打造Perl中的词法分析器:深入自定义文本处理Perl作为一种强大的文本处理语言,提供了丰富的工具来实现词法分析器(Lexer)。词法分析是编译原理中将源代码分解成一系列词素(Tokens)的过程,是构建编译器或解释器的第一步。本文将详细探讨如何在Perl中实现一个自定义的词法分析器,包括......
  • 【Redis】全局命令/内部编码/浅谈单线程模型
    目录前言两个核心命令GET和SET全局命令KEYSEXISTS DELEXPIRETTLTYPE 数据结构的内部编码Redis的5中数据类型Redis数据结构和内部编码单线程架构前言Redis提供了5种数据结构,理解每种数据结构的特点对于Redis开发运维⾮常重要,同时掌握每种数据结构的常......
  • 守护数据堡垒:SQL Server数据库自定义备份审计实现指南
    标题:守护数据堡垒:SQLServer数据库自定义备份审计实现指南引言数据库备份是确保数据安全和业务连续性的关键措施。SQLServer提供了多种备份策略,但有时候,为了满足特定的合规性要求或业务需求,我们需要实现更细粒度的自定义数据备份审计。本文将详细介绍如何在SQLServer中......
  • 自定义导航栏兼容ios和android
    <template>  <view class="content">    <!--距离顶部的距离刚好留出状态栏即可即statusBarHeight-->    <view class="topNav" :style="{height:navHeight+'px',paddingTop:statusBarHeight+'px'}">      <......
  • 数据安全堡垒:构建SQL Server自定义数据安全策略
    数据安全堡垒:构建SQLServer自定义数据安全策略在数字化时代,数据安全是企业的生命线。SQLServer作为企业级数据库解决方案,提供了丰富的安全特性来保护数据。然而,面对复杂的业务需求和不断演变的安全威胁,自定义数据安全策略显得尤为重要。本文将详细探讨如何在SQLServer中......
  • 多线程-进阶2
     博主主页: 码农派大星.  数据结构专栏:Java数据结构 数据库专栏:MySQL数据库JavaEE专栏:JavaEE关注博主带你了解更多数据结构知识1.CAS1.1CAS全称:Compareandswap比较内存和cpu寄存器中的内容,如果发现相同,就进行交换(交换的是内存和另一个寄存器的内容)......