java 线程池默认提供了几种拒绝策略:
这几个策略都实现了RejectedExecutionHandler,拿DiscardOldestPolicy来说,查看源码:
核心代码只有2行:
- e.getQueue().poll() 从列表里弹出1个(最早的)任务,以便让队列空出1个位置
- e.execute(r) 新任务放入队列执行
从这段代码来看,如果有任务被丢弃(即:从队列里弹出了),不会有任何报错,也没有日志可查,实际使用中不太方便监控这种情况。
我们可以参考这段源码,自定义策略:
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; public class CustomDiscardPolicy implements RejectedExecutionHandler { //额外传入1个名称,方便打日志或埋点监控时,定位问题 private String factoryName = ""; public CustomDiscardPolicy(String factoryName) { this.factoryName = factoryName; } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { Runnable poll = e.getQueue().poll(); //这里可以加一些自己的处理(比如:埋点监控) System.err.println("[" + this.factoryName + "]task will be discard:" + poll); e.execute(r); } } }
当然,这里出于演示目的,只打了一行错误信息,实际应用中大家可以埋点发到kafka之类(以便后续做实时监控预警)。
测试一下:
@Test public void testThreadPool() throws InterruptedException { final ThreadFactory DEMO_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("demo-POOL-%d").build(); final ExecutorService DEMO_POOL = new ThreadPoolExecutor(1, 2, 300L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(5), DEMO_THREAD_FACTORY, new CustomDiscardPolicy("demo-POOL")); for (int i = 0; i < 10; i++) { DEMO_POOL.submit(() -> { try { System.out.println(Thread.currentThread().getId() + " ready!"); //假设线程干活,需要一段时间 Thread.sleep(500); System.out.println("\t" + Thread.currentThread().getId() + " done!"); } catch (Exception e) { } }); } //等一会儿,让线程池都跑完,再结束main Thread.sleep(10000); }
提交了10个任务,线程池必然饱和(10>2+5),会丢弃一些早期任务,输出如下:
从输出看,丢了3个任务,符合预期。
标签:java,自定义,Thread,线程,new,factoryName,public From: https://www.cnblogs.com/yjmyzz/p/how_to_customize_thread_pool_reject_policy.html