首页 > 编程语言 >Java ThreadPoolExecutor

Java ThreadPoolExecutor

时间:2024-05-25 16:44:37浏览次数:27  
标签:Java executor 任务 线程 Executor ThreadPoolTaskExecutor ThreadPoolExecutor

ThreadPoolExecutor?

ThreadPoolExecutor是什么,先拆开来看,Thread Pool And Executor?那Thread Pool是什么?Executor又是什么?

Executor:任务执行者,只定义了一个execute方法,接收一个Runable参数。

public interface Executor {
void execute(Runnable command);
}

Thread Pool:可以缓存一定数量线程的池子。

List<WorkerThread> workers = new ArrayList<>();

那结合在一起的ThreadPoolExecutor是什么?

ThreadPoolExecutor就是一个Executor,用于执行任务。那为什么前面有ThreadPool呢?因为执行任务时,所使用的线程是缓存在一个ThreadPool中的,结合了ThreadPool和Executor的概念,所以叫ThreadPoolExecutor。

Executor为什么需要Thread Pool?

因为创建线程所需要的开销较大,进程需要向系统内核申请,然后由系统内核创建线程后分配给进程,这个过程所需的时间可能比实际任务执行的时间还要久。如果每次任务执行都创建一个新的线程,既影响系统吞吐量,也浪费了系统资源。创建一定数量的线程,将其缓存到线程池中是更好的使用方式。

Executor实例

以java.util.concurrent.ThreadPoolExecutor为例,其构造方法如下,方法中的参数就是一个Executor的核心参数。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

构造参数

  • int corePoolSize: 线程池中核心线程数据数量
  • int maximumPoolSize: 在同步队列写满之后,线程池中最大可创建的线程数量
  • long keepAliveTime: 非核心线程的最长存活时间
  • TimeUnit unit: 时间单位
  • BlockingQueue workQueue: 保存待执行任务的同步队列
  • ThreadFactory threadFactory: 创建线程的工厂
  • RejectedExecutionHandler handler: 当同步队列满载之后,添加新的任务时执行的拒绝策略

核心配置

在配置时,需要重点关注的是corePoolSize和BlockingQueue

  • corePoolSize:针对不同的任务类型,核心线程数量的配置不同。当任务类型为CPU密集型时,可以配置为(CPU核心 - 1);当为IO密集型时,根据单个任务的执行完成时间来设置,执行时间愈长时,配置的核心线程数可以愈多,例如(CPU核心*2)。
  • BlockingQueue:在提交任务时,如果线程数小于corePoolSize配置,则新建一个线程执行任务。如果核心线程已经全部创建,新提交的任务将会被添加到同步队列中暂存,然后等待线程从队列中获取任务进行执行。这涉及到队列长度问题,队列过长或者过短都会影响使用。需要根据实际业务情况进行设置,例如Executor每秒的任务处理能力、每秒的任务提交数量和持续时间。

需要注意的是在一个ThreadPoolExecutor中,不管有多少个生产者(调用者),不管有多少个消费者(线程),其都共用了一个阻塞队列。而阻塞队列中都是加锁的,ArrayBlockingQueue不管是take和put操作,都使用了同一把锁;LinkedBlockingQueue则是对take和put操作分别设置了一把锁。

应用场景

最常使用的场景应该是多任务并行处理和异步处理的时候

并行处理

在使用ThreadPoolExecutor并行处理任务时,需要先计量一下单个任务的执行时间和线程间的切换时间。如果是CPU密集型,并且任务量不多的话,直接单线程就可以了。使用多线程未必有多大的提升,除非该处理步骤确实很耗时,在排除代码原因后可以尝试多线程处理。

如果是IO密集型,并且任务量较多时,应尽量使用多线程的方式。因为不管是磁盘IO还是网络IO,处理时间一般都远大于线程切换时间和同步队列的锁开销。

异步处理

在异步处理的时候,往往是主线程提交任务后立即返回,交由Executor异步执行。这种处理方式性能极佳,但可靠性不足,适用于在允许少量数据丢失的场景下,例如在logback中,启用异步日志记录。

使用实例

ThreadPoolTaskExecutor

在SpringBoot项目中,一般是创建一个ThreadPoolTaskExecutor。其是java中ThreadPoolExecutor的封装,提供了一些功能扩展和更便捷的配置和管理,使用更方便,功能更强大。

自定义ThreadPoolTaskExecutor

@Bean("taskExecutor") 
public Executor taskExecutor() {

    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(7);
    executor.setMaxPoolSize(20);
    executor.setQueueCapacity(20000);
    executor.setKeepAliveSeconds(60); 
    executor.setThreadNamePrefix("taskExecutor-"); 

    // 添加一个任务装饰器
    executor.setTaskDecorator(new ContextDecorator());
    // 线程池对拒绝任务的处理策略
    executor.setRejectedExecutionHandler(newThreadPoolExecutor.AbortPolicy());
    
    executor.initialize();
    return executor;
}

TaskDecorator

ThreadPoolTaskExecutor中提供的一个任务装饰器,可以在任务执行前后进行一些自定义操作,适用于需要在线程间传递消息的场景。

例如,现在有上百个REST API请求任务,为了更快的获取,将这些任务放到ThreadPoolTaskExecutor中去执行。但是在Executor中执行时发现,每个请求任务都没有成功获取到数据,而使用主线程遍历获取是没问题的。

原因是在调用REST API接口时,需要用到当前线程中的RequestContext信息,主线程中存在RequestContext信息(Auth信息),但Executor中的线程是没有的。这时就需要实现一个TaskDecorator,将主线程中的RequestContext复制到执行的子线程中了。

实现一个TaskDecorator

package com.sugon.common.config;

import org.springframework.core.task.TaskDecorator;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;

public class ContextDecorator implements TaskDecorator {

    @Override
    public Runnable decorate(Runnable runnable) {

        RequestAttributes context;
        try{
            context = RequestContextHolder.currentRequestAttributes();
        } catch (IllegalStateException e) {
            // 不存在请求参数时会抛出异常,这里返回一个空对象
            // NonWebRequestAttributes为实现RequestAttributes接口的空对象
            context = new NonWebRequestAttributes();
        }

        RequestAttributes finalContext = context;
        return () -> {
            try {
                RequestContextHolder.setRequestAttributes(finalContext);
                runnable.run();
            } finally {
                RequestContextHolder.resetRequestAttributes();
            }
        };
    }
}

将其设置到ThreadPoolTaskExecutor中

executor.setTaskDecorator(new ContextDecorator());

并行处理

使用java8中的CompletableFuture和Executor,进行多任务的并行处理

@Autowired
@Qualifier("taskExecutor")
Executor executor;

public void parallelRun(List<Runnable> runnables) {
    
    CompletableFuture[] futures = runnables.stream()
            .map(runnable -> CompletableFuture.runAsync(runnable, executor))
            .toArray(CompletableFuture[]::new);
    
    // 等待所有任务执行成功
    CompletableFuture.allOf(futures).join();
}

异步处理

提交任务后,不关心返回结果

@Autowired
@Qualifier("taskExecutor")
Executor executor;

public void asyncRun(Runnable runnable) {
    
    // 提交后立即返回
    CompletableFuture.runAsync(runnable, executor);
}

标签:Java,executor,任务,线程,Executor,ThreadPoolTaskExecutor,ThreadPoolExecutor
From: https://www.cnblogs.com/cd-along/p/18211113

相关文章

  • Java实现图书系统
    首先实现一个图书管理系统,我们要知道有哪些元素?1.用户分成为管理员和普通用户2.书:书架  书3.操作的是:书架目录第一步:建包第二步:搭建框架首先:完成book中的方法其次:完成BookList然后:完成管理员界面和普通用户界面最后:Main第三步:细分方法1.退出系统2......
  • 从零手写实现 nginx-01-为什么不能有 java 版本的 nginx?
    前言大家好,我是老马。很高兴遇到你。作为一个java开发者,工作中一直在使用nginx。却发现一直停留在使用层面,无法深入理解。有一天我在想,为什么不能有一个java版本的nginx呢?一者是理解nginx的设计灵魂,再者java开发者用java语言的服务器不是更加自然吗。于是动手开......
  • arthas:Java调试利器,线上Debug不是梦
    目录前言一、Arthas是什么?二、Arthas能解决啥问题?三、Arthas两种安装、启动方式1、jar包启动2、在线安装3、远程连接:四、Arthas命令使用1、Dashboard命令2、Thread(线程监控)3、JVM(jvm实时运行状态,内存使用情况等)4、trace(当前方法内部调用路径,路径上每个节......
  • 「终极收藏」前端开发必备:超全JavaScript公共方法大全
    目录引言1安装js-tool-big-box工具包1.1安装1.2截至目前的方法集合 2时间日期类 2.1更灵活的年月日时分秒2.2 日期时间转换2.3个性的时间组合 2.4 某个时间距离现在2.5 平年还是闰年2.6指定月份的天数 2.7属相2.8获取指定年份的法定节假日 3......
  • Java根据URL下载文件到本地的2种方式(大型文件与小型文件)
    1.小型文件推荐使用2.大型文件推荐使用总结 各位小伙伴是否有使用java,根据url下载文件到本地的需求,以下介绍两种方式1.小型文件推荐使用代码解析首先创建了一个URL对象website,用来表示远程文件的地址。然后创建了一个ReadableByteChannel对象rbc和一个FileOutputStr......
  • Java 登录错误次数限制,用户禁登1小时
    手机号验证码登录,验证码输入错误次数超5次封禁@OverridepublicbooleancheckCaptcha(StringphoneNum,Stringcaptcha){StringcodeNum=(String)redisTemplate.opsForValue().get(UserCacheNames.USER_CAPTCHA+phoneNum);if(codeNum==......
  • JAVA计算机毕业设计基于SpringBoot的在线古玩市场系统的设计与实现(附源码+springboot+
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着互联网的迅猛发展和电子商务的普及,传统行业纷纷寻求数字化转型以适应市场的新需求。古玩市场作为一个历史悠久、文化底蕴深厚的行业,在数字化浪潮......
  • 红黑树基于Java代码的剖析
    红黑树是一种自平衡的二叉查找树,通过添加颜色属性和旋转操作来保证树的平衡性,从而在最坏情况下仍能提供对数时间复杂度的插入、删除和查找操作。本文通过对一段红黑树的Java代码进行剖析,详细讲解其插入和删除操作是如何实现的,以及这些操作是如何利用红黑树的性质来维持平衡......
  • 深入理解Java的垃圾回收机制(GC)实现原理
    深入理解Java的垃圾回收机制(GC)实现原理Java的垃圾回收机制(GarbageCollection,GC)是其内存管理的核心功能之一。通过GC,Java自动管理对象的生命周期,回收不再使用的对象所占的内存空间。本文将详细探讨GC的实现原理、不同算法的细节以及其在JVM中的应用。1.垃圾回收的基本......
  • web前端课程设计——重庆旅游7页 HTML+CSS+JavaScript
    ......