首页 > 其他分享 >延迟队列-处理偶然性延迟任务的延迟队列

延迟队列-处理偶然性延迟任务的延迟队列

时间:2024-02-04 16:33:05浏览次数:21  
标签:队列 任务 Assert manager 线程 偶然性 执行 延迟

目标:实现一个处理偶然事件的延迟队列。

  1. 偶然事件发生的概率不高
  2. 偶然事件一旦发生,事件的量多少不一
  3. 期望偶然事件处理完成之后,能回收处理偶然事件的所有资源(因为偶然事件发生的概率低,所有分配的资源大部分时候都处于闲置状态)

 

思路:回收闲置资源

  1. 发生偶然事件时,自动分配用于处理偶然事件的资源(线程、队列等资源)
  2. 偶然事件处理完成后,回收分配的资源
  3. 使用一个线程用于延迟任务的获取与分发,使用一个线程池并发处理分发后的延迟任务

 

实现:基于延迟队列实现方案

  1. 提交任务时,检查并初始化延迟队列
  2. 如果初始化延迟队列失败,则直接添加延迟任务到延迟队列(说明延迟队列已被初始化)
  3. 如果初始化延迟队列成功,则创建分发线程和任务处理线城池。使用分发线程获取延迟队列任务并将任务分发给线程池处理
  4. 在分发线程池分发完延迟队列任务之后,关闭任务线程池(待任务全部执行完成之后关闭),重置延迟队列的源自引用,回收分发线程
  1 import org.slf4j.Logger;
  2 import org.slf4j.LoggerFactory;
  3 
  4 import java.util.concurrent.*;
  5 import java.util.concurrent.atomic.AtomicReference;
  6 
  7 /**
  8  * @author chris
  9  * @version ID: DelayQueueManager.java, v 0.1 Exp $
 10  * @since 2023-12-22 11:15:04 Fri
 11  */
 12 public class DelayQueueManager {
 13 
 14     private static final Logger logger = LoggerFactory.getLogger(DelayQueueManager.class);
 15 
 16     private final int      minThreads;
 17     private final int      maxThreads;
 18     private final long     keepAliveTime;
 19     private final TimeUnit unit;
 20 
 21     /** 延迟队列管理器执行线程 */
 22     private Thread dispatchThread;
 23     /** 延迟队列管理器执行线程初始化计数 */
 24     private int    initDispatchThreadCount;
 25 
 26     private final AtomicReference<DelayQueue<DelayTask<Runnable>>> queueRef;
 27 
 28     private static final DelayQueueManager manager = new DelayQueueManager();
 29 
 30     private DelayQueueManager() {
 31         this(1, 1, 100, TimeUnit.MILLISECONDS);
 32     }
 33 
 34     private DelayQueueManager(int minThreads, int maxThreads, long keepAliveTime, TimeUnit unit) {
 35         this.minThreads = minThreads;
 36         this.maxThreads = maxThreads;
 37         this.keepAliveTime = keepAliveTime;
 38         this.unit = unit;
 39 
 40         this.initDispatchThreadCount = 0;
 41         this.queueRef = new AtomicReference<>();
 42     }
 43 
 44     public static DelayQueueManager getInstance() {
 45         return manager;
 46     }
 47 
 48     /**
 49      * 提交延迟任务<br />
 50      *
 51      * @param task     任务
 52      * @param time     延迟时间
 53      * @param timeUnit 时间单位
 54      */
 55     public void submitTask(Runnable task, long time, TimeUnit timeUnit) {
 56         long timeout = TimeUnit.MILLISECONDS.convert(time, timeUnit);
 57         DelayTask<Runnable> delayTask = new DelayTask<>(task, timeout);
 58 
 59         // 如果已经启动有执行线程,则不再启动新的执行线程
 60         boolean res = this.queueRef.compareAndSet(null, new DelayQueue<>());
 61 
 62         DelayQueue<DelayTask<Runnable>> delayQueue = this.queueRef.get();
 63         delayQueue.put(delayTask);
 64 
 65         if (!res) {
 66             return;
 67         }
 68 
 69         ExecutorService executor = new ThreadPoolExecutor(minThreads, maxThreads, keepAliveTime, unit, new LinkedBlockingQueue<>());
 70         dispatchThread = new Thread(() -> {
 71             try {
 72                 execute(executor);
 73             } finally {
 74                 executor.shutdown();
 75                 this.queueRef.compareAndSet(delayQueue, null);
 76             }
 77         });
 78         dispatchThread.start();
 79         initDispatchThreadCount++;
 80         logger.info("延迟队列管理器执行线程第 {} 次启动成功, daemonThread={}", initDispatchThreadCount, dispatchThread);
 81     }
 82 
 83     private void execute(ExecutorService executor) {
 84         DelayQueue<DelayTask<Runnable>> delayQueue;
 85         while (!(delayQueue = this.queueRef.get()).isEmpty()) {
 86             logger.info("延迟队列执行任务, queue={}", delayQueue.size());
 87             try {
 88                 DelayTask<Runnable> delayTask = delayQueue.take();
 89                 Runnable task = delayTask.getTask();
 90                 if (task == null) {
 91                     continue;
 92                 }
 93                 executor.execute(task);
 94             } catch (Exception e) {
 95                 logger.error("延迟队列执行任务异常", e);
 96             }
 97         }
 98     }
 99 
100     /**
101      * 延迟任务
102      *
103      * @param <T>
104      */
105     private static class DelayTask<T extends Runnable> implements Delayed {
106 
107         private final T    task;
108         private final long expire;
109 
110         private DelayTask(T task, long time) {
111             this.task = task;
112             this.expire = System.currentTimeMillis() + time;
113         }
114 
115         @Override
116         public long getDelay(TimeUnit unit) {
117             return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
118         }
119 
120         public T getTask() {
121             return task;
122         }
123 
124         @Override
125         public int compareTo(Delayed o) {
126             DelayTask<?> other = (DelayTask<?>) o;
127             long diff = this.expire - other.expire;
128             if (diff < 0) {
129                 return -1;
130             } else if (diff > 0) {
131                 return 1;
132             } else {
133                 return 0;
134             }
135         }
136     }
137 
138     public Thread getDispatchThread() {
139         return dispatchThread;
140     }
141 
142     public int getInitDispatchThreadCount() {
143         return initDispatchThreadCount;
144     }
145 }

 

测试

  1. 单线程提交任务之后,任务正常执行完成,资源正常回收。重新提交任务之后,重新分配资源处理任务(前后两次任务提交相隔较久,前一批次任务申请资源已经回收)
  2. 批量提交任务(在同一个偶然事件周期内)不会多次重复分配资源
  3. 多线程批量提交任务(同一个偶然事件周期内的并发场景)不会多次重复分配资源
  1 import org.junit.Assert;
  2 import org.junit.Before;
  3 import org.junit.Test;
  4 
  5 import java.lang.reflect.Constructor;
  6 import java.util.concurrent.TimeUnit;
  7 import java.util.concurrent.atomic.AtomicInteger;
  8 
  9 /**
 10  * @author chris
 11  * @version ID: TestDelayQueueManager.java, v 0.1 Exp $
 12  * @since 2023-12-22 16:21:48 Fri
 13  */
 14 public class TestDelayQueueManager {
 15 
 16     private static DelayQueueManager manager;
 17 
 18     @Before
 19     public void setUp() throws Exception {
 20         Constructor<DelayQueueManager> constructor = DelayQueueManager.class.getDeclaredConstructor();
 21         constructor.setAccessible(true);
 22         manager = constructor.newInstance();
 23     }
 24 
 25     /**
 26      * 1. 连续提交任务,只启动一个执行线程
 27      * 2. 任务执行完毕后,执行线程退出
 28      * 3. 任务执行完毕后,再次提交任务,启动新的执行线程
 29      */
 30     @Test
 31     public void testExecutorThreadStart() throws InterruptedException {
 32         Assert.assertNull("执行线程未初始化", manager.getDispatchThread());
 33         Assert.assertEquals("执行线程计数器", 0, manager.getInitDispatchThreadCount());
 34 
 35         AtomicInteger counter = new AtomicInteger(0);
 36         MyTask task = new MyTask(counter);
 37 
 38         // 提交任务,创建新的线程
 39         manager.submitTask(task, 1, TimeUnit.SECONDS);
 40         // 任务执行1s之内,执行线程不退出
 41         Assert.assertTrue("执行线程启动", manager.getDispatchThread().isAlive());
 42         Assert.assertEquals("执行线程计数器", 1, manager.getInitDispatchThreadCount());
 43         Assert.assertEquals("任务计数器", 0, counter.get());
 44         // 任务执行2s之后,执行线程退出
 45         Thread.sleep(2000);
 46         Assert.assertFalse("执行线程退出", manager.getDispatchThread().isAlive());
 47         Assert.assertEquals("执行线程计数器", 1, manager.getInitDispatchThreadCount());
 48         Assert.assertEquals("任务计数器", 1, counter.get());
 49 
 50         // 提交任务,创建新的线程
 51         manager.submitTask(task, 1, TimeUnit.SECONDS);
 52         // 任务执行1s之内,执行线程不退出
 53         Assert.assertTrue("执行线程启动", manager.getDispatchThread().isAlive());
 54         Assert.assertEquals("执行线程计数器", 2, manager.getInitDispatchThreadCount());
 55         Assert.assertEquals("任务计数器", 1, counter.get());
 56         // 任务执行2s之后,执行线程退出
 57         Thread.sleep(2000);
 58         Assert.assertFalse("执行线程退出", manager.getDispatchThread().isAlive());
 59         Assert.assertEquals("执行线程计数器", 2, manager.getInitDispatchThreadCount());
 60         Assert.assertEquals("任务计数器", 2, counter.get());
 61     }
 62 
 63     /**
 64      * 1. 单线程批量提交多个任务,只启动一个执行线程
 65      * 2. 任务执行完毕后,执行线程退出
 66      */
 67     @Test
 68     public void testSingleThreadPutTask() throws InterruptedException {
 69         AtomicInteger counter = new AtomicInteger(0);
 70         MyTask task = new MyTask(counter);
 71         // 单线程提交单个任务
 72         Assert.assertEquals("执行线程计数器", 0, manager.getInitDispatchThreadCount());
 73         for (int i = 0; i < 10; i++) {
 74             manager.submitTask(task, 1, TimeUnit.SECONDS);
 75         }
 76 
 77         // 任务执行1s之内,执行线程不退出
 78         Assert.assertTrue("执行线程启动", manager.getDispatchThread().isAlive());
 79         Assert.assertEquals("执行线程计数器", 1, manager.getInitDispatchThreadCount());
 80         Assert.assertEquals("任务计数器", 0, counter.get());
 81         // 任务执行2s之后,执行线程退出
 82         Thread.sleep(2000);
 83         Assert.assertFalse("执行线程退出", manager.getDispatchThread().isAlive());
 84         Assert.assertEquals("执行线程计数器", 1, manager.getInitDispatchThreadCount());
 85         Assert.assertEquals("任务计数器", 10, counter.get());
 86     }
 87 
 88     /**
 89      * 1. 多线程批量提交多个任务,只启动一个执行线程
 90      * 2. 任务执行完毕后,执行线程退出
 91      */
 92     @Test
 93     public void testMultiThreadPutTask() throws InterruptedException {
 94         AtomicInteger counter = new AtomicInteger(0);
 95         MyTask task = new MyTask(counter);
 96 
 97         // 单线程提交单个任务
 98         Assert.assertEquals("执行线程计数器", 0, manager.getInitDispatchThreadCount());
 99         for (int i = 0; i < 10; i++) {
100             new Thread(() -> {
101                 for (int j = 0; j < 10; j++) {
102                     manager.submitTask(task, 1, TimeUnit.SECONDS);
103                 }
104             }).start();
105         }
106         Thread.sleep(100);
107         // 任务执行100ms之后,线程退出
108         Assert.assertTrue("执行线程启动", manager.getDispatchThread().isAlive());
109         Assert.assertEquals("执行线程计数器", 1, manager.getInitDispatchThreadCount());
110         Assert.assertEquals("任务计数器", 0, counter.get());
111 
112         // 任务执行3000ms之后,执行线程退出
113         Thread.sleep(3000);
114         Assert.assertFalse("执行线程退出", manager.getDispatchThread().isAlive());
115         Assert.assertEquals("执行线程计数器", 1, manager.getInitDispatchThreadCount());
116         Assert.assertEquals("任务计数器", 10 * 10, counter.get());
117     }
118 
119     private record MyTask(AtomicInteger counter) implements Runnable {
120 
121         @Override
122         public void run() {
123             this.counter.incrementAndGet();
124         }
125     }

 

标签:队列,任务,Assert,manager,线程,偶然性,执行,延迟
From: https://www.cnblogs.com/benthal/p/18006467

相关文章

  • JUC【1.原子类、2.锁Lock、3.阻塞队列、4.并发集合容器、5.并发工具类、6.线程池】、
    (JUC简介)转自极客时间1.JUC简介从JDK1.5起,JavaAPI中提供了java.util.concurrent(简称JUC)包,在此包中定义了并发编程中很常用的工具,比如:线程池、阻塞队列、同步器、原子类等等。JUC是JSR166标准规范的一个实现,JSR166以及JUC包的作者是同一个人DougLea。2.原......
  • Queue(队列)
    特性先进先出,允许再表的一端进行删除另一端进行插入运算。STL方式头文件#include<queue>定义queue<int>q;//建立一个队列q,其内部元素类型是int;函数q,push(a);//将元素a插入到队列q的末尾/q.pop();//删除队列q的队首元素。q.front();//查询q的队首元素。q.ba......
  • (10/60)用栈实现队列、用队列实现栈
    用栈实现队列实现思路用两个栈实现。入队用输入栈stIn,出队用输出栈stOut。实现pop()时,要注意pop只删除,不返回值。复杂度分析略注意点stack的pop只能弹出,不返回值;弹出并获取值分成:用top()记录栈顶值、用pop()弹出(删除)栈顶值。class方法调用要用->。代码实现classMyQu......
  • 堆(优先队列)
    堆是一种树形结构,树的根是堆顶,堆顶始终保持为所有元素的最优值。有大根堆和小根堆,大根堆的根节点是最大值,小根堆的根节点是最小值。堆一般用二叉树实现,称为二叉堆。堆的存储方式堆的操作empty返回堆是否为空top直接返回根节点的值,时间复杂度\(O(1)\)push将新元素添加在......
  • kafka系列(一)【消息队列、Kafka的基本概念、Kafka的工作机制、Kafka可满足的需求、Kafk
    (kafka系列一)转自《Kafka并不难学!入门、进阶、商业实战》一、消息队列1.消息队列的来源在高并发的应用场景中,由于来不及同步处理请求,接收到的请求往往会发生阻塞。例如,大量的插入、更新请求同时到达数据库,这会导致行或表被锁住,最后会因为请求堆积过多而触发“连接数过多的......
  • EasyCVR智能视频监控平台云台降低延迟小tips
    关于EasyCVR的使用tips很多,今天小编就分享一下在使用EasyCVR云台控制时出现延迟的解决方法。很多用户在将设备通过国标协议或者sdk以及onvif的方法接入到EasyCVR后,使用云台控制的时会出现转动的延迟。其实这个延迟和当前选择的播放协议是有关联。由于默认的是选择flv的播放格式,一般......
  • EasyCVR智能视频监控平台云台降低延迟小tips
    TSINGSEE青犀视频监控汇聚平台EasyCVR可拓展性强、视频能力灵活、部署轻快,可支持的主流标准协议有国标GB28181、RTSP/Onvif、RTMP等,以及支持厂家私有协议与SDK接入,包括海康Ehome、海大宇等设备的SDK等。平台既具备传统安防视频监控的能力,也具备接入AI智能分析的能力,包括对人、车、......
  • Poj 3414 Pots (BFS+回溯+队列)
    这道题需要输出最后结果的执行过程,可以通过结构体,在结构体中定义一个数组s,s中存储了每一步的执行过程,实现了回溯。并且在运行中可以适当剪枝,减少枚举次数。 #include<iostream>#include<queue>#include<cstring>usingnamespacestd;constintN=110;intaa,bb,cc,vis[N......
  • 延迟Promise/单项目多npm版本/webpack好处/webpack5构建速度
    《Promise.withResolvers延迟Promise》https://sorrycc.com/promise-with-resolvers/promise当被reject之后,再次resolve,都是会返回reject的消息一个npm包的多个版本webpack好处需要通过模块化的方式来开发使用一些高级的特性来加快我们的开发效率或者安全性,比如通过ES6......
  • Poj 3278 Catch That Cow (BFS+队列)
    #include<iostream>#include<queue>#include<cstring>usingnamespacestd;constintN=1e5+10;intn,k,line[N],way;structnode{intloc,step;};queue<node>q;voidBFS(intn,intk){while(!q.empty())q.pop();nodestart,next......