自定义线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5));
第一个参数:核心线程池大小,默认创建后就不会销毁,需要设置allowCoreThreadTimeOut为true时会销毁
第二个参数:线程池最大大小
第三个参数:线程池最大空闲时间(线程空闲下来多久自动释放)
第四个参数:时间单位
第五个参数:线程阻塞队列
- 对上图的详解
1 提交任务到线程池:executor.execute(new Runable());
2 判断核心线程池大小是否已满,否,执行任务
3 是,判断队列是否已满,否,进行等待队列
4 是,判断最大线程池大小是否已满,否,执行任务
5 是,执行拒绝策略,对外就是抛出异常
提供的现成可使用的线程池
Executors.newCachedThreadPool(无界线程池,自动线程回收)
Executors.newFixedThreadPool(固定大小的线程池)
Executors.newSingleThreadExecutor(单一后台线程)
我做过的真实案例-> 线程池应用
-
场景:数据库大量数据(百万级)需要同步至Elasticsearch库,目前采用的单线程的方式,每次从数据库中读取5000条数据同步至Elasticsearch库,循环n次,直到数据库读取不到数据则同步完成。 缺点:此过程需要2小时时间。
-
改造:需要将同步时间缩短,同步不能使ES崩掉
-
方案:采用线程池的方式,配置如下:
核心线程池大小为2 最大线程池大小为2 线程池最大空闲时间 0秒 队列大小 1000, 队列大小的选择:1000 * 5000 = 500万 数据不超过5百万
-
实现代码:每同步完成一次需要返回具体插入了ES库的条数,然后实现累加可算出本次同步一共同步了多少条,JDK提供了Future对象
do { List<JSONObject> list = cilSettOracleMapper.findCilSettByDate(sysDate, page * SIZE, (page + 1) * SIZE); if(list != null && list.size() > 0){ Future<Long> future = ThreadManager.executorService.submit(new Callable<Long>() { @Override public Long call(){ log.debug("线程名:" + Thread.currentThread().getName() + "-->" + bulkRequestSize); return ESClient.getESClient().indexCilSettOracle(request, list, scanTransService, merInfoMapper, bulkRequestSize); } }); futrueList.add(future); }else{ break; } log.info("indexCilSett page:" + page + ",bulkRequestSize: " + bulkRequestSize); page++; } while (true); Long futureRequestSize = 0L; for(Future<Long> f : futrueList) { try { futureRequestSize += f.get(); } catch (InterruptedException | ExecutionException e) { log.error("执行Future的get方法发生异常", e); } }
-
上述注意部分:第一次接触Future类,当时直接在循环里调用future.get()方法获取数据,发现线程是一个一个按照顺序去执行,后来了解到get()方法会阻塞线程,于是有了上述的先将Future对象有list存起来,然后循环拿结果。
-
Future类讲解的比较好的:https://www.cnblogs.com/dolphin0520/p/3949310.html