初识生产者消费者模型
前置知识:线程创建,实现线程互斥的锁,实现线程同步的条件变量
为了了解生产消费模型,我们先来引入一个现实中的例子。虽然这种操作看起来有点偷换概念,但是这无疑是一种便于对生产消费模型有一个初步印象的好方法。
已知学校中有一个小卖部,有一个或多个学生从小卖部买东西,还有一个或多个厂家向小卖部供应商品,具体关系如下图所示有了上面的例子,我们把所有厂家想象成一堆线程,即生产者线程,所有学生想象成一堆线程,即消费者线程,最后再把小卖部想象成一个缓冲区。这样就构成了生产消费模型的主体。
除此之外,我们还要满足三种关系。想象一下,由于小卖部中的一个位置上只有一个物品,为避免多个学生同时买到一个物品,需要维持学生和学生之间的互斥关系,也就是某一位置某一时间,只能有一个学生访问。同理,向小卖部生产商品的厂家,在某一时间某一位置也只能有一个访问,所以厂家和厂家之间也要维持互斥关系。又因为,如果厂家在放置一个商品的中途,一个学生来买,那么学生只会"买到物品的一半",(可以想象成,生产者写入,消费者读取,生产者刚写入了一半的数据,消费者就读取了,并认为这一半的数据就是要处理的数据,并进行了处理)。所以要维持学生和厂家之间的互斥。再想象另外一种情况,如果一个学生来买某一个商品,这个商品刚好缺货,这种情况下该学生有两种做法,第一种是隔一段时间来小卖部问一下,自己要购买的商品有没有货,第二种做法是,留一个电话给老板,等厂家送到货之后,给该同学打电话,该同学再来拿货。第二种做法较第一种做法显然更优,这就是我们要维持的生产者和消费者之间的同步关系。显然这段例子过于抽象,不过我们先对生产消费模型需要我们实现的部分进行总结,待实现后根据具体情况就能知其所以然。
生产消费模型321原则
3种关系:生产者和生产者之间互斥,消费者和消费者之间互斥,生产者和消费者之间互斥与同步。
2种角色:生产者和消费者
1个场所:一段特定结构的缓冲区
对生产消费模型的实现,就是围绕上述321原则来进行的
阻塞队列的实现
class BlockQueue{
public:
BlockQueue(){
//对一个锁和两个条件变量进行初始化
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_pcond,nullptr);
pthread_cond_init(&_ccond,nullptr);
}
void push(int x){
pthread_mutex_lock(&_mutex);//加锁
while(_q.size()>=5)//假设队列最大容量为5个,队列已经满了
pthread_cond_wait(&_pcond,&_mutex);//将push操作所在线程,放入_pcond条件变量的队列进行等待
_q.push(x);//向队列中放入数据
pthread_cond_signal(&_ccond);//提醒出_ccond条件变量队列
pthread_mutex_unlock(&_mutex);//归还锁
}
void pop(int& x){
pthread_mutex_lock(&_mutex);//加锁
while(_q.size()==0)//如果队列为空
pthread_cond_wait(&_ccond,&_mutex);//将pop操作所在线程,放入_ccond条件变量的队列
x=_q.front();//从队列取数据
_q.pop();
pthread_cond_signal(&_pcond);//提醒出_pcond条件变量队列
pthread_mutex_unlock(&_mutex);//归还锁
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_pcond);
pthread_cond_destroy(&_ccond);
}
private:
queue<int> _q;
size_t _capacity=5;
pthread_mutex_t _mutex;
pthread_cond_t _pcond;
pthread_cond_t _ccond;
};
为了实现消费生产模型,我们先实现一个阻塞队列,阻塞队列的成员包括,一个queue用来存取数据,_capacity决定队列能存取的数据最多有几个。外加一个锁,和一个消费整条件变量,一个生产者条件变量。实现两个外部接口push和pop。
push操作整个过程都要加锁,加锁后判断,如果队列已经满了,就进入生产者条件变量进行等待。待pop操作消费数据后,唤醒生产者条件变量。后向queue中存入数据。因为队列中肯定会有了数据,所以再唤醒消费者条件变量。
pop操作整个过程也要加锁,加锁后判断,如果队列为空,则进入消费者条件变量进行等待。待push操作生产数据后,唤醒消费者条件变量,后从queue中取数据。此时队列中必有空余位置,则唤醒生产者条件变量。
生产者消费者模型的初步实现
//生产者线程
void* productor(void* b) {
BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(b);//装换类型
while (true) {
sleep(1);
bq->push(1);
cout << "生产者放入成功" << endl;
}
}
//消费者线程
void* consumer(void* b) {
BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(b);
int t;
while (true) {
bq->pop(&t);
cout << "消费者消费成功" << endl;
}
}
int main() {
BlockQueue<int> bq;
pthread_t p1, p2;
pthread_create(&p1, nullptr, productor,&bq);//创建生产者线程
pthread_create(&p1, nullptr, consumer, &bq);//创建消费者线程
return 0;
}
在阻塞队列的基础上,我们分别实现两个线程用来表示生产者和消费者,生产者线程每隔一秒,向阻塞队列中push数据,消费线程从阻塞队列中pop数据。、
信号量的引入
发现之前的不足
此时我们完成了对生产消费模型的初步实现,可是,我们对push和pop操作采用的是一个锁,执行这两个操作之前都对锁进行了申请,那就导致了,push和pop操作同一时间只能有一个在执行。也就是我们把整个缓冲区看成了一整份资源对他进行了整体访问,但是实际情况是,在缓冲区不空或者不满的时候push和pop操作可以同时访问缓冲区的不同位置。这样利于线程的并发执行,进而提高运行速度。
是什么?怎么做?
a.信号量的本质是一个计数器,用来表示临界资源中资源数量多少。所以,拥有了信号量就代表拥有了临界资源的一部分。申请信号量的本质其实就是资源的预定机制。
举一个简单的例子:比如现在有一个信号量计数器为sem=10,表示资源一共有10份,申请资源,就表示sem--;释放资源,就表示sem++。申请和释放资源的操作分别叫做P操作和V操作。他们必须保证原子性。
b.
#include<semaphore.h>
//初始化信号量
int sem_init(sem_t* sem, int pshared, unsigned int value);
//pshared:0表示线程间共享,非零表示进程间共享
// value:信号量初始值
//销毁信号量
int sem_destory(sem_t* sem);
//等待信号量
int sem_wait(sem_t* sem);//P操作
//发布信号量
int sem_post(sem_t* sem);//V操作
使用之前进行初始化,初始化完成后即可执行PV操作,最后销毁信号量
生产者消费者模型的实现
#pragma once
#include<vector>
#include<semaphore.h>
#include<assert.h>
using namespace std;
//这是实现了一个循环的队列,将队列看成首尾相接
template<class T>
class RingQueue
{
public:
RingQueue(int cap=5):queue_(cap),cap_(cap)
{
sem_init(&space_,0,cap_);//对表示空间资源的信号量进行初始化,最初为cap_
sem_init(&data_,0,0);//对表示数据资源的信号量进行初始化,最初为0
pthread_mutex_init(&con_mutex_,nullptr);//对消费者之间的锁进行初始化
pthread_mutex_init(&pro_mutex_,nullptr);//对生产者之间的锁进行初始化
con_step_=0;
pro_step_=0;
}
//申请信号量的操作
void P(sem_t*sem)
{
int n=sem_wait(sem);
assert(n==0);
(void)n;
}
//释放信号量的操作
void V(sem_t*sem)
{
int n=sem_post(sem);
assert(n==0);
(void)n;
}
//向队列中放数据
void push(T x)
{
P(&space_);//申请空间信号量
pthread_mutex_lock(&pro_mutex_);//加消费者的锁
queue_[pro_step_++]=x;//放置数据
pro_step_%=cap_;//确保循环
pthread_mutex_unlock(&pro_mutex_);//释放锁
V(&data_);//释放数据信号量
}
//从队列取数据
void pop(T*out)
{
P(&data_);//申请数据信号量
pthread_mutex_lock(&con_mutex_);//加生产者的锁
*out=queue_[con_step_++];//取数据
con_step_%=cap_;//确保循环
pthread_mutex_unlock(&con_mutex_);//释放锁
V(&space_);//释放空间信号量
}
~RingQueue()
{
sem_destroy(&space_);
sem_destroy(&data_);
pthread_mutex_destroy(&con_mutex_);
pthread_mutex_destroy(&pro_mutex_);
}
private:
vector<T> queue_;
int cap_;//队列的容量
sem_t space_;//表示空间资源的信号量,刚开始是cap_
sem_t data_;//表示已有数据资源的信号量,刚开始是0
int con_step_;//消费者要获取数据的位置
int pro_step_;//生产者要放置数据的位置
pthread_mutex_t con_mutex_;
pthread_mutex_t pro_mutex_;
};
由原来的队列,改成了现在的循环队列的同时。将原来的一个锁两个条件变量,改成了现在的两个锁两个信号两,两个锁分别用来确保消费者之间和生产者之间的原子性;两个信号量,一个是表示空间资源多少的空间信号量,最开始没有数据在队列中时,大小为_cap,每要放入一个数据之前,都要先申请此空间量,对他进行P操作后,再放置数据,直到队列中数据放满了,空间信号量的大小也就变成了0,再进行P操作的时候就会申请失败,直到pop操作取数据后,队列中用了空间,才对空间信号量进行释放。另一个信号量表示的是数据资源的多少,最开始因为没有数据所以为0,其刚好跟空间信号量是对称的关系。
//生产者线程
void* productor(void* b) {
RingQueue<int>* bq = static_cast<RingQueue<int>*>(b);
while (true) {
sleep(1);
bq->push(1);
cout << "生产者放入成功" << endl;
}
}
//消费者线程
void* consumer(void* b) {
RingQueue<int>* bq = static_cast<RingQueue<int>*>(b);
int t;
while (true) {
bq->pop(&t);
cout << "消费者消费成功" << endl;
}
}
int main() {
RingQueue<int> bq;
pthread_t p1, p2;
pthread_create(&p1, nullptr, productor,&bq);
pthread_create(&p1, nullptr, consumer, &bq);
return 0;
}
再把之前实现的,两个线程分别进行push和pop操作的代码进行修改,将原来的BlockQueue修改为RingQueue。即可实现简单的一个生产者一个消费者的生产消费模型。
挖掘生产者消费者模型的特点
这么麻烦来实现生产者消费者模型,到底有什么优点呢?
①生产线程和消费线程进行了解耦
没有缓冲区存在之前,生产者产生数据,保存到一个局部变量中,消费者再这个局部变量进行处理。这线程就势必要求,生产者和消费者的执行动作是线性的。而有了缓冲区存在之后,生产者产生完一个数据后,不必等待消费者处理,就可直接去产生新的数据,所以实现了二者的解耦。
②支持生产者和消费者一段时间的忙闲不均
也是由于缓冲区存在,比如某段时间,消费者线程由于在处理其他任务,没有办法处理数据,生产者依然可以产生数据放到缓冲区,直到缓冲区没有空间之前,都不受消费者线程的影响。
③提高了效率
在实际情况中,不只需要生产和消费,在生产之前还需要进行一定的整合数据,在消费之后还需要对数据进行复杂的处理,生产消费模型的存在,可以让生产前和消费后让线程并行,大大提高了执行效率。