首页 > 其他分享 >ThreadPoolExecutor 线程池的使用

ThreadPoolExecutor 线程池的使用

时间:2022-08-31 16:12:38浏览次数:50  
标签:workQueue 队列 任务 线程 使用 new ThreadPoolExecutor

ThreadPoolExecutor

这个类是JDK中的线程池类,继承自Executor,里面有一个execute()方法,用来执行线程,线程池主要提供一个线程队列,队列中保存着所有等待状态的线程。

package com.tan.test.thread;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;

/**
 * @author tlj
 * @date 2022/08/10 11:42:27
 */
public class PoolTest {
    public static void main (String[] args) throws Exception {
        testThreadPoolExecutor();
    }
    public static void testThreadPoolExecutor() throws Exception {
        ThreadFactory factory = r -> new Thread(r, "线程名");
        threadPool = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                workQueue,
                factory,
                rejected);//默认线程池,可控制参数比较多

        //执行无返回值线程
        for(int i=0;i<10;i++) {
            threadPool.execute(new TaskRunnable());
        }

        //执行有返回值线程
        List<Future<String>> futres = new ArrayList<>();
        for(int i=0;i<10;i++) {
            Future<String> future = threadPool.submit(new TaskCallable(i));
            futres.add(future);
        }
        for(int i=0;i<futres.size();i++){
            String result = futres.get(i).get();
            System.out.println(i+" result = "+result);
        }
    }
    /**
     * 无返回值的线程,使用 threadpool.execut() 执行
     */
    public static class TaskRunnable implements Runnable{
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " runnable result!");
        }
    }
    /**
     * 有返回值的线程,使用 threadpool.submit() 执行
     */
    public static class TaskCallable implements Callable<String> {
        public TaskCallable(int index){
            this.i=index;
        }
        private int i;
        @Override
        public String call() throws Exception {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //System.out.println("callable result!");
            return Thread.currentThread().getName()+" callable index="+i +",sleep="+r;
        }
    }
}

  相关参数配置:

//基础参数
int corePoolSize=2;//最小活跃线程数
int maximumPoolSize=5;//最大活跃线程数
int keepAliveTime=5;//指定线程池中线程空闲超过 5s 后将被回收
TimeUnit unit = TimeUnit.SECONDS;//keepAliveTime 单位
//阻塞队列
BlockingQueue<Runnable> workQueue = null;
//1 有界队列
workQueue = new ArrayBlockingQueue<>(5);//基于数组的先进先出(FIFO)队列,支持公平锁和非公平锁,有界
workQueue = new LinkedBlockingQueue<>();//基于链表的先进先出(FIFO)队列,默认长度为 Integer.MaxValue 有OOM危险,有界
workQueue = new LinkedBlockingDeque(); //一个由链表结构组成的,双向阻塞队列,有界
//2 无界队列
workQueue = new PriorityBlockingQueue(); //支持优先级排序的无限队列,默认自然排序,可以实现 compareTo()方法指定排序规则,不能保证同优先级元素的顺序,无界。
workQueue = new DelayQueue(); //一个使用优先级队列(PriorityQueue)实现的无界延时队列,在创建时可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。
workQueue = new LinkedTransferQueue(); //一个由链表结构组成的,无界阻塞队列
//3 同步移交队列
workQueue = new SynchronousQueue<>();//无缓冲的等待队列,队列不存元素,每个put操作必须等待take操作,否则无法添加元素,支持公平非公平锁,无界


//拒绝策略
RejectedExecutionHandler rejected = null;
rejected = new ThreadPoolExecutor.AbortPolicy();//默认,队列满了丢任务抛出异常
rejected = new ThreadPoolExecutor.DiscardPolicy();//队列满了丢任务不异常
rejected = new ThreadPoolExecutor.DiscardOldestPolicy();//将最早进入队列的任务删,之后再尝试加入队列
rejected = new ThreadPoolExecutor.CallerRunsPolicy();//如果添加到线程池失败,那么主线程会自己去执行该任务


//使用的线程池
ExecutorService threadPool = null;
threadPool = Executors.newCachedThreadPool();//有缓冲的线程池,线程数 JVM 控制
threadPool = Executors.newFixedThreadPool(3);//固定大小的线程池
threadPool = Executors.newScheduledThreadPool(2);
threadPool = Executors.newSingleThreadExecutor();//单线程的线程池,只有一个线程在工作

合理配置线程池
要想合理地配置线程池,首先要分析任务特性

1 任务的性质:CPU密集型任务、IO密集型任务和混合型任务。
2 任务的优先级:高、中和低。
3 任务的执行时间:长、中和短。
4 任务的依赖性:是否依赖其他系统资源,如数据库连接。


性质不同的任务可以用不同规模的线程池分开处理。

CPU密集型任务应该配置尽可能少的线程,如配置N+1个线程,N位CPU的个数。

IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*N。

混合型任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量

优先级不同的任务可以交给优先级队列PriorityBlcokingQueue来处理。

执行时间不同的任务可以交给不同规模的线程池来处理。

依赖数据库的任务,因此线程提交SQL后需要等待数据库返回结果,等待的时间越长,则CPU空闲时间越长,那么线程数应该设置的越大,这样能更好滴利用CPU。





标签:workQueue,队列,任务,线程,使用,new,ThreadPoolExecutor
From: https://www.cnblogs.com/dragon-lan/p/16643435.html

相关文章