首页 > 编程语言 >Java并发编程——ExecutorCompletionService原理详解

Java并发编程——ExecutorCompletionService原理详解

时间:2023-02-01 16:33:42浏览次数:62  
标签:task Java 任务 详解 线程 executor completionQueue ExecutorCompletionService

一、简介

在JDK并发包中有这么一个类ExecutorCompletionService,提交任务后,可以按任务返回结果的先后顺序来获取各任务执行后的结果。

 

该类实现了接口CompletionService:

public interface CompletionService<V> {
    
    Future<V> submit(Callable<V> task);
 
    Future<V> submit(Runnable task, V result);
 
    Future<V> take() throws InterruptedException;
 
    Future<V> poll();
 
    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
 
}

该接口定义了一系列方法:提交实现了Callable或Runnable接口的任务,并获取这些任务的结果。

 

CompletionService接口定义了一组任务管理接口:

  • submit() - 提交任务

  • take() - 获取任务结果

  • poll() - 获取任务结果

  • ExecutorCompletionService类是CompletionService接口的实现

  • ExecutorCompletionService内部管理者一个已完成任务的阻塞队列

  • ExecutorCompletionService引用了一个Executor,用来执行任务

  • submit()方法最终会委托给内部的executor去执行任务

  • take/poll方法的工作都委托给内部的已完成任务阻塞队列

  • 如果阻塞队列中有已完成的任务,take方法就返回任务的结果,否则阻塞等待任务完成

  • poll与take方法不同,poll有两个版本:

    • 无参的poll方法 --- 如果完成队列中有数据就返回, 否则返回null
    • 有参数的poll方法 --- 如果完成队列中有数据就直接返回,否则等待指定的时间,到时间后如果还是没有数据就返回null
    • ExecutorCompletionService主要用与管理异步任务 (有结果的任务,任务完成后要处理结果)

关于CompletionService和ExecutorCompletionService的类图如下:

ExecutorCompletionService实现了CompletionService,内部通过Executor以及BlockingQueue来实现接口提出的规范。其中,Executor由调用者传递进来,而Blocking可以使用默认的LinkedBlockingQueue,也可以由调用者传递。另外,该类还会将提交的任务封装成QueueingFuture,这样就可以实现FutureTask.done()方法,以便于在任务执行完毕后,将结果放入阻塞队列中。

 

QueueingFuture为内部类:

private static class QueueingFuture<V> extends FutureTask<Void> {
	QueueingFuture(RunnableFuture<V> task,
				   BlockingQueue<Future<V>> completionQueue) {
		super(task, null);
		this.task = task;
		this.completionQueue = completionQueue;
	}
	private final Future<V> task;
	private final BlockingQueue<Future<V>> completionQueue;
	protected void done() { completionQueue.add(task); }
}

其中,done()方法就是在任务执行完毕后,将任务放入队列中。

 

在提交任务时,将任务封装成QueueingFuture:

public Future<V> submit(Callable<V> task) {
	if (task == null) throw new NullPointerException();
	RunnableFuture<V> f = newTaskFor(task);
	executor.execute(new QueueingFuture<V>(f, completionQueue));
	return f;
}

public Future<V> submit(Runnable task, V result) {
	if (task == null) throw new NullPointerException();
	RunnableFuture<V> f = newTaskFor(task, result);
	executor.execute(new QueueingFuture<V>(f, completionQueue));
	return f;
}

在调用take()、poll()方法时,会从阻塞队列中获取Future对象,以取得任务执行的结果。

二、原理

当我们向Executor提交一组任务,并且希望任务在完成后获得结果,此时可以考虑使用ExecutorCompletionService。

 

ExecutorCompletionService实现了CompletionService接口。ExecutorCompletionService将Executor和BlockingQueue功能融合在一起,使用它可以提交我们的Callable任务。这个任务委托给Executor执行,可以使用ExecutorCompletionService对象的take和poll方法获取结果。

 

ExecutorCompletionService的设计目的在于提供一个可获取线程池执行结果的功能,这个类采用了装饰器模式,需要用户提供一个自定义的线程池,在ExecutorCompletionService内部持有该线程池进行线程执行,在原有的线程池功能基础上装饰额外的功能。

 

下面是ExecutorCompletionService的原理图:

  • 1、在使用ExecutorCompletionService时需要提供一个自定义的线程池Executor,构造ExecutorCompletionService。同时,也可以指定一个自定义的队列作为线程执行结果的容器,当线程执行完成时,通过重写FutureTask#done()将结果压入队列中。

  • 2、当用户把所有的任务都提交了以后,可通过ExecutorCompletionService#poll方法来弹出已完成的结果,这样做的好处是可以节省获取完成结果的时间。

下面是使用队列和不使用队列的流程对比,从图中我们可以看出,在使用队列的场景下,我们可以优先获取到完成的线程,当我们要汇总所有的执行结果时,这无疑会缩减我们的汇总时间。

 

而不使用队列时,我们需要对FutureTask进行遍历,因为我们不知道哪个线程先执行完了,只能挨个去获取结果,这样已经完成的线程会因为前面未完成的线程的耗时而无法提前进行汇总。

如果算上汇总结果的耗时时间:

在使用队列的场景下,我们可以在其他任务线程执行的过程中汇总已完成的结果,节省汇总时间。不使用队列的场景下,只用等到当前的线程执行完成才能汇总。

代码演示

public class ExecutorCompletionServiceTest {
    public static void main(String[] args) throws Exception {
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
        ExecutorCompletionService ecs = new ExecutorCompletionService(fixedThreadPool);
        Future future = ecs.submit(new Callable() {
            @Override
            public Integer call() throws Exception {
                return ThreadLocalRandom.current().nextInt(100);
            }
        });
        System.out.println("future:" + future.get());

        //用于取出最新的线程执行结果,注意这里是阻塞的
        //Future future1 = ecs.take();
        //System.out.println("future1:" + future1.get());

        //用于取出最新的线程执行结果,是非阻塞的,如果没有结果就返回null
        Future future2 = ecs.poll();
        if (future2.isDone()) {
            System.out.println(future2.get());
        }
    }
}


//多个线程,先执行完的进阻塞队列,然后可以按执行顺序获取结果
public class ExecutorCompletionServiceDemo {

    public static void main(String[] args) {

        //这里只是为了方便,真正项目中不要这样创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executorService);

        completionService.submit(() -> {
            System.out.println("执行任务1开始");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("执行任务1结束");
            return "任务1执行成功";
        });

        completionService.submit(() -> {
            System.out.println("执行任务2开始");
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("执行任务2结束");
            return "任务2执行成功";
        });

        completionService.submit(() -> {
            System.out.println("执行任务3开始");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("执行任务3结束");
            return "任务3执行成功";
        });

        for (int i = 0; i < 3; i++) {
            try {
                String result = completionService.take().get();
                System.out.println(result);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }

        }
        executorService.shutdown();
    }
}

源码

public class ExecutorCompletionService<V> implements CompletionService<V> {

	//执行任务的线程池
    private final Executor executor;
	
    //用于调用AbstractExecutorService的newTaskFor方法,来实例化一个实现了RunnableFuture接口的对象
    //如果executor继承了AbstractExecutorService ,则直接调用executor的newTaskFor方法
    //否则直接创建一个FutureTask对象
    private final AbstractExecutorService aes;
	
	//任务完成后放入该阻塞队列中
    private final BlockingQueue<Future<V>> completionQueue;

    /**
     * 用于放入执行完成的任务
     */
    private static class QueueingFuture<V> extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task,
                       BlockingQueue<Future<V>> completionQueue) {
            super(task, null);
            this.task = task;
            this.completionQueue = completionQueue;
        }
        private final Future<V> task;
        private final BlockingQueue<Future<V>> completionQueue;
		//重写了FutureTask的done方法,任务完成后,将任务放入阻塞队列中
        protected void done() { completionQueue.add(task); }
    }

	//将传入的Callable包装为RunnableFuture
    private RunnableFuture<V> newTaskFor(Callable<V> task) {
        if (aes == null)
            return new FutureTask<V>(task);
        else
            return aes.newTaskFor(task);
    }
	
	//将传入的Callable包装为RunnableFuture
    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
        if (aes == null)
            return new FutureTask<V>(task, result);
        else
            return aes.newTaskFor(task, result);
    }

    /**
     * Creates an ExecutorCompletionService using the supplied
     * executor for base task execution and a
     * {@link LinkedBlockingQueue} as a completion queue.
     *
     * @param executor the executor to use
     * @throws NullPointerException if executor is {@code null}
     */
    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
		//completionQueue默认为LinkedBlockingQueue
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

    /**
     * Creates an ExecutorCompletionService using the supplied
     * executor for base task execution and the supplied queue as its
     * completion queue.
     *
     * @param executor the executor to use
     * @param completionQueue the queue to use as the completion queue
     *        normally one dedicated for use by this service. This
     *        queue is treated as unbounded -- failed attempted
     *        {@code Queue.add} operations for completed tasks cause
     *        them not to be retrievable.
     * @throws NullPointerException if executor or completionQueue are {@code null}
     */
    public ExecutorCompletionService(Executor executor,
                                     BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }

    /**
	 * 提交任务,任务被包装为QueueingFuture对象,主要重写FutureTask的done方法,
	 * 使得任务执行完毕后被执行任务的线程放入到阻塞队列中
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture<V>(f, completionQueue));
        return f;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture<V>(f, completionQueue));
        return f;
    }
	
	//从阻塞队列中获取任务
    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }
	
	//如果完成队列中有数据就返回, 否则返回null
    public Future<V> poll() {
        return completionQueue.poll();
    }

	//如果完成队列中有数据就直接返回, 否则等待指定的时间, 到时间后如果还是没有数据就返回null
    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

}

参考: https://blog.csdn.net/ado1986/article/details/37758105

https://www.cnblogs.com/xfeiyun/p/16373387.html

https://blog.csdn.net/weixin_42103620/article/details/119080939

http://t.zoukankan.com/zouhong-p-14089982.html

标签:task,Java,任务,详解,线程,executor,completionQueue,ExecutorCompletionService
From: https://blog.51cto.com/u_14014612/6031668

相关文章

  • SMSUtils阿里云短信验证码java-api
    <!--阿里云短信服务--><dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-core</artifactId> <version>4.5.16</version></dependency><depe......
  • Java基础学习09
    今天简单做小系统,之前也做过的类似的系统,想重新复习一次逻辑业务(2023-02-01-16:10:49)这次学到有了一个小的函数//获取本地时间并将时间格式化,调用sdf.format(date)输出......
  • JavaScript之异步编程
    什么是异步异步:Asynchronous,async是与同步synchronous,sync相对的概念。传统单线程编程中,程序的运行是同步的,指程序运行在一个控制流之中运行。而异步的概念就是不保证同......
  • Java并发编程——ReentrantReadWriteLock原理
    一、读写锁有这样一种场景:1、如果对一个共享资源的写操作没有读操作那么频繁,这个时候可以允许多个线程同时读取共享资源;2、但是如果有一个线程想去写这些共享资源,那么其......
  • Java并发编程——StampedLock
    一、StampedLock类简介StampedLock类,在JDK1.8时引入,是对读写锁ReentrantReadWriteLock的增强,该类提供了一些功能,优化了读锁、写锁的访问,同时使读写锁之间可以互相转换,更细......
  • Java并发编程——CountDownLatch
    一、闭锁CountDownLatch一个同步工具类,允许一个或者多个线程一直等待,直到其他线程的操作都执行完成之后再继续往下执行。 使用场景:在一些应用场合中,需要等待某个条件达......
  • Java并发编程——CyclicBarrier
    一、CyclicBarrier循环栅栏CyclicBarrier是java.util.concurrent包下面的一个工具类,字面意思是可循环使用(Cyclic)的屏障(Barrier),通过它可以实现让一组线程到达一个屏障(也可......
  • Java并发编程——Semaphore
    一、SemaphoreSemaphore是一种在多线程环境下使用的设施,该设施负责协调各个线程,以保证它们能够正确、合理的使用公共资源的设施,也是操作系统中用于控制进程同步互斥的量。S......
  • Java并发编程——并发包中锁的AQS通用实现
    一、包结构介绍我们查看下java.util.concurrent.locks包下面,发现主要包含如下类:可以发现ReentrantLock和ReentrantReadWriteLock都是AbstractQueueSynchronizer类。我们......
  • Java并发编程——ArrayBlockingQueue
    一、阻塞队列BlockingQueue在java.util.concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速......