首页 > 其他分享 >面试官:如果不允许线程池丢弃任务,应该选择哪个拒绝策略?

面试官:如果不允许线程池丢弃任务,应该选择哪个拒绝策略?

时间:2024-05-30 15:34:40浏览次数:12  
标签:CallerRunsPolicy 面试官 策略 丢弃 队列 任务 线程 ThreadPoolExecutor

线程池的拒绝策略有哪些?

如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolExecutor 定义一些策略:

  • ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException来拒绝新任务的处理。
  • ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果你的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。
  • ThreadPoolExecutor.DiscardPolicy:不处理新任务,直接丢弃掉。
  • ThreadPoolExecutor.DiscardOldestPolicy:此策略将丢弃最早的未处理的任务请求。

举个例子:Spring 通过 ThreadPoolTaskExecutor 或者我们直接通过 ThreadPoolExecutor 的构造函数创建线程池的时候,当我们不指定 RejectedExecutionHandler 拒绝策略来配置线程池的时候,默认使用的是 AbortPolicy。在这种拒绝策略下,如果队列满了,ThreadPoolExecutor 将抛出 RejectedExecutionException 异常来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。如果不想丢弃任务的话,可以使用CallerRunsPolicyCallerRunsPolicy 和其他的几个策略不同,它既不会抛弃任务,也不会抛出异常,而是将任务回退给调用者,使用调用者的线程来执行任务。

public static class CallerRunsPolicy implements RejectedExecutionHandler {

        public CallerRunsPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                // 直接主线程执行,而不是线程池中的线程执行
                r.run();
            }
        }
    }

如果不允许丢弃任务任务,应该选择哪个拒绝策略?

根据上面对线程池拒绝策略的介绍,相信大家很容易能够得出答案是:CallerRunsPolicy

这里我们再来结合CallerRunsPolicy 的源码来看看:

public static class CallerRunsPolicy implements RejectedExecutionHandler {

        public CallerRunsPolicy() { }


        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            //只要当前程序没有关闭,就用执行execute方法的线程执行该任务
            if (!e.isShutdown()) {

                r.run();
            }
        }
    }

从源码可以看出,只要当前程序不关闭就会使用执行execute方法的线程执行该任务。

CallerRunsPolicy 拒绝策略有什么风险?如何解决?

我们上面也提到了:如果想要保证任何一个任务请求都要被执行的话,那选择 CallerRunsPolicy 拒绝策略更合适一些。

不过,如果走到CallerRunsPolicy的任务是个非常耗时的任务,且处理提交任务的线程是主线程,可能会导致主线程阻塞,影响程序的正常运行。

这里简单举一个例子,该线程池限定了最大线程数为 2,阻塞队列大小为 1(这意味着第 4 个任务就会走到拒绝策略),ThreadUtil为 Hutool 提供的工具类:

public class ThreadPoolTest {

    private static final Logger log = LoggerFactory.getLogger(ThreadPoolTest.class);

    public static void main(String[] args) {
        // 创建一个线程池,核心线程数为1,最大线程数为2
        // 当线程数大于核心线程数时,多余的空闲线程存活的最长时间为60秒,
        // 任务队列为容量为1的ArrayBlockingQueue,饱和策略为CallerRunsPolicy。
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,
                2,
                60,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1),
                new ThreadPoolExecutor.CallerRunsPolicy());

        // 提交第一个任务,由核心线程执行
        threadPoolExecutor.execute(() -> {
            log.info("核心线程执行第一个任务");
            ThreadUtil.sleep(1, TimeUnit.MINUTES);
        });

        // 提交第二个任务,由于核心线程被占用,任务将进入队列等待
        threadPoolExecutor.execute(() -> {
            log.info("非核心线程处理入队的第二个任务");
            ThreadUtil.sleep(1, TimeUnit.MINUTES);
        });

        // 提交第三个任务,由于核心线程被占用且队列已满,创建非核心线程处理
        threadPoolExecutor.execute(() -> {
            log.info("非核心线程处理第三个任务");
            ThreadUtil.sleep(1, TimeUnit.MINUTES);
        });

        // 提交第四个任务,由于核心线程和非核心线程都被占用,队列也满了,根据CallerRunsPolicy策略,任务将由提交任务的线程(即主线程)来执行
        threadPoolExecutor.execute(() -> {
            log.info("主线程处理第四个任务");
            ThreadUtil.sleep(2, TimeUnit.MINUTES);
        });

        // 提交第五个任务,主线程被第四个任务卡住,该任务必须等到主线程执行完才能提交
        threadPoolExecutor.execute(() -> {
            log.info("核心线程执行第五个任务");
        });

        // 关闭线程池
        threadPoolExecutor.shutdown();
    }
}

输出:

18:19:48.203 INFO  [pool-1-thread-1] c.j.concurrent.ThreadPoolTest - 核心线程执行第一个任务
18:19:48.203 INFO  [pool-1-thread-2] c.j.concurrent.ThreadPoolTest - 非核心线程处理第三个任务
18:19:48.203 INFO  [main] c.j.concurrent.ThreadPoolTest - 主线程处理第四个任务
18:20:48.212 INFO  [pool-1-thread-2] c.j.concurrent.ThreadPoolTest - 非核心线程处理入队的第二个任务
18:21:48.219 INFO  [pool-1-thread-2] c.j.concurrent.ThreadPoolTest - 核心线程执行第五个任务

从输出结果可以看出,因为CallerRunsPolicy这个拒绝策略,导致耗时的任务用了主线程执行,导致线程池阻塞,进而导致后续任务无法及时执行,严重的情况下很可能导致 OOM。

我们从问题的本质入手,调用者采用CallerRunsPolicy是希望所有的任务都能够被执行,暂时无法处理的任务又被保存在阻塞队列BlockingQueue中。这样的话,在内存允许的情况下,我们可以增加阻塞队列BlockingQueue的大小并调整堆内存以容纳更多的任务,确保任务能够被准确执行。

为了充分利用 CPU,我们还可以调整线程池的maximumPoolSize (最大线程数)参数,这样可以提高任务处理速度,避免累计在 BlockingQueue的任务过多导致内存用完。

调整阻塞队列大小和最大线程数

如果服务器资源以达到可利用的极限,这就意味我们要在设计策略上改变线程池的调度了,我们都知道,导致主线程卡死的本质就是因为我们不希望任何一个任务被丢弃。换个思路,有没有办法既能保证任务不被丢弃且在服务器有余力时及时处理呢?

这里提供的一种任务持久化的思路,这里所谓的任务持久化,包括但不限于:

  1. 设计一张任务表间任务存储到 MySQL 数据库中。
  2. Redis 缓存任务。
  3. 将任务提交到消息队列中。

这里以方案一为例,简单介绍一下实现逻辑:

  1. 实现RejectedExecutionHandler接口自定义拒绝策略,自定义拒绝策略负责将线程池暂时无法处理(此时阻塞队列已满)的任务入库(保存到 MySQL 中)。注意:线程池暂时无法处理的任务会先被放在阻塞队列中,阻塞队列满了才会触发拒绝策略。
  2. 继承BlockingQueue实现一个混合式阻塞队列,该队列包含 JDK 自带的ArrayBlockingQueue。另外,该混合式阻塞队列需要修改取任务处理的逻辑,也就是重写take()方法,取任务时优先从数据库中读取最早的任务,数据库中无任务时再从 ArrayBlockingQueue中去取任务。

将一部分任务保存到MySQL中

整个实现逻辑还是比较简单的,核心在于自定义拒绝策略和阻塞队列。如此一来,一旦我们的线程池中线程以达到满载时,我们就可以通过拒绝策略将最新任务持久化到 MySQL 数据库中,等到线程池有了有余力处理所有任务时,让其优先处理数据库中的任务以避免"饥饿"问题。

当然,对于这个问题,我们也可以参考其他主流框架的做法,以 Netty 为例,它的拒绝策略则是直接创建一个线程池以外的线程处理这些任务,为了保证任务的实时处理,这种做法可能需要良好的硬件设备且临时创建的线程无法做到准确的监控:

private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
    NewThreadRunsPolicy() {
        super();
    }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            //创建一个临时线程处理任务
            final Thread t = new Thread(r, "Temporary task executor");
            t.start();
        } catch (Throwable e) {
            throw new RejectedExecutionException(
                    "Failed to start a new thread", e);
        }
    }
}

ActiveMQ 则是尝试在指定的时效内尽可能的争取将任务入队,以保证最大交付:

new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
                    try {
                        //限时阻塞等待,实现尽可能交付
                        executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
                    } catch (InterruptedException e) {
                        throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
                    }
                    throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
                }
            });

标签:CallerRunsPolicy,面试官,策略,丢弃,队列,任务,线程,ThreadPoolExecutor
From: https://www.cnblogs.com/javaguide/p/18222455

相关文章

  • 面试官:说说Netty的核心组件?
    Netty核心组件是指Netty在执行过程中所涉及到的重要概念,这些核心组件共同组成了Netty框架,使Netty框架能够正常的运行。Netty核心组件包含以下内容:启动器Bootstrap/ServerBootstrap事件循环器EventLoopGroup/EventLoop通道Channel通道处理器ChannelHandler通道......
  • 操作系统复习:进程和线程的理解串记
    进程和线程        我是一个ikun,我坐着不动(静态)就是ikun程序,我开始执行任务唱、跳、rap和打篮球(动态)就是ikun进程。在ikun进程中,我们的孩子就是线程。        为了ikun进程们能公平地抢坤坤哥哥下的蛋(临界资源),坤坤哥哥(CPU)会安排(进程调度)时间片给每个ikun进程......
  • 线程简述:协程、抢占式、sleep、wait、interrupt,优雅中断线程,线程通信等
    思维导图在此:java线程简述-CSDN博客1、线程与协程协程-->线程-->进程,协程最小协程:用户态,go语言线程:用户态、内核态都有。cpu调度的最小单位。是工人,从进程获取资源,多个线程共享进程的资源。进程:内核态。操作系统调度资源的最小单位。是资源管家。2、调度机制协同式。......
  • 在 Rust 多线程应用程序中锁定 Mutex 时发生死锁
    我正在开发一个Rust应用程序,其中有一个与PacketManager交互的BusDevice。在多线程环境中尝试锁定一个Mutex时,我遇到了死锁。应用程序被卡在锁定Mutex的那一行,再也无法继续。详细描述:在我的Rust应用程序中,我有一个使用PacketManager发送确认数据包的BusDevice。BusD......
  • 给师妹写的《Java并发编程之线程池十八问》被表扬啦!
    写在开头  之前给一个大四正在找工作的学妹发了自己总结的关于Java并发中线程池的面试题集,总共18题,将之取名为《Java并发编程之线程池十八问》,今天聊天时受了学妹的夸赞,心里很开心,毕竟自己整理的东西对别人起到了一点帮助,记录一下!Java并发编程之线程池十八问  经过之前......
  • 十四.吊打面试官系列-JVM优化-JVM垃圾回算法详解
    前言说到JVM不可避免的会聊到垃圾回收器,(GarbageCollection,简称GC)。它负责跟踪哪些对象仍然在使用,哪些对象已经不再被引用,并释放那些不再被引用的对象所占用的内存空间。这一过程涉及到对象的标记、清除、压缩等多个阶段,每个阶段都有其特定的算法和策略。随着Java技术的不......
  • 面试官:前端加密怎么做?这,这,这不是后端的活儿吗?
    目录 一、Base64编码二、哈希算法三、对称加密(AES/DES) 四、非对称加密(RSA) 五、加盐六、WebCryptographtAPI七、总结    随着信息和数据安全重要性的日益凸显,如何保证信息数据在传输的过程中的安全成为开发者重点关注的内容。前端加密通常是指在浏览器中......
  • 【Rust】——使用消息在线程之间传递数据
    ......
  • C++ 跨线程 传递指针
    目录在C++中跨线程传递指针时,需要注意线程安全和生命周期管理的问题。以下是一些常见的方法,用于在C++中安全地跨线程传递指针:使用智能指针和线程安全队列结合使用std::shared_ptr和线程安全的队列(如std::queue配合互斥锁)是一种常见的方法。#include<iostream>#include<t......
  • C++ 线程同步condition_variable的使用
    一,condition_variable使用步骤创建一个scondition_variable对象。创建一个互斥量对象mutex。创建两个线程:等待线程和唤醒线程。在等待线程中,使用unique_lock锁定互斥量,并调用wait函数进入等待状态。在唤醒线程lock_guard锁定互斥量,并调用notify_one或notify_all函数唤醒等......