首页 > 其他分享 >线程池在工作中的几种使用姿势

线程池在工作中的几种使用姿势

时间:2023-08-31 17:22:28浏览次数:41  
标签:姿势 Executors int 池在 private 线程 使用 ThreadPoolExecutor

线程池在开发中一定会用到,如果能像golang一样,java语言也有协程,也许java程序员就少了一种包袱。

回归正题,我们聊下到底有哪些线程池的使用方式,总结有以下几种。

  1. JDK 内置线程池
  2. Spring线程池
  3. 自己魔改封装

1、JDK 内置线程池

常用的有:

我们看下最全的线程池参数,探究为什么阿里规约不建议使用Executors创建默认个数的线程池。

/**
参数【7个】
corePoolSize - 即使空闲时仍保留在池中的线程数,除非设置 allowCoreThreadTimeOut
maximumPoolSize - 池中允许的最大线程数
keepAliveTime - 当线程数大于内核时,这是多余的空闲线程在终止前等待新任务的最大时间。
unit - keepAliveTime参数的时间单位
workQueue - 用于在执行任务之前使用的队列。 这个队列将仅保存execute方法提交的Runnable任务。
threadFactory - 执行程序创建新线程时使用的工厂
handler - 执行被阻止时使用的处理程序,因为达到线程限制和队列容量

四个预定义的处理程序策略
在默认ThreadPoolExecutor.AbortPolicy ,处理程序会引发运行RejectedExecutionException后排斥反应。
在ThreadPoolExecutor.CallerRunsPolicy中,调用execute本身的线程运行任务。 这提供了一个简单的反馈控制机制,将降低新任务提交的速度。
在ThreadPoolExecutor.DiscardPolicy中 ,简单地删除无法执行的任务。
在ThreadPoolExecutor.DiscardOldestPolicy中 ,如果执行程序没有关闭,则工作队列头部的任务被删除,然后重试执行(可能会再次失败,导致重复)。

**/

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

1.1 阿里规约原文

线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

说明:Executors返回的线程池对象的弊端如下:

  • 1)FixedThreadPool和SingleThreadPool:

    允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。

  • 2)CachedThreadPool:

    允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。

1.2 阿里规约是否是多余的?不然jdk为啥提供这个api?

答案可以看下ThreadPoolExecutor类的上的注释,已翻译成中文:

为了在广泛的上下文中有用,此类提供了许多可调参数和可扩展性钩子。 然而,程序员被敦促使用更方便的Executors工厂方法Executors.newCachedThreadPool() (无限线程池,具有自动线程回收), Executors.newFixedThreadPool(int) (固定大小的线程池)和Executors.newSingleThreadExecutor() (单个后台线程),可以预先配置最常用的使用场景设置。 否则,手动配置和调优此类时,请使用以下指南...

其实这个类的作者"Doug Lea",已经说明Executors.newCachedThreadPool(),Executors.newFixedThreadPool(int),应该是在什么情况下使用?
只是一般程序员是不会去仔细看类说明文档,都是跟着百度复制过来的代码就干起活来了,
说起来也惭愧之前在2010年培训的时候,其实是会看这个jdk api文档的,但那个时候似乎很懵,也不会去详尽的看文档背后的一些知识点。
所以说看第一手材料多么重要,作者已经把该注意的点都写到类说明文档注释里了,吓得我赶紧用到一个类都去细看下文档注释,解决不了再找度娘。

1.3 线程池不为人知的几个冷门知识点

从几个面试题来一一道来。

1.核心和最大线程池是否都可以动态修改?

可以使用setCorePoolSize(int)和setMaximumPoolSize(int)进行动态 更改

2.核心线程最初创建并且只有在新任务到达时才启动?

可以使用方法prestartCoreThread()或prestartAllCoreThreads()动态地覆盖。如果您使用非空队列构建池,则可能需要预先提供线程。

3.核心线程是否一定不终止?

不一定,报错或者allowCoreThreadTimeOut(boolean)也可以用于将这个超时策略应用于核心线程,只要keepAliveTime值不为零

4.如何给线程池添加一个启动/暂停的功能?

class PausableThreadPoolExecutor extends ThreadPoolExecutor {              
  private boolean isPaused;                                                
  private ReentrantLock pauseLock = new ReentrantLock();                   
  private Condition unpaused = pauseLock.newCondition();                   
                                                                           
  public PausableThreadPoolExecutor(...) { super(...); }                   
                                                                           
  protected void beforeExecute(Thread t, Runnable r) {                     
    super.beforeExecute(t, r);                                             
    pauseLock.lock();                                                      
    try {                                                                  
      while (isPaused) unpaused.await();                                   
    } catch (InterruptedException ie) {                                    
      t.interrupt();                                                       
    } finally {                                                            
      pauseLock.unlock();                                                  
    }                                                                      
  }                                                                        
                                                                           
  public void pause() {                                                    
    pauseLock.lock();                                                      
    try {                                                                  
      isPaused = true;                                                     
    } finally {                                                            
      pauseLock.unlock();                                                  
    }                                                                      
  }                                                                        
                                                                           
  public void resume() {                                                   
    pauseLock.lock();                                                      
    try {                                                                  
      isPaused = false;                                                    
      unpaused.signalAll();                                                
    } finally {                                                            
      pauseLock.unlock();                                                  
    }                                                                      
  }                                                                        
}}                                                             

2、Spring线程池

官方已经实现的全部7个TaskExecuter。Spring宣称对于任何场景,这些TaskExecuter完全够用了:

名字特点
SimpleAsyncTaskExecutor 每次请求新开线程,没有最大线程数设置.不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。限流是通过 ConcurrencyThrottleSupport类的monitor的wait(),notify() 实现的
SyncTaskExecutor 不是异步的线程.同步可以用SyncTaskExecutor,但这个可以说不算一个线程池,因为还在原线程执行。这个类没有实现异步调用,只是一个同步操作。
ConcurrentTaskExecutor Executor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类。
SimpleThreadPoolTaskExecutor 监听Spring’s lifecycle callbacks,并且可以和Quartz的Component兼容.是Quartz的SimpleThreadPool的类。线程池同时被quartz和非quartz使用,才需要使用此类。
ThreadPoolTaskExecutor 最常用。要求jdk版本大于等于5。可以在程序而不是xml里修改线程池的配置.其实质是对java.util.concurrent.ThreadPoolExecutor的包装。
TimerTaskExecutor  
WorkManagerTaskExecutor  
  • 使用ThreadPoolExecutorFactoryBean
package org.springframework.scheduling.concurrent;

public class ThreadPoolExecutorFactoryBean extends ExecutorConfigurationSupport implements FactoryBean<ExecutorService>, InitializingBean, DisposableBean {
    private int corePoolSize = 1;
    private int maxPoolSize = 2147483647;
    private int keepAliveSeconds = 60;
    private boolean allowCoreThreadTimeOut = false;
    private int queueCapacity = 2147483647;
    private boolean exposeUnconfigurableExecutor = false;
    @Nullable
    private ExecutorService exposedExecutor;

    public ThreadPoolExecutorFactoryBean() {
    }
    ...
    protected ThreadPoolExecutor createExecutor(int corePoolSize, int maxPoolSize, int keepAliveSeconds, BlockingQueue<Runnable> queue, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
        return new ThreadPoolExecutor(corePoolSize, maxPoolSize, (long)keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler);
    }
  • 使用ScheduledExecutorFactoryBean
package org.springframework.scheduling.concurrent;

public class ScheduledExecutorFactoryBean extends ExecutorConfigurationSupport implements FactoryBean<ScheduledExecutorService> {
    private int poolSize = 1;
    @Nullable
    private ScheduledExecutorTask[] scheduledExecutorTasks;
    private boolean removeOnCancelPolicy = false;
    private boolean continueScheduledExecutionAfterException = false;
    private boolean exposeUnconfigurableExecutor = false;
    @Nullable
    private ScheduledExecutorService exposedExecutor;

    public ScheduledExecutorFactoryBean() {
    }
  • 使用ThreadPoolTaskExecutor
    这个类可以设置回调方法

  • 使用ThreadPoolTaskScheduler
    这个类可以设置回调方法,包装错误处理类,错误不会影响执行下一个任务 @Scheduled就是使用这个类包装的,注意核心默认一个线程,会阻塞任务

	@Override
	public void run() {
		try {
			this.delegate.run();
		}
		catch (UndeclaredThrowableException ex) {
			this.errorHandler.handleError(ex.getUndeclaredThrowable());
		}
		catch (Throwable ex) {
			this.errorHandler.handleError(ex);
		}
	}

 

标签:姿势,Executors,int,池在,private,线程,使用,ThreadPoolExecutor
From: https://www.cnblogs.com/jiaodaoniujava/p/17670026.html

相关文章

  • 多线程与单线程执行的对比
    对比技术点:单线程:普通循环多线程框架:CompletableFuture多线程框架;ForkJoin50次对比实验 源码: packagecom.example.demo;importorg.apache.commons.lang3.time.StopWatch;importjava.util.ArrayList;importjava.util.List;importjava.util.concurre......
  • java 使用多线程的注意事项
    线程安全:确保共享资源的正确访问和修改,避免并发冲突。可以通过同步机制(如锁、互斥量、信号量等)或使用线程安全的数据结构来实现线程安全。案例:银行账户并发访问importjava.util.concurrent.locks.Lock;importjava.util.concurrent.locks.ReentrantLock;classBankAccount{......
  • 多线程执行工具方法
    publicstatic<P,T>List<CompletableFuture<T>>multiThreadRun(Function<P,T>run,Collection<P>list,intthreadSize,Executorexecutor,booleanwaitRunFinal){List<CompletableFuture<T>>cf=newArr......
  • MySQL 使用Navicat delete/insert into/update 大量数据表锁死,kill的线程后线程处于ki
      MySQL使用delete/insertinto/update大量数据表锁死,kill的线程后线程处于killed状态问题解决实际生产环境问题描述:使用Navicat备份BigData数据表时不小心点到了取消按钮,导致数据表被锁。  查看MySQL线程队列,找到刚刚执行的SQL看是属于什么状态。showprocessli......
  • 线程同步 读写锁
    目录读写锁特点函数初始化和析构读锁写锁解锁读写锁使用读写锁一把锁,并不是读锁和写锁称之为读写锁,因为他既可以锁定读操作,也可以锁定写操作pthread_rwlock_trwlock;锁中记录了锁的状态打开关闭锁定的操作锁读锁写哪个线程持有钥匙使用方式和互斥锁相同:找共享资......
  • 线程篇--线程的特点
    1.线程是轻量级进程(light-weightprocess),也有PCB,创建线程使⽤的底层函数和进程⼀样,都是clone;2.从内核⾥看进程和线程是⼀样的,都有各⾃不同的PCB;3.进程可以蜕变成线程;4.在linux下,线程最是⼩的执⾏单位;进程是最⼩的分配资源单位。实际上,⽆论是创建进程的fork,还是创建线......
  • 线程同步 死锁
    目录加锁后忘记解锁重复加锁,造成死锁死锁成环如何避免加锁后忘记解锁//场景1voidfunc(){for(inti=0;i<6;++i){//当前线程A加锁成功,当前循环完毕没有解锁,在下一轮循环的时候自己被阻塞了//其余的线程也被阻塞 pthread_mutex_lock(......
  • python多线程
    python多线程多线程threading,利用CPU和IO可以同时执行的原理多进程multiprocessing,利用多核CPU的能力,真正的并行执行任务异步IOasyncio,在单线程利用CPU和IO同时执行的原理,实现函数异步执行使用Lock对资源加锁,防止冲突访问使用Queue实现不同线程/进程之间的数据通信,实现生......
  • leetcode & c++多线程刷题日志
    1.按序打印按序打印解法互斥锁classFoo{mutexmtx1,mtx2;public:Foo(){mtx1.lock(),mtx2.lock();}voidfirst(function<void()>printFirst){printFirst();mtx1.unlock();}voidsecond(function<voi......
  • 多线程|volatile的使用
    一、内存可见性问题先来看如下代码classMyCounter{publicintflag=0;}publicclassThreadDemo22{publicstaticvoidmain(String[]args){MyCountermyCounter=newMyCounter();Threadt1=newThread(()->{while(myCounter.f......