目录
1. 阻塞队列
在数据结构中, 我们学习了简单的普通队列, 也学习了较为复杂一些的优先级队列等等....
而阻塞队列, 是一个更为复杂的队列.
阻塞队列在具有普通队列特性的基础上, 还具有以下特性:
- 线程安全(之前学习的Queue, PriorityQueue 都是线程不安全的)
- 阻塞特性
线程安全我们都很好理解, 就是这个类通过加锁使得代码在多线程的环境下不会出 bug.
而阻塞特性是什么呢? 其实也不难理解:
- 队列为空时, 尝试出队列, 出队列操作就会发生阻塞. 阻塞到其他线程添加元素为止.
- 队列为满时, 尝试进队列, 进队列操作就会发生阻塞. 阻塞到其他线程取走元素为止.
阻塞队列, 最主要的应用场景, 就是实现 "生产者消费者模型" .
2. 生产者消费者模型
2.1 场景举例
"生产者消费者模型" 是多线程编程中, 一种典型的编码技巧, 也是设计模式的一种.
对于 生产者消费者模型 的场景, 这里为大家举个例子:
相信大家都喜欢吃饺子. 过年时, 一家人都会坐在一块一起包饺子, 包饺子有两种方法:
- 1. 我, 我爸, 我妈 都各包各的 : 和面, 擀饺子皮, 包饺子.... 都一个人独自完成
假设家里只有一个擀面杖, 如果以这种方式包饺子, 那么当我擀饺子皮的时候, 我妈也需要擀饺子皮, 我爸也需要擀饺子皮, 那么此时他们俩就会触发阻塞等待, 直到我擀完皮后, 他们俩的其中一个才能擀. 也就是说, 我们三个人必定会因为竞争擀面杖而触发阻塞等待.
- 2. 我专门负责擀饺子皮, 我爸妈负责包饺子
这种方式, 每个人都有明确的分工, 就不会因为竞争擀面杖而触发阻塞等待.
在这种情况下, 我就属于 "生产者", 我爸妈就属于 "消费者", 他俩都会"消费"我生产的饺子皮. 而我会把擀好的皮放到盖帘上, 他俩也会把包好的饺子放在盖帘上, 那盖帘就属于生产消费的 "交易场所", 而 交易场所 就是一个 阻塞队列.
阻塞队列, 虽然会在 生产者 消费者 速度不协调的极端情况下发生阻塞:
- 可能会因为我擀饺子皮太慢, 而导致他俩阻塞等待, 等待我擀饺子皮.
- 也可能会因为我擀的太快, 他俩包的太慢, 导致盖帘上的饺子皮已经放不下了, 导致我不能再擀皮, 而只能等待他俩包饺子.
但这只是极端情况, 我们可以通过调节生产者消费者的速度进而避免阻塞等待的情况发生.
2.2 重要优势
生产者消费者模型, 具有以下两个重要优势:
- 解耦合
- 削峰填谷
2.2.1 解耦合
解耦合, 不一定是两个线程之间, 也可以是两个服务器之间.
举个例子:
有 A 和 B 两个服务器. A 服务器向 B 服务器发送请求, B 服务器向 A 服务器返回相应.
- 如果是 A 直接向 B 发送请求, B 直接向 A 返回相应, 那么A B 两者间耦合性就高.
因为编写代码时, A 的代码中肯定多多少少会有一些和 B 有关的逻辑. 同时, B 的代码中肯定也会有和 A 相关的逻辑. 所以两个服务器间的耦合性高, 后续代码的修改会"牵一发而动全身", 很麻烦~
- 如果在 A B 间添加一个阻塞队列, 那么 A 就可以通过队列来间接和 B 交互, B 也可以通过队列来间接和 A 交互, 这样 A B 两者间就实现了解耦合.
由于这样的阻塞队列太重要的, 也会把队列单独部署成一个服务器, 独立的阻塞队列的服务器, 称为"消息队列".(消息队列服务器中, 不只有一个队列, 可以有多个队列.)
有的同学会说, 虽然 A 中没 B 了, B 中没 A 了, 但是他俩中和队列的联系不就大了吗???
---- 要知道降低耦合, 就是为了在后续修改代码的时候成本低~ 但是阻塞队列的功能是固定的, 也就入队列出队列, 后续也不会不涉及到队列代码的修改.
2.2.2 削峰填谷
大家先来看以下的流量(访问量)随时间变化的曲线图:
这里的"峰"就只流量最大的时候, 而"谷"就是指流量最小的时候.
而 "削峰填谷" 就是将大量的处理不过来的流量转移到流量少的时期, 避免服务器在高流量访问时资源不足而崩溃.
相信大家都遇到过自己学校网站崩溃的情况吧~ 每逢开学抢课, 网站都打不开, 这就是因为服务器崩了~~
为啥网站会挂呢? 这就是短时间内大量的流量访问导致的.
服务器每处理一个请求的时候, 都是会消耗一定的硬件资源的, 比如:cpu资源, 内存资源, 硬盘资源, 网络带宽资源 ....
若同时有 N 个请求, 那消耗量就会 * N
一旦消耗的总量, 超出机器硬件资源的上限(消耗的量 > 提供的量), 此时, 对应的进程就可能会崩溃, 或者造成系统卡顿.
- 若是以上文的 A B 两个服务器不通过消息队列直接进行交互时为例, 当 A 服务器遇到一波流量激增时, 此时 B 服务器就会收到来自 A 服务器大量的请求, 那么 B 服务器就很可能会挂.
为什么 B 会挂, 而 A 不会挂呢???
因为一般来说, 像 A 这样的上游服务器, 干的活更简单, 单个请求消耗的资源数少, 就不容易挂.
而像 B 这样的下游服务器, 会承担更重的任务量, 进行复杂的计算/存储 工作, 单个请求消耗的资源数更多, 也就更容易挂.
所以在日常开发中, B 也是会被分配更好的机器, 但是当访问量剧增时, B 还是会顶不住的~
- 但是当引入阻塞队列的生产者消费者模型后, 即使"峰值"时有大量的流量进入, A 中的请求不会直接发送到 B 中, 而是先存到队列中(队列服务器针对单个请求, 消耗的资源也少, 可以抗很高的请求量), 让 B 按照自己的节奏消费数据, 而不会崩溃.
也就是说, 生产者消费者模型, 可以让 B 这边可以不关心队列中数据量的多少, 按照自己的节奏慢慢处理队列中的请求数据(A 即使波涛汹涌, B 依旧波澜不惊), 当波峰过去后, 利用波谷的时间, 处理之前波峰时期积压的数据.
2.3 付出的代价
虽然引入阻塞队列的生产者消费者模型具有解耦合, 削峰填谷的优势, 但是同时也付出了一定的代价:
- 1. 引入队列后, 整体的结构变得更加复杂
此时, 就需要更多的机器进行部署. 生产环境的结构会更复杂, 管理起来也会更麻烦.
- 2. 效率也会收到影响
原先是 A 服务器直接和 B 服务器进行交互, 而进入队列后, 两个服务器通过队列间接进行交互, 使得效率被拉低.
3. BlockingQueue
在 Java 标准库中, 也提供了现成的阻塞队列, BlockingQueue.
BlockingQueue 是一个接口, 继承自 Queue, 也就是说, 我们要使用 BlockingQueue, 需要创建实现该接口的类.
在 BlockingQueue 中, 使用 put 方法入队列, 使用 take 方法出队列.
(offer 和 poll 也可以使用, 但是不具有阻塞功能)
由于 put 和 take 带有阻塞功能, 所以可以被Interrupt强制唤醒而抛出异常, 所以我们要处理一下异常.
我们可以在队列空时进行出队列操作, 观察一下 BlockingQueue 的阻塞效果 :
在上述的代码中, 我们 new 队列对象时, 是没有指定capacity(空间)的, 那么默认申请的空间是 Integer.MAX_VALUE(21亿) 个空间.
若泛型参数是int, 一个 int 是 4byte 那么 21亿 个 4byte 将是大约 80亿 byte => 8G 空间.
Thousand 千 --> KB
Million 百万 --> MB
Billion 十亿 --> G
如果是其他更大空间的泛型类型, 那么队列可能会耗费大量的内存空间, 所以在实际开发中, 一般要给队列设置指定的容量值, 防止抛出 内存超出范围的异常.
虽然队列最多能存储1000个数据, 但是只要协调好生产者和消费者的速度(两者速度相当), 也不会出现阻塞的请情况.
接下来, 我将使用代码为大家演示生产者消费者模型:
可以发现, 通过协调生产者和消费者的速度, 两者就不会出现阻塞的情况, 效率也会大幅度提升~
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(1000);
Thread producer =new Thread(() -> {
int n = 0;
while (true) {
try {
queue.put(n);
System.out.println("生产元素 " + n);
n++;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, "producer");
Thread consumer = new Thread(() -> {
while (true) {
try {
Integer n = queue.take();
System.out.println("消费元素 " + n);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, "consumer");
producer.start();
consumer.start();
}
4. 模拟实现阻塞队列
在模拟实现 put 和 take 方法时, 当队列为满 和 队列为空时, 我们要对线程进行阻塞操作, 那么就需要使用 wait 和 notify :
- 当队列满时, 一个线程进行 put, 则该线程阻塞, 直到另一个线程进行 take.
- 当队列空时, 一个线程进行 take, 则该线程阻塞, 直到另一个线程进行 put.
4.1 wait的注意事项
wait 的阻塞存在被 Interrupt 唤醒的风险, 当 wait 被中断休眠后, 就会继续向下执行, 而此时的队列是仍旧为空或为满的, 往下执行就可能会出现 bug.
所以在进行 wait 阻塞的条件判定时, 不能使用 if , 我们要使用 while 来进行条件的二次判定, 当 wait 被唤醒后, 确定是被 notify 唤醒的, 而不是被 Interrupt 中途唤醒的(若是被 Interrupt 唤醒, 则线程继续进行阻塞).
4.2 代码实现
class MyBlockingQueue {
private String[] data = null;
private int head = 0;
private int tail = 0;
private int size = 0;
public MyBlockingQueue() {
}
public MyBlockingQueue(int capacity) {
data = new String[capacity];
}
public void put(String elem) throws InterruptedException {
synchronized (this) {
while (size >= data.length) {
// 阻塞 ...
this.wait();
}
data[tail] = elem;
tail++;
if (tail >= data.length) {
tail = 0;
}
size++;
synchronized (this) {
this.notify();
}
}
}
public String take() throws InterruptedException {
synchronized (this) {
while (size == 0) {
// 阻塞 ...
this.wait();
}
String ret = data[head];
head++;
if (head >= data.length) {
head = 0;
}
size--;
synchronized (this) {
this.notify();
}
return ret;
}
}
}
public class Demo26 {
public static void main(String[] args) {
MyBlockingQueue myBlockingQueue = new MyBlockingQueue(1000);
Thread producer = new Thread(() -> {
int n = 0;
while (true) {
try {
myBlockingQueue.put(n + "");
System.out.println("生产元素 " + n);
n++;
// Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
Thread consumer = new Thread(() -> {
while (true) {
try {
String n = myBlockingQueue.take();
System.out.println("消费元素 " + n);
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
producer.start();
consumer.start();
}
}
END
标签:队列,阻塞,生产者,线程,new,服务器,多线程 From: https://blog.csdn.net/2401_83595513/article/details/143066620