首页 > 其他分享 >ThreadPoolExecutor

ThreadPoolExecutor

时间:2023-07-26 15:22:24浏览次数:42  
标签:task java util concurrent Runnable public ThreadPoolExecutor

任务类型

1、java.lang.Runnable

@FunctionalInterface
public interface Runnable {
    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see     java.lang.Thread#run()
     */
    public abstract void run();
}

2、java.util.concurrent.Callable

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

  

任务提交

方式

1、java.util.concurrent.ThreadPoolExecutor#execute

  public void execute(Runnable command) {}

2、java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable)

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

3、java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable, T)  

public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

4、java.util.concurrent.AbstractExecutorService#submit(java.util.concurrent.Callable<T>)

 public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

  

任务提交拒绝策略

1、java.util.concurrent.ThreadPoolExecutor#execute

  

// command the task to execute
// @throws RejectedExecutionException at discretion of {@code 
// RejectedExecutionHandler}, if the task cannot be accepted for execution

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
       
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);                                         -- 拒绝
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);                                            -- 拒绝
    }

  

2、java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable)

/**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);  -- 将Runnable包装
        execute(ftask);  -- java.util.concurrent.ThreadPoolExecutor#execute
        return ftask;
    }


protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

  

3、java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable, T)

/**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result); // 将Runnable包装
        execute(ftask);    // java.util.concurrent.ThreadPoolExecutor#execute
        return ftask;
    }


 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

  

4、java.util.concurrent.AbstractExecutorService#submit(java.util.concurrent.Callable<T>)

/**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);  // 将Callable包装
        execute(ftask);  //java.util.concurrent.ThreadPoolExecutor#execute
        return ftask;
    }


protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

  

  【execute、submit时,可能会有异常,需要手动try...catch...】

 

任务线程异常

java.util.concurrent.ThreadPoolExecutor#execute

public class ThreadPoolExecutor extends AbstractExecutorService {

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable{

        public void run() {
            runWorker(this);
        }

    }


     final void runWorker(Worker w) {

        ...    
        beforeExecute(wt, task);
        run...
        throw...
        ...
        afterExecute(task, thrown);
        ...

    }
}

  

  任务线程执行异常,将由 java.lang.Thread.UncaughtExceptionHandler 进行处理

 

// When a thread is about to terminate due to an uncaught exception
// the Java Virtual Machine will query the thread for its
// UncaughtExceptionHandler using getUncaughtExceptionHandler 
// and will invoke the handler's uncaughtException method, 
// passing the thread and the exception as arguments.

@FunctionalInterface
    public interface UncaughtExceptionHandler {
        /**
         * Method invoked when the given thread terminates due to the
         * given uncaught exception.
         * <p>Any exception thrown by this method will be ignored by the
         * Java Virtual Machine.
         * @param t the thread
         * @param e the exception
         */
        void uncaughtException(Thread t, Throwable e);
    }

  

/**
     * Dispatch an uncaught exception to the handler. This method is
     * intended to be called only by the JVM.
     */
    private void dispatchUncaughtException(Throwable e) {
        getUncaughtExceptionHandler().uncaughtException(this, e);
    }

  当任务线程异常,JVM会调用dispatchUncaughtException处理异常

 

  用户可以自定义UncaughtExceptionHandler,实现UncaughtExceptionHandler,通过java.lang.Thread#setUncaughtExceptionHandler自定义线程异常处理器

public void setUncaughtExceptionHandler(UncaughtExceptionHandler eh) {
        checkAccess();
        uncaughtExceptionHandler = eh;
    }

  

java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable)/submit(java.lang.Runnable, T)/submit(java.util.concurrent.Callable<T>)

  submit方式,将Runnable、Callble包装成java.util.concurrent.FutureTask,任务执行体就是FutureTask的run方法

public interface RunnableFuture<V> extends Runnable, Future<V> {

     void run();

}


public class FutureTask<V> implements RunnableFuture<V> {

    // The result to return or exception to throw from get() 
    private Object outcome;

    public void run() {

        ...
        try {
                    result = c.call();   // 任务执行
                    ...
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);      // 任务异常处理
                }
        ....

    }


    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;   // 将异常信息存储到outcome
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }


    


}

  

  当调用java.util.concurrent.FutureTask#get(),任务线程的异常就会获取到

public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);    // 获取异常信息
    }


/**
     * Returns result or throws exception for completed task.
     *
     * @param s completed state value
     */
    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        Object x = outcome;     // 从outcome获取异常信息
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

  

  

  

  

标签:task,java,util,concurrent,Runnable,public,ThreadPoolExecutor
From: https://www.cnblogs.com/anpeiyong/p/17582560.html

相关文章

  • ThreadPoolExecutor线程池用法简介
    ThreadPoolExecutor 是Java中用于管理线程池的类,它提供了一种方便的方式来执行多线程任务。通过使用线程池,我们可以有效地管理和复用线程,提高程序的性能和资源利用率。下面是 ThreadPoolExecutor 线程池的详细用法介绍:创建线程池对象:ThreadPoolExecutorexecutor=ne......
  • ScheduledThreadPoolExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(bo
    MethodSummary voidexecute(Runnable          Executecommandwithzerorequireddelay. booleangetContinueExistingPeriodicTasksAfterShutdownPolicy()          Getthepolicyonwhethertocontinueexecutingexistingperiodictaskseven......
  • ScheduledThreadPoolExecutor模仿学习
     publicinterfaceCBlockingQueue<E>{booleanadd(Ee);Etake();} importjava.util.concurrent.Delayed;importjava.util.concurrent.FutureTask;importjava.util.concurrent.RunnableScheduledFuture;importjava.util.concurrent.TimeUnit;......
  • ThreadPoolTaskExecutor与ThreadPoolExecutor的区别及优缺点
    ThreadPoolTaskExecutor和ThreadPoolExecutor都是线程池的实现,但它们有以下几点区别:1.ThreadPoolTaskExecutor是Spring框架中编写的,它对ThreadPoolExecutor进行了封装,提供了更加丰富的功能,更易于在Spring中使用。而ThreadPoolExecutor是JDK中的实现。2.ThreadPoolTaskExe......
  • ThreadPoolExecutor的使用
    线程池的基类是concurrent.futures模块中的Executor,Executor提供了两个子类,即ThreadPoolExecutor和ProcessPoolExecutor,其中ThreadPoolExecutor用于创建线程池,而ProcessPoolExecutor用于创建进程池。如果使用线程池/进程池来管理并发编程,那么只要将相应的task函数提......
  • ThreadPoolExecutor获取线程池中已经运行完的任务结果
    方法一:使用as_completed函数fromconcurrent.futuresimportThreadPoolExecutor,as_completedfromtimeimportsleepdefmethod(times):sleep(times)print('sleep{}secondes'.format(times))returntimespool=ThreadPoolExecutor(max_wor......
  • java如何使用线程池 new threadPoolExecutor()
    //使用线程池不返回结果脚本中使用的ClassB{privatestaticfinalExecutorServiceexecutor=newThreadPoolExecutor(4,10,3000L,TimeUnit.MILLISECONDS,newArrayBlockingQueue<>(500),newThreadFactoryBuilder().setNameFormat("publish-pool-%d").build(),(......
  • Java中创建线程的方式以及线程池创建的方式、推荐使用ThreadPoolExecutor以及示例
    场景Java中创建线程的方式有三种1、通过继承Thread类来创建线程定义一个线程类使其继承Thread类,并重写其中的run方法,run方法内部就是线程要完成的任务,因此run方法也被称为执行体,使用start方法来启动线程。2、通过实现Runanle接口来创建线程首先定义Runnable接口,并重写Runnable接口......
  • ThreadPoolTaskExecutor和ThreadPoolExecutor区别
    ThreadPoolExecutor是Java原生的线程池类,而ThreadPoolTaskExecutor是Spring推出的线程池工具  一、从核心参数看两者关系 ThreadPoolExecutor(java.util.concurrent) publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,......
  • 以阿里巴巴推荐的使用 ThreadPoolExecutor 构造函数自定义参数的方式来创建线程池
    importjava.util.concurrent.ArrayBlockingQueue;importjava.util.concurrent.ThreadPoolExecutor;importjava.util.concurrent.TimeUnit;publicclassThreadPoolExecutorDemo{privatestaticfinalintCORE_POOL_SIZE=5;privatestaticfinalintMAX......