目标:实现一个处理偶然事件的延迟队列。
- 偶然事件发生的概率不高
- 偶然事件一旦发生,事件的量多少不一
- 期望偶然事件处理完成之后,能回收处理偶然事件的所有资源(因为偶然事件发生的概率低,所有分配的资源大部分时候都处于闲置状态)
思路:回收闲置资源
- 发生偶然事件时,自动分配用于处理偶然事件的资源(线程、队列等资源)
- 偶然事件处理完成后,回收分配的资源
- 使用一个线程用于延迟任务的获取与分发,使用一个线程池并发处理分发后的延迟任务
实现:基于延迟队列实现方案
- 提交任务时,检查并初始化延迟队列
- 如果初始化延迟队列失败,则直接添加延迟任务到延迟队列(说明延迟队列已被初始化)
- 如果初始化延迟队列成功,则创建分发线程和任务处理线城池。使用分发线程获取延迟队列任务并将任务分发给线程池处理
- 在分发线程池分发完延迟队列任务之后,关闭任务线程池(待任务全部执行完成之后关闭),重置延迟队列的源自引用,回收分发线程
1 import org.slf4j.Logger; 2 import org.slf4j.LoggerFactory; 3 4 import java.util.concurrent.*; 5 import java.util.concurrent.atomic.AtomicReference; 6 7 /** 8 * @author chris 9 * @version ID: DelayQueueManager.java, v 0.1 Exp $ 10 * @since 2023-12-22 11:15:04 Fri 11 */ 12 public class DelayQueueManager { 13 14 private static final Logger logger = LoggerFactory.getLogger(DelayQueueManager.class); 15 16 private final int minThreads; 17 private final int maxThreads; 18 private final long keepAliveTime; 19 private final TimeUnit unit; 20 21 /** 延迟队列管理器执行线程 */ 22 private Thread dispatchThread; 23 /** 延迟队列管理器执行线程初始化计数 */ 24 private int initDispatchThreadCount; 25 26 private final AtomicReference<DelayQueue<DelayTask<Runnable>>> queueRef; 27 28 private static final DelayQueueManager manager = new DelayQueueManager(); 29 30 private DelayQueueManager() { 31 this(1, 1, 100, TimeUnit.MILLISECONDS); 32 } 33 34 private DelayQueueManager(int minThreads, int maxThreads, long keepAliveTime, TimeUnit unit) { 35 this.minThreads = minThreads; 36 this.maxThreads = maxThreads; 37 this.keepAliveTime = keepAliveTime; 38 this.unit = unit; 39 40 this.initDispatchThreadCount = 0; 41 this.queueRef = new AtomicReference<>(); 42 } 43 44 public static DelayQueueManager getInstance() { 45 return manager; 46 } 47 48 /** 49 * 提交延迟任务<br /> 50 * 51 * @param task 任务 52 * @param time 延迟时间 53 * @param timeUnit 时间单位 54 */ 55 public void submitTask(Runnable task, long time, TimeUnit timeUnit) { 56 long timeout = TimeUnit.MILLISECONDS.convert(time, timeUnit); 57 DelayTask<Runnable> delayTask = new DelayTask<>(task, timeout); 58 59 // 如果已经启动有执行线程,则不再启动新的执行线程 60 boolean res = this.queueRef.compareAndSet(null, new DelayQueue<>()); 61 62 DelayQueue<DelayTask<Runnable>> delayQueue = this.queueRef.get(); 63 delayQueue.put(delayTask); 64 65 if (!res) { 66 return; 67 } 68 69 ExecutorService executor = new ThreadPoolExecutor(minThreads, maxThreads, keepAliveTime, unit, new LinkedBlockingQueue<>()); 70 dispatchThread = new Thread(() -> { 71 try { 72 execute(executor); 73 } finally { 74 executor.shutdown(); 75 this.queueRef.compareAndSet(delayQueue, null); 76 } 77 }); 78 dispatchThread.start(); 79 initDispatchThreadCount++; 80 logger.info("延迟队列管理器执行线程第 {} 次启动成功, daemonThread={}", initDispatchThreadCount, dispatchThread); 81 } 82 83 private void execute(ExecutorService executor) { 84 DelayQueue<DelayTask<Runnable>> delayQueue; 85 while (!(delayQueue = this.queueRef.get()).isEmpty()) { 86 logger.info("延迟队列执行任务, queue={}", delayQueue.size()); 87 try { 88 DelayTask<Runnable> delayTask = delayQueue.take(); 89 Runnable task = delayTask.getTask(); 90 if (task == null) { 91 continue; 92 } 93 executor.execute(task); 94 } catch (Exception e) { 95 logger.error("延迟队列执行任务异常", e); 96 } 97 } 98 } 99 100 /** 101 * 延迟任务 102 * 103 * @param <T> 104 */ 105 private static class DelayTask<T extends Runnable> implements Delayed { 106 107 private final T task; 108 private final long expire; 109 110 private DelayTask(T task, long time) { 111 this.task = task; 112 this.expire = System.currentTimeMillis() + time; 113 } 114 115 @Override 116 public long getDelay(TimeUnit unit) { 117 return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS); 118 } 119 120 public T getTask() { 121 return task; 122 } 123 124 @Override 125 public int compareTo(Delayed o) { 126 DelayTask<?> other = (DelayTask<?>) o; 127 long diff = this.expire - other.expire; 128 if (diff < 0) { 129 return -1; 130 } else if (diff > 0) { 131 return 1; 132 } else { 133 return 0; 134 } 135 } 136 } 137 138 public Thread getDispatchThread() { 139 return dispatchThread; 140 } 141 142 public int getInitDispatchThreadCount() { 143 return initDispatchThreadCount; 144 } 145 }
测试
- 单线程提交任务之后,任务正常执行完成,资源正常回收。重新提交任务之后,重新分配资源处理任务(前后两次任务提交相隔较久,前一批次任务申请资源已经回收)
- 批量提交任务(在同一个偶然事件周期内)不会多次重复分配资源
- 多线程批量提交任务(同一个偶然事件周期内的并发场景)不会多次重复分配资源
1 import org.junit.Assert; 2 import org.junit.Before; 3 import org.junit.Test; 4 5 import java.lang.reflect.Constructor; 6 import java.util.concurrent.TimeUnit; 7 import java.util.concurrent.atomic.AtomicInteger; 8 9 /** 10 * @author chris 11 * @version ID: TestDelayQueueManager.java, v 0.1 Exp $ 12 * @since 2023-12-22 16:21:48 Fri 13 */ 14 public class TestDelayQueueManager { 15 16 private static DelayQueueManager manager; 17 18 @Before 19 public void setUp() throws Exception { 20 Constructor<DelayQueueManager> constructor = DelayQueueManager.class.getDeclaredConstructor(); 21 constructor.setAccessible(true); 22 manager = constructor.newInstance(); 23 } 24 25 /** 26 * 1. 连续提交任务,只启动一个执行线程 27 * 2. 任务执行完毕后,执行线程退出 28 * 3. 任务执行完毕后,再次提交任务,启动新的执行线程 29 */ 30 @Test 31 public void testExecutorThreadStart() throws InterruptedException { 32 Assert.assertNull("执行线程未初始化", manager.getDispatchThread()); 33 Assert.assertEquals("执行线程计数器", 0, manager.getInitDispatchThreadCount()); 34 35 AtomicInteger counter = new AtomicInteger(0); 36 MyTask task = new MyTask(counter); 37 38 // 提交任务,创建新的线程 39 manager.submitTask(task, 1, TimeUnit.SECONDS); 40 // 任务执行1s之内,执行线程不退出 41 Assert.assertTrue("执行线程启动", manager.getDispatchThread().isAlive()); 42 Assert.assertEquals("执行线程计数器", 1, manager.getInitDispatchThreadCount()); 43 Assert.assertEquals("任务计数器", 0, counter.get()); 44 // 任务执行2s之后,执行线程退出 45 Thread.sleep(2000); 46 Assert.assertFalse("执行线程退出", manager.getDispatchThread().isAlive()); 47 Assert.assertEquals("执行线程计数器", 1, manager.getInitDispatchThreadCount()); 48 Assert.assertEquals("任务计数器", 1, counter.get()); 49 50 // 提交任务,创建新的线程 51 manager.submitTask(task, 1, TimeUnit.SECONDS); 52 // 任务执行1s之内,执行线程不退出 53 Assert.assertTrue("执行线程启动", manager.getDispatchThread().isAlive()); 54 Assert.assertEquals("执行线程计数器", 2, manager.getInitDispatchThreadCount()); 55 Assert.assertEquals("任务计数器", 1, counter.get()); 56 // 任务执行2s之后,执行线程退出 57 Thread.sleep(2000); 58 Assert.assertFalse("执行线程退出", manager.getDispatchThread().isAlive()); 59 Assert.assertEquals("执行线程计数器", 2, manager.getInitDispatchThreadCount()); 60 Assert.assertEquals("任务计数器", 2, counter.get()); 61 } 62 63 /** 64 * 1. 单线程批量提交多个任务,只启动一个执行线程 65 * 2. 任务执行完毕后,执行线程退出 66 */ 67 @Test 68 public void testSingleThreadPutTask() throws InterruptedException { 69 AtomicInteger counter = new AtomicInteger(0); 70 MyTask task = new MyTask(counter); 71 // 单线程提交单个任务 72 Assert.assertEquals("执行线程计数器", 0, manager.getInitDispatchThreadCount()); 73 for (int i = 0; i < 10; i++) { 74 manager.submitTask(task, 1, TimeUnit.SECONDS); 75 } 76 77 // 任务执行1s之内,执行线程不退出 78 Assert.assertTrue("执行线程启动", manager.getDispatchThread().isAlive()); 79 Assert.assertEquals("执行线程计数器", 1, manager.getInitDispatchThreadCount()); 80 Assert.assertEquals("任务计数器", 0, counter.get()); 81 // 任务执行2s之后,执行线程退出 82 Thread.sleep(2000); 83 Assert.assertFalse("执行线程退出", manager.getDispatchThread().isAlive()); 84 Assert.assertEquals("执行线程计数器", 1, manager.getInitDispatchThreadCount()); 85 Assert.assertEquals("任务计数器", 10, counter.get()); 86 } 87 88 /** 89 * 1. 多线程批量提交多个任务,只启动一个执行线程 90 * 2. 任务执行完毕后,执行线程退出 91 */ 92 @Test 93 public void testMultiThreadPutTask() throws InterruptedException { 94 AtomicInteger counter = new AtomicInteger(0); 95 MyTask task = new MyTask(counter); 96 97 // 单线程提交单个任务 98 Assert.assertEquals("执行线程计数器", 0, manager.getInitDispatchThreadCount()); 99 for (int i = 0; i < 10; i++) { 100 new Thread(() -> { 101 for (int j = 0; j < 10; j++) { 102 manager.submitTask(task, 1, TimeUnit.SECONDS); 103 } 104 }).start(); 105 } 106 Thread.sleep(100); 107 // 任务执行100ms之后,线程退出 108 Assert.assertTrue("执行线程启动", manager.getDispatchThread().isAlive()); 109 Assert.assertEquals("执行线程计数器", 1, manager.getInitDispatchThreadCount()); 110 Assert.assertEquals("任务计数器", 0, counter.get()); 111 112 // 任务执行3000ms之后,执行线程退出 113 Thread.sleep(3000); 114 Assert.assertFalse("执行线程退出", manager.getDispatchThread().isAlive()); 115 Assert.assertEquals("执行线程计数器", 1, manager.getInitDispatchThreadCount()); 116 Assert.assertEquals("任务计数器", 10 * 10, counter.get()); 117 } 118 119 private record MyTask(AtomicInteger counter) implements Runnable { 120 121 @Override 122 public void run() { 123 this.counter.incrementAndGet(); 124 } 125 }
标签:队列,任务,Assert,manager,线程,偶然性,执行,延迟 From: https://www.cnblogs.com/benthal/p/18006467