首页 > 其他分享 >【生产-消费模型】生产者 - 消费者模型原理及实现

【生产-消费模型】生产者 - 消费者模型原理及实现

时间:2023-01-12 10:46:45浏览次数:58  
标签:消费者 生产者 模型 int 线程 缓冲区 原理 include

一、什么是生产者-消费者模型

1.简单理解生产者-消费者模型

假设有两个进程(或线程)A、B和一个固定大小的缓冲区,A进程生产数据放入缓冲区,B进程从缓冲区中取出数据进行计算,这就是一个简单的生产者-消费者模型。这里的A进程相当于生产者,B进程相当于消费者。

img

2.为什么要使用生产者-消费者模型

在多线程开发中,如果生产者生产数据的速度很快,而消费者消费数据的速度很慢,那么生产者就必须等待消费者消费完数据才能够继续生产数据,因为生产过多的数据可能会导致存储不足;同理如果消费者的速度大于生产者那么消费者就会经常处理等待状态,所以为了达到生产者和消费者生产数据和消费数据之间的平衡,那么就需要一个缓冲区用来存储生产者生产的数据,所以就引入了生产者-消费者模式

这里缓冲区的作用是为了平衡生产者和消费者的数据处理能力,一方面起缓存作用,另一方面达到解耦合作用。

3.生产者-消费者模型特点

  • 保证生产者不会在缓冲区满的时候继续向缓冲区放入数据,而消费者也不会在缓冲区空的时候,消耗数据
  • 当缓冲区满的时候,生产者会进入休眠状态,当下次消费者开始消耗缓冲区数据时,生产者才会被唤醒,开始往缓冲区中添加数据;当缓冲区空的时候,消费者也会进入休眠状态,知道生产者往缓冲区添加数据时才会被唤醒

img

img

4.生产者-消费者模型的应用场景

生产者-消费者模型一般用于将生产数据的一方和消费数据的一方分割开来,将生产数据与消费数据的过程解耦开来。

(1)Excutor任务执行框架

通过将任务的提交和任务的执行解耦,提交任务的操作相当于生产者,执行任务的操作相当于消费者;例如使用Excutor构建web服务器,用于处理线程的请求:生产者将任务提交给线程池,线程池创建线程处理任务,如果需要运行的任务数大于线程池的基本线程数,那么就把任务扔到阻塞队列。(通过线程池+阻塞队列的方式比只使用一个阻塞队列的效率高很多,因为消费者能够处理就直接处理掉了,不用每个消费者都要先从阻塞队列中取出任务再执行)

(2)消息中间件active MQ

在短时间内产生大量的订单的场景下,不可能同时处理那么多的订单,需要将订单放入一个队列里面,然后由专门的线程处理订单。这里用户下单就是生产者,然后再由专门处理订单的线程慢慢处理,这样可以短时间内支持高并发服务。

(3)任务的处理时间比较长的情况下:

上传附近并处理的情景下,那么这个时候可以将用户上传和处理附件分为两个过程,用一个队列暂时存储用户上传的附近,然后立即返回用户上传成功,然后由专门的线程处理队列中的附件。

5.生产者-消费者模型的优点

(1)解耦合:将生产者类和消费者类进行解耦,消除代码之间的依赖性,简化工作负载的管理

(2)复用:通过生产者和消费者类独立开来,那么可以对生产者类型和消费者类进行独立的复用和扩展

(3)调整并发数:由于生产者和消费者的处理速度是不一样的,可以调整并发数,给予慢的一方多的并发数,来提高任务的处理速度

(4)异步:对于生产者和消费者来说能各司其职,生产者只需要关心缓冲区是否还有数据,不需要等待消费者处理完;同样对于消费者涞水,也只需要关注缓冲区的内容,不需要关注生产者,通过异步的方式支持高并发,将一个耗时的流程拆成生产和消费两个阶段,这样生产者因为执行put()的时间比较短,而支持高并发。

(5)支持分布式:生产者和消费者通过队列进行通信,所有不需要在同一台机器上,在分布式环境中可以通过redis的list作为队列,而消费者只需要轮询队列中是否有数据。同事还能支持集群的伸缩性,当某台机器宕掉的时候不会导致整个集群宕掉。

二、C++实现生产者-消费者模型

1.依赖

(1) C++提供的thread

(2) 互斥锁mutex

(3) 条件变量condition_variable

(4) 队列queue

(5) 原子操作

2.实现细节

(1)具体的实体逻辑是构建一个queue来存储生产的数据,queue不满时可以生产的数据,queue不满时可以生产,不空时可以消费。对于这个队列可以采用阻塞队列的实现思路

(2)先实现构造函数,初始化一个unique_lock供condition_variable使用。

(3)条件变量需要申请两个,分别控制Consumer和producer

(4)出队和入队的细节

(5)首先加锁

(6)循环判断一下目前队列情况,对于各自的特殊情况(队满和队空)进行处理

(7)唤醒一个线程来处理特殊情况

(8)处理入队和出队操作

(9)最后释放锁

(10)对于输出std::cout可能由于多线程紊乱的问题,加入临界区。另外因为std::cout缓存问题,可能存在其他问题

3.问题

(1) 出现的bug:在多个consumer线程情况下,会出现有线程无法退出的情况。在析构函数中,加入stop,并且唤醒因条件变量阻塞的线程。在pop函数中加入对stop的判断,当队列为空并且stop时,退出pop函数。对 consumer 的条件变量 wait 调用加入 pred,队列为空或者没有停止时阻塞。

(2) 条件变量的 wait 函数理解:单参数版本,此时传入一个 unique_lock 类型的变量,并且已经加锁,调用 wait 之后释放锁,并阻塞等待 notify 唤醒,唤醒后加锁,要注意的是被唤醒后有可能加锁失败,此时继续阻塞;双参数版本,此时需要再加入一个 Predicate 类型的变量,应该是一个返回 bool 的函数,可用 lamda 表达式代替,返回 false 阻塞,true 解除,要注意这里的意思是即使 notify 了,如果后面的条件不满足,也不会解除阻塞。

(3) 对于多 consumer 的消息同步暂时搁置,是在外部程序完成调用的 stop。

4.代码

Demo1:

BlockQueue.cpp

#include "BlockQueue.hpp"

CBlockQueue::CBlockQueue() : _capacity(TASK_NUM),_stopped(false) {}

CBlockQueue::~CBlockQueue()
{
    stop();
    _cv_con.notify_all();
    _cv_prod.notify_all();
}

void CBlockQueue::push(const int &data)
{
    unique_lock<mutex> _lck(_mt);
    while(full())
    {
        _cv_con.notify_one();
        _cv_prod.wait(_lck);
    }
    _tasks.push(data);
    _cv_con.notify_one();
}

void CBlockQueue::pop(int &data)
{
    unique_lock<mutex> _lck(_mt);
    while(empty())
    {
        if(this->stopped())
            return ;
        _cv_prod.notify_one();
        _cv_con.wait(_lck,[this](){return this->stopped() || !this->empty();});
    }
    data = _tasks.front();
    _tasks.pop();
    _cv_prod.notify_one();
}

BlockQueue.h

#pragma  once
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>

#define TASK_NUM    8

using namespace std;

class CBlockQueue
{
private:
    mutex _mt;
    condition_variable _cv_con;
    condition_variable _cv_prod;
    queue<int> _tasks;
    atomic<bool> _stopped;

    const int _capacity;

    bool stopped()
    {
        return _stopped.load();
    }

    bool empty()
    {
        return _tasks.size() == 0 ? true:false;
    }

    bool full()
    {
        return _tasks.size() == _capacity ? true:false;
    }
public:
    CBlockQueue();
    ~CBlockQueue();

    void stop()
    {
        _stopped.store(true);
        _cv_con.notify_all();
    }

    bool available()
    {
        return !stopped() || !empty();
    }
    void push(const int &data);
    void pop(int &data);
};

main.cpp

#include <iostream>
#include "BlockQueue.hpp"

#ifdef WIN32
#include <windows.h>
#define sleep(x) (Sleep((x) * 1000))
#else
#include <unistd.h>
#endif

CRITICAL_SECTION cs;
// mutex mt_prod;

void consumer(CBlockQueue *bq)
{
    // CBlockQueue *bq = static_cast<CBlockQueue *>(arg);
    while (bq->available())
    {
        int data = -1;
        bq->pop(data);
        EnterCriticalSection(&cs);
        cout << "<" << this_thread::get_id() << ">: " << data << " comsumed.\n";
        LeaveCriticalSection(&cs);
        // sleep(0.5);
    }
    cout << "<" << this_thread::get_id() << ">: " << "consumer is done.\n";
}

void producer(CBlockQueue *bq, int start, int maxNum)
{
    // CBlockQueue *bq = static_cast<CBlockQueue *>(arg);
    // unique_lock<mutex> lck(mt_prod);

    int i = 0;
    while (i++ < maxNum)
    {
        // int data = rand() .% 1024;
        int data = i + start;
        bq->push(data);
        EnterCriticalSection(&cs);
        cout << "[" << this_thread::get_id() << "]: " << data << " produced.\n";
        LeaveCriticalSection(&cs);
        // sleep(0.2);
    }

    // if(start + i >= maxNum) bq->stop();
    cout << "[" << this_thread::get_id() << "]: " << "producer is done.\n";
}

int main()
{
    CBlockQueue bqueue;
    InitializeCriticalSection(&cs);

    vector<thread> th_prods;
    const int num_prod = 3;
    for (int i = 0; i < num_prod; ++i)
    {
        th_prods.emplace_back(producer, &bqueue, i * 100, num_prod * 100);
    }

    vector<thread> th_cons;
    const int num_con = 3;
    for (int i = 0; i < num_con; ++i)
    {
        th_cons.emplace_back(consumer, &bqueue);
    }

    for (auto &t : th_prods)
    {
        t.join();
    }
    bqueue.stop();
    for (auto &t : th_cons)
    {
        t.join();
    }
    
    DeleteCriticalSection(&cs);
    
    return 0;
}

Demo2:

#include <iostream>
#include <thread>
#include <condition_variable>
#include <list>
#include <iomanip>

std::mutex mtx;
std::condition_variable oneFinished;

const int MAX_QUEUE_LENGTH = 10;
int id = 1;
const int TOTAL_TEST_NUMBER = 100;

std::list<int> arr;

void print_buffer(void)
{
    // 打印queue中存在的数据
    std::cout << "queue[" << arr.size() << "] :";
    for (auto i : arr)
    {
        std::cout<<std::setw(3) << i << " ";
    }
    std::cout << "\n";
}

void producer()
{
    while(true)
    {
        std::unique_lock<std::mutex> lock(mtx);
        if(id >= TOTAL_TEST_NUMBER)
        {
            break;
        }
        // 当不满足arr.size() < MAX_QUEUE_LENGTH条件时阻塞在此处
        oneFinished.wait(lock,[](){return arr.size() < MAX_QUEUE_LENGTH;});
        std::cout << "producer is producer " << id << "\n";
        // 从arr尾部push插入
        arr.push_back(id++);
        print_buffer();
        // 唤醒所有线程
        oneFinished.notify_all();
    }
}

void consumer()
{
    while(true)
    {
        std::unique_lock<std::mutex> lock(mtx);
        if(id >= TOTAL_TEST_NUMBER && arr.empty())
        {
            break;
        }
        // 当不满足arr.size() > 0条件时阻塞在此处
        oneFinished.wait(lock,[]() {return arr.size() > 0;});
        std::cout << "consumer is consumer " << arr.front() << "\n";
        // 从arr头部pop提取
        arr.pop_front();
        print_buffer();
        oneFinished.notify_all();
    }
}

int main(int argc, char *argv[])
{
    std::thread c1(consumer);
    std::thread c2(consumer);
    std::thread p1(producer);
    std::thread p2(producer);

    c1.join();
    c2.join();
    p1.join();
    p2.join();

    return 0;
}












转载文章:

https://www.cnblogs.com/horacle/p/15425808.html

标签:消费者,生产者,模型,int,线程,缓冲区,原理,include
From: https://www.cnblogs.com/Wangzx000/p/17045634.html

相关文章

  • 常见的思维模型:5W2H和2W1H
    5W2HW(Who)谁来做:关键人物是谁?W(What)做什么:明确做什么?W(When)何时完成:时间节点是?W(Where)在哪里做?W(Why)为什么做:了解原因。H(How)如何做:写方案,梳理关键步骤等。Mors......
  • NetCore模型绑定之FromBodyFromUriFromQueryFromRoute
    title:.NetCore模型绑定之FromBody、FromUri、FromQuery、FromRoutecategories:后端date:2022-10-2917:21:11tags:-.NETFromRoute[FromRoute]属性处理“?”之......
  • CQF学习笔记M1L2二叉树模型
    https://blog.csdn.net/weixin_42859140/article/details/107018914?spm=1001.2101.3001.6650.5&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCo......
  • 0/1分数规划模型
    0/1分数规划模型给定n个物品,每个物品有两个权值ai,bi,从这n个物品选取k(0≤k≤n)个,使得\[\frac{\sum_{i=1}^{k}{a_i}}{\sum_{i=1}^{k}{b_i}}\]最大.对于该类......
  • 【table master mmocr】Windows下模型训练的配置
    processed_data就是mmocr_pubtabnet_recognition,注意统一命名由图可以看出,那个processed_data就是mmocr_pubtabnet_recognition,而且后面后缀_0927之类的都是日期,可能是......
  • PostGIS之维数扩展的九交模型
    1.概述PostGIS是PostgreSQL数据库一个空间数据库扩展,它添加了对地理对象的支持,允许在SQL中运行空间查询PostGIS官网:AboutPostGIS|PostGISPostGIS官方教程:PostGIS......
  • Vue.js 双向数据绑定原理
    Vue双向数据绑定原理涉及到Vue中的响应式系统和模板编译。在Vue中,响应式系统是通过Object.defineProperty或者Proxy来实现的。当Vue创建一个Vue实例时,它会遍......
  • 数据分享|逻辑回归、随机森林、SVM支持向量机预测心脏病风险数据和模型诊断可视化|附
    原文链接:http://tecdat.cn/?p=24973 最近我们被客户要求撰写关于预测心脏病风险的研究报告,包括一些图形和统计输出。世界卫生组织估计全世界每年有1200万人死于心脏病......
  • R语言如何做马尔可夫转换模型markov switching model|附代码数据
    全文链接:http://tecdat.cn/?p=6962最近我们被客户要求撰写关于马尔可夫转换模型的研究报告,包括一些图形和统计输出。假设有时间序列数据,如下所示。经验表明,目标变量y似......
  • RNN原理介绍
    RNN原理RNN在实际使用的频率并不多,大多使用LSTM替代RNN,因此对RNN进行简单的介绍。RNN是研究LSTM的基础,毕竟LSTM是基于RNN的改良,二者循环原理大体一致。初识RNN,要记住在时......