文章目录
引言
在Java并发编程中,了解线程池的任务提交过程对于正确使用线程池至关重要。本文将详细介绍任务从提交到执行的完整流程,包括任务提交方式、执行流程以及相关的异常处理机制。
一、任务提交方式
1.1 execute方法
execute方法是ThreadPoolExecutor提供的最基本的任务提交方式,用于提交不需要返回值的任务。它只接受Runnable类型的任务,不会返回任务的执行结果。
public class ExecuteExample {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2)
);
executor.execute(() -> {
try {
Thread.sleep(1000);
System.out.println("Task executed by " +
Thread.currentThread().getName());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
}
1.2 submit方法
submit方法是通过ExecutorService接口提供的任务提交方式,可以提交有返回值的任务。它接受Callable或Runnable类型的任务,并返回Future对象。
public class SubmitExample {
public static void main(String[] args) throws Exception {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2)
);
// 提交Callable任务
Future<String> future = executor.submit(() -> {
Thread.sleep(1000);
return "Task result";
});
// 获取任务结果
String result = future.get();
System.out.println("Task result: " + result);
}
}
二、任务执行流程
2.1 核心流程分析
当任务提交到线程池后,线程池会根据当前的运行状态和配置来决定如何处理任务。一般会经过以下判断流程:核心线程数判断、任务队列判断、最大线程数判断。
public class TaskExecutionFlow {
private ThreadPoolExecutor createThreadPool() {
return new ThreadPoolExecutor(
2, // 核心线程数
4, // 最大线程数
60L, TimeUnit.SECONDS, // 线程存活时间
new ArrayBlockingQueue<>(2), // 任务队列
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
}
public void demonstrateExecutionFlow() {
ThreadPoolExecutor executor = createThreadPool();
// 提交多个任务演示执行流程
for (int i = 0; i < 6; i++) {
final int taskId = i;
try {
executor.execute(() -> {
System.out.printf("Task %d is running on %s%n",
taskId, Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
} catch (RejectedExecutionException e) {
System.out.println("Task " + taskId + " was rejected");
}
}
}
}
2.2 任务状态转换
任务在提交到执行的过程中会经历多个状态:等待执行、正在执行、执行完成或异常终止。通过Future可以监控任务的执行状态。
public class TaskStateMonitor {
public static void monitorTaskState() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>()
);
Future<String> future = executor.submit(() -> {
System.out.println("Task started");
Thread.sleep(2000);
return "Task completed";
});
// 监控任务状态
while (!future.isDone()) {
System.out.println("Task is still running...");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
try {
String result = future.get();
System.out.println("Final result: " + result);
} catch (Exception e) {
System.out.println("Task execution failed: " + e.getMessage());
}
}
}
三、任务队列处理
3.1 队列类型选择
不同的队列类型会影响任务的提交和执行方式。常用的队列类型包括ArrayBlockingQueue、LinkedBlockingQueue和SynchronousQueue。
public class QueueTypeDemo {
// 有界队列示例
public ThreadPoolExecutor createWithArrayQueue() {
return new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100)
);
}
// 无界队列示例
public ThreadPoolExecutor createWithLinkedQueue() {
return new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>()
);
}
// 同步队列示例
public ThreadPoolExecutor createWithSynchronousQueue() {
return new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new SynchronousQueue<>()
);
}
}
3.2 队列满时的处理
当任务队列满时,线程池会根据配置的拒绝策略来处理新提交的任务。可以实现自定义的拒绝策略来满足特定需求。
public class QueueFullHandling {
public static class LoggingRejectedHandler
implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r,
ThreadPoolExecutor executor) {
System.out.println("Task rejected at: " +
System.currentTimeMillis());
System.out.println("Current pool size: " +
executor.getPoolSize());
System.out.println("Queue size: " +
executor.getQueue().size());
// 可以将任务信息保存到日志或数据库
saveRejectedTask(r);
}
private void saveRejectedTask(Runnable task) {
// 保存任务信息的具体实现
}
}
public static void demonstrateRejection() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 2,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
new LoggingRejectedHandler()
);
// 提交超过线程池处理能力的任务
for (int i = 0; i < 4; i++) {
final int taskId = i;
executor.execute(() -> {
try {
Thread.sleep(1000);
System.out.println("Task " + taskId + " executed");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
}
}
四、异常处理
4.1 提交时异常处理
在任务提交过程中可能出现的异常包括RejectedExecutionException等,需要妥善处理这些异常。
public class ExceptionHandling {
public void submitTaskWithExceptionHandling() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 1,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1)
);
try {
executor.execute(() -> {
// 任务逻辑
System.out.println("Task executed");
});
} catch (RejectedExecutionException e) {
// 处理任务拒绝异常
System.out.println("Task rejected: " + e.getMessage());
// 可以选择重试或其他处理方式
handleRejectedTask();
} catch (Exception e) {
// 处理其他可能的异常
System.out.println("Task submission failed: " + e.getMessage());
}
}
private void handleRejectedTask() {
// 实现任务拒绝后的处理逻辑
}
}
4.2 执行时异常处理
任务执行过程中的异常需要通过定制线程池或使用Future来捕获和处理。
public class ExecutionExceptionHandling {
public static class CustomThreadPool extends ThreadPoolExecutor {
public CustomThreadPool(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime,
unit, workQueue);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t != null) {
System.out.println("Task failed with exception: " +
t.getMessage());
// 记录异常信息
logException(t);
}
}
private void logException(Throwable t) {
// 实现异常日志记录
}
}
public void demonstrateExceptionHandling() {
CustomThreadPool executor = new CustomThreadPool(
2, 4, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2)
);
Future<?> future = executor.submit(() -> {
throw new RuntimeException("Task failed");
});
try {
future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
System.out.println("Task execution failed: " +
e.getCause().getMessage());
}
}
}
总结
线程池的任务提交是一个复杂的过程,涉及多个环节和多种情况的处理。通过合理使用不同的提交方式、选择适当的队列类型、实现完善的异常处理机制,可以确保任务能够被正确且高效地处理。在实际应用中,要根据具体场景选择合适的配置,并做好异常处理和监控。
标签:程池,Java,System,98,任务,Task,new,public,ThreadPoolExecutor From: https://blog.csdn.net/weixin_55344375/article/details/144667548