首页 > 其他分享 >多线程篇(阻塞队列- DelayQueue)(持续更新迭代)

多线程篇(阻塞队列- DelayQueue)(持续更新迭代)

时间:2024-09-09 12:25:52浏览次数:10  
标签:Task 迭代 work delay start 线程 time DelayQueue 多线程

目录

一、简介

二、基本原理

四、代码示例

简单定时调度任务

多消费者定时调度任务

得出结论

四、应用场景


一、简介

DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在

其到期时才能从队列中取走。

这种队列是有序的,即队头对象的延迟到期时间最长。注意:不能将null元素放置到这种队列中。

方法

抛出异常

返回值

一直阻塞

超时退出

插入方法

add

offer

put

Offer(time)

移除方法

remove

poll

take

Poll(time)

检查方法

element

peek

N/A

N/A

二、基本原理

DelayQueue是一个没有边界BlockingQueue实现,加入其中的元素必需实现Delayed接口。当生产者线程

调用put之类的方法加入元素时,会触发Delayed接口中的compareTo方法进行排序,也就是说队列中元素的顺

序是按到期时间排序的,而非它们进入队列的顺序。

排在队列头部的元素是最早到期的,越往后到期时间越晚。

消费者线程查看队列头部的元素,注意是查看不是取出。然后调用元素的getDelay方法,如果此方法返回的值小

0或者等于0,则消费者线程会从队列中取出此元素,并进行处理。如果getDelay方法返回的值大于0,则消费者

线程wait返回的时间值后,再从队列头部取出元素,此时元素应该已经到期。

DelayQueue是Leader-Followr模式的变种,消费者线程处于等待状态时,总是等待最先到期的元素,而不是长

时间的等待。

消费者线程尽量把时间花在处理任务上,最小化空等的时间,以提高线程的利用效率。

以下通过队列及消费者线程状态变化大致说明一下DelayQueue的运行过程。

初始状态

因为队列是没有边界的,向队列中添加元素的线程不会阻塞,添加操作相对简单,所以此图不考虑向队列添加元素

的生产者线程。

假设现在共有三个消费者线程。

队列中的元素按到期时间排序,队列头部的元素2s以后到期。消费者线程1查看了头部元素以后,发现还需要2s

才到期,于是它进入等待状态,2s以后醒来,等待头部元素到期的线程称为Leader线程。

消费者线程2与消费者线程3处于待命状态,它们不等待队列中的非头部元素。当消费者线程1拿到对象5以后,会

向它们发送signal。

这个时候两个中的一个会结束待命状态而进入等待状态。

2S以后

消费者线程1已经拿到了对象5,从等待状态进入处理状态,处理它取到的对象5,同时向消费者线程2与消费者

线程3发送signal。

消费者线程2与消费者线程3会争抢领导权,这里是消费者线程2进入等待状态,成为Leader线程,等待2s以后对

象4到期。

而消费者线程3则继续处于待命状态。

此时队列中加入了一个新元素对象6,它10s后到期,排在队尾。

又2S以后

先看线程1,如果它已经结束了对象5的处理,则进入待命状态。如果还没有结束,则它继续处理对象5。

消费线程2取到对象4以后,也进入处理状态,同时给处于待命状态的消费线程3发送信号,消费线程3进入等待

状态,成为新的Leader。现在头部元素是新插入的对象7,因为它1s以后就过期,要早于其它所有元素,所以排到

了队列头部。

又1S后

一种不好的结果:

消费线程3一定正在处理对象7。消费线程1与消费线程2还没有处理完它们各自取得的对象,无法进入待命状

态,也更加进入不了等待状态。此时对象3马上要到期,那么如果它到期时没有消费者线程空下来,则它的处理一

定会延期。

可以想见,如果元素进入队列的速度很快,元素之间的到期时间相对集中,而处理每个到期元素的速度又比较慢的

话,则队列会越来越大,队列后边的元素延期处理的时间会越来越长。

另外一种好的结果:

消费线程1与消费线程2很快的完成对取出对象的处理,及时返回重新等待队列中的到期元素。一个处于等待状态

(Leader),对象3一到期就立刻处理。另一个则处于待命状态。这样,每一个对象都能在到期时被及时处理,不会

发生明显的延期。

所以,消费者线程的数量要够,处理任务的速度要快。否则,队列中的到期元素无法被及时取出并处理,造成任务

延期、队列元素堆积等情况。

四、代码示例

简单定时调度任务

package com.jht.scala.delayTask;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class DelayQueueTest {

    // 可以用来执行定时任务
    static BlockingQueue<MyTask> task = new DelayQueue<>();

    static class MyTask implements Delayed {

        String name;
        long runningTime;

        MyTask(String name,long rt) {
            this.name = name;
            this.runningTime= rt;
        }
        @Override
        public int compareTo(Delayed other) {
            long td = this.getDelay(TimeUnit.MILLISECONDS);
            long od = other.getDelay(TimeUnit.MILLISECONDS);
            return Long.compare(td,od);
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        @Override
        public String toString() {
            return name + "-" + runningTime;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        long now = System.currentTimeMillis();
        MyTask t1 = new MyTask("t1",now + 1000);
        MyTask t2 = new MyTask("t2",now + 2000);
        MyTask t3 = new MyTask("t3",now + 1500);
        MyTask t4 = new MyTask("t4",now + 3000);
        MyTask t5 = new MyTask("t5",now + 500);
        MyTask t6 = new MyTask("t6",now + 2500);

        task.put(t1);
        task.put(t2);
        task.put(t3);
        task.put(t4);
        task.put(t5);
        task.put(t6);

        for (int i = 0; i < 6; i++) {
            System.out.println(task.take());
        }
    }
}

多消费者定时调度任务

本例中先让主线程向DelayQueue添加 10 个任务,任务之间的启动间隔在1~2s之间,每个任务的执行时间固定为

2s,代码如下:

package com.jht.scala.delayTask;

import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

class DelayTask implements Delayed {
    private static long currentTime = System.currentTimeMillis();
    protected final String taskName;
    protected final int timeCost;
    protected final long scheduleTime;

    protected static final AtomicInteger taskCount = new AtomicInteger(0);

    // 定时任务之间的启动时间间隔在1~2s之间,timeCost表示处理此任务需要的时间,本示例中为2s
    public DelayTask(String taskName, int timeCost) {
        this.taskName = taskName;
        this.timeCost = timeCost;
        taskCount.incrementAndGet();
        currentTime += 1000 + (long) (Math.random() * 1000);
        scheduleTime = currentTime;
    }

    @Override
    public int compareTo(Delayed o) {
        return (int) (this.scheduleTime - ((DelayTask) o).scheduleTime);
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long expirationTime = scheduleTime - System.currentTimeMillis();
        return unit.convert(expirationTime, TimeUnit.MILLISECONDS);
    }

    public void execTask() {
        long startTime = System.currentTimeMillis();
        System.out.println("Task " + taskName + ": schedule_start_time=" + scheduleTime + ",real start time="
                + startTime + ",delay=" + (startTime - scheduleTime));
        try {
            Thread.sleep(timeCost);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class DelayTaskComsumer extends Thread {
    private final BlockingQueue<DelayTask> queue;

    public DelayTaskComsumer(BlockingQueue<DelayTask> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        DelayTask task = null;
        try {
            while (true) {
                task = queue.take();
                task.execTask();
                DelayTask.taskCount.decrementAndGet();
            }
        } catch (InterruptedException e) {
            System.out.println(getName() + " finished");
        }
    }
}

public class DelayQueueTest1 {

    public static void main(String[] args) {

        BlockingQueue<DelayTask> queue = new DelayQueue<DelayTask>();

        for (int i = 0; i < 10; i++) {
            try {
                queue.put(new DelayTask("work " + i, 2000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        ThreadGroup g = new ThreadGroup("Consumers");

        for (int i = 0; i < 1; i++) {
            new Thread(g, new DelayTaskComsumer(queue)).start();
        }

        while (DelayTask.taskCount.get() > 0) {
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        g.interrupt();
        System.out.println("Main thread finished");
    }
}

首先启动一个消费者线程。因为消费者线程处单个任务的时间为2s,而任务的调度间隔为1~2s。这种情况下,每

当消费者线程处理完一个任务,回头再从队列中新取任务时,新任务肯定延期了,无法按给定的时间调度任务。而

且越往后情况越严重。

运行代码看一下输出:

Task work 0: schedule_start_time=1554203579096,real start time=1554203579100,delay=4
Task work 1: schedule_start_time=1554203580931,real start time=1554203581101,delay=170
Task work 2: schedule_start_time=1554203582884,real start time=1554203583101,delay=217
Task work 3: schedule_start_time=1554203584660,real start time=1554203585101,delay=441
Task work 4: schedule_start_time=1554203586075,real start time=1554203587101,delay=1026
Task work 5: schedule_start_time=1554203587956,real start time=1554203589102,delay=1146
Task work 6: schedule_start_time=1554203589041,real start time=1554203591102,delay=2061
Task work 7: schedule_start_time=1554203590127,real start time=1554203593102,delay=2975
Task work 8: schedule_start_time=1554203591903,real start time=1554203595102,delay=3199
Task work 9: schedule_start_time=1554203593577,real start time=1554203597102,delay=3525
Main thread finished
Thread-0 finished

最后一个任务的延迟时间已经超过3.5s了。

再作一次测试,将消费者线程的个数调整为2,这时任务应该能按时启动,延迟应该很小,运行程序看一下结果:

Task work 0: schedule_start_time=1554204395427,real start time=1554204395430,delay=3
Task work 1: schedule_start_time=1554204396849,real start time=1554204396850,delay=1
Task work 2: schedule_start_time=1554204398050,real start time=1554204398051,delay=1
Task work 3: schedule_start_time=1554204399590,real start time=1554204399590,delay=0
Task work 4: schedule_start_time=1554204401289,real start time=1554204401289,delay=0
Task work 5: schedule_start_time=1554204402883,real start time=1554204402883,delay=0
Task work 6: schedule_start_time=1554204404663,real start time=1554204404664,delay=1
Task work 7: schedule_start_time=1554204406154,real start time=1554204406154,delay=0
Task work 8: schedule_start_time=1554204407991,real start time=1554204407991,delay=0
Task work 9: schedule_start_time=1554204409540,real start time=1554204409540,delay=0
Main thread finished
Thread-0 finished
Thread-2 finished

基本上按时启动,最大延迟为3毫秒,大部分都是0毫秒。

将消费者线程个数调整成3个,运行看一下结果:

Task work 0: schedule_start_time=1554204499695,real start time=1554204499698,delay=3
Task work 1: schedule_start_time=1554204501375,real start time=1554204501376,delay=1
Task work 2: schedule_start_time=1554204503370,real start time=1554204503371,delay=1
Task work 3: schedule_start_time=1554204504860,real start time=1554204504861,delay=1
Task work 4: schedule_start_time=1554204506419,real start time=1554204506420,delay=1
Task work 5: schedule_start_time=1554204508191,real start time=1554204508192,delay=1
Task work 6: schedule_start_time=1554204509495,real start time=1554204509496,delay=1
Task work 7: schedule_start_time=1554204510663,real start time=1554204510664,delay=1
Task work 8: schedule_start_time=1554204512598,real start time=1554204512598,delay=0
Task work 9: schedule_start_time=1554204514276,real start time=1554204514277,delay=1
Main thread finished
Thread-0 finished
Thread-2 finished
Thread-4 finished

大部分延迟时间变成1毫秒,情况好像还不如2个线程的情况。

将消费者线程数调整成5,运行看一下结果:

Task work 0: schedule_start_time=1554204635015,real start time=1554204635019,delay=4
Task work 1: schedule_start_time=1554204636856,real start time=1554204636857,delay=1
Task work 2: schedule_start_time=1554204637968,real start time=1554204637970,delay=2
Task work 3: schedule_start_time=1554204639758,real start time=1554204639759,delay=1
Task work 4: schedule_start_time=1554204641089,real start time=1554204641090,delay=1
Task work 5: schedule_start_time=1554204642879,real start time=1554204642880,delay=1
Task work 6: schedule_start_time=1554204643941,real start time=1554204643942,delay=1
Task work 7: schedule_start_time=1554204645006,real start time=1554204645007,delay=1
Task work 8: schedule_start_time=1554204646309,real start time=1554204646310,delay=1
Task work 9: schedule_start_time=1554204647537,real start time=1554204647538,delay=1
Thread-2 finished
Thread-0 finished
Main thread finished
Thread-8 finished
Thread-4 finished
Thread-6 finished

与3个消费者线程的情况差不多。

得出结论

最优的消费者线程的个数与任务启动的时间间隔好像存在这样的关系:

单个任务处理时间的最大值 / 相邻任务的启动时间最小间隔 = 最优线程数,

如果最优线程数是小数,则取整数后加1,比如1.3的话,那么最优线程数应该是2。

本例中,单个任务处理时间的最大值固定为2s。

相邻任务的启动时间最小间隔为1s。

则消费者线程数为2/1=2。

如果消费者线程数小于此值,则来不及处理到期的任务。

如果大于此值,线程太多,在调度、同步上花更多的时间,无益改善性能。

四、应用场景

  1. 淘宝订单业务:下单之后如果三十分钟之内没有付款就自动取消订单。
  2. 饿了吗订餐通知:下单成功后60s之后给用户发送短信通知。
  3. 关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。
  4. 缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。
  5. 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求等。

标签:Task,迭代,work,delay,start,线程,time,DelayQueue,多线程
From: https://blog.csdn.net/qq_51226710/article/details/142047062

相关文章

  • 多线程篇(阻塞队列- PriorityBlockingQueue)(持续更新迭代)
    目录一、简介二、类图三、源码解析1.字段讲解2.构造方法3.入队方法put浮调整比较器方法的实现入队图解4.出队方法takedequeue下沉调整比较器方法的实现出队图解四、总结一、简介PriorityBlockingQueue队列是JDK1.5的时候出来的一个阻塞队列。但是该队......
  • 如何实现标准库般强大的 C++ Vector?:从动态扩容到移动语义到迭代器全覆盖
    在C++中,std::vector是最常用的动态数组容器。它具有高效的内存管理、动态扩容、随机访问等特点。在这篇博客中,我们将从零开始,实现一个功能强大、灵活、并且具有高性能的Vector类,具备std::vector的大部分功能,包括动态扩容、迭代器、模板支持、随机访问等,尽可能模仿C+......
  • 【Java学习】配置文件&日志&多线程
    一、配置文件1、概述在企业开发过程中,我们习惯把一些需要灵活配置的数据放在一些文本文件中,而不是在Java代码写死。我们把这种存放程序配置信息的文件,统称为配置文件。配置文件一般要求有明确的格式,以方便读写操作。2、PropertiesProperties是一个Map集合(键值对集合),但是一......
  • python装饰器\迭代器\生成器
    1.迭代器\生成器#斐波那契数列deffib(n):"""生成斐波那契数列的前n个数字。参数:n--要生成的斐波那契数列的长度返回:生成器,产出斐波那契数列的前n个数字。"""a,b=0,1#初始化斐波那契数列的前两个数字content=......
  • 多线程轮流打印字符
    要求:使用多个线程轮流打印字符方法1。无锁自旋,一般在多核机器并且临界区耗时很短的话可以尝试自旋publicclassprintABC{staticLoggerlog=newLogger(Logger.LogLevel.DEBUG,printABC.class);staticvolatileintcur=0;publicstaticvoidmain(St......
  • 9-迭代器
    迭代器ArrayList<String>list=newArrayList<>();list.add("aa");list.add("bb");list.add("cc");list.add("dd");list.add("ee");......
  • C# 多线程的学习大纲
    C#多线程编程是开发高效并发应用的核心技术之一。以下是一个详细的学习大纲,涵盖了C#多线程编程的各个方面,从基础概念到高级主题。学习大纲1.多线程基础知识1.1什么是线程?定义线程及其在操作系统中的角色进程与线程的区别1.2C#中的多线程基础Thread类的基本使......
  • 多线程篇(阻塞队列- BlockingQueue)(持续更新迭代)
    目录一、了解什么是阻塞队列之前,需要先知道队列1.Queue(接口)二、阻塞队列1.前言2.什么是阻塞队列3.Java里面常见的阻塞队列三、BlockingQueue(接口)1.前言2.简介3.特性3.1.队列类型3.2.队列数据结构2.简介4.核心功能入队(放入数据)出队(取出数据)总结四......
  • [Redis]Redis到底是单线程还是多线程程序?
    概述这里我们先给出问题的全面回答:Redis到底是多线程还是单线程程序要看是针对哪个功能而言,对于核心业务功能部分(命令操作处理数据),Redis是单线程的,主要是指Redis的网络IO和键值对读写是由一个线程来完成的,这也是Redis对外提供键值存储服务的主要流程,所以一般我们认为Red......
  • Java多线程中常见死锁问题及解决方案
    在编写Java多线程代码的时候,很难避免会出现线程安全问题,在线程安全问题中也有一个很常见的现象就是死锁现象。今天我们就来聊一聊Java中的死锁问题,以及如何避免死锁问题。本次知识点讲解建立在大家已经知道“锁”......