首页 > 其他分享 >Submit Tasks in Batch using ExecutorService

Submit Tasks in Batch using ExecutorService

时间:2023-09-01 11:36:30浏览次数:39  
标签:tasks Batch task Callable Tasks result new using final

Submit Tasks in Batch using ExecutorService

http://www.javabyexamples.com/submit-tasks-in-batch-using-executorservice

1. Overview

In this tutorial, we're going to look at how we can submit tasks in batch using the ExecutorService implementations.

2. Sample Application

Let's start with the sample application.

We'll work with the SleepAndReturn task:

public class SleepAndReturn implements Callable<String> {

    private final int millis;

    public SleepAndReturn(int millis) {
        this.millis = millis;
    }

    @Override
    public String call() throws Exception {
        TimeUnit.MILLISECONDS.sleep(millis);
        return "Done at " + millis;
    }
}

SleepAndReturn sleeps for the given amount of time and then returns a String.

3. Submit Tasks with ExecutorService.invokeAll()

Firstly, we'll use the invokeAll method of ExecutorService.

When we provide a list of Callable tasks, invokeAll runs them all and returns a list of Futures when all complete:

public class SubmitTasksInBatch {

    private final int threadCount = 5;

    public void batchWithInvokeAll(List<Callable<String>> tasks) throws InterruptedException {
        final ExecutorService threadPool = Executors.newFixedThreadPool(threadCount);
        final List<Future<String>> futures = threadPool.invokeAll(tasks);

        for (Future<String> future : futures) {
            try {
                final String result = future.get();
                System.out.println(result);
            } catch (ExecutionException e) {
                System.out.println("Error occurred.");
            }
        }

        threadPool.shutdown();
    }
}

Here, we have the batchWithInvokeAll method that accepts a list of Callable tasks. Firstly, we're creating a thread pool to run the tasks. Then we're invoking the invokeAll method passing the given tasks.
Note that the return value is a list of Futures whose order is the same as the submission order. We're then accessing the actual String result by invoking Future.get.

Next, let's look at the caller:

public static void main(String[] args) throws InterruptedException {
    final Callable<String> task1 = new SleepAndReturn(900);
    final Callable<String> task2 = new SleepAndReturn(700);
    final Callable<String> task3 = new SleepAndReturn(300);
    final List<Callable<String>> tasks = Arrays.asList(task1, task2, task3);

    final SubmitTasksInBatch submitTasks = new SubmitTasksInBatch();
    submitTasks.batchWithInvokeAll(tasks);
}

Here, we're creating three SleepAndReturn tasks.

A sample run outputs:

Done at 900
Done at 700
Done at 300

Note that the output has the same order as the submission although the first submitted task - 900 - completes last.

4. Submit Tasks with Timed ExecutorService.invokeAll()

Now we'll use invokeAll with a timeout value. When the given time passes, invokeAll cancels the remaining tasks and returns the results as a list of Futures:

public void batchWithInvokeAllWithTimeout(List<Callable<String>> tasks) throws InterruptedException {
    final ExecutorService threadPool = Executors.newFixedThreadPool(threadCount);
    final List<Future<String>> futures = threadPool.invokeAll(tasks, 500, TimeUnit.MILLISECONDS);

    for (Future<String> future : futures) {
        try {
            final String result = future.get();
            System.out.println(result);
        } catch (CancellationException e) {
            System.out.println("Cancelled.");
        } catch (ExecutionException e) {
            System.out.println("Error occurred.");
        }
    }

    threadPool.shutdown();
}

In this method, we're calling invokeAll with the timeout value of 500 milliseconds. If any tasks don't complete after the given time, the ExecutorService instance cancels them. Be aware that when a task is canceled, Future.get throws a CancellationException. Since it is a runtime exception, the compiler won't require you to handle it. But since it is a timed invocation, we're catching the exception.

A sample run prints:

Cancelled.
Cancelled.
Done at 300

As we can see, the thread pool cancels the first two tasks.

5. Submit Tasks Manually

Next, we'll implement a basic task submission method similar to invokeAll. Given a list of Callable tasks, we'll submit these tasks one by one to a thread pool.When all tasks complete, we'll return a list of Futures:

public void submitInBatchManually(List<Callable<String>> tasks) throws InterruptedException {
    final ExecutorService threadPool = Executors.newFixedThreadPool(threadCount);
    final List<Future<String>> futures = new ArrayList<>();
    for (Callable<String> task : tasks) {
        futures.add(threadPool.submit(task));
    }

    for (Future<String> future : futures) {
        try {
            final String result = future.get();
            System.out.println(result);
        } catch (ExecutionException e) {
            System.out.println("Error occurred.");
        }
    }
    threadPool.shutdown();
}

This example is very similar to the previous one. Instead of invokeAll, we're calling the submit method to run our tasks. After getting the result by invoking Future.get, we're printing the result. Alternatively(或者), we can collect the Future values in a list similar to invokeAll. The whole operation has the effect of waiting for all tasks to complete.

When we call this method:

public static void main(String[] args) throws InterruptedException {
    final Callable<String> task1 = new SleepAndReturn(900);
    final Callable<String> task2 = new SleepAndReturn(700);
    final Callable<String> task3 = new SleepAndReturn(300);
    final List<Callable<String>> tasks = Arrays.asList(task1, task2, task3);

    final SubmitTasksInBatch submitTasks = new SubmitTasksInBatch();
    submitTasks.submitInBatchManually(tasks);
}

It prints:

Done at 900
Done at 700
Done at 300

The results are again printed in the submission order, not in the completion order.

6. Submit Tasks with CompletionService

Until now, when we submitted tasks, we waited for the completion of all tasks, in other words, waiting for the completion of the longest-running task. Assume that, the first submitted task completes in ten seconds, and the second one completes in three seconds. Although the result of the second task is ready, we can't access it until the first task finishes. To remedy(纠正) this problem, we'll use the CompletionService class.

CompletionService allows us to submit tasks similar to a thread pool, but in addition to that we can get the task results as soon as they're ready:

public void batchWithCompletionService(List<Callable<String>> tasks) throws InterruptedException {
    final ExecutorService threadPool = Executors.newFixedThreadPool(threadCount);
    final ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(threadPool);

    for (Callable<String> task : tasks) {
        completionService.submit(task);
    }

    for (int i = 0; i < tasks.size(); i++) {
      	//if none task completed, the take() method well be blocked.
      	//if anly task completed, the take() method can get the result of that task, and the loop continues.
        final Future<String> future = completionService.take();
        try {
            final String result = future.get();
            System.out.println(result);
        } catch (ExecutionException e) {
            System.out.println("Error occurred.");
        }
    }
    threadPool.shutdown();
}

In this example, after creating a thread pool we're initializing an instance of ExecutorCompletionService. When we submit tasks to the CompletionService instance, it delegates the execution to the wrapped thread pool. To acquire the results, we're calling the CompletionService's take method. This method blocks until a task completes. As long as we know the number of submitted and completed tasks, it is easy to work with a CompletionService.

Although we've created a thread pool exclusively for the CompletionService, we can also use an existing thread pool. This way, for some set of tasks, we can get the results in the order they complete. And for the others, we can have the default behavior.(同一个线程池实例,可以同时采用传统和CompletionService两种方式来提交任务,以满足不同场景的需求)

Next, we'll call our method:

public static void main(String[] args) throws InterruptedException {
    final Callable<String> task1 = new SleepAndReturn(900);
    final Callable<String> task2 = new SleepAndReturn(700);
    final Callable<String> task3 = new SleepAndReturn(300);
    final List<Callable<String>> tasks = Arrays.asList(task1, task2, task3);

    final SubmitTasksInBatch submitTasks = new SubmitTasksInBatch();
    submitTasks.batchWithCompletionService(tasks);
}

It prints:

Done at 300
Done at 700
Done at 900

Unlike the previous examples, the output shows the completion order.

7. Submit Tasks with ExecutorService.invokeAny()

Lastly, we'll submit multiple tasks and get the result of the first one that completes successfully. For this purpose, we'll use the invokeAny method:

public void batchwithInvokeAny(List<Callable<String>> tasks) throws InterruptedException {
    final ExecutorService threadPool = Executors.newFixedThreadPool(threadCount);

    try {
        final String result = threadPool.invokeAny(tasks);
        System.out.println(result);
    } catch (ExecutionException e) {
        System.out.println("No tasks successfully completed.");
    }

    threadPool.shutdown();
}

In this example, we're submitting the given tasks to the thread pool calling invokeAny. And it returns the result, not a Future - if there is any successfully completed task.

When we provide some tasks:

public static void main(String[] args) throws InterruptedException {
    final Callable<String> task1 = new SleepAndReturn(900);
    final Callable<String> task2 = new SleepAndReturn(700);
    final Callable<String> task3 = new SleepAndReturn(300);
    final List<Callable<String>> tasks = Arrays.asList(task1, task2, task3);

    final SubmitTasksInBatch submitTasks = new SubmitTasksInBatch();
    submitTasks.batchwithInvokeAny(tasks);
}

The output shows:

Done at 300

8. Submit Tasks with Timed ExecutorService.invokeAny()

Similar to invokeAll, invokeAny also has a timed variant(变体). If none task can't complete in the given time, invokeAny throws a TimeoutException.

public void batchWithInvokeAnyWithTimeout(List<Callable<String>> tasks) throws InterruptedException {
    final ExecutorService threadPool = Executors.newFixedThreadPool(threadCount);

    try {
        final String result = threadPool.invokeAny(tasks, 200, TimeUnit.MILLISECONDS);
        System.out.println(result);
    } catch (TimeoutException e) {
        System.out.println("No successful result until timeout.");
    } catch (ExecutionException e) {
        System.out.println("No tasks successfully completed.");
    }

    threadPool.shutdown();
}

Here, we're defining the timeout as 200 milliseconds.

Given the same tasks as the previous example, a sample run prints:

No successful result until timeout.

9. Summary

In this tutorial, we've looked at how we can submit multiple tasks to an ExecutorService instance.

First, we've looked at the invokeAll method to get all results in the submission order.

Then we've created a custom method similar to invokeAll. We learned that CompletionService allows us to get the task results as they become ready.

We then investigated invokeAny to acquire the first result among similar tasks.

Lastly, check out the source code for all examples in this tutorial over on Github.

标签:tasks,Batch,task,Callable,Tasks,result,new,using,final
From: https://www.cnblogs.com/JaxYoun/p/17671399.html

相关文章

  • TypeScript – Using Disposable
    前言TypeScriptv5.2多了一个新功能叫 Disposable。Dispose的作用是让"对象"离开"作用域"后做出一些"释放资源"的操作。很多地方都可以看到 Dispose概念。比如WebComponent的 disconnectedCallback,Angular组件的 ngOnDestroy。而对象释放资源在其它面向对象......
  • SCHTASKS 执行计划任务
    不正确:schtasks/create/tn"MySQLAutomaticBackup"/tr"d:\pathwithspacestomyscript\myscript.bat"/scdaily...  正确:schtasks/create/tn"MySQLAutomaticBackup"/tr"\"d:\pathwiths......
  • Struts2标签错误:using Struts tags without the associat解决
    <filter-mapping><filter-name>struts2</filter-name><url-pattern>*.jsp</url-pattern></filter-mapping> struts,在使用标签的时候,出现了这样一个问题。    原本使用标签,引用方法是默认配置:    web.xml:<filter><filter-name&......
  • 使用pdb调试openstack (How to debug openstack using pdb )
    终于摸索出了一个调试openstack的简单方法,使用pdb进行单步调试,对于分析代码很有帮助。注意,这个方法只适用于用devstack安装的openstack。调试主要是使用了一个脚本,这个脚本不记得是从哪下载的了,是用来重启使用devstack安装的openstack的各个服务的,后来,我又结合devstack的stack.sh和......
  • cocos2dx之利用CCSpriteBatchNode创建多个Sprite
    相关技术文档,我们在渲染一个图片的时候经常都是一次渲染一个,如果图片资源很多的话,自然降低了效率,这个时候,我们想,要是能一次渲染完毕,以后要再创建的时候,就不需要再渲染就好了,刚好提供了一个类:CCSpriteBatchNode,一次渲染多个,具体看如下代码:voidMyBathNodeLayer::initLayer(){ CCSi......
  • Spring Boot + Spring Batch 实现批处理任务,保姆级教程!(场景实战)
    来源:blog.csdn.net/qq_35387940/article/details/108193473前言概念词就不多说了,我简单地介绍下,springbatch是一个方便使用的较健全的批处理框架。为什么说是方便使用的,因为这是基于spring的一个框架,接入简单、易理解、流程分明。为什么说是较健全的,因为它提供了往......
  • WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platfo
    问题描述使用start-dfs.sh命令开启hdfs服务时,爆出这样的警告信息问题解决可以先进入到我们下载hadoop的文件目录下,然后进入到这个文件里面:vimetc/hadoop/log4j.properties然后将下面的语句添加到问年末尾处:log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR......
  • Using 声明”在 C# 7.3 中不可用。请使用 8.0 或更高的语言版本
    Core3.0升级至3.1时候报错:“Using声明”在C#7.3中不可用。请使用8.0或更高的语言版本。解决办法:在工程.csproj文件中的<PropertyGroup>节加入<LangVersion>preview</LangVersion>即可。也可以指定具体的版本。......
  • 2023-08-22 SAS数据集与Excel文件之间的批量转换 Batch File Exchange between SAS an
    参考资料:BatchFileExchangebetweenSASandExcel-theMagicofCallExecute我们经常需要在SAS和MSExcel之间传输数据。默认情况下,SASlibnameengine或PROCIMPORT/EXPORT只能在SAS和Excel之间传输单个文件。在多个文件的情况下,一个一个地传输文件会显得过于繁......
  • The body might complete normally, causing 'null' to be returned, but the return
    你收到的警告信息表明onRefresh回调函数的代码体可能会在没有返回值的情况下正常完成,但是onRefresh的返回类型是Future<void>,这是一个不可为空的类型。要解决这个警告,你可以在onRefresh回调函数中明确返回一个Future<void>。你可以使用async关键字,并返回Future.value()......