生产者-消费者模式是多线程并发编程中一个非常经典的模式,它通过解耦生产者和消费者的关系,使得两者可以独立工作,从而提高系统的并发性和可扩展性。本文将详细介绍生产者-消费者模式的概念、实现方式以及应用场景。
1 生产者-消费者模式概述
生产者-消费者模式包含两类线程:
- 生产者线程:负责生产数据,并将数据放入共享数据区。
- 消费者线程:负责从共享数据区中取出数据并进行消费。
为了解耦生产者和消费者的关系,通常会使用一个共享的数据区域(如队列)作为缓冲区。生产者将数据放入缓冲区,消费者从缓冲区中取出数据,两者之间不需要直接通信。
共享数据区域需要具备以下功能:
- 当缓冲区已满时,阻塞生产者线程,防止其继续生产数据。
- 当缓冲区为空时,阻塞消费者线程,防止其继续消费数据。
2 wait/notify
的消息通知机制
wait/notify
是 Java 中用于线程间通信的经典机制,通过 Object
类提供的 wait
和 notify/notifyAll
方法,可以实现线程的等待和唤醒。下面将详细介绍 wait/notify
机制的使用方法、注意事项以及常见问题的解决方案。
2.1 wait
方法
wait
方法用于将当前线程置入休眠状态,直到其他线程调用 notify
或 notifyAll
方法唤醒它。
- 调用条件:
wait
方法必须在同步方法或同步块中调用,即线程必须持有对象的监视器锁(monitor lock)。 - 释放锁:调用
wait
方法后,当前线程会释放锁,并进入等待状态。 - 异常:如果线程在调用
wait
方法时没有持有锁,则会抛出IllegalMonitorStateException
异常。
示例代码:
synchronized (lockObject) {
try {
lockObject.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
2.2 notify
方法
notify
方法用于唤醒一个正在等待该对象监视器锁的线程。
- 调用条件:
notify
方法也必须在同步方法或同步块中调用,线程必须持有对象的监视器锁。 - 唤醒线程:
notify
方法会从等待队列中随机选择一个线程进行唤醒,使其从wait
方法处退出,并进入同步队列,等待获取锁。 - 释放锁:调用
notify
后,当前线程不会立即释放锁,而是在退出同步块后才会释放锁。
示例代码:
synchronized (lockObject) {
lockObject.notify();
}
2.3 notifyAll
方法
notifyAll
方法与 notify
方法类似,但它会唤醒所有正在等待该对象监视器锁的线程。
- 唤醒所有线程:
notifyAll
方法会将所有等待队列中的线程移入同步队列,等待获取锁。
示例代码:
synchronized (lockObject) {
lockObject.notifyAll();
}
2.4 wait/notify
机制的常见问题及解决方案
2.4.1 notify
早期通知
问题描述:如果 notify
方法在 wait
方法之前被调用,可能会导致通知遗漏,使得等待的线程一直处于阻塞状态。
示例代码:
public class EarlyNotify {
private static String lockObject = "";
public static void main(String[] args) {
WaitThread waitThread = new WaitThread(lockObject);
NotifyThread notifyThread = new NotifyThread(lockObject);
notifyThread.start();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
waitThread.start();
}
static class WaitThread extends Thread {
private String lock;
public WaitThread(String lock) {
this.lock = lock;
}
@Override
public void run() {
synchronized (lock) {
try {
System.out.println(Thread.currentThread().getName() + " 进去代码块");
System.out.println(Thread.currentThread().getName() + " 开始wait");
lock.wait();
System.out.println(Thread.currentThread().getName() + " 结束wait");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class NotifyThread extends Thread {
private String lock;
public NotifyThread(String lock) {
this.lock = lock;
}
@Override
public void run() {
synchronized (lock) {
System.out.println(Thread.currentThread().getName() + " 进去代码块");
System.out.println(Thread.currentThread().getName() + " 开始notify");
lock.notify();
System.out.println(Thread.currentThread().getName() + " 结束开始notify");
}
}
}
}
解决方案:添加一个状态标志,在 wait
方法调用前判断状态是否已经改变。
优化后的代码:
public class EarlyNotify {
private static String lockObject = "";
private static boolean isWait = true;
public static void main(String[] args) {
WaitThread waitThread = new WaitThread(lockObject);
NotifyThread notifyThread = new NotifyThread(lockObject);
notifyThread.start();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
waitThread.start();
}
static class WaitThread extends Thread {
private String lock;
public WaitThread(String lock) {
this.lock = lock;
}
@Override
public void run() {
synchronized (lock) {
try {
while (isWait) {
System.out.println(Thread.currentThread().getName() + " 进去代码块");
System.out.println(Thread.currentThread().getName() + " 开始wait");
lock.wait();
System.out.println(Thread.currentThread().getName() + " 结束wait");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class NotifyThread extends Thread {
private String lock;
public NotifyThread(String lock) {
this.lock = lock;
}
@Override
public void run() {
synchronized (lock) {
System.out.println(Thread.currentThread().getName() + " 进去代码块");
System.out.println(Thread.currentThread().getName() + " 开始notify");
lock.notifyAll();
isWait = false;
System.out.println(Thread.currentThread().getName() + " 结束开始notify");
}
}
}
}
2.4.2 等待条件发生变化
问题描述:如果线程在等待时接收到通知,但之后等待的条件发生了变化,可能会导致程序出错。
示例代码:
public class ConditionChange {
private static List<String> lockObject = new ArrayList();
public static void main(String[] args) {
Consumer consumer1 = new Consumer(lockObject);
Consumer consumer2 = new Consumer(lockObject);
Productor productor = new Productor(lockObject);
consumer1.start();
consumer2.start();
productor.start();
}
static class Consumer extends Thread {
private List<String> lock;
public Consumer(List lock) {
this.lock = lock;
}
@Override
public void run() {
synchronized (lock) {
try {
if (lock.isEmpty()) {
System.out.println(Thread.currentThread().getName() + " list为空");
System.out.println(Thread.currentThread().getName() + " 调用wait方法");
lock.wait();
System.out.println(Thread.currentThread().getName() + " wait方法结束");
}
String element = lock.remove(0);
System.out.println(Thread.currentThread().getName() + " 取出第一个元素为:" + element);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class Productor extends Thread {
private List<String> lock;
public Productor(List lock) {
this.lock = lock;
}
@Override
public void run() {
synchronized (lock) {
System.out.println(Thread.currentThread().getName() + " 开始添加元素");
lock.add(Thread.currentThread().getName());
lock.notifyAll();
}
}
}
}
解决方案:在 wait
方法退出后再次检查等待条件。
优化后的代码:
public class ConditionChange {
private static List<String> lockObject = new ArrayList();
public static void main(String[] args) {
Consumer consumer1 = new Consumer(lockObject);
Consumer consumer2 = new Consumer(lockObject);
Productor productor = new Productor(lockObject);
consumer1.start();
consumer2.start();
productor.start();
}
static class Consumer extends Thread {
private List<String> lock;
public Consumer(List lock) {
this.lock = lock;
}
@Override
public void run() {
synchronized (lock) {
try {
while (lock.isEmpty()) {
System.out.println(Thread.currentThread().getName() + " list为空");
System.out.println(Thread.currentThread().getName() + " 调用wait方法");
lock.wait();
System.out.println(Thread.currentThread().getName() + " wait方法结束");
}
String element = lock.remove(0);
System.out.println(Thread.currentThread().getName() + " 取出第一个元素为:" + element);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class Productor extends Thread {
private List<String> lock;
public Productor(List lock) {
this.lock = lock;
}
@Override
public void run() {
synchronized (lock) {
System.out.println(Thread.currentThread().getName() + " 开始添加元素");
lock.add(Thread.currentThread().getName());
lock.notifyAll();
}
}
}
}
2.4.3 “假死”状态
问题描述:在多消费者和多生产者的情况下,使用 notify
方法可能会导致“假死”状态,即所有线程都处于等待状态,无法被唤醒。
原因分析:如果多个生产者线程调用了 wait
方法阻塞等待,其中一个生产者线程获取到对象锁后使用 notify
方法唤醒其他线程,如果唤醒的仍然是生产者线程,就会导致所有生产者线程都处于等待状态。
解决方案:将 notify
方法替换成 notifyAll
方法。
总结:在使用 wait/notify
机制时,应遵循以下原则:
- 永远在
while
循环中对条件进行判断,而不是在if
语句中进行wait
条件的判断。 - 使用
notifyAll
而不是notify
。
基本的使用范式如下:
synchronized (sharedObject) {
while (condition) {
sharedObject.wait();
// (Releases lock, and reacquires on wakeup)
}
// do action based upon condition e.g. take or put into queue
}
3 实现生产者-消费者模式的三种方式
生产者-消费者模式可以通过以下三种方式实现:
3.1 使用 Object
的 wait/notify
机制
Object
类提供了 wait
和 notify/notifyAll
方法,用于线程间的通信。
wait
方法:使当前线程进入等待状态,直到其他线程调用notify
或notifyAll
方法唤醒它。调用wait
方法前,线程必须持有对象的监视器锁,否则会抛出IllegalMonitorStateException
异常。notify
方法:唤醒一个正在等待该对象监视器锁的线程。如果有多个线程在等待,则随机选择一个唤醒。notifyAll
方法:唤醒所有正在等待该对象监视器锁的线程。
示例代码:
public class ProductorConsumer {
private static LinkedList<Integer> list = new LinkedList<>();
private static final int MAX_SIZE = 10;
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(15);
for (int i = 0; i < 5; i++) {
service.submit(new Productor());
}
for (int i = 0; i < 10; i++) {
service.submit(new Consumer());
}
}
static class Productor implements Runnable {
@Override
public void run() {
while (true) {
synchronized (list) {
try {
while (list.size() == MAX_SIZE) {
System.out.println("生产者" + Thread.currentThread().getName() + " list以达到最大容量,进行wait");
list.wait();
}
int data = new Random().nextInt();
System.out.println("生产者" + Thread.currentThread().getName() + " 生产数据" + data);
list.add(data);
list.notifyAll();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
static class Consumer implements Runnable {
@Override
public void run() {
while (true) {
synchronized (list) {
try {
while (list.isEmpty()) {
System.out.println("消费者" + Thread.currentThread().getName() + " list为空,进行wait");
list.wait();
}
int data = list.removeFirst();
System.out.println("消费者" + Thread.currentThread().getName() + " 消费数据:" + data);
list.notifyAll();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
3.2 使用 Lock
和 Condition
的 await/signal
机制
Lock
和 Condition
提供了比 Object
的 wait/notify
更灵活的线程通信机制。Condition
对象可以通过 lock.newCondition()
创建,并提供了 await
和 signal/signalAll
方法。
await
方法:使当前线程进入等待状态,直到其他线程调用signal
或signalAll
方法唤醒它。signal
方法:唤醒一个正在等待该Condition
的线程。signalAll
方法:唤醒所有正在等待该Condition
的线程。
示例代码:
public class ProductorConsumer {
private static LinkedList<Integer> list = new LinkedList<>();
private static final int MAX_SIZE = 10;
private static ReentrantLock lock = new ReentrantLock();
private static Condition full = lock.newCondition();
private static Condition empty = lock.newCondition();
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(15);
for (int i = 0; i < 5; i++) {
service.submit(new Productor());
}
for (int i = 0; i < 10; i++) {
service.submit(new Consumer());
}
}
static class Productor implements Runnable {
@Override
public void run() {
while (true) {
lock.lock();
try {
while (list.size() == MAX_SIZE) {
System.out.println("生产者" + Thread.currentThread().getName() + " list以达到最大容量,进行wait");
full.await();
}
int data = new Random().nextInt();
System.out.println("生产者" + Thread.currentThread().getName() + " 生产数据" + data);
list.add(data);
empty.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
static class Consumer implements Runnable {
@Override
public void run() {
while (true) {
lock.lock();
try {
while (list.isEmpty()) {
System.out.println("消费者" + Thread.currentThread().getName() + " list为空,进行wait");
empty.await();
}
int data = list.removeFirst();
System.out.println("消费者" + Thread.currentThread().getName() + " 消费数据:" + data);
full.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
}
3.3 使用 BlockingQueue
实现
BlockingQueue
是 Java 并发包中提供的一个接口,它提供了可阻塞的插入和移除操作。BlockingQueue
非常适合用来实现生产者-消费者模型,因为它可以自动处理线程的阻塞和唤醒。
put
方法:如果队列已满,则阻塞生产者线程,直到队列有空闲空间。take
方法:如果队列为空,则阻塞消费者线程,直到队列中有数据。
示例代码:
public class ProductorConsumer {
private static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(15);
for (int i = 0; i < 5; i++) {
service.submit(new Productor());
}
for (int i = 0; i < 10; i++) {
service.submit(new Consumer());
}
}
static class Productor implements Runnable {
@Override
public void run() {
try {
while (true) {
int data = new Random().nextInt();
System.out.println("生产者" + Thread.currentThread().getName() + " 生产数据" + data);
queue.put(data);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class Consumer implements Runnable {
@Override
public void run() {
try {
while (true) {
int data = queue.take();
System.out.println("消费者" + Thread.currentThread().getName() + " 消费数据:" + data);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
4 生产者-消费者模式的应用场景
生产者-消费者模式广泛应用于以下场景:
- 任务执行框架:如
Executor
框架,将任务的提交和执行解耦,提交任务的操作相当于生产者,执行任务的操作相当于消费者。 - 消息中间件:如 MQ(消息队列),用户下单相当于生产者,处理订单的线程相当于消费者。
- 任务处理时间较长:如上传附件并处理,用户上传附件相当于生产者,处理附件的线程相当于消费者。
5 生产者-消费者模式的优点
- 解耦:生产者和消费者之间通过缓冲区进行通信,彼此独立,简化了系统的复杂性。
- 复用:生产者和消费者可以独立复用和扩展,提高了代码的可维护性。
- 调整并发数:可以根据生产者和消费者的处理速度调整并发数,优化系统性能。
- 异步:生产者和消费者各司其职,生产者不需要等待消费者处理完数据,消费者也不需要等待生产者生产数据,提高了系统的响应速度。
- 支持分布式:生产者和消费者可以通过分布式队列进行通信,支持分布式系统的扩展。
6 小结
生产者-消费者模式是多线程并发编程中的经典模式,通过解耦生产者和消费者的关系,提高了系统的并发性和可扩展性。本文介绍了三种实现生产者-消费者模式的方式,并给出了相应的示例代码。通过理解这些实现方式,可以更好地应用生产者-消费者模式解决实际问题。