1. 概述
- 生产者-消费者问题(Producer-Consumer Problem)也称有限缓冲问题(Bounded-BufferProblem),是一个
多线程同步问题
的经典案例。 - 生产者一消费者问题描述了两类访问共享缓冲区的线程(即所谓的生产者和消费者)在实际运行时会发生的问题。
- 生产者线程的主要功能是生成一定量的数据放到缓冲区中,然后重复此过程。
- 消费者线程的主要功能是从缓冲区提取(或消耗)数据。
- 生产者一消费者问题关键是:
- 保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中为空时消耗数据。
- 保证在生产者加入过程、消费者消耗过程中,不会产生错误的数据和行为。
- 生产者一消费者问题不仅仅是一个多线程同步问题的经典案例,而且业内已经将解决该问题的方案,抽象成为了一种设计模式
生产者一消费者模式
。生产者一消费者模式是一个经典的多线程设计模式,它为多线程间的协作提供了良好的解决方案。
1.1 生产者一消费者模式
- 在生产者一消费者模式中,通常由两类线程,即生产者线程(若干个)和消费者线程(若干个)。生产者线程向数据缓冲区(DataBuffer)加入数据,消费者线程则从DataBuffer消耗数据。生产者和消费者、内存缓冲区之间的关系如图。
- 生产者一消费者模式中,至少有以下关键点:
- 生产者与生产者之间、消费者与消费者之间,对数据缓冲区的操作是并发进行的。
- 数据缓冲区是有容量上限的。数据缓冲区满后,生产者不能再加入数据:DataBufer空时消费者不能再取出数据。
- 数据缓冲区是线程安全的。在并发操作数据区的过程中,不能出现数据不一致情况;或者在多个线程并发更改共享数据后,不会造成出现脏数据的情况。
- 生产者或者消费者线程在空闲时,需要尽可能阻塞而不是执行无效的空操作,尽量节约CPU资源。
1.2 一个线程不安全的实现版本
public class NotSafeDataBuffer<T> {
public static final int MAX_AMOUNT = 10;
private List<T> dataList = new LinkedList<>();
//保存数量
private AtomicInteger amount = new AtomicInteger(0);
/**
* 向数据区增加一个元素
*/
public void add(T element) throws Exception {
if (amount.get() > MAX_AMOUNT) {
Print.tcfo("队列已经满了!");
return;
}
dataList.add(element);
System.out.println(element + "");
amount.incrementAndGet();
//如果数据不一致,抛出异常
if (amount.get() != dataList.size()) {
throw new Exception(amount + "!=" + dataList.size());
}
}
/**
* 从数据区取出一个元素
*/
public T fetch() throws Exception {
if (amount.get() <= 0) {
System.out.println("队列已经空了!");
return null;
}
T element = dataList.remove(0);
System.out.println(element + "");
amount.decrementAndGet();
//如果数据不一致,抛出异常
if (amount.get() != dataList.size()) {
throw new Exception(amount + "!=" + dataList.size());
}
return element;
}
}
DataBuffer类型的实例属性dataList保存具体数据元素,实例属性amount保存元素的数量。DataBuffer类型有两个实例方法,实例方法add()用于向数据区增加元素,实例方法fetch()用于从数据区消耗元素。
在add()实例方法中,加入元素之前首先会对amount是否达到上限进行判断,如果数据区满了则不能加入数据;在fetch()实例方法中,消耗元素前首先会对amount是否大于零进行判断,如果数据区空了,就不能取出数据。
1.3 生产者、消费者的逻辑与动作解耦
- 生产者一消费者模式有多个不同版本的实现,这些版本的区别在于数据缓冲区(DataBuffer)类以及相应的生产、消费动作(Action)不同,而生产者类、消费者类的执行逻辑是相同的。
- 分离变与不变是软件设计的一个基本原则。现在将生产者类、消费者类与具体的生产、消费Action解耦,从而使得生产者类、消费者类的代码在后续可以复用。生产者、消费者逻辑与对应Action解耦后的类结构图。
分离变与不变原则的背后蕴藏着丰富的软件工程思想,例如信息的分装与隐藏、系统的模块化、使用分层构架等。其中,变是指易变的代码或者模块,不变就是指系统中不易变化的部分。
在解耦后的生产者一消费者模式:
- 不变的部分为生产者(Producer)类、消费者(Consumer)类,后续可以直接复用,不需要修改代码;
- 变化的部分为数据缓冲区(DataBufer)类以及相应的生产和消费动作,
2. 通用的 Producer 类实现
public class Producer implements Runnable {
// 生产的时间间隔,产一次等待的时间,默认为200ms
public static final int PRODUCE_GAP = 200;
//总次数
static final AtomicInteger TURN = new AtomicInteger(0);
//生产者对象编号
static final AtomicInteger PRODUCER_NO = new AtomicInteger(1);
//生产者名称
String name = null;
//生产的动作
Callable action = null;
int gap = PRODUCE_GAP;
public Producer(Callable action, int gap) {
this.action = action;
this.gap = gap;
if (this.gap <= 0) {
this.gap = PRODUCE_GAP;
}
name = "生产者-" + PRODUCER_NO.incrementAndGet();
}
public Producer(Callable action) {
this.action = action;
this.gap = PRODUCE_GAP;
name = "生产者-" + PRODUCER_NO.incrementAndGet();
}
@Override
public void run() {
while (true) {
try {
//执行生产动作
Object out = action.call();
//输出生产的结果
if (null != out) {
System.out.println("第" + TURN.get() + "轮生产:" + out);
}
//每一轮生产之后,稍微等待一下
sleepMilliSeconds(gap);
//增加生产轮次
TURN.incrementAndGet();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
3. 通用的 Consumer 类实现
public class Consumer implements Runnable {
//消费的时间间隔,默认等待100毫秒
public static final int CONSUME_GAP = 100;
//消费总次数
static final AtomicInteger TURN = new AtomicInteger(0);
//消费者对象编号
static final AtomicInteger CONSUMER_NO = new AtomicInteger(1);
//消费者名称
String name;
//消费的动作
Callable action = null;
//消费一次等待的时间,默认为1000ms
int gap = CONSUME_GAP;
public Consumer(Callable action, int gap) {
this.action = action;
this.gap = gap;
name = "消费者-" + CONSUMER_NO.incrementAndGet();
}
public Consumer(Callable action) {
this.action = action;
this.gap = gap;
this.gap = CONSUME_GAP;
name = "消费者-" + CONSUMER_NO.incrementAndGet();
}
@Override
public void run() {
while (true) {
//增加消费次数
TURN.incrementAndGet();
try {
//执行消费动作
Object out = action.call();
if (null != out) {
System.out.println("第" + TURN.get() + "轮消费:" + out);
}
//每一轮消费之后,稍微等待一下
sleepMilliSeconds(gap);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
4. 数据缓冲区实例、生产动作、消费动作的定义
public class NotSafePetStore {
//共享数据区,实例对象
private static NotSafeDataBuffer<IGoods> notSafeDataBuffer = new NotSafeDataBuffer();
//生产者执行的动作
static Callable<IGoods> produceAction = () ->
{
//首先生成一个随机的商品
IGoods goods = Goods.produceOne();
//将商品加上共享数据区
try {
notSafeDataBuffer.add(goods);
} catch (Exception e) {
e.printStackTrace();
}
return goods;
};
//消费者执行的动作
static Callable<IGoods> consumerAction = () ->
{
// 从PetStore获取商品
IGoods goods = null;
try {
goods = notSafeDataBuffer.fetch();
} catch (Exception e) {
e.printStackTrace();
}
return goods;
};
}
5. 生产者一消费者简单实现版本(不安全)
public static void main(String[] args) throws InterruptedException {
System.setErr(System.out);
// 同时并发执行的线程数
final int THREAD_TOTAL = 20;
//线程池,用于多线程模拟测试
ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_TOTAL);
for (int i = 0; i < 5; i++) {
//生产者线程每生产一个商品,间隔500ms
threadPool.submit(new Producer(produceAction, 500));
//消费者线程每消费一个商品,间隔1500ms
threadPool.submit(new Consumer(consumerAction, 1500));
}
}
从以上异常可以看出,在向数据缓冲区进行元素的增加或者提取时,多个线程在并发执行对amount、dataList两个成员操作时次序已经混乱,导致了数据不一致和线程安全问题。
6. 生产者一消费者简单实现版本(安全)
public class SafeDataBuffer<T> {
public static final int MAX_AMOUNT = 10;
private List<T> dataList = new LinkedList<>();
//保存数量
private AtomicInteger amount = new AtomicInteger(0);
/**
* 向数据区增加一个元素
*/
public synchronized void add(T element) throws Exception {
if (amount.get() > MAX_AMOUNT) {
System.out.println("队列已经满了!");
return;
}
dataList.add(element);
System.out.println(element + "");
amount.incrementAndGet();
//如果数据不一致,抛出异常
if (amount.get() != dataList.size()) {
throw new Exception(amount + "!=" + dataList.size());
}
}
/**
* 从数据区取出一个元素
*/
public synchronized T fetch() throws Exception {
if (amount.get() <= 0) {
Print.tcfo("队列已经空了!");
return null;
}
T element = dataList.remove(0);
System.out.println(element + "");
amount.decrementAndGet();
//如果数据不一致,抛出异常
if (amount.get() != dataList.size()) {
throw new Exception(amount + "!=" + dataList.size());
}
return element;
}
}
public class SafePetStore {
//共享数据区,实例对象
private static SafeDataBuffer<IGoods> notSafeDataBuffer = new SafeDataBuffer();
//生产者执行的动作
static Callable<IGoods> produceAction = () ->
{
//首先生成一个随机的商品
IGoods goods = Goods.produceOne();
//将商品加上共享数据区
try {
notSafeDataBuffer.add(goods);
} catch (Exception e) {
e.printStackTrace();
}
return goods;
};
//消费者执行的动作
static Callable<IGoods> consumerAction = () ->
{
// 从PetStore获取商品
IGoods goods = null;
try {
goods = notSafeDataBuffer.fetch();
} catch (Exception e) {
e.printStackTrace();
}
return goods;
};
public static void main(String[] args) throws InterruptedException {
System.setErr(System.out);
// 同时并发执行的线程数
final int THREAD_TOTAL = 20;
//线程池,用于多线程模拟测试
ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_TOTAL);
for (int i = 0; i < 5; i++) {
//生产者线程每生产一个商品,间隔500ms
threadPool.submit(new Producer(produceAction, 500));
//消费者线程每消费一个商品,间隔1500ms
threadPool.submit(new Consumer(consumerAction, 1500));
}
}
}
标签:消费者,生产者,问题,amount,线程,new,public From: https://www.cnblogs.com/ccblblog/p/17988873虽然线程安全问题顺利解决,但是以上的解决方式使用了SafeDataBuffer的实例的对象锁作为同步锁,这样一来,所有的生产、消费动作在执行过程中都需要抢占同一个同步锁,最终的结果是所有的生产、消费动作都被串行化了。
高效率的生产者一消费者模式,生产、消费动作是肯定不能串行执行,而是需要并行执行的,而且并行化程度越高越好。