使用场景
- 异步化
- 并发化
- 削峰填谷
Thread
Thread t = new Thread(() -> sout("hello from :") + Thread.currentThread().getName());
t.start();
- 不建议使用, 数量无法控制, 假如runnable逻辑很慢, 造成线程堆积, 内存很快填满, CPU时间片调度消耗大
- 线程数量规定:
- CPU密集型, CPU核数 + 1, 比如图片处理, 视频转码...
- IO密集型, 可以多一些, 比如操作数据库, RPC...
- 两者皆有, 二分法性能测试取最优
基于阻塞队列的Executor
关注runnable逻辑和管理线程壳的消耗
SingleThreadExecutor
- 可执行线程数为1, 多个提交排队, 执行顺序等于提交顺序
- 队列大小无上限, 可能排队过长OOM
FixedThreadPool
- 可执行线程数为设定值, 多个提交排队, 执行顺序不保证等于提交顺序
- 队列大小无上限, 可能排队过长OOM
- 由于线程壳不会销毁, 初始化可执行数过大容易leak
CachedThreadPool
- 线程壳定期清理, 不容易leak, 但也导致一段空闲后可用壳过少, 再面对突发洪峰创建壳消耗大
- 队列大小无上限, 可能排队过长OOM
- 执行顺序不保证等于提交顺序
ScheduledThreadPool
- 定时, 延时提交
ScheduledExecutorService thread = Executors.newScheduledThreadPool(1);
// 包含子线程执行时间的执行, 以下实际周期为1.5s
thread.scheduldWithFixedDelay(() -> {
sleep(500);
sout("hello" + System.currentTimeMillis());}
(起始延时)1, (定时周期)1, TimeUnit.SECONDS);
// 不包含子线程执行时间的执行, 以下实际周期为1s
thread.scheduldAtFixedRate(() -> {
sleep(500);
sout("hello" + System.currentTimeMillis());}
(起始延时)1, (定时周期)1, TimeUnit.SECONDS);
ThreadPoolExecutor
- 留存一定的预备线程壳, 定期清理也不会减少到此数目以下corePoolSize(参考值200)
- 最大可执行线程数, 防止OOM maximumPoolSize(参考值400)
- 创建线程壳工厂 threadFactory
- 可以指定线程名
- 可以设置setUncaughtExceptionHandler, 同一处理子线程的未处理异常, 比如记录到日志, 注意默认子线程异常不影响父线程, 不设置会导致捕获不到
- 排队队列满了处理策略 rejectExecutionHandler
- 默认Abort, 可以抛出InvocationException, 返回状态码429
Callable
- 在线程池里submit Thread没有返回值
- 提交Callable可以有返回值, 返回类型为
Future<T>
- Future类还用于响应式编程
基于Map-Reduce的forkjoin线程池
- 为了减少管理线程壳的overhead, 可以将大量待处理线程切分为少量块, 然后用单线程分别处理每个块, overhead也能减少
- 效果不会比基于阻塞队列的线程池好, 极端情况下切分到线程单元, 等价于一个很大的阻塞队列线程池
基于Map-Reduce的并发流
-
相比forkjoin, 无需手写切分动作
-
并且除了切分好块后创建的线程壳, 调用者(主线程壳)也会加入算力, 所以默认切分块数为CPU核数 - 1
-
改变默认块数
ForkJoinPool = pool = new ForkJoinPool(123); pool.submit(() -> list.parallelStream().forEach().get());