首页 > 编程语言 >C++ std::thread 实现生产者消费者模型

C++ std::thread 实现生产者消费者模型

时间:2023-03-06 15:36:25浏览次数:71  
标签:std thread lock C++ locker 线程 缓冲区

一、OS中的生产者消费者问题

1.1 问题描述

  • 系统中有一组生产者进程和一组消费者进程,生产者进程每次生产一个产品放入缓冲区,消费者进程每次从缓冲区中取出一个产品并使用。

  • 生产者、消费者共享一个初始为空、大小为n的缓冲区。

    • 只有缓冲区没满时,生产者才能把产品放入缓冲区,否则必须等待。
    • 只有缓冲区不空时,消费者才能从中取出产品,否则必须等待。
    • 缓冲区是临界资源,各进程必须互斥地访问。

1.2 伪代码实现

semaphore mutex = 1;
semaphore empty = n;
semaphore full = 0;

producer(){
    while(1){
        生产一个产品;
        P(empty);
        P(mutex);
        将产品放入缓冲区;
        V(mutex);
        V(empty);
    }
}

comsumer(){
    while(1){
        P(full);
        P(mutex);
        从缓冲区取出一个产品;
        V(mutex);
        V(full);
        使用产品;
    }
}

二、C++ 多线程代码实现

OS中的生产者-消费者模型是多进程模型,但对于多线程的情况也同样适用。

2.1 前置知识

C++中多线程实现相关的库函数非常多,这里只针对上文所写的伪代码所要求的功能找到对应的库函数,以此实现最简洁的代码,达到快速入门了解C++多线程的目的。

线程的创建与运行

头文件:#include <thread>

void Hello(int num){
    cout << num << ": Hello thread!" << endl;
}

int main(void){
    thread t[3];
    // 创建线程
    for (int i = 0; i < 3; i++) {
        t[i] = thread(Hello, i);
    }
    for (int i = 0; i < 3; i++) {
        // 等待线程t[i]完成
        t[i].join();
    }
    return 0;
}

缓冲区互斥锁 mutex

头文件:#include <mutex>

为了体现mutex的作用,先来写一个多线程打印而不加锁的版本,看看效果:

#include <iostream>
#include <thread>
#include <mutex>
using namespace std;

mutex mtx;
void print_thread_id(int id) {
    cout << "thread " << id << '\n';
}
int main() {
    thread threads[10];
    for (int i = 0; i < 10; i++) {
        threads[i] = thread(print_thread_id, i + 1);
    }
    for (int i = 0; i < 10; i++) 
        threads[i].join();
    return 0;
}

打印结果:

thread 1thread 
thread 4
2
thread 3
thread 5
thread 6
thread 7
thread 8
thread 9
thread 10

可以看到,由于线程执行print_thread_id()进行打印的过程未加锁,有的线程在打印完thread id后还没来得及打印\n就被切换了,导致打印结果不合预期。

下面对cout << "thread " << id << '\n';加互斥锁:

...
void print_thread_id(int id) {
    mtx.lock();
    cout << "thread " << id << '\n';
    mtx.unlock();
}
...

打印结果:

thread 2
thread 1
thread 3
thread 4
thread 6
thread 7
thread 5
thread 8
thread 9
thread 10

加锁之后每个线程都能完整地打印完一行输出信息。

std::lock_guard与std::unique_lock

  • 为什么不用mutex

    mutex只是最基本的互斥锁,其加锁解锁都要手动编写代码,在ifwhile等涉及条件判断的代码中很容易发生问题,如忘记解锁等。(更详细的实验见:https://blog.csdn.net/qq_45662588/article/details/116882720)

  • std::lock_guard

    为了解决mutex的问题,C++标准库提供了std::lock_guard

    它的特点在于将互斥锁与lock_guard对象的生命周期关联起来:在lock_guard对象的生命周期内,所管理的临界区保持上锁状态;在其生命周期结束后,所管理的临界区自动解锁。

    由于上述特性,之前需要手动写的lock()unlock()都不再需要了。

    std::lock_guard使用示例:

    mutex mtx;
    void print_thread_id(int id) {
        lock_guard<mutex> lock(mtx); // lock构造即上锁
    	cout << "thread " << id << '\n';
        // 函数结束时lock也被自动析构,意味着自动解锁
    }
    int main() {
    	thread threads[10];
    	for (int i = 0; i < 10; i++) {
    		threads[i] = thread(print_thread_id, i + 1);
    	}
    	for (int i = 0; i < 10; i++) 
    		threads[i].join();
    	return 0;
    }
    
  • 加强版:std::unique_lock

    std::unique_lockstd::lock_guard的加强版。

    加强的地方有:

    • lock_guard将加锁解锁与对象的生命周期绑定了,不能手动加锁解锁;而unique_lock可以手动加锁解锁
    • unique_lock支持的参数更多,用法更丰富

条件变量 condition_variable

头文件:#include <condition_variable>

注意到伪代码中的fullempty信号量其实隐含了一个功能:当有线程执行了V操作使信号量≥0时,应唤醒/通知需要该信号量的进程。

C++中的mutex类只是一个简单的互斥锁,不具备上述功能。所以需要condition_varible来补充。

condition_varible与一个互斥锁结合使用时,该互斥锁必须是std::unique_lock<std::mutex>对象。

互斥锁与条件变量结合使用的示例:

std::queue<int> buffer; // 缓冲区
std::mutex mtx; // 缓冲区互斥锁 
std::condition_variable cond; // 条件变量

void producer_do() {
    int count = 10;
    while (count > 0) {
        std::unique_lock<std::mutex> locker(mtx); // 声明即加锁
        buffer.push(count); // 往缓冲区放入数据
        locker.unlock(); // 手动解锁缓冲区
        cond.notify_one(); // 唤醒一个需要数据的阻塞线程(如果有)
        std::this_thread::sleep_for(std::chrono::seconds(1));
        // 睡1s,体现consumer会因缓冲区无数据而跟着阻塞1s
        count--;
    }
    // 依次往缓冲区中放入数据:10、9、8……2、1
}

void consumer_do() {
    int data = 0;
    while (data != 1) { //取到数据1后停止
        std::unique_lock<std::mutex> locker(mtx); // 声明即加锁
        while(buffer.empty())
            cond.wait(locker); // 等待缓冲区出现数据
        data = buffer.front();
        buffer.pop(); // 从缓冲区取走数据
        locker.unlock(); // 手动解锁缓冲区
        std::cout << "消费者线程取得数据:" << data << std::endl;
    }
}

int main() {
    std::thread producer(producer_do);
    std::thread consumer(consumer_do);
    producer.join();
    consumer.join();
    return 0;
}

运行结果:

消费者线程取得数据:10
消费者线程取得数据:9
消费者线程取得数据:8
消费者线程取得数据:7
消费者线程取得数据:6
消费者线程取得数据:5
消费者线程取得数据:4
消费者线程取得数据:3
消费者线程取得数据:2
消费者线程取得数据:1

细节问题解释

  1. 理论上来说当cond.wait(locker)返回时,缓冲区一定非空,为什么还要在cond.wait(locker)外面套一个while循环判断缓冲区是否为空?

cond.wait(locker)返回不一定是因为其他线程执行了notify_one(),还可能是因为其他的与程序无关的原因,这种情况的唤醒被称为“伪唤醒”。当“伪唤醒”发生时,缓冲区可能还是空的,因此需要额外套个while循环判断,避免被“伪唤醒”影响。

  1. 为什么condition_varible必须与std::unique_lock结合使用?消费者线程在判定缓冲区是否非空之前就已经创建了locker对象锁住缓冲区,不会影响生产者线程吗?

消费者进程创建locker对象确实锁住了缓冲区,但紧接其后的条件变量操作cond.wait(locker)其实是先执行了locker.unlock(),再阻塞消费者线程,因此不会影响生产者线程使用缓冲区。

之前提过,std::lock_guard不支持手动加锁解锁,即没有unlock()函数,因此condition_varible只能与std::unique_lock结合使用。

2.2 完整代码实现

前面的示例代码其实已经实现了单个消费者线程和单个生产者线程。要完整实现“一组”消费者线程和“一组”生产者线程,只需对线程数量稍作修改即可。

此外,原题目中对缓冲区大小有限制,只需在生产者函数部分增加while循环判断即可。

完整的代码实现如下:(2个消费者线程+3个生产者线程)

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include<queue>
using namespace std;

const int BUFFER_SIZE = 15; // 缓冲区大小
std::queue<int> buffer; // 缓冲区
mutex mtx;
std::condition_variable cond; // 条件变量

void producer_do() {
    int count = 1;
    while (1) {
        std::unique_lock<std::mutex> locker(mtx); // 声明即加锁
        while(buffer.size() >= BUFFER_SIZE){
            cond.wait(locker); //缓冲区已满,等待消费者线程取走数据
        }
        buffer.push(count); // 往缓冲区放入数据
        std::cout << "生产者线程 " << this_thread::get_id();
        std::cout<< " 放入数据:" << count << std::endl;
        locker.unlock();
        cond.notify_one(); // // 相当于V(full)
        // std::this_thread::sleep_for(std::chrono::seconds(1));
        count++;
    }
}

void consumer_do() {
    int data = 0;
    while (1) {
        std::unique_lock<std::mutex> locker(mtx); // 声明即加锁
        while(buffer.empty())
            cond.wait(locker); // 等待缓冲区出现数据
        data = buffer.front();
        buffer.pop(); // 从缓冲区取走数据
        std::cout << "消费者线程 " << this_thread::get_id();
        std::cout<< " 取得数据:" << data << std::endl;
        locker.unlock();
        cond.notify_one(); // 相当于V(free)
    }
}

int main() {
    const int p_num = 3;
    const int c_num = 2;
    std::thread producers[p_num];
    std::thread consumers[c_num];
    for(int i = 0;i < p_num;i ++){
        producers[i] = std::thread(producer_do);
    }
    for(int i = 0;i < c_num;i ++){
        consumers[i] = std::thread(consumer_do);
    }
    for(int i = 0;i < p_num;i ++){
        producers[i].join();
    }
    for(int i = 0;i < p_num;i ++){
        consumers[i].join();
    }
    return 0;
}

部分结果如下:

...
生产者线程 140230731507264 放入数据:737
消费者线程 140230706329152 取得数据:618
生产者线程 140230723114560 放入数据:619
消费者线程 140230697936448 取得数据:724
生产者线程 140230714721856 放入数据:671
消费者线程 140230706329152 取得数据:725
...

标签:std,thread,lock,C++,locker,线程,缓冲区
From: https://www.cnblogs.com/streamazure/p/17184064.html

相关文章

  • C++重写(覆盖)、重载、重定义、多态
    引用:https://www.cnblogs.com/DannyShi/p/4593735.html1重写(覆盖)overrideoverride是重写(覆盖)了一个方法,以实现不同的功能。一般用于子类在继承父类时,重写(覆盖)父类......
  • C++ 面向对象程序设计(中)
    (上)讲述了基于对象,(中)则是在基于对象的基础上,建立类与类之间的联系,即面向对象编程以及面向对象设计。主要讲述以下三点:Inheritance(继承)Composition(复合)Delega......
  • C++学生成绩管理系统[2023-03-06]
    C++学生成绩管理系统[2023-03-06]C++课程设计说明参与专业信息和数学专业所有学生时间安排完成需求分析、类设计以及代码的实现答辩注意:答辩未过的需要参加下届C++......
  • C/C++飞机订票系统[2023-03-06]
    C/C++飞机订票系统[2023-03-06]三、飞机订票系统1.某公司每天有10航班(航班号、价格),每个航班的飞机,共有80个坐位,分20排,每排4个位子。编号为A,B,C,D·如座位号:10D......
  • C/C++课程设计题目[2023-03-06]
    C/C++课程设计题目[2023-03-06]课题1:公司考勤管理系统(一)、课程设计题目:某公司的考勤管理系统(二)、目的与要求:1、目的:(1)要求学生达到熟练掌握C++语言的基本知识和技能;(2......
  • 序章 高质量C++/C编程指南
    一、文件结构避免头文件被重复引用,用#pragmaonce进行预处理用<>引用标注库头文件,用""引用自定义库头文件C语言头文件只进行函数声明,不进行函数定义;C++类的成员......
  • Python、C++、Swift或任何其他语言会取代Java吗?为什么?
    很难预测Python,C++,Swift或任何其他编程语言是否会取代Java作为最受欢迎的语言,但在不久的将来不太可能.以下是一些原因:受欢迎程度并建立的用法:Java已经存在了25年以上,并......
  • C++类和对象
                       ......
  • C++--引用和函数调用
             ......
  • C++编译小技巧
    1.单文件//math.cppintmultiply(inta,intb){returna*b;}//main.cpp#include<iostream>intmain(){ std::cout<<"Hello,world!"<<std::endl; s......