首页 > 系统相关 >【Linux】 生产消费者模型

【Linux】 生产消费者模型

时间:2024-03-19 22:34:08浏览次数:37  
标签:线程 消费者 生产者 模型 Linux &_ mutex pthread cond

线程同步

同步: 在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,这就叫做同步(饥饿问题:某些线程无法得到资源而长时间无法执行,常见的就是申请不到锁)

竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。

单纯的加锁会引起问题。

如果某个线程竞争锁的能力非常强,则会一直申请锁,释放锁。就会导致线程的饥饿问题。

加锁保护了临界资源,但也阻止了高效让每一个线程使用这份资源。

现在规定线程竞争锁必须排队,一个线程得到锁后释放了锁,如果想要重新得到锁必须从队尾等待锁的申请。这样就保证了线程按照某种顺序访问资源!也就是同步。

条件变量

条件变量是一种维持线程同步的一种机制。

如果一个线程申请到锁,条件也不成立,就直接释放锁。轮到下一个线程申请锁,条件不成立,释放锁。反复申请释放,什么都不做。对于系统来说这是一种资源浪费。

我们希望的是,当线程申请到锁时,如果条件不满足,就先阻塞等待。等到条件满足后,唤醒线程。线程执行后续的任务。

条件变量的主要动作

  • 当前线程等待条件变量成立而被挂起。
  • 条件变量成立,线程被唤醒。

条件变量通过指得是在互斥底下。

条件变量的用法与锁基本一致。

条件变量函数

定义

pthread_cond_t cond;

初始化

全局的初始化为PTHREAD_COND_INITIALIZER

局部初始化为

int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);

成功返回0,失败返回错误码。

参数: cond条件变量     attr:初始化变量的属性(一般设为空)

    // 初始化互斥锁和条件变量  
    pthread_mutex_init(&mutex, NULL);  
    pthread_cond_init(&cond, NULL);  

销毁

全局会自动销毁

    // 销毁互斥锁和条件变量  
    pthread_mutex_destroy(&mutex);  
    pthread_cond_destroy(&cond);  

条件条件满足

int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);

参数

  • cond:为条件变量
  • mutex:当前的互斥锁
  • 成功返回0,失败返回错误码

作用是让线程进入挂起状态,等待被唤醒。

注意:进入等待时,实际上会释放掉锁。


唤醒线程

int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
  • pthread_cond_signal是唤醒等待队列的首个线程。
  • pthread_cond_broadcast唤醒等待队列的所有线程。

如果唤醒多个线程,那么线程也要先竞争锁,再去访问资源

生产者消费模型

生产者消费者模型是OS中的一个重要的模型。它描述了一种通知和等待的机制。

这个模型包括生产者,消费者,交易产所(通常称为缓冲区)。

生产者负责生产数据,将其放入交易场所(一块内存)。同一时间只能有一个生产者放数据,如果交易场所满了,生产者不能继续生产,必须等待消费者从仓库拿走数据。消费者从仓库取出数据。同样,同一时间只能有一个消费者拿走数据,如果仓库满了,消费者必须等待。

生产者和消费者存在复杂的关系。首先必须是互斥的。生产者和消费者都需要面对交易场所。如果生产和消费同时对交易场所操作,可能会互相干扰,因此要对这个临界资源互斥的访问,即加锁。

同时生产和消费也是同步的,必须保证在适当的时机对数据的生产和消费。

这个模型的一个主要优点是解耦,即降低生产者和消费者之间的依赖关系。通过引入交易场所(缓冲区),生产者不需要直接关注消费者的状态,只需要将数据放入缓冲区;同样,消费者也不需要关心生产者的状态,只需要从缓冲区中取出数据进行处理。

生产者和生产者关系,消费者和消费者的关系,生产和消费者的关系为什么是互斥的?

  • 生产者和生产者会竞争对临界资源的访问,使用是互斥的。
  • 消费者和消费者会竞争取出数据,所以是互斥的。
  • 生产者竞争消费者,消费者竞争生产者。所以生产者和消费者是互斥的。

生产和消费者为什么也是同步的?

要保证一个合适的生产和消费数据的顺序。既不能一直生产(仓库满了),也不能一直消费(仓库空了),同时也要防止饥饿问题。

所以生产者消费者模型就是利用锁和条件变量的维护。


基于BlockingQueue的生产者消费者模型

基于阻塞队列,是常用的生产消费模型。

  • 当队列为空时,往队列里取数据就会被阻塞,直到队列里有数据。
  • 当队列为满时,往队列时放数据会被阻塞,直到队列有数据被取出。

我们以一个单生产者,单消费者来模拟实现。

阻塞队列的框架

  • 由于库里面的队列是malloc的空间,所以不是线程安全的,需要我们封装。
  • 需要cappcity来判断阻塞队列是空满。
  • 需要mutex来保护临界资源(仓库)。
  • 需要条件变量 _c_cond   _p_cond分别维护消费者和生产者的同步。
  • 成员函数需要生产者push,消费者pop

生产者生产数据push

    void Push(const T &in)
    {
        pthread_mutex_lock(&_mutex);

        // 满了就不生产
        while (IsFull())
        {
            pthread_cond_wait(&_p_cond, &_mutex);
        }

        _q.push(in);
        pthread_cond_signal(&_c_cond);

        pthread_mutex_unlock(&_mutex);
    }

首先是加锁

为什么条件判断是while?

为了保持代码的鲁棒性。如果是if判断,在if条件变量等待后,锁被释放。这时候就绕开(不成立)的条件执行后续的代码。因此每一轮都要判断。如果被阻塞了,就必须一直判断。

往队列中push数据后唤醒消费者来消费。

最后释放锁。

消费者消费数据pop

    void Pop(T *out)
    {
        pthread_mutex_lock(&_mutex);

        //空了就不消费
        while (IsEmpty())
        {
            pthread_cond_wait(&_c_cond, &_mutex);
        }

        *out=_q.front();
        _q.pop();
        pthread_cond_signal(&_p_cond);

        pthread_mutex_unlock(&_mutex);
    }

out是一个输出型参数。

条件同样需要一直判断。

消费一个数据后,就通知生产者来生产数据。


#include <queue>
#include <pthread.h>

const int defultcap = 5;

template <class T>
class BlockQueue
{
public:
    BlockQueue(int cap = defultcap) : _capacity(cap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_p_cond, nullptr);
        pthread_cond_init(&_c_cond, nullptr);
    }

    bool IsFull()
    {
        return _capacity==_q.size();
    }

    bool IsEmpty()
    {
        return _q.size()==0;
    }
    void Push(const T &in)
    {
        pthread_mutex_lock(&_mutex);

        // 满了就不生产
        while (IsFull())
        {
            pthread_cond_wait(&_p_cond, &_mutex);
        }

        _q.push(in);
        pthread_cond_signal(&_c_cond);

        pthread_mutex_unlock(&_mutex);
    }

    void Pop(T *out)
    {
        pthread_mutex_lock(&_mutex);

        //空了就不消费
        while (IsEmpty())
        {
            pthread_cond_wait(&_c_cond, &_mutex);
        }

        *out=_q.front();
        _q.pop();
        pthread_cond_signal(&_p_cond);

        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_p_cond);
        pthread_cond_destroy(&_c_cond);
    }

private:
    std::queue<T> _q;
    int _capacity;
    pthread_mutex_t _mutex;
    pthread_cond_t _p_cond; // 生产者
    pthread_cond_t _c_cond; // 消费者
};

我们创建的阻塞队列是否适用于多生产者多消费者?
适合。因为生产的过程是进行了保护,即使是多生产的竞争也不会影响放数据。

消费也受到保护,不会因为多消费导致数据的读取干扰。

生产和消费的过程本来就是串行的?为什么会高效呢?

  • 生产者和消费者模型由于锁的保护,没有改变串行的特点。
  • 但是数据的生产是可以并行的!在放数据的那一刻是互斥的。
  • 数据效率的过程也是并行的!在取数据的一瞬间是互斥的。
  • 故而快就是快在数据怎么来和数据处理的过程。

这种解耦的特点就是高效的原因。

在软件开发过程中,生产者消费者模型常被用于处理数据的生产和消费问题。例如,在并发编程中,可以使用生产者消费者模型来避免数据的竞争条件和提高系统的吞吐量。此外,该模型也可以用于实现异步编程、任务调度和消息队列等功能。

总的来说,生产者消费者模型是一种强大的并发处理模型,能够有效地平衡生产者和消费者的处理能力,并降低它们之间的依赖关系。

利用阻塞队列模拟计算任务。

利用2生产者,3消费者处理任务。

随机生成+ - */%的任务,并且展示结果的正确性,以及错误码。

#pragma once
#include <string>

const int defulvalue = 0;
const std::string opers = "+-*/%()";

enum
{
    right = 0,
    div_zore,
    mod_zore,
    unkonw
};

class Task
{
public:
    Task()
    {
    }

    Task(int x, int y, char op)
        : _x(x), _y(y), _oper(op), _result(0), _code(0)
    {
    }

    void Run()
    {
        switch (_oper)
        {
        case '+':
            _result = _x + _y;
            break;
        case '-':
            _result = _x - _y;
            break;
        case '*':
            _result = _x * _y;
            break;
        case '/':
        {
            if (_y == 0)
                _code = div_zore;
            else
                _result = _x / _y;
        }
        break;
        case '%':
        {
            if (_y == 0)
                _code = mod_zore;
            else
                _result = _x % _y;
        }
        break;
        default:
            _code = unkonw;
            break;
        }
    }

    void operator()()
    {
        Run();
    }

    std::string PrintTask()
    {
        std::string s;
        s = std::to_string(_x);
        s += _oper;
        s += std::to_string(_y);
        s += "=?";

        return s;
    }

    std::string PrintResult()
    {
        std::string s;
        s = std::to_string(_x);
        s += _oper;
        s += std::to_string(_y);
        s += "=";
        s += std::to_string(_result);
        s += " [";
        s += std::to_string(_code);
        s += "]";

        return s;
    }

    ~Task()
    {
    }

private:
    int _x;
    int _y;
    char _oper;

    int _result;
    int _code;
};

创建多生产多消息模型

#include "BlockQueue.hpp"
#include <iostream>
#include <pthread.h>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
#include <string>
#include "Task.hpp"

class Data
{
public:
  BlockQueue<Task> *_bq;
  std::string _name;
};

void *consurmer(void *args)
{
  Data*td=(Data*)args;
  while (true)
  {
    Task t;
    td->_bq->Pop(&t);

    t();
    std::cout <<td->_name<< "pop :" << t.PrintResult() << std::endl;
  }
}

void *productor(void *args)
{
  BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
  while (true)
  {
    int data1 = rand() % 10; // 0-9
    usleep(rand() % 123);

    int data2 = rand() % 10;
    usleep(rand() % 123);

    char oper = opers[rand() % opers.size()];
    usleep(rand() % 123);
    Task t(data1, data2, oper);
    std::cout << "_p push: " << t.PrintTask() << std::endl;

    bq->Push(t);
  }
  return nullptr;
}
int main()
{
  srand((uint64_t)time(nullptr) ^ getpid() ^ pthread_self()); // 更加随机的种子

  BlockQueue<Task> *bq = new BlockQueue<Task>();
  pthread_t c[3], p[2];

  Data *td1=new Data();
  td1->_bq=bq,td1->_name="_c_1";
  pthread_create(&c[0], nullptr, consurmer, td1);

    Data *td2=new Data();
  td2->_bq=bq,td2->_name="_c_2";
  pthread_create(&c[1], nullptr, consurmer, td2);

    Data *td3=new Data();
  td3->_bq=bq,td3->_name="_c_3";
  pthread_create(&c[2], nullptr, consurmer, td3);

  pthread_create(&p[0], nullptr, productor, bq);
  pthread_create(&p[1], nullptr, productor, bq);
 
 
  pthread_join(c[0], nullptr);
  pthread_join(c[1], nullptr);
  pthread_join(c[2], nullptr);
  pthread_join(p[0], nullptr);
  pthread_join(p[1], nullptr);
  return 0;
}

由于生产的速度小于效率。最终消费者会竞争商品。

在我们的生产者消费者模型中是一个无限循环的,在实际中需要添加信号或者共享的布尔类型让程序停下。

标签:线程,消费者,生产者,模型,Linux,&_,mutex,pthread,cond
From: https://blog.csdn.net/m0_73299809/article/details/136791858

相关文章

  • [Linux]文件缓冲区
    文件fd输出重定向除了用dup2()改变数组下标外,还可以用命令来完成所有的命令执行,都必须有操作系统将其运行起来变成进程,然后根据>>,<<来判断是输入重定向,还是输出重定向。缓冲区之所以有缓冲区,是为了提高效率的。就类比快递一样,如果你送一个东西给北京的朋友,那么你自己......
  • 软件工程 第二章 过程模型
    软件工程第二章过程模型通用过程模型通用过程框架:框架活动:沟通,策划,建模,构建,部署普适性活动:项目跟踪控制,风向管理,质量保证,配置管理,技术评审等常见的过程流(processflow):线性过程流(linearprocessflow)迭代过程流(iterativeprocessflow)演化过程流(e......
  • 大语言模型的参数级别和能力之间的关系
    模型的参数数量通常被视为模型能力的一个重要指标,更多的参数意味着模型有更大的能力来学习、存储和泛化不同类型的数据。以下是这种关系的几个关键点:学习能力:参数数量越多,模型学习复杂模式的能力通常越强。这意味着大模型能够理解和生成更复杂的文本,更准确地执行特定任务......
  • whisper-large-v3:速度快的令人翻译模型三种实用的调用方法
    1、whisper-large-v3是openai公司的模型,可使用Python代码调用;2、whisper-large-v3基础上chenxwh制作了开源库insanely-fast-whisper,可本地指令运行,或GoogleColabT4GPU运行;3、以上两个模型应用,如果觉得使用复杂难度大,国内软件工程师制作了更简单的版本fast-whisper3。......
  • zhipuai的GLM-4模型API访问出现错误: ConnectError: TLS/SSL connection has been clo
    1简介访问zhipuai的GLM-4模型的API时,挂上梯子后访问失败,显示ConnectError:TLS/SSLconnectionhasbeenclosed(EOF)(_ssl.c:1131)报错信息如下{ "name":"ConnectError", "message":"TLS/SSLconnectionhasbeenclosed(EOF)(_ssl.c:1131)",......
  • 操作系统实践之路——五、初始化(2.Linux初始化)
    文章目录一、全局流程二、从BIOS到GRUB三、GRUB是如何启动的四、详解vmlinuz文件结构五、流程梳理-1六、内核初始化从_start开始七、流程梳理-2参考资料前言​本章节将讨论一下Linux如何去做初始化。一、全局流程​在机器加电后,BIOS会进行自检,然后由BIOS加载......
  • 从 Linux 内核角度探秘 JDK MappedByteBuffer
    本文涉及到的内核源码版本为:5.4,JVM源码为:OpenJDK17,RocketMQ源码版本为:5.1.1在之前的文章《一步一图带你深入剖析JDKNIOByteBuffer在不同字节序下的设计与实现》中,笔者为大家详细剖析了JDKBuffer的整个设计体系,从总体上来讲,JDKNIO为每一种Java基本类型定义了对......
  • Linux TCP/UDP CS模型
    LinuxTCP/UDPCS模型目录LinuxTCP/UDPCS模型TCPServer/TCPClientUDPServer/UDPClientTCPServer/TCPClient在C语言中实现一个TCP服务器时,使用select函数可以帮助我们同时监控多个文件描述符(包括socket)的状态,从而实现非阻塞的I/O操作。以下是一个简单的TCP服务器示例,它......
  • linux命令 --简化版--快速上手
    linux命令--简化版--快速上手系统信息arch显示机器的处理器架构(1)uname-m显示机器的处理器架构(2)uname-r显示正在使用的内核版本dmidecode-q显示硬件系统部件-(SMB[IOS](https://www.2cto.com/kf/yidong/iphone/)/DMI)hdparm-i/dev/hda罗列一个磁盘的......
  • linux--shell 一般把脚本文件放到哪里
    linux--shell一般把脚本文件放到哪里shell在Linux系统中,脚本文件的存放位置取决于其用途和类型。以下是几个常见的脚本存放位置:系统级脚本:这些脚本通常与系统管理、初始化、配置或权限认证相关。/usr/bin:主要存放所有用户都可用的系统程序,即普通的基本命令。/etc:存放系统......