首页 > 系统相关 >基于Linux操作系统的生产消费者队列封装(C++)

基于Linux操作系统的生产消费者队列封装(C++)

时间:2024-06-06 20:00:50浏览次数:18  
标签:task 封装 队列 max C++ num mutex Linux cond

一.先前代码及实现(在该篇中会用到)

1.基于Linux操作系统的锁的封装-CSDN博客

2.基于linux操作系统的线程封装(可实现任意传递任意类型任意个数的参数)-CSDN博客

二.生产消费者模型

        在一个多线程的进程中,通常存在如下关系生产者和消费者,其中生产者负责生产资源(产生任务)消费者负责消耗资源(获取任务),一般而言,消费者和消费者互斥,生产者和生产者互斥,消费者和生产者互斥且同步。我们将存放任务的地方(阻塞队列)称为交易场所。当阻塞队列满时,需要生产者停止发放任务,而为空时则需要消费者停止拿取任务。当阻塞队列由满变非满时,需要消费者唤醒生产者进行生产任务,反之亦然。

生产消费者模型
生产消费者模型

        由于同步和互斥的存在,以及额外的阻塞队列存在,生产者将任务递达至消费者 这个步骤会产生额外的开销,但是多消费者和多生产者可以充分利用CPU的多核,这使得生产者在生产任务,消费者在处理任务可以并行处理,极大的节约了生产任务和处理任务的总时长。

三.条件变量

        当阻塞队列(交易场所)由空变为非空时,可以利用条件变量唤醒消费线程,反之当阻塞队列由满变为非满时,也可以利用条件变量唤醒生产线程,条件变量可以极大的减少锁竞争所带来的开销。

有关条件变量的封装可参考基于Linux操作系统的锁的封装-CSDN博客

class condition_variable
    {
    public:
        condition_variable(const condition_variable&)=delete;
        condition_variable operator=(condition_variable)=delete;
        condition_variable(mutex& mutex):_mutex(mutex) //条件变量创建
        {
            if(pthread_cond_init(&_cond,nullptr))
            {
                std::cerr<<errno<<strerror(errno)<<std::endl;
                exit(1);
            }
        }
        ~condition_variable() //条件变量析构
        {
            if(pthread_cond_destroy(&_cond))
            {
                std::cerr<<errno<<strerror(errno)<<std::endl;
                exit(2);
            }
        }
        void wait() //当没有资源时则等待
        {
            if(pthread_cond_wait(&_cond,&_mutex.getmutex()))
            {
                std::cerr<<errno<<strerror(errno)<<std::endl;
                exit(3);
            }
        }
        void notify_one() //唤醒一个线程
        {
            if(pthread_cond_signal(&_cond))
            {
                std::cerr<<errno<<strerror(errno)<<std::endl;
                exit(4);
            }
        }
        void notify_all() //唤醒所有线程
        {
            if(pthread_cond_broadcast(&_cond))
            {
                std::cerr<<errno<<strerror(errno)<<std::endl;
                exit(5);
            }
        }
    private:
        mutex& _mutex;
        pthread_cond_t _cond;
    };

四.生产消费者队列封装(普通队列)

1.成员变量,构造和析构函数

    template<class T>
    class prodcon
    {
    public:
        typedef T task;
        prodcon(size_t max_task=normal_max_task)
        :_mutex(nullptr),_num_task(0),_max_task(max_task)
        ,_pro_cond(_mutex),_con_cond(_mutex)
        {
        }
        ~prodcon(){}
    private:
        size_t _num_task; //当前任务数量
        size_t _max_task; //最大任务数量
        std::queue<task> _qtask; //阻塞队列 task为放置任务
        condition_variable _pro_cond; //生产者条件变量,自定义封装
        condition_variable _con_cond; //消费者条件变量,自定义封装
        mutex _mutex; //互斥锁,自定义封装
    };

        所有的成员变量均有对应的析构函数,因此析构函数不需要做任何处理

2.放任务和出任务

        void push_task(task& in,int pri=10) //放入任务,pri是为了适配用优先级队列而引入的,无需注意
        {
            {
                lockguard<mutex> L(_mutex); //上锁
                while(isfull()) //判断是否满
                {
                    _pro_cond.wait(); //如果满则等待
                }
                _qtask.push(in); //放入任务
                ++_num_task; //当前任务+1
            }
            _con_cond.notify_one(); //唤醒线程,由于在唤醒线程时队列中一定存在任务,因此
//此处可以不加锁
        }
        void pop_task(task& out) //取得任务
        {
            {
                lockguard<mutex> L(_mutex); //加锁
                while(isempty()) //判断是不是为空
                {
                    _con_cond.wait(); 
                }
                out= std::move(_qtask.front());
                _qtask.pop(); //取任务
                --_num_task; //任务数量-1
            }
            _pro_cond.notify_one(); //唤醒生产者线程
        }
        int pop_some_tasks(task out[],int num)//取出多个任务
        {
            int real_num=0;
            {
                lockguard<mutex> L(_mutex);
                while(isempty())
                {
                    _con_cond.wait();
                }
                for(int i=0;i<num;++i)
                {
                    if(isempty()) break; //如果是空代表任务完成则返回
                    out[i]= std::move(_qtask.front());
                    ++real_num;
                    _qtask.pop();
                    --_num_task;
                }
            }
            _pro_cond.notify_all();//由于一次取出多个任务,因此可以
//全局唤醒所有线程
            return real_num; //返回取出了多少任务
        }
private:
        bool isfull() //判断是否为满
        {
            return !(_max_task-_num_task);
        }
        bool isempty() //判断是否为空
        {
            return !_num_task;
        }

3.其他成员函数

        

        bool full()
        {
            lockguard<mutex> L(_mutex); //上锁
            return !(_max_task-_num_task);
        }
        bool empty()
        {
            lockguard<mutex> L(_mutex); //上锁
            return !_num_task;
        }

        可以外界访问任务是否为空或者满,由于没有上锁因此要上锁,该函数不能成员函数中已上锁的时候调用,会发生死锁

五.生产消费者模型(优先级队列)

        优先级队列和普通队列执行一样,唯一的不同就是优先级队列在放入任务时需要设置该任务的优先级,这也是为什么在普通队列的放入任务中,会出现 int pri=10,这是为了适配优先级队列。另外需要注意,普通队列的插入和取出均为o(1),而优先级队列的插入和取出均为o(logn),如果非必要,不要使用优先级队列

1.完整代码(包括普通队列和优先级队列)

#ifndef _PRODCON_HPP
#define _PRODCON_HPP
#include<iostream>
#include<vector>
#include<memory>
#include<pthread.h>
#include<cstring>
#include<functional>
#include<unistd.h>
#include<queue>
#include"lock.hpp"

static const int normal_max_task=10;
static const int max_pri=100;
static const int min_pri=0;

namespace zwr
{
    template<class T>
    class prodcon
    {
    public:
        typedef T task;
        prodcon(size_t max_task=normal_max_task)
        :_mutex(nullptr),_num_task(0),_max_task(max_task)
        ,_pro_cond(_mutex),_con_cond(_mutex)
        {
        }
        ~prodcon(){}
        void push_task(task& in,int pri=10)
        {
            {
                lockguard<mutex> L(_mutex);
                while(isfull())
                {
                    _pro_cond.wait();
                }
                _qtask.push(in);
                ++_num_task;
            }
            _con_cond.notify_one();
        }
        void pop_task(task& out)
        {
            {
                lockguard<mutex> L(_mutex);
                while(isempty())
                {
                    _con_cond.wait();
                }
                out= std::move(_qtask.front());
                //std::cout<<"::"<<_qtask.front()<<std::endl;
                _qtask.pop();
                --_num_task;
            }
            _pro_cond.notify_one();
        }
        int pop_some_tasks(task out[],int num)
        {
            int real_num=0;
            {
                lockguard<mutex> L(_mutex);
                while(isempty())
                {
                    _con_cond.wait();
                }
                for(int i=0;i<num;++i)
                {
                    if(isempty()) break;
                    out[i]= std::move(_qtask.front());
                    ++real_num;
                    _qtask.pop();
                    --_num_task;
                }
            }
            _pro_cond.notify_all();
            return real_num;
        }
        bool full()
        {
            lockguard<mutex> L(_mutex);
            return !(_max_task-_num_task);
        }
        bool empty()
        {
            lockguard<mutex> L(_mutex);
            return !_num_task;
        }
    private:
        bool isfull()
        {
            return !(_max_task-_num_task);
        }
        bool isempty()
        {
            return !_num_task;
        }
        size_t _num_task;
        size_t _max_task;
        std::queue<task> _qtask;
        condition_variable _pro_cond;
        condition_variable _con_cond;
        mutex _mutex;
    };


    template<class T>
    class priority_prodcon //优先级队列
    {
    public:
        typedef T task;
        priority_prodcon(size_t max_task=normal_max_task)
        :_mutex(nullptr),_num_task(0),_max_task(max_task),_qtask(cmp())
        ,_pro_cond(_mutex),_con_cond(_mutex)
        {
        }
        ~priority_prodcon(){}
        void push_task(task& in,int pri=10) 
        {
            if(pri>max_pri) pri=max_pri;
            if(pri<min_pri) pri=min_pri;
            {lockguard<mutex> L(_mutex);
            while(isfull())
            {
               _pro_cond.wait();
            }
            _qtask.push(std::make_pair(in,pri));
            ++_num_task;}
            _con_cond.notify_one();
        }
        void pop_task(task& out)
        {
            {lockguard<mutex> L(_mutex);
            while(isempty())
            {
                _con_cond.wait();
            }
            out= std::move(_qtask.top().first);
            _qtask.pop();
            --_num_task;}
            _pro_cond.notify_one();
        }
        int pop_some_tasks(task out[],int num)
        {
            int real_num=0;
            {
                lockguard<mutex> L(_mutex);
                while(isempty())
                {
                    _con_cond.wait();
                }
                for(int i=0;i<num;++i)
                {
                    if(isempty()) break;
                    out[i]= std::move(_qtask.top().first);
                    ++real_num;
                    _qtask.pop();
                    --_num_task;
                }
            }
            _pro_cond.notify_all();
            return real_num;
        }
        bool full()
        {
            lockguard<mutex> L(_mutex);
            return !(_max_task-_num_task);
        }
        bool empty()
        {
            lockguard<mutex> L(_mutex);
            return !_num_task;
        }
    private:
     bool isfull()
        {
            return !(_max_task-_num_task);
        }
        bool isempty()
        {
            return !_num_task;
        }
        struct cmp
        {
            cmp(){}
            ~cmp(){}
            bool operator()(std::pair<task,int> a1,std::pair<task,int> a2)
            {
                if(a1.second>a2.second) return true;
                else return false;
            }
        };
        size_t _num_task;
        size_t _max_task;
        std::priority_queue<std::pair<task,int>,std::vector<std::pair<task,int>>,cmp> _qtask; //优先级队列
        condition_variable _pro_cond;
        condition_variable _con_cond;
        mutex _mutex;
    };
}

#endif

 

标签:task,封装,队列,max,C++,num,mutex,Linux,cond
From: https://blog.csdn.net/zwrQAQ/article/details/139442652

相关文章

  • 详解 Protobuf 在 C++ 下 Message、enum、Service 的使用
    这篇文章主要目的是介绍Protobuf的常用知识,包括前置声明,message,service,enum等。声明//使用proto3语法syntax="proto3";//定义一个名为Greeter的包packageGreeter;//开启生成通用服务代码的选项optioncc_generic_services=true;syntax用于提示pro......
  • 基于Linux下的多人聊天室
    基于Linux下的多人聊天室1.涉及知识点2.整体架构流程3.核心功能展示4.详细代码5.复盘总结1.涉及知识点Linux、C语言、TCP通信、epoll、SQL2.整体架构流程服务器:1.搭建TCP连接客户端2.链接数据库3.使用epoll4.处理各种客户端消息的接收与发送客户端:1.搭建TC......
  • 嵌入式Linux系统编程 — 2.1 标准I/O库简介
    目录1标准I/O库简介1.1 标准I/O库简介1.2 标准I/O和文件I/O的区别2 FILE指针3标准I/O库的主要函数简介4 标准输入、标准输出和标准错误4.1标准输入、标准输出和标准错误概念4.2示例程序5 打开文件fopen()5.1 fopen()函数简介5.2 新建文件的权限5.3......
  • linux命令
    **ls[-a-l-h][路径]**(可以指定要查看的文件夹(目录)的内容,如果不给定参数,就查看当前工作目录的内容)-a选项,可以展示出隐藏的内容,以.开头的文件或文件夹默认被隐藏,需要-a才能显示出来-l选项,以列表的形式展示内容,并展示更多细节-h选项,需要和-l选项搭配使用,以更加人性化的......
  • 查看Linux端口占用和开启端口命令
    查看端口的使用的情况lsof命令比如查看80端口的使用的情况lsof-itcp:80列出所有的端口netstat-ntlp查看端口的状态/etc/init.d/iptablesstatus开启端口以开启端口80为例。1用命令开启端口iptables-IINPUT-ptcp--dport80-jaccpet --写入要开放的端......
  • Centos Stream 10 测试版下载:未来的RHEL10&Rocky Linux 10
    简介最近发现Centos最放出了Stream10测试版本,应该是基于Fedora40构建的。未来红帽会基于此版本构建RHEL10。内核版本:6.9.0Python版本:3.12.2RHEL系发行版对应关系Fedora(根发行版-软件实时更新-只支持一年)>>某一版本作为基准版本⏬CentosStream(长期稳定发行版-软......
  • Linux常用命令
    一、目录操作1、cd命令cda//进入a目录cdabc+tab键//如果有多个abc开头的目录,会显示相应的文件cd..//返回上一级目录cd../..//返回上上一级目录,以此类推cd/use/local//进入根目录下面的use/local目录cd—//返回上一次访问目录cd~//回到用户目录2、ls&l......
  • 00-macOS和Linux安装和管理多个Python版本
    在Mac上安装多个Python版本可通过几种不同方法实现。1Homebrew1.1安装Homebrew若安装过,跳过该步。/bin/bash-c"$(curl-fsSLhttps://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"1.2安装Python如安装Python3.7:[email protected]......
  • 每天学一个 Linux 命令(5):grep
    命令简介文本查找或搜索工具。用于查找内容包含指定的范本样式的文件,如果发现某文件的内容符合所指定的范本样式,预设grep会把含有范本样式的那一列显示出来。若不指定任何文件名称,或是所给予的文件名为-,则grep会从标准输入设备读取数据。同样可以配合正则表达式来搜索文本,并将......
  • 写一个linux驱动
    简单一点,写一个字符设备驱动。首先我们希望在/dev下面出现一个新的字符设备文件。1.分配一个设备号;intalloc_chrdev_region(dev_t*dev,unsignedintfirstminor,unsignedintcount,char*name);示例:首先定义一个dev_t的变量,它其实就是一个32位的整数。再使用上述函......