首页 > 其他分享 >生产者消费者模型精讲

生产者消费者模型精讲

时间:2024-08-09 17:58:34浏览次数:17  
标签:生产者 精讲 信号量 队列 mutex pthread 线程 sem 模型

初识生产者消费者模型

前置知识:线程创建,实现线程互斥的锁,实现线程同步的条件变量

为了了解生产消费模型,我们先来引入一个现实中的例子。虽然这种操作看起来有点偷换概念,但是这无疑是一种便于对生产消费模型有一个初步印象的好方法。
已知学校中有一个小卖部,有一个或多个学生从小卖部买东西,还有一个或多个厂家向小卖部供应商品,具体关系如下图所示有了上面的例子,我们把所有厂家想象成一堆线程,即生产者线程,所有学生想象成一堆线程,即消费者线程,最后再把小卖部想象成一个缓冲区。这样就构成了生产消费模型的主体。

除此之外,我们还要满足三种关系。想象一下,由于小卖部中的一个位置上只有一个物品,为避免多个学生同时买到一个物品,需要维持学生和学生之间的互斥关系,也就是某一位置某一时间,只能有一个学生访问。同理,向小卖部生产商品的厂家,在某一时间某一位置也只能有一个访问,所以厂家和厂家之间也要维持互斥关系。又因为,如果厂家在放置一个商品的中途,一个学生来买,那么学生只会"买到物品的一半",(可以想象成,生产者写入,消费者读取,生产者刚写入了一半的数据,消费者就读取了,并认为这一半的数据就是要处理的数据,并进行了处理)。所以要维持学生和厂家之间的互斥。再想象另外一种情况,如果一个学生来买某一个商品,这个商品刚好缺货,这种情况下该学生有两种做法,第一种是隔一段时间来小卖部问一下,自己要购买的商品有没有货,第二种做法是,留一个电话给老板,等厂家送到货之后,给该同学打电话,该同学再来拿货。第二种做法较第一种做法显然更优,这就是我们要维持的生产者和消费者之间的同步关系。显然这段例子过于抽象,不过我们先对生产消费模型需要我们实现的部分进行总结,待实现后根据具体情况就能知其所以然。

生产消费模型321原则

3种关系:生产者和生产者之间互斥,消费者和消费者之间互斥,生产者和消费者之间互斥与同步。
2种角色:生产者和消费者
1个场所:一段特定结构的缓冲区
对生产消费模型的实现,就是围绕上述321原则来进行的

阻塞队列的实现

class BlockQueue{
public:
BlockQueue(){
    //对一个锁和两个条件变量进行初始化
    pthread_mutex_init(&_mutex,nullptr);
    pthread_cond_init(&_pcond,nullptr);
    pthread_cond_init(&_ccond,nullptr);
}
void push(int x){
    pthread_mutex_lock(&_mutex);//加锁
    while(_q.size()>=5)//假设队列最大容量为5个,队列已经满了
    pthread_cond_wait(&_pcond,&_mutex);//将push操作所在线程,放入_pcond条件变量的队列进行等待
    _q.push(x);//向队列中放入数据
    pthread_cond_signal(&_ccond);//提醒出_ccond条件变量队列
    pthread_mutex_unlock(&_mutex);//归还锁
}

void pop(int& x){
    pthread_mutex_lock(&_mutex);//加锁
    while(_q.size()==0)//如果队列为空
    pthread_cond_wait(&_ccond,&_mutex);//将pop操作所在线程,放入_ccond条件变量的队列
    x=_q.front();//从队列取数据
    _q.pop();
    pthread_cond_signal(&_pcond);//提醒出_pcond条件变量队列
    pthread_mutex_unlock(&_mutex);//归还锁
}

~BlockQueue()
{
    pthread_mutex_destroy(&_mutex);
    pthread_cond_destroy(&_pcond);
    pthread_cond_destroy(&_ccond);   
}
private:

    queue<int> _q;
    size_t _capacity=5;
    pthread_mutex_t _mutex;
    pthread_cond_t _pcond;
    pthread_cond_t _ccond;
};

为了实现消费生产模型,我们先实现一个阻塞队列,阻塞队列的成员包括,一个queue用来存取数据,_capacity决定队列能存取的数据最多有几个。外加一个锁,和一个消费整条件变量,一个生产者条件变量。实现两个外部接口push和pop。
push操作整个过程都要加锁,加锁后判断,如果队列已经满了,就进入生产者条件变量进行等待。待pop操作消费数据后,唤醒生产者条件变量。后向queue中存入数据。因为队列中肯定会有了数据,所以再唤醒消费者条件变量。
pop操作整个过程也要加锁,加锁后判断,如果队列为空,则进入消费者条件变量进行等待。待push操作生产数据后,唤醒消费者条件变量,后从queue中取数据。此时队列中必有空余位置,则唤醒生产者条件变量。

生产者消费者模型的初步实现

//生产者线程
void* productor(void* b) {
	BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(b);//装换类型
	while (true) {
		sleep(1);
		bq->push(1);
		cout << "生产者放入成功" << endl;
	}
}
//消费者线程
void* consumer(void* b) {
	BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(b);
	int t;
	while (true) {
		bq->pop(&t);
		cout << "消费者消费成功" << endl;
	}
}
int main() {
	BlockQueue<int> bq;
	pthread_t p1, p2;
	pthread_create(&p1, nullptr, productor,&bq);//创建生产者线程
	pthread_create(&p1, nullptr, consumer, &bq);//创建消费者线程
	return 0;
}

在阻塞队列的基础上,我们分别实现两个线程用来表示生产者和消费者,生产者线程每隔一秒,向阻塞队列中push数据,消费线程从阻塞队列中pop数据。、

信号量的引入

发现之前的不足

此时我们完成了对生产消费模型的初步实现,可是,我们对push和pop操作采用的是一个锁,执行这两个操作之前都对锁进行了申请,那就导致了,push和pop操作同一时间只能有一个在执行。也就是我们把整个缓冲区看成了一整份资源对他进行了整体访问,但是实际情况是,在缓冲区不空或者不满的时候push和pop操作可以同时访问缓冲区的不同位置。这样利于线程的并发执行,进而提高运行速度。

是什么?怎么做?

a.信号量的本质是一个计数器,用来表示临界资源中资源数量多少。所以,拥有了信号量就代表拥有了临界资源的一部分。申请信号量的本质其实就是资源的预定机制。
举一个简单的例子:比如现在有一个信号量计数器为sem=10,表示资源一共有10份,申请资源,就表示sem--;释放资源,就表示sem++。申请和释放资源的操作分别叫做P操作和V操作。他们必须保证原子性。

b.

#include<semaphore.h>
//初始化信号量
int sem_init(sem_t* sem, int pshared, unsigned int value);
//pshared:0表示线程间共享,非零表示进程间共享
// value:信号量初始值
//销毁信号量
int sem_destory(sem_t* sem);
//等待信号量
int sem_wait(sem_t* sem);//P操作
//发布信号量
int sem_post(sem_t* sem);//V操作

 使用之前进行初始化,初始化完成后即可执行PV操作,最后销毁信号量

生产者消费者模型的实现

#pragma once
#include<vector>
#include<semaphore.h>
#include<assert.h>
using namespace std;

//这是实现了一个循环的队列,将队列看成首尾相接
template<class T>
class RingQueue
{
public:
RingQueue(int cap=5):queue_(cap),cap_(cap)
{
    sem_init(&space_,0,cap_);//对表示空间资源的信号量进行初始化,最初为cap_
    sem_init(&data_,0,0);//对表示数据资源的信号量进行初始化,最初为0
    pthread_mutex_init(&con_mutex_,nullptr);//对消费者之间的锁进行初始化
    pthread_mutex_init(&pro_mutex_,nullptr);//对生产者之间的锁进行初始化
    con_step_=0;
    pro_step_=0;
}
//申请信号量的操作
void P(sem_t*sem)
{
    int n=sem_wait(sem);
    assert(n==0);
    (void)n;
}
//释放信号量的操作
void V(sem_t*sem)
{
    int n=sem_post(sem);
    assert(n==0);
    (void)n;
}
//向队列中放数据
void push(T x)
{
    P(&space_);//申请空间信号量
    pthread_mutex_lock(&pro_mutex_);//加消费者的锁
    queue_[pro_step_++]=x;//放置数据
    pro_step_%=cap_;//确保循环
    pthread_mutex_unlock(&pro_mutex_);//释放锁
    V(&data_);//释放数据信号量
}
//从队列取数据
void pop(T*out)
{
 P(&data_);//申请数据信号量
pthread_mutex_lock(&con_mutex_);//加生产者的锁
*out=queue_[con_step_++];//取数据
con_step_%=cap_;//确保循环
pthread_mutex_unlock(&con_mutex_);//释放锁
 V(&space_);//释放空间信号量
}
~RingQueue()
{
sem_destroy(&space_);
sem_destroy(&data_);
pthread_mutex_destroy(&con_mutex_);
pthread_mutex_destroy(&pro_mutex_);
}
private:
vector<T> queue_;
int cap_;//队列的容量
sem_t space_;//表示空间资源的信号量,刚开始是cap_
sem_t data_;//表示已有数据资源的信号量,刚开始是0
int con_step_;//消费者要获取数据的位置
int pro_step_;//生产者要放置数据的位置
pthread_mutex_t con_mutex_;
pthread_mutex_t pro_mutex_;
};

由原来的队列,改成了现在的循环队列的同时。将原来的一个锁两个条件变量,改成了现在的两个锁两个信号两,两个锁分别用来确保消费者之间和生产者之间的原子性;两个信号量,一个是表示空间资源多少的空间信号量,最开始没有数据在队列中时,大小为_cap,每要放入一个数据之前,都要先申请此空间量,对他进行P操作后,再放置数据,直到队列中数据放满了,空间信号量的大小也就变成了0,再进行P操作的时候就会申请失败,直到pop操作取数据后,队列中用了空间,才对空间信号量进行释放。另一个信号量表示的是数据资源的多少,最开始因为没有数据所以为0,其刚好跟空间信号量是对称的关系。

//生产者线程
void* productor(void* b) {
	RingQueue<int>* bq = static_cast<RingQueue<int>*>(b);
	while (true) {
		sleep(1);
		bq->push(1);
		cout << "生产者放入成功" << endl;
	}
}
//消费者线程
void* consumer(void* b) {
	RingQueue<int>* bq = static_cast<RingQueue<int>*>(b);
	int t;
	while (true) {
		bq->pop(&t);
		cout << "消费者消费成功" << endl;
	}
}
int main() {
	RingQueue<int> bq;
	pthread_t p1, p2;
	pthread_create(&p1, nullptr, productor,&bq);
	pthread_create(&p1, nullptr, consumer, &bq);
	return 0;
}

再把之前实现的,两个线程分别进行push和pop操作的代码进行修改,将原来的BlockQueue修改为RingQueue。即可实现简单的一个生产者一个消费者的生产消费模型。

挖掘生产者消费者模型的特点

这么麻烦来实现生产者消费者模型,到底有什么优点呢?
①生产线程和消费线程进行了解耦
没有缓冲区存在之前,生产者产生数据,保存到一个局部变量中,消费者再这个局部变量进行处理。这线程就势必要求,生产者和消费者的执行动作是线性的。而有了缓冲区存在之后,生产者产生完一个数据后,不必等待消费者处理,就可直接去产生新的数据,所以实现了二者的解耦。
②支持生产者和消费者一段时间的忙闲不均
也是由于缓冲区存在,比如某段时间,消费者线程由于在处理其他任务,没有办法处理数据,生产者依然可以产生数据放到缓冲区,直到缓冲区没有空间之前,都不受消费者线程的影响。
③提高了效率
在实际情况中,不只需要生产和消费,在生产之前还需要进行一定的整合数据,在消费之后还需要对数据进行复杂的处理,生产消费模型的存在,可以让生产前和消费后让线程并行,大大提高了执行效率。

标签:生产者,精讲,信号量,队列,mutex,pthread,线程,sem,模型
From: https://blog.csdn.net/qq_55008871/article/details/140845557

相关文章

  • 璞公英与恩施三中试点签约,AI大模型批阅+精准教学赋能教育升级
    在国家大力推动教育数字化战略行动的浪潮中,通过科技创新赋能教育,促进教育公平与质量双提升的重要签约仪式,于恩施市第三高级中学隆重举行。一直以来,恩施市第三高级中学积极寻求创新与突破,引入璞公英的先进教育技术,旨在提升教学质量,为学生创造更优质的学习环境。此次签约仪式标志着......
  • 隐马尔可夫模型与隐半马尔可夫模型:状态持续时间的影响与应用场景对比
    目录前言隐马尔可夫模型(HMM)与隐半马尔可夫模型(HSMM)的区别隐马尔可夫模型(HMM)隐半马尔可夫模型(HSMM)HSMM的特点应用场景实现复杂度隐马尔可夫模型(HMM)与隐半马尔可夫模型(HSMM)对比表结论前言   在序列数据分析领域,隐马尔可夫模型(Hidd......
  • 【数学建模导论】Task05 多模数据与智能模型
    前言在多模数据中,涉及以下领域数字图像处理与计算机视觉计算语言学与自然语言处理数字信号处理与智能感知多模态学习——融合不同的数据类型(如图像和文本):是机器学习的一个重要分支。是处理复杂数据分析问题的关键❤️❤️❤️❤️❤️系列文章导航【数学建模导论】Ta......
  • 数学建模——线性规划模型
    前言:当学习完线性规划模型,我感觉到了数学建模的“细腻”之处,也可以从中感觉到他“细腻”的美感,为此想记录一下我学习数学建模的一些笔记跟心得。线性规划模型一般是求解最大值最小值问题,如果目标函数f(x)和约束条件均是决策变量的线性表达式,(即没有平方项和乘积项),那么此时的数......
  • 扩散模型(Diffusion Model)——生成模型
    一、扩散模型介绍    扩散模型(DiffusionModel)是一种生成模型,最近在图像生成、视频生成、语音合成等领域取得了显著的进展。与传统的生成对抗网络(GAN)和变分自编码器(VAE)不同,扩散模型通过逐步将噪声添加到数据并反转这一过程来生成新样本二、扩散模型的基本原理扩散模......
  • 预训练语言模型公平性-公平性度量、去偏方法
    一、内在偏见与外在偏见1、内在偏见:训练前数据集中存在的刻板印象;2、外在偏见:用来衡量偏差如何在下游任务中传播。通常包括微调,然后评估其关于性别和种族等敏感属性的表现;3、许多NLP应用程序对现有的语言模型进行了微调,这些模型将外在偏见和内在偏见交织在一起。......
  • 多进程系列:不同的模型处理不同的数据
    多进程系列:不同的模型处理不同的数据代码示例importmultiprocessingimporttime#假设以下是五个分类模型函数defclassify_model_1(data):#模拟分类操作time.sleep(1)print("classify_model_1")returnf"模型1分类结果:{data}"defcl......
  • 什么是大模型?一文速通了解什么才是真正的大模型
    在这个充满变革的时代里,人工智能领域的几个关键词——ChatGPT、OpenAI、大模型、提示词工程以及“幻觉”频繁出现在我们的视野中,它们如同一股不可忽视的力量,冲击并重塑着我们的认知。这些术语不仅代表了技术的前沿动态,也引发了社会各界的广泛讨论与关注。什么是大模型当......
  • 炸裂!人人需要一份AI大模型学习路线!
    23年AI大模型技术狂飙一年后,24年AI大模型的应用已经在爆发,因此掌握好AI大模型的应用开发技术就变成如此重要,那么如何才能更好地掌握呢?一份AI大模型详细的学习路线就变得非常重要!由于AI大模型应用技术比较新,业界也没什么参照标准,打造AI大模型技术的学习路线并非......