生产者消费者问题
其实这个问题在一开始阶段只存在两个问题,但随着多线程的情况下,同步的执行顺序和临界资源的安全性也必须得以保障,之前在信号量(缓冲区槽位和计数器)和互斥锁中有单独地分开去解决生产者消费者问题,现在来去真正的解决一下这个问题:
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProducerConsumerExample {
private static final int BUFFER_SIZE = 10;
private static int[] buffer = new int[BUFFER_SIZE];
private static int count = 0;
private static Lock lock = new ReentrantLock();
private static Condition notFull = lock.newCondition();
private static Condition notEmpty = lock.newCondition();
private static Semaphore mutex = new Semaphore(1);
private static Semaphore emptySlots = new Semaphore(BUFFER_SIZE);
private static Semaphore filledSlots = new Semaphore(0);
public static void main(String[] args) {
Thread producerThread = new Thread(producer);
Thread consumerThread = new Thread(consumer);
producerThread.start();
consumerThread.start();
}
private static Runnable producer = () -> {
try {
int producedValue = 1;
while (true) {
emptySlots.acquire(); // 等待缓冲区中的空槽
lock.lock();
try {
buffer[count] = producedValue;
count++;
System.out.println("Producer produced: " + producedValue);
producedValue++;
notEmpty.signal(); // 通知消费者有可用的值
} finally {
lock.unlock();
}
filledSlots.release(); // 增加已填充的槽数
Thread.sleep(1000); // 休眠一段时间,模拟生产者
}
} catch (InterruptedException e) {
e.printStackTrace();
}
};
private static Runnable consumer = () -> {
try {
while (true) {
filledSlots.acquire(); // 等待缓冲区中的空槽
lock.lock();
try {
int consumedValue = buffer[count - 1];
count--;
System.out.println("Consumer consumed: " + consumedValue);
notFull.signal(); // 通知生产者有一个空槽可用
} finally {
lock.unlock();
}
emptySlots.release(); // 增加空槽数
Thread.sleep(2000); // 模拟消费者所做的一些工作
}
} catch (InterruptedException e) {
e.printStackTrace();
}
};
}
老规矩,来说明一下每一个变量的作用及其真正含义:
BUFFER_SIZE
:这是一个常量,表示缓冲区的大小,即可以存储的产品数量。buffer
:这是一个整型数组,用于存储生产者生产的产品。count
:这是一个整数变量,表示当前缓冲区中已经存储的产品数量。lock
:这是一个互斥锁(Lock
接口的实例),用于实现对临界区的互斥访问。notFull
:这是一个条件变量(Condition
接口的实例),用于在缓冲区满时阻塞生产者线程。notEmpty
:这是一个条件变量(Condition
接口的实例),用于在缓冲区空时阻塞消费者线程。mutex
:这是一个信号量(Semaphore
类的实例),用于实现对临界区的互斥访问,相当于互斥锁的概念。/** 这里实际没有用到 ,在之前信号量的代码中通过这样实现的 **/emptySlots
:这是一个信号量(Semaphore
类的实例),表示缓冲区中可用的空槽数量。filledSlots
:这是一个信号量(Semaphore
类的实例),表示缓冲区中已经被生产者填充的槽数量。producerThread
和consumerThread
:这是用于执行生产者和消费者任务的线程对象。producer
和consumer
:这是两个Runnable
接口的实例,分别表示生产者和消费者的任务。通过Lambda表达式的方式定义了它们的具体实现。
- 用简短的语言称述上述操作就是:
- 缓冲区的大小为10,计数器count = 0,每次放入数组,意味着生产资料+1,计数器就会+1,同时信号量+1。而每次从数组拿出,意味着生产资料 -1,计数器也会-1,同时信号量 -1。
- 为了保证临界资源型问题,即多个线程去
竞争冒险
同一个资源区或资源而导致数据的覆盖问题,加入了互斥锁,这里是通过Lock实现,之前在进程中有通过wait和synchronized去实现锁的机制.即在进入缓冲区最多只有一个线程.
不是有计数器了,为什么还需要信号量?信号量的作用好像和计数器没什么区别.
因为这段代码的给出是一个简单的情况,仅仅只有一个生产者和消费者的情况,现实场景中往往通过信号量去表示其中的资源数,且单独分开表示更加规范,而且信号量具备阻塞的作用,计数器仅仅只能当作参考去评判,不能有下一步的动作和行为.
因此输出结果会是这样的:
Producer produced: 1
Consumer consumed: 1
....................
这个过程会持续不断的继续下去,因为缓冲区的大小是10,而生产者消费者一直是一进一出的状态,导致只用了一个槽位,而数值就是具体添加的资源.
-
如果想加入多个线程也可以:
Thread producerThread2 = new Thread(producer);
这样就会出现以下这种情况,在缓冲区未被加满之前,消费者的消费速度明显更不上生产者,每次生产者都是生产一个资源,但因为两个生产者的缘故,且放置的位置永远不会冲突,导致消费者其实一直在不停的清理货舱
Producer produced: 1 Producer produced: 1 Consumer consumed: 1 Producer produced: 2 Producer produced: 2 Consumer consumed: 2 Producer produced: 3 Producer produced: 3 Producer produced: 4 Producer produced: 4 Consumer consumed: 4 Producer produced: 5 Producer produced: 5 Producer produced: 6 Producer produced: 6 Consumer consumed: 6 Producer produced: 7 Producer produced: 7 Consumer consumed: 7 ---------在此处其实缓冲区就已经满了 Producer produced: 8 Consumer consumed: 8 Producer produced: 8 Consumer consumed: 8 Producer produced: 9 Consumer consumed: 9 Producer produced: 9 Consumer consumed: 9 Producer produced: 10 Consumer consumed: 10
而在此之后便是生产者消费者一进一出的缘故了,无论是哪一个生产者去生产,都不可能出现,他们连续生产的情况,这是因为缓冲区已经被用完所以导致了这种情况
总结:来聊一下互斥锁,信号量,条件变量三者的关系和不同
-
互斥锁与信号量有相似之处,当信号量的初值设置为1,且只允许其值在0和1之间变化时(在信号量时,便是通过这种方式实现).wait和signal的操作其实就是互斥锁中的lock和unlock操作类似,因此我们称这种信号量为二元信号量.他们二者之间的差别就在于:面向对象的不同,拥有锁的人被称为拥有者,而信号量更是一种同步机制,他不单独属于谁,也不绝对可能属于某一进程.因此两者在机制上有所不同.
-
信号量其实是一种抽象的封装,无论是底层的C语言还是java中的Semaphore,他们都基于计数器,他与条件变量的不同是,他们都提供了类似wait和signal的操作,而信号量是一种更高级的封装,他是由条件变量.互斥锁以及计数器共同实现的,因此我们查看Semaphore这个类时会发现有很多抽象的封装
-
条件变量:
为此,学习编程其实是一个自下向上的过程,如果能够理解整体的架构思想,在学习的路上就会方便和高效许多.
标签:信号量,消费者,Producer,生产者,问题,static,produced,Semaphore,缓冲区 From: https://www.cnblogs.com/looktheworld/p/17468927.html