首页 > 系统相关 >【Linux修行路】基于阻塞队列的生产消费者模型

【Linux修行路】基于阻塞队列的生产消费者模型

时间:2024-09-08 09:23:41浏览次数:20  
标签:mutex 修行 队列 Linux cond pthread 线程 唤醒

目录

⛳️推荐

一、生产消费者模型

1.1 生产消费者模型的解藕特性

二、基于BlockingQueue的生产消费者模型

2.1 单生产单消费模型

2.2 伪唤醒、误唤醒造成的问题

2.3 基于任务的多生产多消费模型


⛳️推荐

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站

一、生产消费者模型

生产消费者模型(consumer producter )简称 cp ,是多线程多进程下同步互斥的一种场景。

超市的本质就是一个大号的缓存,超市的存在提高了生产者和消费者的效率,超市支持忙闲不均,即生产者可能一次生产很多,但是消费的很慢。对于生产者来说,它最关心超市里还剩多少空位置,而消费者最关心超市的商品数量。超市的存在,让生产和消费行为进行一定成都的解藕。

image-20240314212208748

将这个现实生活中的例子映射到计算机中:生产者和消费者都由不同的线程来承担,超市就是一个特定结构的内存空间(可以是队列、二叉树、链表、数组…),商品就是数据。所以生产消费者模型本质是执行流在做通信,但是现如今我们的关注点不再是如何进行通信,而是如何进行安全且高效的通信。超市需要被多个线程访问到,因此超市就属于一种共享资源。既然是共享资源,那么在多执行流进行并发访问的时候可能会出现线程安全问题。主要有以下三组并发问题:

  • 生产者 VS 生产者:互斥(各个生产者之间是一种竞争关系)

  • 消费者 VS 消费者:互斥(只剩一个商品了)

  • 生产者 VS 消费者:互斥(生产者要么不生产,要已经生产好放在超市了,消费者要么拿到商品,要么没拿到商品,不存在生产者正在生产,消费者就来拿,也就是说在超市的商品不允许生产者和消费者同时去访问,生产者正在往超市放置商品的时候,消费者不能来拿,为了保证数据的安全,生产者和消费者之间一定需要是互斥关系);同步(如果生产者频繁的访问超市,给超市打电话,问超市需不需要货物,会导致消费者访问不到超市的数据,造成消费者饥饿。正确做法是,生产者生产一部分,然后让消费者来消费,生产和消费要有一定的顺序性,因此需要同步)

生产消费者模型总结:3 种关系;2 种角色;1 个交易场所。该模型的优点是支持忙闲不均,将生产和消费进行解藕。

image-20240315123929242

生产者的数据,一般是从用户、网络中获取的,所以生产者生产的数据也是要花时间获取的,然后才是生产数据到队列。消费者,也不只是获取数据这么简单,消费者在拿到数据后大概率是要对数据做加工处理的,这也需要花时间。生产消费者模型就高效在这里,生产者在向仓库生产数据的时候,消费者虽然不能从仓库获取数据,但是消费者可以在这个时候去加工处理数据,同理,消费者在从队列里获取数据的时候,生产者虽然不能向仓库中生产数据,但是它可以在这个时候获取数据。因此,生产线程和消费线程可以并发的去执行,这就是生产消费者模型高效的点。如果只看上图中的中间部分,那么生产消费者模型其实并不高效,因为生产者放数据和消费者取数据并不能同时进行。

1.1 生产消费者模型的解藕特性

举个栗子,以前我们在 main 函数中调用 add 函数的时候,因为是单执行流,所以main 函数只能等待。假如采用生产消费者模型,让 main 函数是一个线程充当生产者, add 函数是另一个线程充当消费者,此时 main 函数这个线程在 add 线程进行计算的时候就不需要等待了,可以继续生产数据往超市里面放,add 线程现在也不用等 main 线程生产出一组数据再计算一组数据,而是直接去超市里面取数据进行计算。对 main 函数和 add 函数来说,此时就解藕了。

二、基于BlockingQueue的生产消费者模型

BlockQueue:在多线程编程中,阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通队列区别在于,当队列为空时,从队列获取元素的操作会被阻塞,直到队列中被放入的元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出。

2.1 单生产单消费模型

// BlockQueue.hpp
#pragma

#include <iostream>
#include <pthread.h>
#include <queue>

template <class T>
class BlockQueue
{
    static const int defaultmaximum = 20;

public:
    BlockQueue(int maximum = defaultmaximum)
        : maximum_(maximum)
    {
        pthread_mutex_init(&mutex_, nullptr);
        pthread_cond_init(&c_cond_, nullptr);
        pthread_cond_init(&p_cond_, nullptr);
        low_water_ = maximum_ / 3; // 低水位线是队列最大容量的 1/3
        high_water_ = (maximum_*2)/3; // 高水位线是队列最大容量的 2/3
    }

    T pop()
    {
        pthread_mutex_lock(&mutex_);
        if (q_.size() == 0)
        { // 消费条件不满足,去等待
            pthread_cond_wait(&c_cond_, &mutex_);
        }
        // 1. 队列没空 2. 被唤醒
        T out = q_.front();
        q_.pop();
        if(q_.size() == low_water_) pthread_cond_signal(&p_cond_); // 消费者消费到低水位线,此时就可以去唤醒生产者进行生产了
        pthread_mutex_unlock(&mutex_);
        return out;
    }

    void push(const T &data)
    {
        pthread_mutex_lock(&mutex_);
        if (q_.size() == maximum_) // 判断本身就是访问临界资源,所以判断一定要在加锁之后
        {
            // 生产条件不满足,需要去等待。1、调用的时候,自动释放锁 2、
            pthread_cond_wait(&p_cond_, &mutex_);
        }

        // 1、队列没满 2、被唤醒
        q_.push(data);                  // 得先确保生产条件满足(当前队列中的数据个数小于 maximum_),才能生产,

        pthread_mutex_unlock(&mutex_);
        if(q_.size() == high_water_) pthread_cond_signal(&c_cond_); // 生产数据到高水位线,说明队列里一定有数据了,此时去唤醒消费者
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&c_cond_);
        pthread_cond_destroy(&p_cond_);
    }

private:
    std::queue<T> q_;       // 共享资源
    int maximum_;           // 队列的最大容量
    pthread_mutex_t mutex_; // 定义一个互斥量(锁)
    pthread_cond_t c_cond_; // 定义一个消费者条件变量,消费者在这个条件变量下进行等待
    pthread_cond_t p_cond_; // 定义一个生产者条件变量,生产者在这个条件变量下进行等待
    int low_water_; // 队列的低水位线
    int high_water_; // 队列的高水位线
};
// main.cc
#include "BlockQueue.hpp"
#include <unistd.h>

void *Consumer(void *args)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int>*>(args);

    while(true)
    {
        // 消费——从队列里面拿数据
        int data = bq->pop();
        std::cout << "消费了一个数据: " << data << std::endl;
        usleep(1000000);
    }
}

void *Productor(void *args)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int>*>(args);
    int data = 1;
    while(true)
    {
        // 生产——往队列里面放数据
        // int data = rand() % 10 + 1;
        bq->push(data);
        std::cout << "生产了一个数据:" << data << std::endl;
        data++;
        usleep(100000);
    }
}

int main()
{
    srand((unsigned int)time(nullptr));
    BlockQueue<int> *bq = new BlockQueue<int>();
    pthread_t c, p;
    pthread_create(&c, nullptr, Consumer, bq);
    pthread_create(&p, nullptr, Productor, bq);

    // 主线程等待两个子线程退出
    pthread_join(c, nullptr);
    pthread_join(p, nullptr);
    delete bq; // 释放资源
    return 0;
}

生产消费者模型

注意事项:申请到锁之后,首先要检查资源是否就绪,就绪了才能进行后续操作,而资源是否就绪是通过判断得来的,判断本身就属于访问临界资源,所以判断应该在加锁之后,所以如果资源没有就绪就需要调用 pthread_cond_wait 让该线程去休眠,所以调用pthread_cond_wait 的时候,该线程一定是持有锁的不能持有锁去等待,这样会造成死锁问题,因此在调用该函数时,该函数内部会释放锁,然后让进程去休眠,在进程被重新唤醒,也就是函数返回的时候,需要重新持有锁

为什么结果中夹杂了消费一个数据?不应该是一批数据嘛?而且,队列的高水位线似乎没有起作用?因为,最初一定是生产者先生产13个数据,然后到了高水位线,此时去 c_cond_ 中唤醒消费者线程,刚被唤醒的消费者线程先要去竞争锁,在生产者线程释放锁之前,这个被唤醒的消费者线程其实是处于挂起等待状态的,只不过等待的地方变了,本来是在 c_cond_ 这个条件变量中进行等待条件满足,进入 c_cond_ 是因为资源没有就绪,此时被唤醒,说明资源就绪了,所以 c_cond_ 就给这个在其中等待的消费者线程说:“好了,资源已经就绪了,你出去等吧,别在我这里等了”。此时虽然资源就绪了,但是访问资源需要锁,但是该线程刚“出来”,它并没有锁,此时锁还在生产者线程手里,所以该消费者线程还是只能等待,紧接着生产者线程释放锁,此时注意了!!!生产者线程释放完锁之后,并没有去 p_cond_ 中休眠,因为此时代码中去 p_cond_ 中休眠的前提只有一个,就是资源不就绪,资源不就绪的前提是,你得先去申请锁,有了锁才能去判断资源是否就绪,所以,此时生产者线程和消费者线程,都在等着竞争这个锁,这一次,消费者线程运气比较好,竞争到了锁,它去消费了一个数据,然后把锁释放了,这个过程中,生产者线程并没有去 p_cond_ 中进行休眠,而是一直在等待锁被释放,然后去竞争。这一次,生产者线程运气比较好,它竞争到了锁,它就又去生产数据了,这就是为什么在生产一批数据后,本应该消费一批数据的,但是却只消费了一个数据就继续去生产了,并且后面生成数据的个数明显已经超过了高水位线,但是还一直在生产,本质是因为,这段时间对生产者和消费者来说,资源始终都是就绪的,他们并没有去对应的条件变量下进行休眠,而是都一直处于竞争锁的状态,在仔细观察可以发现,在生产了13个数据之后,消费了1个,紧接着又生产了8个,此时队列已经被生产满了,然后对生产者线程来说,资源处于不就绪状态,所以此时生产者线程去 p_cond_ 下休眠了,然后消费者线程就能安心来进行消费了。我们的高水位线和低水位线貌似并没有起作用,我们希望的是,当生产达到高水位线的时候,让生产者停止,然后让消费者去消费一批线程,达到一种同步的效果,但是我们的希望并没有实现。问题就出在,我们在唤醒一个生产者线程(消费者线程)的同时,消费者线程(生产者线程)也是处于唤醒状态,此时就会存在消费者线程和生产者线程同时去竞争锁,此时他们俩竞争锁的能力可能会不同,因此就达不到我们所希望的同步状态,怎么解决呢?其实也很简单,在唤醒一个线程的时候,让当前线程去休眠。此时就不会出现两个线程都处于被唤醒的状态,去竞争锁的情况,全程只会有一个线程处于被唤醒的状态。将代码修改如下:

T pop()
{
    pthread_mutex_lock(&mutex_);
    while (q_.size() == 0)
    { // 消费条件不满足,去等待
        pthread_cond_wait(&c_cond_, &mutex_);
    }
    // 1. 队列没空 2. 被唤醒
    T out = q_.front();
    q_.pop();
    if (q_.size() <= low_water_)
    {
        pthread_cond_signal(&p_cond_); // 消费者消费到低水位线,此时就可以去唤醒生产者进行生产了
        pthread_cond_wait(&c_cond_, &mutex_); // 让当前进程去休眠
        std::cout << "c is sleep..." << std::endl;
    }
    pthread_mutex_unlock(&mutex_);
    return out;
}

void push(const T &data)
{
    pthread_mutex_lock(&mutex_);
    while (q_.size() == maximum_) // 判断本身就是访问临界资源,所以判断一定要在加锁之后
    {
        // 生产条件不满足,需要去等待。1、调用的时候,自动释放锁 2、
        pthread_cond_wait(&p_cond_, &mutex_);
    }

    // 1、队列没满 2、被唤醒
    q_.push(data); // 得先确保生产条件满足(当前队列中的数据个数小于 maximum_),才能生产,

    if (q_.size() >= high_water_)
    {
        int ret = pthread_cond_signal(&c_cond_); // 生产数据到高水位线,说明队列里一定有数据了,此时去唤醒消费者
        pthread_cond_wait(&p_cond_, &mutex_);
        std::cout << "p is sleep.." << std::endl;// 让当前进程去休眠
    }
    pthread_mutex_unlock(&mutex_);
    // pthread_cond_wait(&p_cond_, &mutex_);
}

生产消费者模型同步

2.2 伪唤醒、误唤醒造成的问题

其次是唤醒操作,生产者生产一个数据后,它自己是最清楚的,此时队列里有数据了,可以唤醒一个消费者来取数据。消费者在队列中取数据时,如果没有了,它就可以唤醒一个生产者来进行生产。

T pop()
    {
        pthread_mutex_lock(&mutex_);
        if (q_.size() == 0)
        { // 消费条件不满足,去等待
            pthread_cond_wait(&c_cond_, &mutex_);
        }
        // 1. 队列没空 2. 被唤醒
        T out = q_.front();
        q_.pop();
        if(q_.size() == low_water_) pthread_cond_signal(&p_cond_); // 消费者消费到低水位线,此时就可以去唤醒生产者进行生产了
        pthread_mutex_unlock(&mutex_);
        return out;
    }

    void push(const T &data)
    {
        pthread_mutex_lock(&mutex_);
        if (q_.size() == maximum_) // 判断本身就是访问临界资源,所以判断一定要在加锁之后
        {
            // 生产条件不满足,需要去等待。1、调用的时候,自动释放锁 2、
            pthread_cond_wait(&p_cond_, &mutex_);
        }

        // 1、队列没满 2、被唤醒
        q_.push(data);                  // 得先确保生产条件满足(当前队列中的数据个数小于 maximum_),才能生产,

        pthread_mutex_unlock(&mutex_);
        if(q_.size() == high_water_) pthread_cond_signal(&c_cond_); // 生产数据到高水位线,说明队列里一定有数据了,此时去唤醒消费者
    }

伪唤醒和误唤醒是在多生产,多消费的前提下,假设此时队列里已经满了,然后消费者消费了一个数据,此时队列空出来一个位置,紧接着,这个消费线程唤醒了一批正在 p_cond_ 条件变量下进行等待的生产线程,因为重新被唤醒的线程需要重新去竞争锁,所以被唤醒的一批生产线程之间处于一种互斥关系,在前面的消费线程释放锁之后,只有一个生产线程能够抢到锁,剩下被唤醒的生产线程此时就只能在锁那里等待,不能在 p_cond_ 下继续等待,因为他们已经被唤醒了,唤醒就相当于 pthread_cond_wait(&c_cond_, &mutex_); 调用结束返回,该函数是在 if 下的,函数返回后,那些被唤醒却没抢到锁的生产线程此时就只能干巴巴的等着了,不能继续向后执行,只有抢到锁的那个线程可以往后执行。当这个申请锁成功的线程生产完数据,此时队列里数据又满了,会去唤醒一个消费者线程,然后该线程释放锁,此时,被唤醒的线程不止有前一秒才被唤醒的消费线程,还有之前被唤醒但是没抢到锁的那一批生产者线程,此时就会出现,生产线程和消费线程在同时竞争锁,如果锁被一个生产线程抢到了,那么它会从 pthread_cond_wait(&c_cond_, &mutex_); 继续向后执行,也就是继续向队列里生产数据,但是此时队列已经满了,再向队列里生产数据就会出问题。所以,这里应该将 if 换成 while,一个被唤醒的进程在抢到锁之后,不应该直接进行队列操作(不只是上面说的生产会出问题,消费也可能会出同样的问题,总之都是队列操作),而是再进行一次判断,看资源是否就绪,如果就绪再往后进行队列操作,没就绪就继续去调用 pthread_cond_wait(&c_cond_, &mutex_); 将自己休眠。

2.3 基于任务的多生产多消费模型

// task.hpp
#include <iostream>
#include <string>

enum
{
    DIVERROR = 1,
    MODERROR,
    UNKNOWERRROR
};

class Task
{
public:
    Task(int a, int b, char op)
    :data1_(a), data2_(b), op_(op), result_(0), exitcode_(0)
    {}

    void run()
    {
        switch(op_)
        {
            case '+':
                result_ = data1_ + data2_;
                break;
            case '-':
                result_ = data1_ - data2_;
                break;
            case '*':
                result_ = data1_ * data2_;
                break;
            case '/':
                if(data2_ == 0) exitcode_ = DIVERROR;
                else result_ = data1_ / data2_;
                break;
            case '%':
                if(data2_ == 0) exitcode_ = MODERROR;
                else result_ = data1_ % data2_;
                break;
            default:
                exitcode_ = UNKNOWERRROR;
                break;
        }
    }

    std::string result_to_string()
    {
        std::string ret = std::to_string(data1_);
        ret += ' ';
        ret += op_;
        ret += ' ';
        ret += std::to_string(data2_);
        ret += ' ';
        ret += '=';
        ret += ' ';
        ret += std::to_string(result_);
        ret += "[exitcode: ";
        ret += std::to_string(exitcode_);
        ret += ']';

        return ret;
    }

    std::string get_task()
    {
        std::string ret = std::to_string(data1_);
        ret += ' ';
        ret += op_;
        ret += ' ';
        ret += std::to_string(data2_);
        ret += ' ';
        ret += '=';
        ret += ' ';
        ret += '?';
        return ret;
    }    
private:
    int data1_;
    int data2_;
    char op_;
    int result_;
    int exitcode_;
};
// BlockQueue.hpp
#pragma

#include <iostream>
#include <pthread.h>
#include <queue>


template <class T>
class BlockQueue
{
    static const int defaultmaximum = 20;

public:
    BlockQueue(int maximum = defaultmaximum)
        : maximum_(maximum)
    {
        pthread_mutex_init(&mutex_, nullptr);
        pthread_cond_init(&c_cond_, nullptr);
        pthread_cond_init(&p_cond_, nullptr);
        low_water_ = maximum_ / 3; // 低水位线是队列最大容量的 1/3
        high_water_ = (maximum_*2)/3; // 高水位线是队列最大容量的 2/3
    }

    T pop()
    {
        pthread_mutex_lock(&mutex_);
        while (q_.size() == 0)
        { // 消费条件不满足,去等待
            pthread_cond_wait(&c_cond_, &mutex_);
        }
        // 1. 队列没空 2. 被唤醒
        T out = q_.front();
        q_.pop();
        if (q_.size() <= low_water_)
        {
            pthread_cond_signal(&p_cond_); // 消费者消费到低水位线,此时就可以去唤醒生产者进行生产了
            pthread_cond_wait(&c_cond_, &mutex_);
        }
        pthread_mutex_unlock(&mutex_);
        return out;
    }

    void push(const T &data)
    {
        pthread_mutex_lock(&mutex_);
        while (q_.size() == maximum_) // 判断本身就是访问临界资源,所以判断一定要在加锁之后
        {
            // 生产条件不满足,需要去等待。1、调用的时候,自动释放锁 2、
            pthread_cond_wait(&p_cond_, &mutex_);
        }

        // 1、队列没满 2、被唤醒
        q_.push(data); // 得先确保生产条件满足(当前队列中的数据个数小于 maximum_),才能生产,

        if (q_.size() >= high_water_)
        {
            int ret = pthread_cond_signal(&c_cond_); // 生产数据到高水位线,说明队列里一定有数据了,此时去唤醒消费者
            pthread_cond_wait(&p_cond_, &mutex_);
        }
        pthread_mutex_unlock(&mutex_);
        // pthread_cond_wait(&p_cond_, &mutex_);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&c_cond_);
        pthread_cond_destroy(&p_cond_);
    }

private:
    std::queue<T> q_;       // 共享资源
    int maximum_;           // 队列的最大容量
    pthread_mutex_t mutex_; // 定义一个互斥量(锁)
    pthread_cond_t c_cond_; // 定义一个消费者条件变量,消费者在这个条件变量下进行等待
    pthread_cond_t p_cond_; // 定义一个生产者条件变量,生产者在这个条件变量下进行等待
    int low_water_;         // 队列的低水位线
    int high_water_;        // 队列的高水位线
};
//main.cc
#include "BlockQueue.hpp"
#include <unistd.h>
#include "Task.h"

const std::string opers = "+-*/%";

// 消费者
void *Consumer(void *args)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);

    while (true)
    {
        // 消费——从队列里面拿数据
        Task task = bq->pop();

        // 模拟数据处理的过程
        task.run();
        std::cout << pthread_self() << "# 处理任务: " << task.get_task().c_str() << ", 运算结果是: " << task.result_to_string().c_str() << std::endl;
        usleep(1000000);
    }
}

// 生产者
void *Productor(void *args)
{
    int len = opers.size();
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
    int data = 1;
    while (true)
    {
        // 模拟生产者生产数据
        int data1 = rand() % 10 + 1; // [1, 10]
        usleep(10);

        int data2 = rand() % 13; // [0, 13]
        usleep(10);

        char op = opers[rand() % len];

        Task task(data1, data2, op);

        // 生产——往队列里面放数据
        bq->push(task);
        std::cout << pthread_self() << "@ 生产了一个任务: " << task.get_task().c_str() << std::endl;
        // data++;
        usleep(1000000);
    }
}

int main()
{
    srand((unsigned int)time(nullptr));
    BlockQueue<Task> *bq = new BlockQueue<Task>();
    pthread_t c[3], p[5];
    for (int i = 0; i < 3; i++)
    {
        pthread_create(c + i, nullptr, Consumer, bq);
    }

    for (int i = 0; i < 5; i++)
    {
        pthread_create(p+i, nullptr, Productor, bq);
    }

    // 主线程等待两个子线程退出
    for(int i = 0; i < 3; i++)
    {
        pthread_join(c[i], nullptr);
    }
    
    for(int i = 0; i < 5; i++)
    {
        pthread_join(p[i], nullptr);
    }
    
    delete bq; // 释放资源
    return 0;
}

基于任务的多生产多消费者模型

标签:mutex,修行,队列,Linux,cond,pthread,线程,唤醒
From: https://blog.csdn.net/m0_68662723/article/details/141995926

相关文章

  • Linux高效进程控制的实战技巧
    Linux高效进程控制的实战技巧Linux是一种开源的Unix-like操作系统内核,由林纳斯·托瓦兹(LinusTorvalds)于1991年首次发布。Linux以其稳定性、安全性和灵活性而著称,广泛应用于服务器、桌面、嵌入式系统等多个领域。在Linux系统编程中,进程管理是核心部分之一,它涉及到如何创建......
  • Java-数据结构-栈和队列-Stack和Queue (o゚▽゚)o
    文本目录:❄️一、栈(Stack):  ▶1、栈的概念: ▶ 2、栈的使用和自实现:   ☑1)、Stack():   ☑2)、push(Ee):   ☑3)、empty():     ☑4)、peek(Ee):     ☑5)、pop(Ee):    ☑6)、size(Ee): ▶3、栈自实现的总代码:......
  • 全栈性能优化秘籍--Linux 系统性能调优全攻略:多维度优化技巧大揭秘
           ......
  • [Linux] 操作系统 入门详解
    标题:[Linux]操作系统@水墨不写bug目录一、冯.诺依曼体系结构1.冯诺依曼体系结构简介 2.对冯诺依曼体系结构的理解 二、操作系统定位1.为什么需要操作系统?2.操作系统是什么?三、系统调用和库函数 正文开始:一、冯.诺依曼体系结构1.冯诺依曼体系结构简介......
  • [Linux]netstat
    netstat是一个用于显示网络连接、路由表、接口统计等网络相关信息的命令行工具。在Linux系统中,它可以帮助你了解和分析网络状态。虽然netstat已被ss命令取代,但它在某些系统上仍然可用。以下是netstat的一些常用选项:基本用法netstat-a:显示所有连接和监听端口。netst......