首页 > 系统相关 >【Linux】线程间同步

【Linux】线程间同步

时间:2024-07-14 23:55:23浏览次数:27  
标签:std 同步 int Linux cond mutex pthread 线程

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

目录

前言

Linux线程同步

条件变量

同步概念与竞态条件

条件变量函数

条件变量使用规范

条件变量接口的使用

生产者消费者模型

为何要使用生产者消费者模型

生产者消费者模型优点

基于BlockingQueue的生产者消费者模型

总结



前言

世上有两种耀眼的光芒,一种是正在升起的太阳,一种是正在努力学习编程的你!一个爱学编程的人。各位看官,我衷心的希望这篇博客能对你们有所帮助,同时也希望各位看官能对我的文章给与点评,希望我们能够携手共同促进进步,在编程的道路上越走越远!


提示:以下是本篇文章正文内容,下面案例可供参考

Linux线程同步

条件变量

  • 当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
  • 例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。

同步概念与竞态条件

  • 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步
  • 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解

条件变量函数

全局或静态的条件变量:

pthread_cond_t  一个条件变量的类型
如果条件变量是静态的或全局的,那么对条件变量初始化
pthread_cond_t cond = PTHREAD_ COND_INITIALIZER

局部的条件变量:

初始化:

int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr); 参数:

cond:要初始化的条件变量

attr:NULL

销毁

int pthread_cond_destroy(pthread_cond_t *cond)

等待条件满足

int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);

参数:

cond:要在这个条件变量上等待

mutex:互斥量,后面详细解释

唤醒等待

int pthread_cond_broadcast(pthread_cond_t *cond);// 唤醒所有在cond下等待的线程

int pthread_cond_signal(pthread_cond_t *cond);// 唤醒一个线程

条件变量使用规范

  • 等待条件代码
pthread_mutex_lock(&mutex); 
while (条件为假) 
    pthread_cond_wait(cond, mutex); 
修改条件 
pthread_mutex_unlock(&mutex); 
  • 给条件发送信号代码
pthread_mutex_lock(&mutex); 
设置条件为真 
pthread_cond_signal(cond); 
pthread_mutex_unlock(&mutex); 

条件变量接口的使用

// 信号变量
#include <iostream>
#include <string>
#include <vector>
#include <pthread.h>
#include <unistd.h>

pthread_cond_t gcond = PTHREAD_COND_INITIALIZER; // 定义一个全局的条件变量
pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER; // 定义一个全局的互斥锁

// Master去控制Slaver,让Slaver的线程排队一个一个的打印
void* SlaverCore(void* args)
{
    std::string name = static_cast<const char*>(args);
    while (true)
    {
        // 1. 加锁
        pthread_mutex_lock(&gmutex);
        // 2. 一般条件变量是在加锁和解锁之间使用的
        pthread_cond_wait(&gcond, &gmutex); 
        // gmutex:这个是,是用来被释放的[前一半]
        // Slaver线程进行等待的时候,不能抱着锁等待,得把锁释放,让其它的Slaver线程去申请
        // 所有的Slaver线程都要先申请锁,然后在全局的条件变量下的队列中排队,等待被Master线程唤醒
        std::cout << "当前被叫醒的线程是: " << name << std::endl;
        // 3. 解锁
        pthread_mutex_unlock(&gmutex);
    }
}

void* MasterCore(void* args)
{
    sleep(3);// Master线程休眠三秒是为了让Slaver线程都去队列中等待
    std::cout << "master 开始工作..." << std::endl;
    std::string name = static_cast<const char*>(args);
    while (true)
    {
        // pthread_cond_signal(&gcond);// 唤醒其中一个队列首部的线程
        pthread_cond_broadcast(&gcond);// 唤醒队列中所有的线程
        std::cout << "master 唤醒一个线程..." << std::endl;
        sleep(1);
    }
}

void StartMaster(std::vector<pthread_t>* tidsptr)
{
    pthread_t tid;
    int n = pthread_create(&tid, nullptr, MasterCore, (void*)"Master Thread");
    if (n == 0)
    {
        std::cout << "create master success" << std::endl;
    }
    tidsptr->emplace_back(tid);// 将主线程的线程id放入vector中
}

void StartSlaver(std::vector<pthread_t>* tidsptr, int threadnum = 3)
{
    for (int i = 0; i < threadnum; i++)
    {
        char* name = new char[64];
        snprintf(name, 64, "slaver-%d", i + 1); // thread-1
        pthread_t tid;
        int n = pthread_create(&tid, nullptr, SlaverCore, name);
        if (n == 0)
        {
            std::cout << "create success: " << name << std::endl;
            tidsptr->emplace_back(tid);
        }
    }
}

void WaitThread(std::vector<pthread_t>& tids)
{
    for (auto& tid : tids)
    {
        pthread_join(tid, nullptr);
    }
}

int main()
{
    std::vector<pthread_t> tids;
    StartMaster(&tids);// 主线程去控制其它的线程
    StartSlaver(&tids, 5);
    WaitThread(tids);
    return 0;
}

生产者消费者模型

为何要使用生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

生产者消费者模型优点

  • 生产和消费数据,进行解耦(只是在数据层面上做交互)
  • 并发
  • 支持忙闲不均

生产者和消费者的互斥关系:生产者往超市的货架上放100包方便面,然后入库超市的系统,才能把方便面卖出去,再此之前不能买出去,不然,超市的系统对不上号了。

基于BlockingQueue的生产者消费者模型

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

Thread.hpp
#ifndef __THREAD_HPP__
#define __THREAD_HPP__

#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>

namespace ThreadModule
{
    template<typename T>
    using func_t = std::function<void(T&)>;
    // typedef std::function<void(const T&)> func_t;

    template<typename T>
    class Thread
    {
    public:
        void Excute()
        {
            _func(_data);
        }
    public:
        // 构造函数参数1:回调方法,让所有的线程做什么
        Thread(func_t<T> func, T& data, const std::string& name = "none-name")
            : _func(func), _data(data), _threadname(name), _stop(true)
        {}
        static void* threadroutine(void* args) // 类成员函数,形参是有this指针的!!
        {
            Thread<T>* self = static_cast<Thread<T> *>(args);
            self->Excute();
            return nullptr;
        }
        bool Start()
        {
            int n = pthread_create(&_tid, nullptr, threadroutine, this);
            if (!n)
            {
                _stop = false;
                return true;
            }
            else
            {
                return false;
            }
        }
        void Detach()
        {
            if (!_stop)
            {
                pthread_detach(_tid);
            }
        }
        void Join()
        {
            if (!_stop)
            {
                pthread_join(_tid, nullptr);
            }
        }
        std::string name()
        {
            return _threadname;
        }
        void Stop()
        {
            _stop = true;
        }
        ~Thread() {}

    private:
        pthread_t _tid;
        std::string _threadname;
        T& _data;  // 为了让所有的线程访问同一个全局变量
        func_t<T> _func;
        bool _stop;
    };
} 
#endif
BlockQueue.hpp
// 阻塞队列
#ifndef __BLOCK_QUEUE_HPP__
#define __BLOCK_QUEUE_HPP__

#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>

template <typename T>
class BlockQueue
{
private:
    bool IsFull()
    {
        return _block_queue.size() == _cap;
    }
    bool IsEmpty()
    {
        return _block_queue.empty();
    }
public:
    // cap:阻塞队列所对应的容量
    BlockQueue(int cap) : _cap(cap)
    {
        _productor_wait_num = 0;
        _consumer_wait_num = 0;
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_product_cond, nullptr);
        pthread_cond_init(&_consum_cond, nullptr);
    }
    void Enqueue(T& in) // 生产者用的接口
    {
        // 生产与消费者之间形成互斥
        pthread_mutex_lock(&_mutex);
        // 生产之前,要先判断阻塞队列中是否满了
        while (IsFull()) // 保证代码的健壮性
        {
            // 生产线程去等待,是在临界区中休眠的!你现在还持有锁呢!!!
            // 1. pthread_cond_wait调用是: a. 让调用线程等待 b. 自动释放曾经持有的_mutex锁 
            // c. 当条件满足,线程唤醒,pthread_cond_wait要求线程必须重新竞争_mutex锁,竞争成功,方可返回!!!
            // 之前:安全
            _productor_wait_num++;
            // 只要等待,必定会有唤醒,唤醒的时候,就要继续从这个位置向下运行!!
            // 该线程被唤醒的时候,此时线程的状态是没有锁的,要求重新竞争锁
            pthread_cond_wait(&_product_cond, &_mutex);  
            _productor_wait_num--;
            // 之后:安全
        }
        // 进行生产
        // _block_queue.push(std::move(in));// 右值引用,自定义类型可以,内置类型不行
        // std::cout << in << std::endl;
        _block_queue.push(in);
        // 通知消费者来消费
        // 如果消费者有等待的,就唤醒消费者
        if (_consumer_wait_num > 0)
            pthread_cond_signal(&_consum_cond); // pthread_cond_broadcast
        pthread_mutex_unlock(&_mutex);
    }
    void Pop(T* out) // 消费者用的接口 --- 5个消费者
    {
        pthread_mutex_lock(&_mutex);
        // 保证代码的健壮性:当5个线程都被唤醒时,但是队列中的数据没有了,可以通过while让其它的线程重新等待
        while (IsEmpty()) 
        {
            // 消费线程去等待,是在临界区中休眠的!你现在还持有锁呢!!!
            // 1. pthread_cond_wait调用是: a. 让调用进程等待 b. 自动释放曾经持有的_mutex锁
            _consumer_wait_num++;
            pthread_cond_wait(&_consum_cond, &_mutex);  // 伪唤醒
            _consumer_wait_num--;
        }

        // 进行消费
        *out = _block_queue.front();
        _block_queue.pop();
        // 通知生产者来生产
        // 队列中有一个被消费者消费了,那么队列中空出来一个位置,通知生产者生产
        // 如果生产者有等待的,就唤醒生产者
        if (_productor_wait_num > 0)
            pthread_cond_signal(&_product_cond);
        pthread_mutex_unlock(&_mutex);
        // pthread_cond_signal(&_product_cond);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_product_cond);
        pthread_cond_destroy(&_consum_cond);
    }

private:
    std::queue<T> _block_queue;   // 阻塞队列,是被整体使用的!!!
    int _cap;                     // 总上限
    pthread_mutex_t _mutex;       // 保护_block_queue的锁
    pthread_cond_t _product_cond; // 专门给生产者提供的条件变量,生产满了,生产者就到_product_cond条件变量中等待被唤醒
    pthread_cond_t _consum_cond;  // 专门给消费者提供的条件变量,没有数据了,消费者去_consum_cond条件变量等待被话唤醒

    int _productor_wait_num;// 有多少个生产者在等待
    int _consumer_wait_num;// 有多少个消费者在等待
};

#endif
Task.hpp
#pragma once

#include <iostream>
#include <string>
#include <functional>

using Task = std::function<void()>;
// 任务变成了一个函数对象,返回值为void,参数为空

 class Task
 {
 public:
     Task() {}
     Task(int a, int b) : _a(a), _b(b), _result(0)
     {
     }
     void Excute()
     {
         _result = _a + _b;
     }
     std::string ResultToString()
     {
         return std::to_string(_a) + "+" + std::to_string(_b) + "=" + std::to_string(_result);
     }
     std::string DebugToString()
     {
         return std::to_string(_a) + "+" + std::to_string(_b) + "=?";
     }

 private:
     int _a;
     int _b;
     int _result;
 };
Main.cc
#define _CRT_SECURE_NO_WARNINGS 1
#include "BlockQueue.hpp"
#include "Thread.hpp"
#include "Task.hpp"
#include <string>
#include <vector>
#include <unistd.h>
#include <ctime>

using namespace ThreadModule;
int a = 10;
// 重定义,从此以后想要阻塞队列中是什么类型,就直接改Task
using blockqueue_t = BlockQueue<Task>;

void PrintHello()
{
    std::cout << "hello world" << std::endl;
}
// class ThreadData
// {
// private:
//     blockqueue_t &bq;
//     std::string who;
// };

// 消费者的执行任务 --- 生产者和消费者看到和访问同一个阻塞队列
void Consumer(blockqueue_t& bq)
{
    while (true)
    {
        // 1. 从blockqueue取下来任务
        Task t;
        bq.Pop(&t);
        // 2. 处理这个任务
        //t.Excute();
        t(); //消费者私有
        // std::cout << "Consumer Consum data is : " << t.ResultToString() << std::endl;
    }
}
// 生产者的执行任务
void Productor(blockqueue_t& bq)
{
    srand(time(nullptr) ^ pthread_self());
    int cnt = 10;
    while (true)
    {
        sleep(1);
        // // 1. 获取任务
        // int a = rand() % 10 + 1;
        // usleep(1234);
        // int b = rand() % 20 + 1;
        // Task t(a, b);
        // 2. 把获取的任务,放入blockqueue
        Task t = PrintHello;
        bq.Enqueue(t);
        // std::cout << "Productor product data is : " << t.DebugToString() << std::endl;
    }
}

// 生产者和消费者共同的线程
void StartComm(std::vector<Thread<blockqueue_t>>* threads, int num, blockqueue_t& bq, func_t<blockqueue_t> func)
{
    for (int i = 0; i < num; i++)
    {
        std::string name = "thread-" + std::to_string(i + 1);
        threads->emplace_back(func, bq, name);// 执行的方法是func
        threads->back().Start();
    }
}

void StartConsumer(std::vector<Thread<blockqueue_t>>* threads, int num, blockqueue_t& bq)
{
    StartComm(threads, num, bq, Consumer);
}

void StartProductor(std::vector<Thread<blockqueue_t>>* threads, int num, blockqueue_t& bq)
{
    StartComm(threads, num, bq, Productor);
}

void WaitAllThread(std::vector<Thread<blockqueue_t>>& threads)
{
    for (auto& thread : threads)
    {
        thread.Join();
    }
}

int main()
{
    blockqueue_t* bq = new blockqueue_t(5);
    std::vector<Thread<blockqueue_t>> threads;
    // std::vector<Thread<ThreadData>> threads;// 线程在初始化时,传ThreadData

    StartConsumer(&threads, 3, *bq);// 消费者线程
    StartProductor(&threads, 1, *bq);// 生产者线程
    WaitAllThread(threads);

    return 0;
}

如果只有一个生产者线程,有5个消费者线程,当生产者线程生产了一个数据,然后通过pthread_cond_broadcast()函数在条件变量下唤醒了5个消费者线程,那么这5个线程就不会在条件变量下等待锁了,这5个线程都会自动竞争这把锁,假如线程1竞争成功了,那么其余4个线程也会阻塞休眠,但是不会在条件变量下休眠了,而是在互斥锁上进行等待的,此时线程1去消费完数据之后解锁,其余4个线程立马继续竞争锁,当消费者线程2竞争锁成功,线程2进行消费,但是此时队列中的数据已经完了,那么就会出错。这4个消费者线程为伪唤醒

生产者生产出来的任务交给消费者。

生产者在队列中放任务,和消费者在队列中消费任务,在生产消费的过程之中是串行的

一个信号量就是一个计数器 ---> 是衡量资源数目的,只要申请成功,就一定有对应的资源提供给你,有效减少内部判断。


总结

好了,本篇博客到这里就结束了,如果有更好的观点,请及时留言,我会认真观看并学习。
不积硅步,无以至千里;不积小流,无以成江海。

标签:std,同步,int,Linux,cond,mutex,pthread,线程
From: https://blog.csdn.net/2301_79585944/article/details/140163703

相关文章

  • 【C++11常见新特性(三)】线程库
    文章目录thread类线程函数参数并行与并发的区别原子性操作库关于atomic类模板对比锁和原子操作lock_guard与unique_locklock_guardunique_lock两个线程交替打印奇数和偶数thread类C++11新特性支持线程,使得C++在并行编程中不需要使用第三方库,并且在原子操作中还引......
  • linux进程周边知识——内核对硬件的管理——计算机世界的管理
        前言:本节主要讲解内核也就是操作系统对于硬件的管理,本节内容同样为进程的周边知识。主要是关于软件方面,和我的上一篇——冯诺依曼体系结构可以说是兄弟文章,这篇文章主要是关于硬件方面。两篇文章都是为学习进程做准备。但不能说本篇文章内容不重要,本篇文章......
  • 在deepin linux系统中安装sqlynx数据库管理工具
    一、官网下载:https://www.sqlynx.com/#/home/probation/SQLynx二、解压后在终端输入命令1.进入目录输入命令./maicong-sqlynx.sh2.修改权限chmod+xjdk1.8.0_351//bin/java3.安装./maicong-sqlynx.sh4.启动sudoshmaicong-sqlynx.shstart5.查询端口号taillog/......
  • C++11 标准库 线程库<thread>梳理
    目录<thread>this_thread命名空间1.get_id()2.sleep_for()3.sleep_until()4.yield()thread类构造函数:类方法1.get_id()2.join()3.detach()4.joinable()5.operator=6.hardware_concurrency(static)多线程的两种计算场景<thread>this_thread命名空间在C++11中不仅添加......
  • [操作系统]线程
    线程线程的状态转换参考文章线程在一定条件下,状态会发生变化。线程一共有以下几种状态:新建状态(New):新创建了一个线程对象。就绪状态(Runnable):线程对象创建后,其他线程调用了该对象的start()方法。该状态的线程位于“可运行线程池”中,变得可运行,只等待获取CPU的使用权。即......
  • 1、多线程同步——CPU、core核、线程、内存
    CPU的运行原理控制单元在时序脉冲的作用下,将指令计数器里所指向的指令地址(这个地址是在内存里的)送到地址总线上去,然后CPU将这个地址里的指令读到指令寄存器进行译码。对于执行指令过程中所需要用到的数据,会将数据地址也送到地址总线,然后CPU把数据读到CPU的内部存储单元(就......
  • [深入理解Java虚拟机]线程
    Java与协程在Java时代的早期,Java语言抽象出来隐藏了各种操作系统线程差异性的统一线程接口,这曾经是它区别于其他编程语言的一大优势。在此基础上,涌现过无数多线程的应用与框架,譬如在网页访问时,HTTP请求可以直接与ServletAPI中的一条处理线程绑定在一起,以“一对一服务”的方式处......
  • STAThread与Windows UI线程模型
    STAThread与WindowsUI线程模型1.STAThread属性标注Main方法,使主线程运行在STA模式。用于兼容需要STA环境的COM组件,特别是UI组件。2.线程模式STA(SingleThreadedApartment)一个线程处理所有STA组件。UI线程通常为STA,负责消息处理和UI更新。MTA(MultiThreaded......
  • Debug Log - Linux下出现 cmake: command not found
    Bug情况:在用脚本安装一些环境时,出现了cmake:commandnotfound的情况,故需要安装cmake。踩坑:网上有人说通过yum来安装cmake,但我先通过apt安装yum(sudoaptinstallyum),再通过yum安装cmake(sudoyuminstallcmake),发现yum找不到对应匹配的包。解决过程:使用cmake--version......
  • Linux驱动加载源码分析(安全加载 、签名、校验)
    PS:要转载请注明出处,本人版权所有。PS:这个只是基于《我自己》的理解,如果和你的原则及想法相冲突,请谅解,勿喷。环境说明  无前言  很久很久以前,在android上面移植linux驱动的时候,由于一些条件限制,导致我们测试驱动非常的麻烦。其中有一个麻烦就是驱动校验失败,然后内核拒......