首页 > 其他分享 >[阻塞队列]

[阻塞队列]

时间:2024-11-11 22:49:35浏览次数:3  
标签:队列 阻塞 queue take put data

目录

1. 阻塞队列

2. 阻塞队列的优点

(1) 实现服务器之间的"低耦合".

(2) 实现"削峰填谷"的功能.

3. 阻塞队列代码举例

4. 自己实现阻塞队列


1. 阻塞队列

我们知道, 标准库中原有的队列Queue及其子类, 都是线程不安全的, 所以java封装了一个名为"阻塞队列" (Blocking Queue) 的接口, "阻塞队列"在普通队列的基础上做了扩充, 它有如下特性:

(1) "阻塞队列"是线程安全的.

(2) 如果队列为空时, 进行出队操作, 此时就会出现阻塞, 一直阻塞到其他线程往队列里增加元素为止 (一直阻塞到队列不为空为止).

(3) 如果队列为满时, 进行入队操作, 此时就会出现阻塞, 一直阻塞到其他线程取走队列元素为止 (一直阻塞到队列不为满为止).

[注]: 阻塞队列主要应用于"生产者-消费者"模型.

2. 阻塞队列的优点

(1) 实现服务器之间的"低耦合".

如果说A, B两台服务器之间是直接调用的关系, 那么编写A的代码时, 就会出现很多B的代码, 编写B的代码时, 也会出现很多A的代码, 这样的话两台服务器之间的耦合程度就非常高. 如果一台服务器挂掉的话, 另一台服务器也会受到很大的影响.

如果我们引入阻塞队列 ( 在服务器领域, 我们给这样的阻塞队列起了一个新的名字: "消息队列"  (Message Queue  "MQ" ) ), 让A与阻塞队列建立直接的联系(A只与阻塞队列通信), B与阻塞队列建立直接的联系(B也只与阻塞队列通信). 那么此时A不知道B的存在, 不也不会知道A的存在, A, B之间的耦合程度就非常低了, 现在即使B挂掉了, 对A也几乎没有影响.

(2) 实现"削峰填谷"的功能.

如果某一时刻, 客户端的请求量突然激增, 那么如果没有消息队列, 下游的服务器 ("下游服务器指的是要对数据或请求作进一步处理的服务器, 它比上游服务器要做的操作更复杂, 消耗的资源更多")很可能被巨大的请求量冲垮, 导致系统崩溃. 但是如果有了这样一个消息队列, 那么在请求量激增的时候, 消息队列就会起到一个"缓冲"的作用, 把巨大的请求量"挡住", 仍然以原有的速度 (或者比原本速度快一点的速度) 把请求传给下游服务器, 这样下游服务器就不会被冲垮. 而到了请求量偏低的时候, 消息队列又会把积攒的请求传给下游服务器, 让它不空闲下来.  这样一个"削峰填谷"的作用类似于"三峡大坝". 在汛期蓄水, 控制流速; 在旱期防水, 不至于让水流干涸.

注意: 消息队列也有其缺点, 比如: (1) 需要更多的机器来部署这样的消息队列. (2) 由于增加了消息队列这样一个东西, A与B之间通信的延时会变长.

3. 阻塞队列代码举例

我们可以看到, BlockingQueue这个接口有很多实现类, 这里我们以ArrayBlockingQueue为例举例. 

public class Demo26 {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
        // 创建一个阻塞队列对象, 并指定其最大容量为3.
        queue.put("111");
        System.out.println("put成功");
        queue.put("111");
        System.out.println("put成功");
        queue.put("111");
        System.out.println("put成功");
        queue.put("111");
        System.out.println("put成功");
    }
}

运行上面这个程序, 我们可以看到, 前三个"111"都put成功, 但是执行到第四个put的时候, 发现队列已经满了, 那么这时候该线程就会进入阻塞状态. 直到其他线程取走队列元素(队列不满时), 才继续再进行put操作.

public class Demo27 {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
        // 创建一个阻塞队列对象, 并指定其最大容量为3.
        queue.put("111");
        System.out.println("put成功");
        queue.put("111");
        System.out.println("put成功");


        queue.take();
        System.out.println("take成功");
        queue.take();
        System.out.println("take成功");
        queue.take();
        System.out.println("take成功");
    }
}

运行上面这个程序, 我们可以看到: 两个put成功, 两个take成功. 当执行到第三个take的时候, 发现此时队列为空, 那么此时线程就会进入阻塞状态, 直到其他线程往队列里新增元素(队列不空时), 才继续再进行take操作.

public class Demo28 {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1000);

        // 生产者线程
        Thread t1 = new Thread(() -> {
            int i = 1;
            while (true) {
                try {
                    queue.put(i);
                    System.out.println("生产元素 " + i);
                    i++;

                    // 给生产操作, 加上 sleep, 生产慢点, 消费快点
                    Thread.sleep(1300);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        // 消费者线程
        Thread t2 = new Thread(() -> {
            while (true) {
                try {
                    Integer i = queue.take();
                    System.out.println("消费元素 " + i);

                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        t1.start();
        t2.start();
    }
}

如上述代码, 我们给生产者线程加了sleep操作. 让生产得慢一点, 那么队列中没有元素的时候, 消费者线程只能阻塞等待, 等到队列中新增元素元素的时候再take, 进行消费元素.  所以, 上述代码的执行效果如下:

4. 自己实现阻塞队列

class MyBlockingQueue {
    private String[] data = null;
    private volatile int head = 0;
    private volatile int tail = 0;
    private volatile int size = 0;

    // private Object locker = new Object();

    public MyBlockingQueue(int capacity) {
        data = new String[capacity];
    }
    public void put(String s) throws InterruptedException {
        // 加锁的对象, 可以单独定义一个, 也可以直接就地使用 this.
        synchronized (this) {
            if (size == data.length) {
                // 队列满了
                // return;
                this.wait();
            }
            data[tail] = s;
            tail++;
            if (tail >= data.length) {
                tail = 0;
            }
            size++;

            this.notify();
        }
    }

    public String take() throws InterruptedException{
        String ret = "";
        synchronized (this) {
            if (size == 0) {
                // 队列为空
                // return null;
                this.wait();
            }
            ret = data[head];
            head++;
            if (head >= data.length) {
                head = 0;
            }
            size--;

            this.notify();
        }
        return ret;
    }
}

这里对变量加上volatile, 对操作加上synchronized, 保证线程安全. put方法中,当队列满的时候, 进入wait()等待,  take方法执行完take操作后会进行通知 notify() ;   take方法中,当队列空的时候, 进入wait()等待,  put方法执行完put操作后会进行通知 notify(); 相互通知, 保证了"阻塞"这个功能.

上述代码写成这个样子就比较完整了, 但是还存在一个小问题: 我们这里的wait()只有notify一种方式能唤醒吗? --> 显然不是, 那么, 比如说: 在 if (size == 0) 这里, 我们判定size == 0, 进入循环, 然后执行wait(), 进入等待状态. 如果此时wait()被提前唤醒了(eg: 被Interrupt唤醒), 而此时size还是0 (队列还没来得及新插入数据, wait()就被提前唤醒了), 那么唤醒之后继续往下执行程序必然会出现错误.  所以, 我们在这里可以做一个小小的改进: 将 if 替换成 wile . 这样做的目的就是为了能够"循环判定", 即使wait()被提前唤醒, 这里还会执行一次while的判定, 看 size 是否为0, 如果仍然是0, 那就继续wait(); 如果不为0了, 就执行后面的代码. 这样一来, 这个代码是不是就更加完善了呢? 不仅在这里, 日常开发中, 也有很多地方, 用 while 会比 if 好很多, 因为 while 是"循环判定", 而 if 只判定一次.

class MyBlockingQueue {
    private String[] data = null;
    private volatile int head = 0;
    private volatile int tail = 0;
    private volatile int size = 0;

    // private Object locker = new Object();

    public MyBlockingQueue(int capacity) {
        data = new String[capacity];
    }
    public void put(String s) throws InterruptedException {
        // 加锁的对象, 可以单独定义一个, 也可以直接就地使用 this.
        synchronized (this) {
            while (size == data.length) { //将if换成while
                // 队列满了
                // return;
                this.wait();
            }
            data[tail] = s;
            tail++;
            if (tail >= data.length) {
                tail = 0;
            }
            size++;

            this.notify();
        }
    }

    public String take() throws InterruptedException{
        String ret = "";
        synchronized (this) {
            while (size == 0) { //将if换成while
                // 队列为空
                // return null;
                this.wait();
            }
            ret = data[head];
            head++;
            if (head >= data.length) {
                head = 0;
            }
            size--;

            this.notify();
        }
        return ret;
    }
}

好了, 本篇文章就介绍到这里啦, 大家如果有疑问欢迎评论, 如果喜欢小编的文章, 记得点赞收藏~~

标签:队列,阻塞,queue,take,put,data
From: https://blog.csdn.net/2301_80313139/article/details/143609444

相关文章

  • 郝玩的数据结构1——单调栈,单调队列
    栈和队列是很郝咏的Stl,那么,我们手搓——用数组模拟故事背景:辣鸡老o在学单调栈&单调队列——我栈top为栈顶,易得出出栈即top--,入栈++top=(ovo)......——完全不会讲,那么上马:点击查看代码#include<bits/stdc++.h>usingnamespacestd;constintN=114514;intstk[N],top=0;......
  • 代码随想录算法训练营第十天 | 232.用栈实现队列+225. 用队列实现栈+20. 有效的括号+1
    加入训练营有点晚,没跟上任务就开始有些偷懒没有真的认真开始,从第十天开始下决心认真完成每天任务还有博客记录学习过程。栈与队列理论基础首先是学习了栈和队列的理论基础,队列 先进先出,栈 先进后出。栈 以底层容器完成其所有的工作,且对外接口统一,有.push,.pop等,不提供......
  • 10. 基于 Redis 实现消息队列
    消息队列在分布式系统中非常重要,能够有效解耦系统的各个模块,提供异步处理能力和缓冲能力。Redis作为一个高性能的内存数据库,除了缓存和持久化存储,它还能充当轻量级的消息队列。使用Redis处理消息队列有助于提高系统的吞吐量和可扩展性。一、使用场景消息队列的应用场景......
  • 单调队列笔记
    单调队列笔记双端队列deque维护一个严格单调变化的组,可以称为一个单调队列单调队列因为可以直接对组的两端进行操作,所以可以有效的降低时间复杂度用单调队列来解决问题,一般是需要得到的某个范围内的最小值或最大值这里以一道经典的单调队列的题目为例:题目描述有一个长为\(......
  • 栈和队列(原理、代码实现、例题)
    一、栈1.概念栈:一种特殊的线性表,其只允许在固定的一端进行插入和删除元素操作。进行数据插入和删除操作的一端称为栈顶,另一端称为栈底。栈中的数据元素遵守后进先出LIFO(LastInFirstOut)的原则。压栈:栈的插入操作叫做进栈/压栈/入栈,入数据在栈顶。出栈:栈的删除操作叫做......
  • 面向大规模队列,百万并发的多优先级消费系统设计
    引言HTTP是一种常用的通信协议,除了常见网站访问、上传下载,HTTP协议还经常被用在消息推送场景上。设想你搭建了一个电商平台,有很多大型商家入驻了该电商平台并售卖各类商品,在消费者购买某个商品后,平台会通过HTTP协议将消费者购买商品的信息通知商家,商家则会在后台接收平台推......
  • 一篇文章带你了解如何测试消息队列
    是什么探讨消息队列的测试,那首先就要了解消息队列是什么。消息队列是一种应用程序之间传递消息的异步通信机制。这里传递的消息指的是需要传输的数据,可以是一些文本、字符串或对象等信息。简述一下消息队列工作模式:消息的生产者将数据存放到消息队列中,消费者进行接收处......
  • 队列详解
    目录队列队列的概念及结构队列的实现代码队列功能的实现队列的尾插voidQueuePush(Queue*pq,QDataTypex);结构体封装指针typedefstructQueue总结代码队列的头删voidQueuePop(Queue*pq)代码队列的初始化voidQueueInit(Queue*pq)代码队列的销毁voidQueueDest......
  • C# 队列的一些并发模拟
    usingSystem;usingSystem.Collections.Generic;usingSystem.ComponentModel;usingSystem.Data;usingSystem.Drawing;usingSystem.Linq;usingSystem.Text;usingSystem.Windows.Forms;usingSystem.Threading;usingSystem.Collections.Concurrent;namespace......
  • 队列的用法详解
    队列是一种常用的数据结构,具有先进先出(FIFO,First-In-First-Out)的特点。通常用来管理需要按顺序处理的任务,例如打印队列、任务调度、资源分配等。下面详细介绍队列的基本概念、常用操作、类型及其在C语言中的实现。队列的基本概念在队列中:入队(enqueue):将元素添加到队列的......