首页 > 其他分享 >多线程(八):阻塞队列 & 生产者消费者模型

多线程(八):阻塞队列 & 生产者消费者模型

时间:2024-10-22 08:49:30浏览次数:3  
标签:队列 阻塞 生产者 线程 new 服务器 多线程

目录

1. 阻塞队列

 2. 生产者消费者模型

2.1 场景举例

2.2 重要优势

2.2.1 解耦合

 2.2.2 削峰填谷

2.3 付出的代价

3. BlockingQueue

4. 模拟实现阻塞队列

4.1 wait的注意事项

4.2 代码实现 


1. 阻塞队列

在数据结构中, 我们学习了简单的普通队列, 也学习了较为复杂一些的优先级队列等等....

而阻塞队列, 是一个更为复杂的队列.

阻塞队列在具有普通队列特性的基础上, 还具有以下特性:

  1. 线程安全(之前学习的Queue, PriorityQueue 都是线程不安全的)
  2. 阻塞特性

线程安全我们都很好理解, 就是这个类通过加锁使得代码在多线程的环境下不会出 bug.

阻塞特性是什么呢? 其实也不难理解:

  1. 队列为空时, 尝试出队列, 出队列操作就会发生阻塞. 阻塞到其他线程添加元素为止.
  2. 队列为满时, 尝试进队列, 进队列操作就会发生阻塞. 阻塞到其他线程取走元素为止.

阻塞队列, 最主要的应用场景, 就是实现 "生产者消费者模型" .

 


 2. 生产者消费者模型

2.1 场景举例

"生产者消费者模型" 是多线程编程中, 一种典型的编码技巧, 也是设计模式的一种.

对于 生产者消费者模型 的场景, 这里为大家举个例子:

相信大家都喜欢吃饺子. 过年时, 一家人都会坐在一块一起包饺子, 包饺子有两种方法:

  • 1. 我, 我爸, 我妈 都各包各的 : 和面, 擀饺子皮, 包饺子.... 都一个人独自完成

假设家里只有一个擀面杖, 如果以这种方式包饺子, 那么当我擀饺子皮的时候, 我妈也需要擀饺子皮, 我爸也需要擀饺子皮, 那么此时他们俩就会触发阻塞等待, 直到我擀完皮后, 他们俩的其中一个才能擀. 也就是说, 我们三个人必定会因为竞争擀面杖而触发阻塞等待.

  • 2. 我专门负责擀饺子皮, 我爸妈负责包饺子

这种方式, 每个人都有明确的分工, 就不会因为竞争擀面杖而触发阻塞等待. 

在这种情况下, 我就属于 "生产者", 我爸妈就属于 "消费者", 他俩都会"消费"我生产的饺子皮. 而我会把擀好的皮放到盖帘上, 他俩也会把包好的饺子放在盖帘上, 那盖帘就属于生产消费的 "交易场所", 而 交易场所 就是一个 阻塞队列.

阻塞队列, 虽然会在 生产者 消费者 速度不协调的极端情况下发生阻塞:

  1. 可能会因为我擀饺子皮太慢, 而导致他俩阻塞等待, 等待我擀饺子皮.
  2. 也可能会因为我擀的太快, 他俩包的太慢, 导致盖帘上的饺子皮已经放不下了, 导致我不能再擀皮, 而只能等待他俩包饺子.

 但这只是极端情况, 我们可以通过调节生产者消费者的速度进而避免阻塞等待的情况发生.

2.2 重要优势

生产者消费者模型, 具有以下两个重要优势:

  1. 解耦合
  2. 削峰填谷

2.2.1 解耦合

解耦合, 不一定是两个线程之间, 也可以是两个服务器之间.

举个例子:

有 A 和 B 两个服务器. A 服务器向 B 服务器发送请求, B 服务器向 A 服务器返回相应.

  • 如果是 A 直接向 B 发送请求, B 直接向 A 返回相应, 那么A B 两者间耦合性就高.

因为编写代码时, A 的代码中肯定多多少少会有一些和 B 有关的逻辑. 同时, B 的代码中肯定也会有和 A 相关的逻辑. 所以两个服务器间的耦合性高, 后续代码的修改会"牵一发而动全身", 很麻烦~

  • 如果在 A B 间添加一个阻塞队列, 那么 A 就可以通过队列来间接和 B 交互, B 也可以通过队列来间接和 A 交互, 这样 A B 两者间就实现了解耦合.

由于这样的阻塞队列太重要的, 也会把队列单独部署成一个服务器, 独立的阻塞队列的服务器, 称为"消息队列".(消息队列服务器中, 不只有一个队列, 可以有多个队列.)

有的同学会说, 虽然 A 中没 B 了, B 中没 A 了, 但是他俩中和队列的联系不就大了吗???

---- 要知道降低耦合, 就是为了在后续修改代码的时候成本低~ 但是阻塞队列的功能是固定的, 也就入队列出队列, 后续也不会不涉及到队列代码的修改.

 2.2.2 削峰填谷

大家先来看以下的流量(访问量)随时间变化的曲线图:

这里的"峰"就只流量最大的时候, 而"谷"就是指流量最小的时候.

而 "削峰填谷" 就是将大量的处理不过来的流量转移到流量少的时期, 避免服务器在高流量访问时资源不足而崩溃.

相信大家都遇到过自己学校网站崩溃的情况吧~ 每逢开学抢课, 网站都打不开, 这就是因为服务器崩了~~

为啥网站会挂呢? 这就是短时间内大量的流量访问导致的.

服务器每处理一个请求的时候, 都是会消耗一定的硬件资源的, 比如:cpu资源, 内存资源, 硬盘资源, 网络带宽资源 ....

若同时有 N 个请求, 那消耗量就会 * N 

一旦消耗的总量, 超出机器硬件资源的上限(消耗的量 > 提供的量), 此时, 对应的进程就可能会崩溃, 或者造成系统卡顿.

  • 若是以上文的 A B 两个服务器不通过消息队列直接进行交互时为例, 当 A 服务器遇到一波流量激增时, 此时 B 服务器就会收到来自 A 服务器大量的请求, 那么 B 服务器就很可能会挂.

为什么 B 会挂, 而 A 不会挂呢???

因为一般来说, 像 A 这样的上游服务器, 干的活更简单, 单个请求消耗的资源数少, 就不容易挂.

而像 B 这样的下游服务器, 会承担更重的任务量, 进行复杂的计算/存储 工作, 单个请求消耗的资源数更多, 也就更容易挂.

所以在日常开发中, B 也是会被分配更好的机器, 但是当访问量剧增时, B 还是会顶不住的~ 

  • 但是当引入阻塞队列的生产者消费者模型后, 即使"峰值"时有大量的流量进入, A 中的请求不会直接发送到 B 中, 而是先存到队列中(队列服务器针对单个请求, 消耗的资源也少, 可以抗很高的请求量), 让 B 按照自己的节奏消费数据, 而不会崩溃.

也就是说, 生产者消费者模型, 可以让 B 这边可以不关心队列中数据量的多少, 按照自己的节奏慢慢处理队列中的请求数据(A 即使波涛汹涌, B 依旧波澜不惊), 当波峰过去后, 利用波谷的时间, 处理之前波峰时期积压的数据.


2.3 付出的代价

虽然引入阻塞队列的生产者消费者模型具有解耦合, 削峰填谷的优势, 但是同时也付出了一定的代价:

  • 1. 引入队列后, 整体的结构变得更加复杂 

此时, 就需要更多的机器进行部署. 生产环境的结构会更复杂, 管理起来也会更麻烦.

  • 2. 效率也会收到影响

原先是 A 服务器直接和 B 服务器进行交互, 而进入队列后, 两个服务器通过队列间接进行交互, 使得效率被拉低.


3. BlockingQueue

在 Java 标准库中, 也提供了现成的阻塞队列, BlockingQueue.

BlockingQueue 是一个接口, 继承自 Queue, 也就是说, 我们要使用 BlockingQueue, 需要创建实现该接口的类.

在 BlockingQueue 中, 使用 put 方法入队列, 使用 take 方法出队列.

(offer 和 poll 也可以使用, 但是不具有阻塞功能)

由于 put 和  take 带有阻塞功能, 所以可以被Interrupt强制唤醒而抛出异常, 所以我们要处理一下异常.

我们可以在队列空时进行出队列操作, 观察一下 BlockingQueue 的阻塞效果 :

在上述的代码中, 我们 new 队列对象时, 是没有指定capacity(空间)的, 那么默认申请的空间是 Integer.MAX_VALUE(21亿) 个空间.

若泛型参数是int, 一个 int 是 4byte 那么 21亿 个 4byte 将是大约 80亿 byte => 8G 空间.

Thousand         千 -->  KB

Million           百万 -->  MB

Billion            十亿 -->  G

如果是其他更大空间的泛型类型, 那么队列可能会耗费大量的内存空间, 所以在实际开发中, 一般要给队列设置指定的容量值, 防止抛出 内存超出范围的异常.

虽然队列最多能存储1000个数据, 但是只要协调好生产者和消费者的速度(两者速度相当), 也不会出现阻塞的请情况. 

接下来, 我将使用代码为大家演示生产者消费者模型:

可以发现, 通过协调生产者和消费者的速度, 两者就不会出现阻塞的情况, 效率也会大幅度提升~

public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(1000);
        Thread producer =new Thread(() -> {
            int n = 0;
            while (true) {
                try {
                    queue.put(n);
                    System.out.println("生产元素 " + n);
                    n++;
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }, "producer");
        Thread consumer = new Thread(() -> {
            while (true) {
                try {
                    Integer n = queue.take();
                    System.out.println("消费元素 " + n);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }, "consumer");
        producer.start();
        consumer.start();
    }

4. 模拟实现阻塞队列

在模拟实现 put 和 take 方法时, 当队列为满 和 队列为空时, 我们要对线程进行阻塞操作, 那么就需要使用 wait 和 notify :

  1. 当队列满时, 一个线程进行 put, 则该线程阻塞, 直到另一个线程进行 take.
  2. 当队列空时, 一个线程进行 take, 则该线程阻塞, 直到另一个线程进行 put.

4.1 wait的注意事项

wait 的阻塞存在被 Interrupt 唤醒的风险, 当 wait 被中断休眠后, 就会继续向下执行, 而此时的队列是仍旧为空或为满的, 往下执行就可能会出现 bug.

所以在进行 wait 阻塞的条件判定时, 不能使用 if , 我们要使用 while 来进行条件的二次判定, 当 wait 被唤醒后, 确定是被 notify 唤醒的, 而不是被 Interrupt 中途唤醒的(若是被 Interrupt 唤醒, 则线程继续进行阻塞).

4.2 代码实现 

class MyBlockingQueue {
    private String[] data = null;
    private int head = 0;
    private int tail = 0;
    private int size = 0;

    public MyBlockingQueue() {
    }

    public MyBlockingQueue(int capacity) {
        data = new String[capacity];
    }

    public void put(String elem) throws InterruptedException {
        synchronized (this) {
            while (size >= data.length) {
                // 阻塞 ...
                this.wait();
            }
            data[tail] = elem;
            tail++;
            if (tail >= data.length) {
                tail = 0;
            }
            size++;
            synchronized (this) {
                this.notify();
            }
        }
    }

    public String take() throws InterruptedException {
        synchronized (this) {
            while (size == 0) {
                // 阻塞 ...
                this.wait();
            }
            String ret = data[head];
            head++;
            if (head >= data.length) {
                head = 0;
            }
            size--;
            synchronized (this) {
                this.notify();
            }
            return ret;
        }
    }
}

public class Demo26 {
    public static void main(String[] args) {
        MyBlockingQueue myBlockingQueue = new MyBlockingQueue(1000);
        Thread producer = new Thread(() -> {
            int n = 0;
            while (true) {
                try {
                    myBlockingQueue.put(n + "");
                    System.out.println("生产元素 " + n);
                    n++;
                    // Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        Thread consumer = new Thread(() -> {
            while (true) {
                try {
                    String n = myBlockingQueue.take();
                    System.out.println("消费元素 " + n);
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        producer.start();
        consumer.start();
    }
}


END

标签:队列,阻塞,生产者,线程,new,服务器,多线程
From: https://blog.csdn.net/2401_83595513/article/details/143066620

相关文章

  • 多线程模块threading
    1.简单例子importthreadingimporttimedefrun(n):print("task",n)time.sleep(2)t1=threading.Thread(target=run,args=("t1",))t2=threading.Thread(target=run,args=("t2",))t1.start()t2.start()2.真使用时需要用类importthreadingcl......
  • 优先级队列(priority_queue)
     priority_queue简介   优先级队列的底层结构其实就是堆,就是我们在学习二叉树的时候学习的堆。它是一种逻辑结构,底层还是vector,我们把它想象成一个堆结构。    我们在学习堆的时候,知道了它的父亲和左右孩子的关系。它如果存在孩子,则一定存在这一种关系,leftchi......
  • 一,多线程
    多线程详解:从基础到实践在现代编程中,多线程是一种常见的并发执行技术,它允许程序同时执行多个任务。本文将详细介绍多线程的基本概念、实现方式、线程控制、线程同步、死锁、线程间通信以及线程池等高级主题。多线程概述进程与线程进程:是系统进行资源分配和调用的独立单位,每一......
  • Javaee---多线程(一)
    文章目录1.线程的概念2.休眠里面的异常处理3.实现runnable接口4.匿名内部类子类创建线程5.匿名内部类接口创建线程6.基于lambda表达式进行线程创建7.关于Thread的其他的使用方法7.1线程的名字7.2设置为前台线程7.3判断线程是否存活8.创建线程方法总结9.start方法10.终......
  • Java消息队列入门详解
    什么是消息队列?消息队列的产生主要是为了解决系统间的异步解耦与确保最终一致性。在实际应用场景中,往往存在一些主流程操作和辅助流程操作,其中主流程需要快速响应用户请求,而辅助流程可能涉及复杂的处理逻辑或者依赖于外部服务。通过将这些辅助流程的消息放入消息队列,使得它们可......
  • 数据结构——队列
    目录1>>导言2>>队列的结构3>>初始化4>>打印5>>入列6>>出列6.1>>判断是否为空7>>取队头和队尾数据and统计个数8>>队列销毁9>>三个文件代码queue.hqueue.ctest.c10>>总结1>>导言    在把栈学习完后,步入新的章节——队列,队列是一种特殊的线性表,队列是......
  • Java消息队列详解
    消息队列的作用及原理消息队列产生主要是为了解决系统间的异步解耦与确保数据最终一致性问题。通过将主流程与辅助流程分离,使得辅助任务可以并行处理,不仅提高了系统的响应速度,还增强了其可扩展性和稳定性。此外,消息队列机制保证了每条消息至少被消费一次,从而确保了业务逻辑的......
  • Java多线程技能
      2.创建多线程的方式,有几种?怎么创建继承Thread类(一般不单独用)实现Runnable接口+Thread对象实现Callable接口+FutureTask<>对象+Thread对象线程池+(实现Callable接口+FutureTask<>对象)或者(实现Runnable接口)3.Thread类的常见APIcurrentThread()获取当前......
  • 从零开始写多线程
    1.Java多线程的基本概念1.1线程与进程进程:是操作系统分配资源的基本单位,每个进程都有独立的内存空间。线程:是进程内的一个执行单元,同一进程中的线程共享进程的内存空间,线程间的通信更为高效。1.2线程的好处提高系统响应性:可以实现用户界面与后台处理的并发执行,使得程序......
  • Java语言快速实现简单MQ消息队列服务
    目录MQ基础回顾主要角色自定义协议流程顺序项目构建流程具体使用流程代码演示消息处理中心Broker消息处理中心服务BrokerServer客户端MqClient测试MQ小结 MQ基础回顾在上一篇消息通讯之关于消息队列MQ必须了解的相关概念中,我们尽可能地详细的了解......