首页 > 其他分享 >生产消费模型

生产消费模型

时间:2024-06-07 13:30:22浏览次数:18  
标签:消费 消费者 生产者 模型 cond bq pthread 生产 线程

一、生产消费者模型

1.1、例子引入

        我们在日常生活中,一般都是通过超市,集市等场所,来购买日常用品,而不会直接向生产商进行购买。超市则会统一向各个生产商批发商品,然后售卖给人们。

        如果我们直接去供货商那里买东西,那我们只会要很少的商品,供货商只能给你单独生产,对于供货商来说生产的成本就太大了,所以有了交易场所这个中间媒介的存在,其目的就是为了集中需求,分发产品。

        消费者与生产者之间通过了超市进行交易。当生产者不需要的时候,生产商还可以继续生产,提供给超市,当供货商不再生产的时候消费者还能从超市买得到!这样生产和消费就能进行解耦了。而我们把临时的存储商品的场所称为缓冲区。

1.2、生产者消费者模型 

        生产者消费者模型就是通过一个容器来解决生产者和消费者的强耦合问题。

        生产者和消费者彼此之间不直接交易,而通过一个容器来进行交易(超市等交易场所),所以生产者生产完数据之后不用等待消费者处理,直接将生产的数据放到这个容器当中,消费者也不用找生产者直接要数据,而是直接从这个容器里取数据,这个容器就相当于一个缓冲区。这个容器实际上就是用来给生产者和消费者解耦的。 

        而从编码的角度,每一个消费者我们可以看作一个消费者线程,每一个生产者我们也可以看作一个生产者线程。所有的消费者和生产者都要访问同一个交易场所,于是交易场所就是一个临界资源!

在编码的角度下,模型特点如下:

1、三种关系: 生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(互斥关系和同步关系)。
2、两种角色: 生产者和消费者。(通常由线程承担)
3、一个交易场所: 通常指的是内存中的一段缓冲区。

编码思路:我们主要的目标其实就是维护上面的三个特点: 

        1、生产者和生产者之间,消费者和消费者之间,生产者和消费者之间,这三个关系它们都是存在互斥的:因为交易场所可能会被多个执行流同时访问,因此我们需要将该临界资源用互斥锁保护起来。其中,所有的生产者和消费者都会竞争式的申请锁。

        2、生产者和消费者之间还有同步关系:如果让生产者一直生产,那么当生产者生产的数据将容器塞满后,生产者就会停止生产,而如果让消费者一直消费,那么当容器当中的数据被消费完后,消费者就会停止消费。

        消费者和生产者会竞争同一把锁。而必定会有一个线程的竞争力特别强,比如是某一个生产者线程,它在生产满资源后,它仍然能够去申请锁,访问临界资源,然后释放锁,这样其他的生产者线程申请不到锁,无法生产,且消费者线程也无法消费了,这样就会引起饥饿问题,是非常低效的。所以,我们应该让生产者和消费者访问该容器时具有一定的顺序性,这就要使用我们之前所讲到的条件变量了,这就是让线程之间同步。

二、基于BlockQueue的生产者消费者模型

        阻塞队列(BlockQueue):常用于作为生产者消费者模型中交易场所的数据结构。

        队列为空时,消费者无法取数据,直到阻塞队列被放入数据。队列为满时,生产者不能生产

数据,直到有数据被取出。

2.1、单生产单消费

BlockQueue的实现:BlockQueue.hpp

#pragma once
 
#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include<time.h>
 
using namespace std;
#define G_NUM 5
 
template <class T>
class BlockQueue
{
public:
    bool full()
    {
        return bq.size() == capacity;
    }
 
    bool empty()
    {
        return bq.size() == 0;
    }
 
    BlockQueue(const int &capacity_ = G_NUM)
        : capacity(capacity_)
    {
        pthread_mutex_init(&mtx_, nullptr);
        pthread_cond_init(&c_cond_, nullptr);
        pthread_cond_init(&p_cond_, nullptr);
    }
 
    void push(const T &in)
    {
        pthread_mutex_lock(&mtx_);
        while (full())
            pthread_cond_wait(&p_cond_, &mtx_);
 
        bq.push(in);
        pthread_cond_signal(&c_cond_);
 
        pthread_mutex_unlock(&mtx_);
    }
 
    void pop(T *out)
    {
        pthread_mutex_lock(&mtx_);
        while (empty())
            pthread_cond_wait(&c_cond_, &mtx_);
 
        *out = bq.front();
        bq.pop();
        pthread_cond_signal(&p_cond_);
 
        pthread_mutex_unlock(&mtx_);
    }
 
    ~BlockQueue()
    {
        pthread_mutex_destroy(&mtx_);
        pthread_cond_destroy(&c_cond_);
        pthread_cond_destroy(&p_cond_);
    }
 
private:
    queue<T> bq;            // 阻塞队列
    int capacity;           // 容量
    pthread_mutex_t mtx_;   // 锁
    pthread_cond_t c_cond_; // 消费者条件变量(消费者等待的队列)是否为空
    pthread_cond_t p_cond_; // 生产者条件变量(生产者等待的队列)是否为满
};

        在主函数中我们创建一个生产者线程和一个消费者线程,让生产者线程不断生产数据,让消费者线程不断消费数据。ConProd.cc:

#include "BlockQueue.hpp"
 
void *consmer(void *arg)
{
    BlockQueue<int> *bq = (BlockQueue<int> *)arg;
    while (true)
    {
        int a = 0;
        bq->pop(&a);
        cout << "消费者:" << a << endl;
        sleep(1);
    }
    return nullptr;
}
 
void *productor(void *arg)
{
    BlockQueue<int> *bq = (BlockQueue<int> *)arg;
    int a = 0;
    while (true)
    {
        bq->push(a);
        cout << "生产者:" << a << endl;
        a++;
    }
    return nullptr;
}
 
int main()
{
    BlockQueue<int> *bq = new BlockQueue<int>();
    pthread_t c, p;
    pthread_create(&c, nullptr, consmer, (void *)bq);
    pthread_create(&p, nullptr, productor, (void *)bq);
 
    pthread_join(c, nullptr);
    pthread_join(p, nullptr);
 
    return 0;
}

运行结果:

说明:

        1、我们需要用到两个条件变量,一个条件变量用来描述队列为空,另一个条件变量用来描述队列已满。

        当阻塞队列满了的时候,生产者线程就应该在p_cond_条件变量下进行等待,直到阻塞队列中有空间时再将其唤醒,继续生产。

        当阻塞队列为空的时候,消费者线程就应该在c_cond_条件变量下进行等待,直到阻塞队列中有新的数据时再将其唤醒,继续消费。

        2、当生产者生产完一个数据后,意味着阻塞队列当中至少有一个数据,而此时可能有消费者线程正在c_cond_条件变量下进行等待,因此当生产者生产完数据后需要唤醒在c_cond_条件变量下等待的消费者线程。
同样的,当消费者消费完一个数据后,意味着阻塞队列当中至少有一个空间,而此时可能有生产者线程正在p_cond_条件变量下进行等待,因此当消费者消费完数据后需要唤醒在p_cond_条件变量下等待的生产者线程。

        3、判断是否满足生产消费条件时不能用if,而应该用while:因为pthread_cond_wait函数有可能调用失败,意味着线程没有被成功阻塞,调用失败后该执行流就会继续往后执行。而此时如果队列为满(队列大小是一定的),在生产者函数中,你继续往后执行的话,就会越界访问。为了避免出现上述情况,我们就要让线程被唤醒后再次进行判断,确认是否真的满足生产消费条件。

2.2、多生产多消费并随机派发任务

        由于我们将BlockQueue当中存储的数据进行了模板化,此时就可以让BlockQueue当中存储其他类型的数据。比如下面我们让其存储自定义类型的数据。

        task.hpp:我们自行设定一个加法计算任务,该任务由多个生产者派发,交给多个消费者去执行。

#pragma once
 
#include <iostream>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include <time.h>
 
class task
{
public:
    task()
    {
    }
    task(int x, int y)
        : x_(x), y_(y)
    {
    }
 
    int operator()()
    {
        return x_ + y_;
    }
 
private:
    int x_;
    int y_;
};

ConProd.cc:

#include "BlockQueue.hpp"
#include "task.hpp"
 
void *consmer(void *arg)
{
    BlockQueue<task> *bq = (BlockQueue<task> *)arg;
    while (true)
    {
        task k;
        bq->pop(&k);
        cout << "消费者: x+y=" << k() << endl;
        sleep(1);
    }
    return nullptr;
}
 
void *productor(void *arg)
{
    BlockQueue<task> *bq = (BlockQueue<task> *)arg;
    while (true)
    {
        int x = rand() % 100 + 1;
        int y = rand() % 100 + 1;
        task k(x, y);
        bq->push(k);
        cout << "生产者: x+y=?"
             << "  "
             << "x:" << x << "  "
             << "y:" << y << endl;
    }
    return nullptr;
}
 
int main()
{
    srand((uint64_t)time(nullptr) ^ getpid() ^ 12323);
    BlockQueue<task> *bq = new BlockQueue<task>();
    pthread_t c[2], p[3];
    for (int i = 0; i < 2; i++)
        pthread_create(c + i, nullptr, consmer, (void *)bq);
    for (int i = 0; i < 2; i++)
        pthread_create(p + i, nullptr, productor, (void *)bq);
 
    for (int i = 0; i < 2; i++)
        pthread_join(c[i], nullptr);
    for (int i = 0; i < 2; i++)
        pthread_join(p[i], nullptr);
 
    return 0;
}

三、总结

        生产者消费者模型是高效的。其高效体现在一个线程拿出来任务可能正在做处理,它在做处理的同时,其他线程可以继续从队列中拿任务,继续处理,所以其高效是我们可以让多个线程并发的同时处理多个任务! 生产者线程也可以不断地并发地派发任务。

标签:消费,消费者,生产者,模型,cond,bq,pthread,生产,线程
From: https://blog.csdn.net/m0_69323023/article/details/139524324

相关文章

  • BERT+P-Tuning文本分类模型
    基于BERT+P-Tuning方式文本分类模型搭建模型搭建本项目中完成BERT+P-Tuning模型搭建、训练及应用的步骤如下(注意:因为本项目中使用的是BERT预训练模型,所以直接加载即可,无需重复搭建模型架构):一、实现模型工具类函数二、实现模型训练函数,验证函数三、实现模型预测函......
  • CNN依旧能战:nnU-Net团队新研究揭示医学图像分割的验证误区,设定先进的验证标准与基线模
    这篇论文研究了在3D医学图像分割领近年引入了许多新的架构和方法,但大多数方法并没有超过2018年的原始nnU-Net基准。作者指出,许多关于新方法的优越性的声称在进行严格验证后并不成立,这揭示了当前在方法验证上存在的不严谨性。揭示验证短板:深入探讨了当前医学图像分割研究中存在的......
  • 大模型开发应用实战:真实项目实战对标各类大厂大模型算法岗技术
    大模型开发应用实战营:真实项目实战对标各类大厂大模型算法岗技术一、引言在人工智能领域,大模型已经成为推动技术进步和应用创新的重要力量。随着技术的不断发展,各大厂商纷纷投入大量资源研发大模型,并尝试将其应用于各种实际场景中。为了培养具备大模型开发与应用能力的高级技术......
  • Navicat生成ER关系图 逆向数据库到模型
    选中表格右击  点击右下角的按钮切换到ER视图:  那些表与表之间相连接的就是外键,这个应该没问题。然后,我想到我要给领导汇报一下我的工作情况呀,直接截图截不完整不说,还很模糊。所以我想,会不会有什么办法可以将得到的关系图导出成pdf或者图片格式的文件呢?找了一会儿,发现......
  • 大模型微调实战:精通、指令微调、开源大模型微调、对齐与垂直领域应用
    大模型微调实战:精通、指令微调、开源大模型微调、对齐与垂直领域应用一、引言随着人工智能技术的迅猛发展,大模型在多个领域展现出强大的能力。然而,如何有效地对大模型进行微调以适应特定任务和场景,成为了研究者和开发者关注的焦点。本文将深入探讨大模型微调实战营中的关键内容,......
  • AI大模型微调训练营,全面解析微调技术理论,掌握大模型微调核心技能
    AI大模型微调训练营:深度解析微调技术,掌握核心技能一、引言随着人工智能技术的飞速发展,大型预训练模型(如GPT、BERT、Transformer等)已成为自然语言处理、图像识别等领域的核心工具。然而,这些大模型在直接应用于特定任务时,往往无法直接达到理想的性能。因此,微调(Fine-tuning)技术应运......
  • 知乎(1-5期)-AI大模型全栈工程师培养计划,做ChatGPT浪潮中顶尖的超级个体
    知乎(1-5期)-AI大模型全栈工程师培养计划,做ChatGPT浪潮中顶尖的超级个体一.前言:1.AI形式目前人工智能和大模型一定是前景非常广阔的赛道,现在陆续出现的模型训练岗,模型技术岗,像有些大厂已经开始不再招聘JAVA开发,所以关于大模型的岗位一定是雨后春笋的喷发2.程序员自身的发展......
  • 探索Native Plugins:开启大模型的技能之门
    前言上一章节我们了解了一下SemanticKernnel中Plugins插件的概念以及学习了的SemanticKernel模板插件的创建,本章节我们来学习NativePlugins原生函数插件使用。通过函数定义插件在之前的章节中我们介绍过在在SemanticKernel中应用FunctionCalling,在文中讲解了Func......
  • 基于 Go 语言实现的 Ollama 大语言模型框架
    大语言模型在现代人工智能领域中扮演着重要角色。Ollama作为一个轻量级且可扩展的框架,帮助开发者在本地机器上构建和运行这些模型。Ollama简介Ollama是一个简单、可扩展的框架,旨在帮助开发者构建和运行大语言模型。它提供了一个简洁的API,用于创建、运行和管理模型。此外,Olla......
  • 从0到1训练私有大模型技能与应用实现 ,企业急迫需求,抢占市场先机
    从0到1训练私有大模型:技能构建与应用实现,助力企业抢占市场先机在当今数字化浪潮中,人工智能(AI)技术已成为企业实现创新和突破的关键。特别是在大模型技术领域,其强大的数据处理能力和泛化能力为企业提供了前所未有的机遇。为了满足企业急迫的需求,抢占市场先机,本文将从0到1探讨如何训......