首页 > 编程语言 >Java多线程 CompletionService和ExecutorCompletionService

Java多线程 CompletionService和ExecutorCompletionService

时间:2022-11-18 18:09:07浏览次数:40  
标签:Java Thread CompletionService Callable Future 线程 new 多线程 public

(目录)

一、说明

Future的不足

  • 当通过 .get() 方法获取线程的返回值时,会导致阻塞
  • 也就是和当前这个Future关联的计算任务真正执行完成的时候才返回结果
  • 新任务必须等待已完成任务的结果才能继续进行处理,会浪费很多时间,最好是谁最先执行完成谁最先返回

CompletionService的引入

  • 解决阻塞的问题
  • 以异步的方式一边处理新的线程任务,一边处理已完成任务的结果,将执行任务与处理任务分开进行处理

二、理解

CompletionService

  • java.util.concurrent包下CompletionService<V>接口,但并不继承Executor接口,仅有一个实现类ExecutorCompletionService用于管理线程对象
  • 更加有效地处理Future的返回值,避免阻塞,使用.submit()方法执行任务,使用.take()取得已完成的任务,并按照完成这些任务的时间顺序处理它们的结果
public interface CompletionService<V> {
    Future<V> submit(Callable<V> task);
    Future<V> submit(Runnable task, V result);
    Future<V> take() throws InterruptedException;
    Future<V> poll();
    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}
  • submit()方法用来执行线程任务
  • take()方法从队列中获取完成任务的Future对象,谁最先执行完成谁最先返回,获取到的对象再调用.get()方法获取结果
  • poll()方法获取并删除代表下一个已完成任务的 Future,如果不存在,则返回null,此无阻塞的效果
  • poll(long timeout, TimeUnit unti) timeout表示等待的最长时间,unit表示时间单位,在指定时间内还没获取到结果,则返回null

ExecutorCompletionService

  • java.util.concurrent包下ExecutorCompletionService<V>类实现CompletionService<V>接口,方法与接口相同
  • ExecutorService可以更精确和简便地完成异步任务的执行
  • executor执行任务,completionQueue保存异步任务执行的结果
public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;
    ……
    Future<V> submit(Callable<V> task) 
    Future<V> submit(Runnable task, V result) 
    Future<V> take() throws InterruptedException
    Future<V> poll() 
    Future<V> poll(long timeout, TimeUnit unit)
    ……
}
  • completionQueue初始化了一个LinkedBlockingQueue类型的先进先出阻塞队列
    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }
  • submit()方法中QueueingFutureExecutorCompletionService中的内部类
    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture<V>(f, completionQueue));
        return f;
    }
  • QueueingFutureRunnableFuture实例对象赋值给了task,内部的done()方法将task添加到已完成阻塞队列中,调用take()poll()方法获取已完成的Future
    private static class QueueingFuture<V> extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task,
                       BlockingQueue<Future<V>> completionQueue) {
            super(task, null);
            this.task = task;
            this.completionQueue = completionQueue;
        }
        private final Future<V> task;
        private final BlockingQueue<Future<V>> completionQueue;
        protected void done() { completionQueue.add(task); }
    }

三、实现

1.使用Future

创建CompletionServiceDemo类,创建好的线程对象,使用Executors工厂类来创建ExecutorService的实例(即线程池),通过ThreadPoolExecutor.submit()方法提交到线程池去执行,线程执行后,返回值Future可被拿到

public class CompletionServiceDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1.创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        // 2.创建Callable子线程对象任务
        Callable callable_1 = new Callable() {
            @Override
            public String call() throws Exception {
                Thread.sleep(5000);
                return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
            }
        };

        Callable callable_2 = new Callable() {
            @Override
            public String call() throws Exception {
                Thread.sleep(3000);
                return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
            }
        };

        Callable callable_3 = new Callable() {
            @Override
            public String call() throws Exception {
                Thread.sleep(1000);
                return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
            }
        };

        // 3.使用Future提交三个任务到线程池
        Future future_1 = executorService.submit(callable_1);
        Future future_2 = executorService.submit(callable_2);
        Future future_3 = executorService.submit(callable_3);

        // 4.获取返回值
        System.out.println("开始获取结果 " + getStringDate());
        System.out.println(future_1.get() + "" + getStringDate());
        System.out.println(future_2.get() + "" + getStringDate());
        System.out.println(future_3.get() + "" + getStringDate());
        System.out.println("结束 " + getStringDate());
        
        // 5.关闭线程池
        executorService.shutdown();
    }

    // 获取时间函数
    public static String getStringDate() {
        Date currentTime = new Date();
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
        String date = simpleDateFormat.format(currentTime);
        return date;
    }
}

future_1.get()会等待执行时间阻塞5秒再获取到结果,而在这5秒内future_2future_3的任务已完成,所以会立马得到结果 在这里插入图片描述

2.使用ExecutorCompletionService

创建一个ExecutorCompletionService放入线程池实现CompletionService接口,将创建好的线程对象通过CompletionService提交任务和获取结果

public class CompletionServiceDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1.创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        
        // 2.创建一个ExecutorCompletionService放入线程池实现CompletionService接口
        CompletionService completionService = new ExecutorCompletionService(executorService);
        
        // 3.创建Callable子线程对象任务
        Callable callable_1 = new Callable() {
            @Override
            public String call() throws Exception {
                Thread.sleep(5000);
                return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
            }
        };

        Callable callable_2 = new Callable() {
            @Override
            public String call() throws Exception {
                Thread.sleep(3000);
                return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
            }
        };

        Callable callable_3 = new Callable() {
            @Override
            public String call() throws Exception {
                Thread.sleep(1000);
                return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
            }
        };

        // 3.使用CompletionService提交三个任务到线程池
        completionService.submit(callable_1);
        completionService.submit(callable_2);
        completionService.submit(callable_3);

        // 4.获取返回值
        System.out.println("开始获取结果 " + getStringDate());
        System.out.println(completionService.take().get() + "" + getStringDate());
        System.out.println(completionService.take().get() + "" + getStringDate());
        System.out.println(completionService.take().get() + "" + getStringDate());
        System.out.println("结束 " + getStringDate());

        // 5.关闭线程池
        executorService.shutdown();
    }

    // 获取时间函数
    public static String getStringDate() {
        Date currentTime = new Date();
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
        String date = simpleDateFormat.format(currentTime);
        return date;
    }
}

提交顺序是1-2-3,按照完成这些任务的时间顺序处理它们的结果,返回顺序是3-2-1 在这里插入图片描述

3.take()方法

take()方法从队列中获取完成任务的Future对象,会阻塞,一直等待线程池中返回一个结果,谁最先执行完成谁最先返回,获取到的对象再调用.get()方法获取结果

如果调用take()方法的次数大于任务数,会因为等不到有任务返回结果而阻塞,只有三个任务,第四次take等不到结果而阻塞 在这里插入图片描述

4.poll()方法

poll()方法不会去等结果造成阻塞,没有结果则返回null,接着程序继续往下运行

直接用completionService.poll().get()会引发 NullPointerException 在这里插入图片描述 创建一个循环,连续调用poll()方法,每次隔1秒调用,没有结果则返回null 在这里插入图片描述

public class CompletionServiceDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1.创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        // 2.创建一个ExecutorCompletionService放入线程池实现CompletionService接口
        CompletionService completionService = new ExecutorCompletionService(executorService);
        // 3.创建Callable子线程对象任务
        Callable callable_1 = new Callable() {
            @Override
            public String call() throws Exception {
                Thread.sleep(5000);
                return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
            }
        };

        Callable callable_2 = new Callable() {
            @Override
            public String call() throws Exception {
                Thread.sleep(3000);
                return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
            }
        };

        Callable callable_3 = new Callable() {
            @Override
            public String call() throws Exception {
                Thread.sleep(1000);
                return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
            }
        };

        // 3.使用CompletionService提交三个任务到线程池
        completionService.submit(callable_1);
        completionService.submit(callable_2);
        completionService.submit(callable_3);

        // 4.获取返回值
        System.out.println("开始获取结果 " + getStringDate());

        // 5.创建一个循环,连续调用poll()方法,间隔1秒
        for (int i = 0; i < 8; i++) {
            Future future = completionService.poll();
            if (future!=null){
                System.out.println(future.get() + getStringDate());
            }else {
                System.out.println(future+" "+getStringDate());
            }
            Thread.sleep(1000);
        }  
        System.out.println("结束 " + getStringDate());

        // 6.关闭线程池
        executorService.shutdown();
    }

    // 获取时间函数
    public static String getStringDate() {
        Date currentTime = new Date();
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
        String date = simpleDateFormat.format(currentTime);
        return date;
    }
}

5.poll(long timeout, TimeUnit unit)方法

poll(long timeout, TimeUnit unit)方法设置了等待时间,等待超时还没有结果就返回null

不使用 Thread.sleep(1000),将等待时间设置成0.5秒,由于只有8次循环,也就是4秒执行时间,而callable_1需要执行5秒,获取不到结果则返回null 在这里插入图片描述

public class CompletionServiceDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1.创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        // 2.创建一个ExecutorCompletionService放入线程池实现CompletionService接口
        CompletionService completionService = new ExecutorCompletionService(executorService);
        // 3.创建Callable子线程对象任务
        Callable callable_1 = new Callable() {
            @Override
            public String call() throws Exception {
                Thread.sleep(5000);
                return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
            }
        };

        Callable callable_2 = new Callable() {
            @Override
            public String call() throws Exception {
                Thread.sleep(3000);
                return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
            }
        };

        Callable callable_3 = new Callable() {
            @Override
            public String call() throws Exception {
                Thread.sleep(1000);
                return ("我是Callable子线程 " +Thread.currentThread().getName()+ " 产生的结果 " );
            }
        };

        // 3.使用CompletionService提交三个任务到线程池
        completionService.submit(callable_1);
        completionService.submit(callable_2);
        completionService.submit(callable_3);

        // 4.获取返回值
        System.out.println("开始获取结果 " + getStringDate());

        // 5.创建一个循环,连续调用poll()方法,间隔1秒
        for (int i = 0; i < 8; i++) {
            Future future = completionService.poll(500, TimeUnit.MILLISECONDS);
            if (future!=null){
                System.out.println(future.get() + getStringDate());
            }else {
                System.out.println(future+" "+getStringDate());
            }
        }
        System.out.println("结束 " + getStringDate());

        // 6.关闭线程池
        executorService.shutdown();
    }

    // 获取时间函数
    public static String getStringDate() {
        Date currentTime = new Date();
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
        String date = simpleDateFormat.format(currentTime);
        return date;
    }
}

标签:Java,Thread,CompletionService,Callable,Future,线程,new,多线程,public
From: https://blog.51cto.com/yeatsliao/5868868

相关文章

  • java_slenium_tess4j 简单图片识别
    1.导包2.下载语言库3.代码1》直接识别图片方法一://@parampath:"C:\\Users\\Administrator\\Desktop\\下载\\03.png"publicstaticStringIdentifyCode01(Str......
  • 8. 使Web工程依赖于Java工程
    #在pro02-maven-web工程的pom.xml文件下,添加对pro01-maven-java工程的依赖:(通过坐标) #添加测试代码:##在pro02-maven-web工程的src目录下,添加文件夹:test/java/com/at......
  • kmp算法(Java)
    详解参考:KMP算法讲解next数组求法方式1移动位数=已匹配的字符数-对应的部分匹配值已知空格与D不匹配时,前面六个字符"ABCDAB"是匹配的。查表可知,最后一个匹......
  • java构造方法的作用
    构造方法作用就是对类进行初始化。如果你没有定议任何构造方法的形式,程式会为你取一个不带任何参数的构造函数,那么你产生类的对像时只能用不带参数的方法,如:classa{}//没......
  • JavaScript代码是怎么在浏览器里面运行起来的?
    JavaScript代码是怎么在浏览器里面运行的?下面简单探索一下浏览器内核浏览器内核(RenderingEngine),常见的叫法如:排版引擎、解释引擎、渲染引擎,现在流行称为浏览器内核。......
  • JavaScript_对象_RegExp2与JavaScript_对象_RegExp3
    JavaScript_对象_RegExp2正则对象:1.创建 1.varreg  =new RegExp(“正则表达式”);2.var......
  • java 文件读写操作
    一、BufferedWriter写入文件+BufferedReader读取文件缓冲字符(BufferedWriter)是一个字符流类来处理字符数据。不同于字节流(数据转换成字节),你可以直接写字符串,数组或字符......
  • JavaScript_对象_Math与JavaScript_对象_RegExp1
    JavaScript_对象_MathMath:数学1.创建特点:Math对象不用创建,直接使用。Math.方法名();2.方法random()返回0~1之间的随机数含0不含......
  • Java 8 Stream基础操作汇总
    Java8Stream操作汇总目录Java8Stream操作汇总1.分组2.分组统计3.分组求和4.最大最小值5.排序前提条件://User实体类@DatapublicclassUser{/**......
  • 自定义IE表达式使用.tld文件减少jsp文件中的java代码时出现的错误(可运行)cvc-id.3
    tld类型的文件产生错误如下:但是不影响运行。只需要如下:将j--->J就行。原因知晓,若有大佬知晓,欢迎留言。本人看到后,必将改正 ......