首页 > 编程语言 >解读 DelayQueue 源码:探究其精妙的设计架构与实现细节

解读 DelayQueue 源码:探究其精妙的设计架构与实现细节

时间:2024-11-17 16:15:55浏览次数:3  
标签:精妙 队列 lock 元素 任务 源码 线程 DelayQueue

一、简介

DelayQueue 是 JUC 包(java.util.concurrent)为我们提供的延迟队列,用于实现延时任务比如订单下单 15 分钟未支付直接取消。它是 BlockingQueue 的一种,底层是一个基于 PriorityQueue 实现的一个无界队列,是线程安全的

BlockingQueue 的实现类

DelayQueue 中存放的元素必须实现 Delayed 接口,并且需要重写 getDelay()方法(计算是否到期)。

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

默认情况下, DelayQueue 会按照到期时间升序编排任务。只有当元素过期时(getDelay()方法返回值小于等于 0),才能从队列中取出。

二、DelayQueue 发展史

  • DelayQueue 最早是在 Java 5 中引入的,作为 java.util.concurrent 包中的一部分,用于支持基于时间的任务调度和缓存过期删除等场景,该版本仅仅支持延迟功能的实现,还未解决线程安全问题。

  • 在 Java 6 中,DelayQueue 的实现进行了优化,通过使用 ReentrantLockCondition 解决线程安全及线程间交互的效率,提高了其性能和可靠性。

  • 在 Java 7 中,DelayQueue 的实现进行了进一步的优化,通过使用 CAS 操作实现元素的添加和移除操作,提高了其并发操作性能。

  • 在 Java 8 中,DelayQueue 的实现没有进行重大变化,但是在 java.time 包中引入了新的时间类,如 DurationInstant,使得使用 DelayQueue 进行基于时间的调度更加方便和灵活。

  • 在 Java 9 中,DelayQueue 的实现进行了一些微小的改进,主要是对代码进行了一些优化和精简。

总的来说,DelayQueue 的发展史主要是通过优化其实现方式和提高其性能和可靠性,使其更加适用于基于时间的调度和缓存过期删除等场景。


三、DelayQueue 常见使用场景示例

我们这里希望任务可以按照我们预期的时间执行,例如提交 3 个任务,分别要求 1s、2s、3s 后执行,即使是乱序添加,1s 后要求 1s 执行的任务会准时执行。

延迟任务

对此我们可以使用 DelayQueue 来实现,所以我们首先需要继承 Delayed 实现 DelayedTask,实现 getDelay 方法以及优先级比较 compareTo

/**
 * 延迟任务
 */
public class DelayedTask implements Delayed {
    /**
     * 任务到期时间
     */
    private long executeTime;
    /**
     * 任务
     */
    private Runnable task;

    public DelayedTask(long delay, Runnable task) {
        this.executeTime = System.currentTimeMillis() + delay;
        this.task = task;
    }

    /**
     * 查看当前任务还有多久到期
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    /**
     * 延迟队列需要到期时间升序入队,所以我们需要实现compareTo进行到期时间比较
     * @param o
     * @return
     */
    @Override
    public int compareTo(Delayed o) {
        return Long.compare(this.executeTime, ((DelayedTask) o).executeTime);
    }

    public void execute() {
        task.run();
    }
}

完成任务的封装之后,使用就很简单了,设置好多久到期然后将任务提交到延迟队列中即可。

// 创建延迟队列,并添加任务
DelayQueue < DelayedTask > delayQueue = new DelayQueue < > ();

//分别添加1s、2s、3s到期的任务
delayQueue.add(new DelayedTask(2000, () -> System.out.println("Task 2")));
delayQueue.add(new DelayedTask(1000, () -> System.out.println("Task 1")));
delayQueue.add(new DelayedTask(3000, () -> System.out.println("Task 3")));

// 取出任务并执行
while (!delayQueue.isEmpty()) {
  //阻塞获取最先到期的任务
  DelayedTask task = delayQueue.take();
  if (task != null) {
    task.execute();
  }
}

从输出结果可以看出,即使笔者先提到 2s 到期的任务,1s 到期的任务 Task1 还是优先执行的。

Task 1

Task 2

Task 3


四、DelayQueue 源码解析

这里以 JDK1.8 为例,分析一下 DelayQueue 的底层核心源码。

DelayQueue 的类定义如下:

public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E>{
  //...
}

DelayQueue 继承了 AbstractQueue 类,实现了 BlockingQueue 接口。

DelayQueue类图 

1、核心成员变量

DelayQueue 的 4 个核心成员变量如下:

//可重入锁,实现线程安全的关键
private final transient ReentrantLock lock = new ReentrantLock();
//延迟队列底层存储数据的集合,确保元素按照到期时间升序排列
private final PriorityQueue<E> q = new PriorityQueue<E>();

//指向准备执行优先级最高的线程
private Thread leader = null;
//实现多线程之间等待唤醒的交互
private final Condition available = lock.newCondition();
  • lock : 我们都知道 DelayQueue 存取是线程安全的,所以为了保证存取元素时线程安全,我们就需要在存取时上锁,而 DelayQueue 就是基于 ReentrantLock 独占锁确保存取操作的线程安全。

  • q : 延迟队列要求元素按照到期时间进行升序排列,所以元素添加时势必需要进行优先级排序,所以 DelayQueue 底层元素的存取都是通过这个优先队列 PriorityQueue 的成员变量 q 来管理的。

  • leader : 延迟队列的任务只有到期之后才会执行,对于没有到期的任务只有等待,为了确保优先级最高的任务到期后可以即刻被执行,设计者就用 leader 来管理延迟任务,只有 leader 所指向的线程才具备定时等待任务到期执行的权限,而其他那些优先级低的任务只能无限期等待,直到 leader 线程执行完手头的延迟任务后唤醒它。

  • available : 上文讲述 leader 线程时提到的等待唤醒操作的交互就是通过 available 实现的,假如线程 1 尝试在空的 DelayQueue 获取任务时,available 就会将其放入等待队列中。直到有一个线程添加一个延迟任务后通过 availablesignal 方法将其唤醒。

2、构造方法

相较于其他的并发容器,延迟队列的构造方法比较简单,它只有两个构造方法,因为所有成员变量在类加载时都已经初始完成了,所以默认构造方法什么也没做。还有一个传入 Collection 对象的构造方法,它会将调用 addAll()方法将集合元素存到优先队列 q 中。

public DelayQueue() {}

public DelayQueue(Collection<? extends E> c) {
    this.addAll(c);
}

3、添加元素

DelayQueue 添加元素的方法无论是 addput 还是 offer,本质上就是调用一下 offer ,所以了解延迟队列的添加逻辑我们只需阅读 offer 方法即可。

offer 方法的整体逻辑为:

  1. 尝试获取 lock

  2. 如果上锁成功,则调 qoffer 方法将元素存放到优先队列中。

  3. 调用 peek 方法看看当前队首元素是否就是本次入队的元素,如果是则说明当前这个元素是即将到期的任务(即优先级最高的元素),于是将 leader 设置为空,通知因为队列为空时调用 take 等方法导致阻塞的线程来争抢元素。

  4. 上述步骤执行完成,释放 lock

  5. 返回 true。

源码如下,笔者已详细注释,读者可自行参阅:

public boolean offer(E e) {
    //尝试获取lock
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //如果上锁成功,则调q的offer方法将元素存放到优先队列中
        q.offer(e);
        //调用peek方法看看当前队首元素是否就是本次入队的元素,如果是则说明当前这个元素是即将到期的任务(即优先级最高的元素)
        if (q.peek() == e) {
            //将leader设置为空,通知调用取元素方法而阻塞的线程来争抢这个任务
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        //上述步骤执行完成,释放lock
        lock.unlock();
    }
}

4、查看元素

上文获取元素时都会调用到 peek 方法,peek 顾名思义仅仅窥探一下队列中的元素,它的步骤就 4 步:

  1. 上锁。

  2. 调用优先队列 q 的 peek 方法查看索引 0 位置的元素。

  3. 释放锁。

  4. 将元素返回出去。

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return q.peek();
    } finally {
        lock.unlock();
    }
}

5、获取元素

DelayQueue 中获取元素的方式分为阻塞式和非阻塞式,先来看看逻辑比较复杂的阻塞式获取元素方法 take,为了让读者可以更直观的了解阻塞式获取元素的全流程,笔者将以 3 个线程并发获取元素为例讲述 take 的工作流程。

1、首先, 3 个线程会尝试获取可重入锁 lock,假设我们现在有 3 个线程分别是 t1、t2、t3,随后 t1 得到了锁,而 t2、t3 没有抢到锁,故将这两个线程存入等待队列中。

2、紧接着 t1 开始进行元素获取的逻辑。

3、线程 t1 首先会查看 DelayQueue 队列首元素是否为空。

4、如果元素为空,则说明当前队列没有任何元素,故 t1 就会被阻塞存到 conditionWaiter 这个队列中。

注意,调用 await 之后 t1 就会释放 lcok 锁,假如 DelayQueue 持续为空,那么 t2、t3 也会像 t1 一样执行相同的逻辑并进入 conditionWaiter 队列中。

如果元素不为空,则判断当前任务是否到期,如果元素到期,则直接返回出去。如果元素未到期,则判断当前 leader 线程(DelayQueue 中唯一一个可以等待并获取元素的线程引用)是否为空,若不为空,则说明当前 leader 正在等待执行一个优先级比当前元素还高的元素到期,故当前线程 t1 只能调用 await 进入无限期等待,等到 leader 取得元素后唤醒。反之,若 leader 线程为空,则将当前线程设置为 leader 并进入有限期等待,到期后取出元素并返回。

自此我们阻塞式获取元素的逻辑都已完成后,源码如下,读者可自行参阅:

public E take() throws InterruptedException {
    // 尝试获取可重入锁,将底层AQS的state设置为1,并设置为独占锁
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            //查看队列第一个元素
            E first = q.peek();
            //若为空,则将当前线程放入ConditionObject的等待队列中,并将底层AQS的state设置为0,表示释放锁并进入无限期等待
            if (first == null)
                available.await();
            else {
                //若元素不为空,则查看当前元素多久到期
                long delay = first.getDelay(NANOSECONDS);
                //如果小于0则说明已到期直接返回出去
                if (delay <= 0)
                    return q.poll();
                //如果大于0则说明任务还没到期,首先需要释放对这个元素的引用
                first = null; // don't retain ref while waiting
                //判断leader是否为空,如果不为空,则说明正有线程作为leader并等待一个任务到期,则当前线程进入无限期等待
                if (leader != null)
                    available.await();
                else {
                    //反之将我们的线程成为leader
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        //并进入有限期等待
                        available.awaitNanos(delay);
                    } finally {
                        //等待任务到期时,释放leader引用,进入下一次循环将任务return出去
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 收尾逻辑:当leader为null,并且队列中有任务时,唤醒等待的获取元素的线程。
        if (leader == null && q.peek() != null)
            available.signal();
        //释放锁
        lock.unlock();
    }
}

我们再来看看非阻塞的获取元素方法 poll ,逻辑比较简单,整体步骤如下:

  1. 尝试获取可重入锁。

  2. 查看队列第一个元素,判断元素是否为空。

  3. 若元素为空,或者元素未到期,则直接返回空。

  4. 若元素不为空且到期了,直接调用 poll 返回出去。

  5. 释放可重入锁 lock

源码如下,读者可自行参阅源码及注释:

public E poll() {
    //尝试获取可重入锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //查看队列第一个元素,判断元素是否为空
        E first = q.peek();

        //若元素为空,或者元素未到期,则直接返回空
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            //若元素不为空且到期了,直接调用poll返回出去
            return q.poll();
    } finally {
        //释放可重入锁lock
        lock.unlock();
    }
}

五、DelayQueue 常见面试题

1、DelayQueue 的实现原理是什么?

DelayQueue 底层是使用优先队列 PriorityQueue 来存储元素,而 PriorityQueue 采用二叉小顶堆的思想确保值小的元素排在最前面,这就使得 DelayQueue 对于延迟任务优先级的管理就变得十分方便了。同时 DelayQueue 为了保证线程安全还用到了可重入锁 ReentrantLock,确保单位时间内只有一个线程可以操作延迟队列。最后,为了实现多线程之间等待和唤醒的交互效率,DelayQueue 还用到了 Condition,通过 Conditionawaitsignal 方法完成多线程之间的等待唤醒。

2、DelayQueue 的实现是否线程安全?

DelayQueue 的实现是线程安全的,它通过 ReentrantLock 实现了互斥访问和 Condition 实现了线程间的等待和唤醒操作,可以保证多线程环境下的安全性和可靠性。

3、DelayQueue 的使用场景有哪些?

DelayQueue 通常用于实现定时任务调度和缓存过期删除等场景。

  1. 在定时任务调度:需要将需要执行的任务封装成延迟任务对象,并将其添加到 DelayQueue 中,DelayQueue 会自动按照剩余延迟时间进行升序排序(默认情况),以保证任务能够按照时间先后顺序执行。        

  2. 对于缓存过期:在数据被缓存到内存之后,我们可以将缓存的 key 封装成一个延迟的删除任务,并将其添加到 DelayQueue 中,当数据过期时,拿到这个任务的 key,将这个 key 从内存中移除。

4、DelayQueue 中 Delayed 接口的作用是什么?

Delayed 接口定义了元素的剩余延迟时间(getDelay)和元素之间的比较规则(该接口继承了 Comparable 接口)。若希望元素能够存放到 DelayQueue 中,就必须实现 Delayed 接口的 getDelay() 方法和 compareTo() 方法,否则 DelayQueue 无法得知当前任务剩余时长和任务优先级的比较。

5、DelayQueue 和 Timer/TimerTask 的区别是什么?

  1. DelayQueueTimer/TimerTask 都可以用于实现定时任务调度,但是它们的实现方式不同。

  2. DelayQueue 是基于优先级队列和堆排序算法实现的,可以实现多个任务按照时间先后顺序执行

  3. Timer/TimerTask 是基于单线程实现的,只能按照任务的执行顺序依次执行,如果某个任务执行时间过长,会影响其他任务的执行。

  4. 另外,DelayQueue 还支持动态添加和移除任务,而 Timer/TimerTask 只能在创建时指定任务。

6、DelayQueue有哪些核心方法?它们的作用是什么?

  1. offer(E e):将指定的元素插入到此队列中,在成功时返回true。如果插入的元素是队列中延迟时间最小的(即最先到期的),则会唤醒所有等待的线程。

  2. poll():获取并移除此队列的头,如果此队列为空,或者队列头部的元素尚未到期,则返回null。

  3. take():获取并移除此队列的头部,在元素到期时返回该元素。如果队列为空,则当前线程会被阻塞,直到有元素到期并被移除。

  4. peek():获取队列头部的元素,但不移除它。如果队列为空,则返回null。

标签:精妙,队列,lock,元素,任务,源码,线程,DelayQueue
From: https://blog.csdn.net/Yaml4/article/details/143758418

相关文章

  • flask火车购票系统(毕设源码+论文)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容一、选题背景关于火车购票系统的研究,现有研究多侧重于传统购票方式的改进以及购票系统的基本功能实现。在国内外,随着信息技术的发展,购票系统已逐渐......
  • flask基于SpringBoot的私人物品管理平台(毕设源码+论文)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容一、选题背景随着人们生活水平的提高,私人物品的数量和种类不断增加,如何有效地管理私人物品成为一个重要问题。关于私人物品管理平台的研究,现有研究......
  • Golang的GMP调度模型与源码解析
    0、引言我们知道,这当代操作系统中,多线程和多进程模型被广泛的使用以提高系统的并发效率。随着互联网不断的发展,面对如今的高并发场景,为每个任务都创建一个线程是不现实的,使用线程则需要系统不断的在用户态和内核态之间不断的切换,引起不必要的损耗,于是引入了协程。协程存在于用户......
  • 基于SpringBoot+Vue实现剧本杀服务平台【源码+LW+PPT+部署】
    作者简介:Java领域优质创作者、CSDN博客专家、CSDN内容合伙人、掘金特邀作者、阿里云博客专家、51CTO特邀作者、多年架构师设计经验、多年校企合作经验,被多个学校常年聘为校外企业导师,指导学生毕业设计并参与学生毕业答辩指导,有较为丰富的相关经验。期待与各位高校教师、企业......
  • 基于微信小程序的学生在线投票系统小程序app项目(源码+lw+部署文档+讲解等)
    项目整体介绍基于微信小程序的学生在线投票系统小程序为校园投票活动提供了便捷的解决方案。它具有简洁易用的界面,方便学生快速上手。活动组织者可以轻松创建投票项目,设定投票主题、选项、投票规则,如是否允许多选、投票起止时间等。在投票过程中,系统能实时统计票数,通过直......
  • flask基于SpringBoot的模具管理(毕设源码+论文)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容一、选题背景关于模具管理的研究,现有研究主要以模具的生产技术和工艺改进为主,如在模具质量控制方面对检测方法和加工工艺的研究等1。专门针对模具......
  • 云财务 财务软件源码 SaaS版
    云财务是一款基于云技术的财务管理软件,通过SaaS模式提供,您可以轻松地在任何时间、任何地点访问到您的财务信息。拥有精准的记账功能,您可以随时记录和跟踪您的收入和支出,有效控制财务状况。云财务具有多项强大的功能和特点,包括但不限于:完善的财务分析:云财务提供多种财务报表和......
  • 【风云毕业设计推荐】基于Spring Boot的企业员工管理的设计与实现 【附源码+数据库+部
    ✍✍计算机编程指导师⭐⭐个人介绍:自己非常喜欢研究技术问题!专业做Java、Python、小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。⛽⛽实战项目:有源码或者技术上的问题欢迎在评论区一起讨论交流!⚡⚡Java实战|SpringBoot/SSMPython实战项目|Django微信小程......
  • 【计算机毕业设计选题推荐】基于spring boot的交通旅游订票系统的设计与实现 【附源码
    ✍✍计算机编程指导师⭐⭐个人介绍:自己非常喜欢研究技术问题!专业做Java、Python、小程序、安卓、大数据、爬虫、Golang、大屏等实战项目。⛽⛽实战项目:有源码或者技术上的问题欢迎在评论区一起讨论交流!⚡⚡Java实战|SpringBoot/SSMPython实战项目|Django微信小程......
  • (赠源码)基于Java Web+springboot+MySQL的Unishare闲置物品共享系统研究22945-计算机毕
    摘 要随着科技的快速迭代和人们环保意识的提高,共享经济逐渐成为社会发展的重要趋势。然而,在现实生活中,人们的消费水平大大提高,存在很多闲置物品无人问津,造成了一定的资源浪费和环境污染。与此同时,许多人需要使用这些闲置物品,却不知道如何获取。这种情况下,基于JavaWeb的Uni......