首页 > 编程语言 >JavaEE 初阶(11)——多线程9之“阻塞队列”

JavaEE 初阶(11)——多线程9之“阻塞队列”

时间:2024-07-29 22:00:02浏览次数:17  
标签:11 初阶 队列 阻塞 queue 线程 put ArrayBlockingQueue 多线程

目录

一. 什么是“阻塞队列”

二. 生产者消费者模型

2.1 概念

2.2 组件

 2.3 实际应用

2.4 优点 

a. 实现“解耦合” 

b. 流量控制——“削峰填谷”

2.5 代价

a. 更多的机器

b. 通信时间延长

三. 阻塞队列的实现 

3.1 简述 

 3.2 ArrayBlockingQueue的使用

3.3 实现MyArrayBlockingQueue 


一. 什么是“阻塞队列”

“阻塞队列” 是在普通队列(先进先出)的基础上,做出了一些扩充。以下是阻塞队列的一些主要特点:

  1. 阻塞插入:当队列为满时,如果再往队列里插入元素,队列会阻塞插入操作的线程(wait);一直阻塞到其他线程从队列取走元素为止(被notify唤醒)
  2. 阻塞移除:当队列为空时,如果再从队列里移除元素,队列会阻塞移除操作的线程(wait);一直阻塞到其他线程往队列里添加元素为止(被notify唤醒)
  3. 线程安全:阻塞队列内部实现了线程同步,因此可以在多线程环境中安全地使用。(标准库中原有的队列Queue和其子类,默认都是线程不安全的)

基于阻塞队列,最大的应用场景,就是实现 “生产者消费者模型” ——日常开发中,常见的编程手法。

二. 生产者消费者模型

2.1 概念

生产者消费者模型 是一个经典的并发编程模型,它描述了一组生产者线程和一组消费者线程如何通过共享的数据缓冲区进行交互。这个模型的核心思想是解耦生产数据的线程(生产者)与消费数据的线程(消费者),使得它们可以并发执行,提高系统的整体效率。

2.2 组件

1.生产者

  • 负责生成数据并将其放入缓冲区。
  • 生产者在缓冲区满时会等待,直到有空间可用。

2.消费者

  • 负责从缓冲区取出数据并处理
  • 消费者在缓冲区为空时会等待,直到有数据可以消费

3.缓冲区

  • 是一个共享的数据结构,用于存储生产者产生的数据,等待消费者来消费。
  • 缓冲区的大小通常是有限的,因此需要适当的同步机制来控制生产者和消费者的访问。
 2.3 实际应用

   上述生产者消费者模型,在后端开发中,经常会涉及到~~当下后端开发常见的结构——“分布式系统”,不是一台服务器解决所有问题,而是分成了多个服务器,服务器之间相互调用,进行通信。

下面是一个搜狗浏览器的例子:

 

2.4 优点 

   在上述服务器之间的通信过程中,使用生产者消费者模型,是非常常见的做法。使用生产者消费者模型,主要有两方面的好处~~

a. 实现“解耦合” 

生产者和消费者不需要知道对方的实现细节,只需要知道如何与缓冲区交互,从而降低了模块之间的 关联/影响程度。

   如果是“直接调用” 关系,编写 A代码中,就会出现很多B服务器相关代码;编写B代码中,也会出现很多A服务器相关的代码。并且,如果B服务器出现故障,A服务器也会受到影响。

    并且,如果后续想增加一个 C服务器,此时对A代码的改动就会很大。

   但是,当引入了生产者消费者模型,结构就成了下列样子~~ 

   A只和队列通信,B也只和队列通信。 A不知道B的存在,代码中更没有B的影子了;B也不知道A的存在,代码中也没有A的影子。这样就实现了A与B之间的“解耦合”~~

通常谈到的“阻塞队列”是代码中的一个数据结构,但是由于这个东西太好用了,以致于会把这样的数据结构,单独封装成一个服务器程序,并且在单独的服务器机器上进行部署~~

此时,这样的阻塞队列,有了一个新的名字,“消息队列”(Message Queue,MQ)


消息队列:一种进程间通信或分布式系统中常用的中间件技术,用于在消息的发送者和接收者之间传递消息。它通常被用于异步处理、系统解耦、流量削峰和消息的可靠传递。

应用场景:

  • 应用解耦:降低系统间的耦合度,使系统更加模块化。
  • 异步处理:处理耗时任务,如邮件发送,文件上传等。
  • 消息广播:将消息广播给多个订阅者。
  • 分布式系统:跨网络和跨服务的数据传递。
  • 高并发处理:在高并发情况下,消息队列可以起到削峰填谷的作用

  * 看起来,A和B之间是解耦合了,但是 A和队列,B和队列,难道不是引入了新的耦合吗?

 首先,我们需要明确为什么怕“耦合”?—— 因为耦合的代码在后续变更过程中,比较复杂,容易产生bug!

“消息队列”是成熟稳定的产品,代码不会频繁修改;并且A和队列、B和队列之间的交互,逻辑基本写一次就固定下来

b. 流量控制——“削峰填谷”

缓冲区的大小可以用来控制生产者和消费者的工作节奏,避免生产过快或消费过慢导致的问题 。

下面,解释一下什么是“削峰填谷”。

 长江上游的三峡水库:


   服务器端,如果A的请求量突然激增,A往队列中写入数据变快了,但是由于阻塞队列的原因,B仍然可以按照原有的速度来消费数据。

    如果是直接调用,没有阻塞队列,那么A收到多少请求,B也会收到多少,很可能就直接把B给搞挂了.....

1)为啥一个服务器,收到的请求激增,就可能会崩溃??? 

     一台服务器,就是一台“电脑”,上面就提供了一些硬件资源(包括不限于CPU,内存,硬盘,网络带宽....)就算你这个机器 配置再好,硬件资源也是有限的~~

    服务器每次收到一个请求,处理这个请求的过程,都需要执行一系列的代码。在执行这些代码过程中,就会需要消耗一定的硬件资源(CPU,内存,硬盘,网络带宽....)

    这些请求消耗的总的硬件资源的量,超过了机器能提供的上限,那么此时机器就会出现问题(卡死,程序直接崩溃.....)

2)在请求激增的时候,A 为啥不会挂?队列为啥不会挂?反而是B更容易挂呢??

    A的角色是一个“网关服务器”,收到客户端的请求,再把请求转发给其他的服务器。这样,服务器里面的代码,做的工作比较简单(单纯的数据转发),消耗的硬件资源通常更少。处理一个请求,消耗的资源更少,同样的配置下,就能支持更多的请求处理~~

    同理,队列 其实也是比较简单的程序,单位请求消耗的硬件资源也是比较少的~~

    B这个服务器,才是真正干活的服务器,要真正完成一系列的业务逻辑~~(这一系列的工作,代码量非常庞大,消耗的时间很多,消耗的系统硬件资源也是更多的)

类似的,MySQL这样的数据库,处理请求做的工作就是比较多的,消耗的硬件资源也是比较多的,因此MySQL也是后端系统中,容易挂的部分~~

对应的,像Redis这种内存数据库,处理请求做的工作,远远少于MySQL做的工作,消耗的资源更少。因此,Redis就比MySQL皮实很多,不容易挂~~

2.5 代价
a. 更多的机器
  •  需要更多的机器,来部署这样的消息队列 
b. 通信时间延长
  • A和B直接通信的延时会变长(对于 A和B 之间的调用,如果要求时间比较短,就不合适了)  

 比如,现在很火的“微服务”

“微服务”本质上就是把分布式系统的服务拆的更细了,每个服务都很小,只做一项功能(比较适合大公司,部门分的很细)。但是这样做的代价就是需要更多的机器,处理请求的响应时间更长,更复杂的后端结构运维成本变高.....

三. 阻塞队列的实现 

3.1 简述 

在Java的标准库中,提供了现成的封装——BlockingQueue

BlockingQueue继承了Queue,因此Queue中的一些操作——offer,poll(不能阻塞)等在BlockingQueue中同样也能使用。

BlockingQueue接口的实现类:

数据结构堆:本质上是一个 “完全二叉树”(即树中的每一层都是满的,除了可能最后一层,最后一层的节点从左向右连续排列),要求父节点和子节点之间的值,存在大小关系 (不是左右子树的大小关系)

堆是实现优先队列的一种高效方式,可以快速地插入元素和删除具有最高(或最低)优先级的元素。

   BlockingQueue 提供了另外两个专属方法——put:入队列    take:出队列 (能阻塞)。但是阻塞队列没有提供“阻塞版本”的获取队首元素的操作

   由于 put 和 take 可能产生阻塞,因此,这样的阻塞又会被 interrupt 唤醒。


 3.2 ArrayBlockingQueue的使用
import java.util.concurrent.ArrayBlockingQueue;

public class ArrayBlockingQueue1 {
    public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);
        queue.put("111");
        System.out.println("put 成功");
        queue.put("111");
        System.out.println("put 成功");
        //第三次put已经超过了队列的容量
        queue.put("111");
        System.out.println("put 成功");
    }
}

运行结果:

如果队列为满,put操作后,不会抛出异常,而是处于WAITING等待状态。


import java.util.concurrent.ArrayBlockingQueue;
public class ArrayBlockingQueue2 {
    public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
        queue.put("111");
        System.out.println("put 成功");
        queue.take();
        System.out.println("take 成功");
        //队列中已经没有元素,take后会阻塞
        queue.take();
        System.out.println("take 成功");

    }
}

 运行结果:

 

 如果队列为空,take操作后,不会抛出异常,而是处于WAITING等待状态。


import java.util.concurrent.ArrayBlockingQueue;

public class ArrayBlockingQueue3 {
    public static void main(String[] args) {
        ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
        //生产者
        Thread t1 = new Thread(()->{
            int i = 1;
            while(true){
                try {
                    queue.put(i);
                    System.out.println("产生元素"+i);
                    i++;
                    //给生产者加上sleep操作,生产慢点,消费快点
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        //消费者
        Thread t2 = new Thread(()->{
            while(true){
                try {
                    int i = queue.take();
                    System.out.println("消费元素"+i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        t1.start();
        t2.start();
    }
}

 运行结果:

 

import java.util.concurrent.ArrayBlockingQueue;

public class ArrayBlockingQueue4 {
    public static void main(String[] args) {
        ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
        //生产者
        Thread t1 = new Thread(()->{
            int i = 1;
            while(true){
                try {
                    queue.put(i);
                    System.out.print("生产元素"+i+" ");
                    i++;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        //消费者
        Thread t2 = new Thread(()->{
            while(true){
                try {
                    int i = queue.take();
                    System.out.println("消费元素"+i+" ");
                    //给消费者加上sleep,消费慢点,生产快点
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        t1.start();
        t2.start();
    }
}

运行结果: 


3.3 实现MyArrayBlockingQueue 

public class MyBlockingQueue {
    private String[] array = null;
    public MyBlockingQueue(int capacity){
        array = new String[capacity];
    }
    private int head = 0;
    private int tail = 0;
    private int size = 0;
    private final Object locker = new Object();
    public void put(String i) throws InterruptedException {
        //整个逻辑都要加锁,防止线程切换更改共享变量
        synchronized (locker){
            if(size == array.length){
                //队列满了,阻塞等待
                locker.wait();
            }
            array[tail] = i;
            tail++;
            if(tail >= array.length){
                tail = 0;
            }
            size++;
            //唤醒队列为空时候的等待
            locker.notify();
        }

    }
    public String take() throws InterruptedException{
        String ret = "";
        synchronized (locker){
            if(size == 0){
                //队列空了,阻塞等待
                locker.wait();
            }
            ret = array[head];
            head++;
            if(head >= array.length){
                head = 0;
            }
            size--;
            //唤醒队列满了时候的等待
            locker.notify();
        }
        return ret;
    }
}

  

但是不推荐第二种%余数的方法:1)代码可读性比较低   2)执行效率比较低(%对计算机来说是除法,比较慢)                        


Java官方文档建议 wait 使用的时候,要结合 while,而不是 if。 

                            

此处即使不改成while,代码问题也是不大~~因为一旦出现提前唤醒的情况(interrupt),代码会直接结束(throws异常)。


如果 wait 是直接包裹在 try-catch 中,此时使用 if 还是 while 的差别就很大了.....

    wait中断异常被抓住后——如果是 if 判断,wait 被唤醒后,代码会继续向后执行,此时size值可能依旧为0,那么接下来的数据就会出现问题;如果是 while 循环,wait 被唤醒后,代码会再次确认条件,看是否能继续执行。

    因此,最好使用 while,可以确保结果的正确性,更加稳妥!

   由此告诫我们,日常开发中,有些bug,比如,报错/抛出异常,这种问题都好办,程序员能第一时间发现。但是有些bug,没有任何提示,但是会得到一个错误的结果(很可能看起来和正确的值没啥差别,但是后续造成的影响,可能是非常严重的)。因此,要选择最为稳妥的方法解决问题。

试想一下,假设使用这个阻塞队列,实现一个“充值”逻辑~~
某个线程在阻塞等待队列里玩家的充值数据。一旦玩家的充值数据到账,就把对应的道具发放给玩家。
玩家可能正要充值,还没冲呢,却不小心interrupt了,导致队列里读取出一个“错误值”......

标签:11,初阶,队列,阻塞,queue,线程,put,ArrayBlockingQueue,多线程
From: https://blog.csdn.net/2301_80243321/article/details/140773483

相关文章

  • 算法笔记|Day11二叉树
    算法笔记|Day11二叉树☆☆☆☆☆leetcode144.二叉树的前序遍历题目分析代码☆☆☆☆☆leetcode94.二叉树的中序遍历题目分析代码☆☆☆☆☆leetcode145.二叉树的后序遍历题目分析代码☆☆☆☆☆leetcode102.二叉树的层序遍历题目分析代码☆☆☆☆☆leetcode107.......
  • 11. 2 用Python开发一个简单的Web服务器
    用Python开发一个简单的Web服务器11.2用Python开发一个简单的Web服务器11.2.1需求分析11.2.2系统设计11.2.3详细设计11.2.4实现11.2.5测试11.2.6部署和维护11.2.7文档和帮助文档11.2.8用户反馈11.2用Python开发一个简单的Web服务......
  • 11.1 用Python开发一个计算器程序
    用Python开发一个计算器程序11.1用Python开发一个计算器程序11.1.1设计思路11.1.2编写代码11.1.3运行与测试11.1用Python开发一个计算器程序在编程的世界里,创建简单的工具如计算器是初学者学习编程语言的一个好方法。Python,由于其简洁的语法......
  • Windows11安装MySQL8.4.2版本详细过程记录
    下载地址:https://dev.mysql.com/downloads/mysql/8.0.html我选择下载zip版本:点击下载需要登录:于是我登录:接着点下载:被迅雷拦截了,直接使用迅雷下载:下载好了:复制到C盘的dev目录:安装解压:这个看上去需要一些基础命令才能操作:于是我重新下载了这个msi版本......
  • 暑假集训csp提高模拟11
    赛时rank9,T1100,T2100,T30,T40update:赛后T1被hack了,成了99,死因没有记忆化挂成了rank10我又要挂分了。T3暴力挂了?话说jjdw和k8啥时候回来。T1Fate签到题,最短路板子。考虑一个点\(a\),其最短路前驱节点为\(b\),前驱结点组成的集合为\(B\),其次短路的前驱节点为......
  • 暑假集训CSP提高模拟11
    A.Fate求次短路方案数.这题有点小水了,好像之前做过.具体的方案显然是DP,考虑枚举当前每一个路径长度,假如比最短路更优则覆盖最短路,之前的最短路用来覆盖次短路.否则如果比次短路更优,则直接覆盖次短路.方案数的话考虑一样的方法维护,只是在遇到相等的路径长时使方案数加一即可.......
  • 暑假集训CSP提高模拟11
    暑假集训CSP提高模拟11组题人:@KafuuChinocpp\(T1\)P152.Fate\(24pts\)强化版:HDU1688Sightseeing设\(dis_{i,0/1}\)表示从\(s\)到\(i\)的最短路/次短路长度,\(f_{i,0/1}\)表示从\(s\)到\(i\)的最短路/次短路条数。\(dijkstra\)过程中按照路径长度与......
  • java多线程(超详细!)
    Java的多线程是一种允许在一个程序中同时运行多个线程的技术。每个线程是独立的执行路径,可以并发执行,从而提高程序的效率和响应能力。1.线程基础Java中的线程可以通过继承Thread类或实现Runnable接口来创建和管理。1.1继承Thread类classMyThreadextendsThr......
  • java多线程(超详细!)
    Java的多线程是一种允许在一个程序中同时运行多个线程的技术。每个线程是独立的执行路径,可以并发执行,从而提高程序的效率和响应能力。1.线程基础Java中的线程可以通过继承Thread类或实现Runnable接口来创建和管理。1.1继承Thread类classMyThreadextendsThread{......
  • 『模拟赛』暑假集训CSP提高模拟11
    Rank昨天积攒的rp生效了!A.Fate签到题。次短路计数,感觉做过类似的,不过这里次短路定义为最短路长度+1,赛时把自环想成环了,原谅我犯唐。还是用dijkstra,我们这次开两个dis数组存最短路和次短路长度,然后两个cnt数组存二者的个数,稍微想想就能想到一共的四种可能:更新后......