提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
目录
前言
世上有两种耀眼的光芒,一种是正在升起的太阳,一种是正在努力学习编程的你!一个爱学编程的人。各位看官,我衷心的希望这篇博客能对你们有所帮助,同时也希望各位看官能对我的文章给与点评,希望我们能够携手共同促进进步,在编程的道路上越走越远!
提示:以下是本篇文章正文内容,下面案例可供参考
Linux线程同步
条件变量
- 当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
- 例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
同步概念与竞态条件
- 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步
- 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解
条件变量函数
全局或静态的条件变量:
pthread_cond_t 一个条件变量的类型
如果条件变量是静态的或全局的,那么对条件变量初始化
pthread_cond_t cond = PTHREAD_ COND_INITIALIZER
局部的条件变量:
初始化:
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr); 参数:
cond:要初始化的条件变量
attr:NULL
销毁
int pthread_cond_destroy(pthread_cond_t *cond)
等待条件满足
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
参数:
cond:要在这个条件变量上等待
mutex:互斥量,后面详细解释
唤醒等待
int pthread_cond_broadcast(pthread_cond_t *cond);// 唤醒所有在cond下等待的线程
int pthread_cond_signal(pthread_cond_t *cond);// 唤醒一个线程
条件变量使用规范
- 等待条件代码
pthread_mutex_lock(&mutex);
while (条件为假)
pthread_cond_wait(cond, mutex);
修改条件
pthread_mutex_unlock(&mutex);
- 给条件发送信号代码
pthread_mutex_lock(&mutex);
设置条件为真
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex);
条件变量接口的使用
// 信号变量
#include <iostream>
#include <string>
#include <vector>
#include <pthread.h>
#include <unistd.h>
pthread_cond_t gcond = PTHREAD_COND_INITIALIZER; // 定义一个全局的条件变量
pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER; // 定义一个全局的互斥锁
// Master去控制Slaver,让Slaver的线程排队一个一个的打印
void* SlaverCore(void* args)
{
std::string name = static_cast<const char*>(args);
while (true)
{
// 1. 加锁
pthread_mutex_lock(&gmutex);
// 2. 一般条件变量是在加锁和解锁之间使用的
pthread_cond_wait(&gcond, &gmutex);
// gmutex:这个是,是用来被释放的[前一半]
// Slaver线程进行等待的时候,不能抱着锁等待,得把锁释放,让其它的Slaver线程去申请
// 所有的Slaver线程都要先申请锁,然后在全局的条件变量下的队列中排队,等待被Master线程唤醒
std::cout << "当前被叫醒的线程是: " << name << std::endl;
// 3. 解锁
pthread_mutex_unlock(&gmutex);
}
}
void* MasterCore(void* args)
{
sleep(3);// Master线程休眠三秒是为了让Slaver线程都去队列中等待
std::cout << "master 开始工作..." << std::endl;
std::string name = static_cast<const char*>(args);
while (true)
{
// pthread_cond_signal(&gcond);// 唤醒其中一个队列首部的线程
pthread_cond_broadcast(&gcond);// 唤醒队列中所有的线程
std::cout << "master 唤醒一个线程..." << std::endl;
sleep(1);
}
}
void StartMaster(std::vector<pthread_t>* tidsptr)
{
pthread_t tid;
int n = pthread_create(&tid, nullptr, MasterCore, (void*)"Master Thread");
if (n == 0)
{
std::cout << "create master success" << std::endl;
}
tidsptr->emplace_back(tid);// 将主线程的线程id放入vector中
}
void StartSlaver(std::vector<pthread_t>* tidsptr, int threadnum = 3)
{
for (int i = 0; i < threadnum; i++)
{
char* name = new char[64];
snprintf(name, 64, "slaver-%d", i + 1); // thread-1
pthread_t tid;
int n = pthread_create(&tid, nullptr, SlaverCore, name);
if (n == 0)
{
std::cout << "create success: " << name << std::endl;
tidsptr->emplace_back(tid);
}
}
}
void WaitThread(std::vector<pthread_t>& tids)
{
for (auto& tid : tids)
{
pthread_join(tid, nullptr);
}
}
int main()
{
std::vector<pthread_t> tids;
StartMaster(&tids);// 主线程去控制其它的线程
StartSlaver(&tids, 5);
WaitThread(tids);
return 0;
}
生产者消费者模型
为何要使用生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
生产者消费者模型优点
- 生产和消费数据,进行解耦(只是在数据层面上做交互)
- 并发
- 支持忙闲不均
生产者和消费者的互斥关系:生产者往超市的货架上放100包方便面,然后入库超市的系统,才能把方便面卖出去,再此之前不能买出去,不然,超市的系统对不上号了。
基于BlockingQueue的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
Thread.hpp
#ifndef __THREAD_HPP__
#define __THREAD_HPP__
#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>
namespace ThreadModule
{
template<typename T>
using func_t = std::function<void(T&)>;
// typedef std::function<void(const T&)> func_t;
template<typename T>
class Thread
{
public:
void Excute()
{
_func(_data);
}
public:
// 构造函数参数1:回调方法,让所有的线程做什么
Thread(func_t<T> func, T& data, const std::string& name = "none-name")
: _func(func), _data(data), _threadname(name), _stop(true)
{}
static void* threadroutine(void* args) // 类成员函数,形参是有this指针的!!
{
Thread<T>* self = static_cast<Thread<T> *>(args);
self->Excute();
return nullptr;
}
bool Start()
{
int n = pthread_create(&_tid, nullptr, threadroutine, this);
if (!n)
{
_stop = false;
return true;
}
else
{
return false;
}
}
void Detach()
{
if (!_stop)
{
pthread_detach(_tid);
}
}
void Join()
{
if (!_stop)
{
pthread_join(_tid, nullptr);
}
}
std::string name()
{
return _threadname;
}
void Stop()
{
_stop = true;
}
~Thread() {}
private:
pthread_t _tid;
std::string _threadname;
T& _data; // 为了让所有的线程访问同一个全局变量
func_t<T> _func;
bool _stop;
};
}
#endif
BlockQueue.hpp
// 阻塞队列
#ifndef __BLOCK_QUEUE_HPP__
#define __BLOCK_QUEUE_HPP__
#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>
template <typename T>
class BlockQueue
{
private:
bool IsFull()
{
return _block_queue.size() == _cap;
}
bool IsEmpty()
{
return _block_queue.empty();
}
public:
// cap:阻塞队列所对应的容量
BlockQueue(int cap) : _cap(cap)
{
_productor_wait_num = 0;
_consumer_wait_num = 0;
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_product_cond, nullptr);
pthread_cond_init(&_consum_cond, nullptr);
}
void Enqueue(T& in) // 生产者用的接口
{
// 生产与消费者之间形成互斥
pthread_mutex_lock(&_mutex);
// 生产之前,要先判断阻塞队列中是否满了
while (IsFull()) // 保证代码的健壮性
{
// 生产线程去等待,是在临界区中休眠的!你现在还持有锁呢!!!
// 1. pthread_cond_wait调用是: a. 让调用线程等待 b. 自动释放曾经持有的_mutex锁
// c. 当条件满足,线程唤醒,pthread_cond_wait要求线程必须重新竞争_mutex锁,竞争成功,方可返回!!!
// 之前:安全
_productor_wait_num++;
// 只要等待,必定会有唤醒,唤醒的时候,就要继续从这个位置向下运行!!
// 该线程被唤醒的时候,此时线程的状态是没有锁的,要求重新竞争锁
pthread_cond_wait(&_product_cond, &_mutex);
_productor_wait_num--;
// 之后:安全
}
// 进行生产
// _block_queue.push(std::move(in));// 右值引用,自定义类型可以,内置类型不行
// std::cout << in << std::endl;
_block_queue.push(in);
// 通知消费者来消费
// 如果消费者有等待的,就唤醒消费者
if (_consumer_wait_num > 0)
pthread_cond_signal(&_consum_cond); // pthread_cond_broadcast
pthread_mutex_unlock(&_mutex);
}
void Pop(T* out) // 消费者用的接口 --- 5个消费者
{
pthread_mutex_lock(&_mutex);
// 保证代码的健壮性:当5个线程都被唤醒时,但是队列中的数据没有了,可以通过while让其它的线程重新等待
while (IsEmpty())
{
// 消费线程去等待,是在临界区中休眠的!你现在还持有锁呢!!!
// 1. pthread_cond_wait调用是: a. 让调用进程等待 b. 自动释放曾经持有的_mutex锁
_consumer_wait_num++;
pthread_cond_wait(&_consum_cond, &_mutex); // 伪唤醒
_consumer_wait_num--;
}
// 进行消费
*out = _block_queue.front();
_block_queue.pop();
// 通知生产者来生产
// 队列中有一个被消费者消费了,那么队列中空出来一个位置,通知生产者生产
// 如果生产者有等待的,就唤醒生产者
if (_productor_wait_num > 0)
pthread_cond_signal(&_product_cond);
pthread_mutex_unlock(&_mutex);
// pthread_cond_signal(&_product_cond);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_product_cond);
pthread_cond_destroy(&_consum_cond);
}
private:
std::queue<T> _block_queue; // 阻塞队列,是被整体使用的!!!
int _cap; // 总上限
pthread_mutex_t _mutex; // 保护_block_queue的锁
pthread_cond_t _product_cond; // 专门给生产者提供的条件变量,生产满了,生产者就到_product_cond条件变量中等待被唤醒
pthread_cond_t _consum_cond; // 专门给消费者提供的条件变量,没有数据了,消费者去_consum_cond条件变量等待被话唤醒
int _productor_wait_num;// 有多少个生产者在等待
int _consumer_wait_num;// 有多少个消费者在等待
};
#endif
Task.hpp
#pragma once
#include <iostream>
#include <string>
#include <functional>
using Task = std::function<void()>;
// 任务变成了一个函数对象,返回值为void,参数为空
class Task
{
public:
Task() {}
Task(int a, int b) : _a(a), _b(b), _result(0)
{
}
void Excute()
{
_result = _a + _b;
}
std::string ResultToString()
{
return std::to_string(_a) + "+" + std::to_string(_b) + "=" + std::to_string(_result);
}
std::string DebugToString()
{
return std::to_string(_a) + "+" + std::to_string(_b) + "=?";
}
private:
int _a;
int _b;
int _result;
};
Main.cc
#define _CRT_SECURE_NO_WARNINGS 1
#include "BlockQueue.hpp"
#include "Thread.hpp"
#include "Task.hpp"
#include <string>
#include <vector>
#include <unistd.h>
#include <ctime>
using namespace ThreadModule;
int a = 10;
// 重定义,从此以后想要阻塞队列中是什么类型,就直接改Task
using blockqueue_t = BlockQueue<Task>;
void PrintHello()
{
std::cout << "hello world" << std::endl;
}
// class ThreadData
// {
// private:
// blockqueue_t &bq;
// std::string who;
// };
// 消费者的执行任务 --- 生产者和消费者看到和访问同一个阻塞队列
void Consumer(blockqueue_t& bq)
{
while (true)
{
// 1. 从blockqueue取下来任务
Task t;
bq.Pop(&t);
// 2. 处理这个任务
//t.Excute();
t(); //消费者私有
// std::cout << "Consumer Consum data is : " << t.ResultToString() << std::endl;
}
}
// 生产者的执行任务
void Productor(blockqueue_t& bq)
{
srand(time(nullptr) ^ pthread_self());
int cnt = 10;
while (true)
{
sleep(1);
// // 1. 获取任务
// int a = rand() % 10 + 1;
// usleep(1234);
// int b = rand() % 20 + 1;
// Task t(a, b);
// 2. 把获取的任务,放入blockqueue
Task t = PrintHello;
bq.Enqueue(t);
// std::cout << "Productor product data is : " << t.DebugToString() << std::endl;
}
}
// 生产者和消费者共同的线程
void StartComm(std::vector<Thread<blockqueue_t>>* threads, int num, blockqueue_t& bq, func_t<blockqueue_t> func)
{
for (int i = 0; i < num; i++)
{
std::string name = "thread-" + std::to_string(i + 1);
threads->emplace_back(func, bq, name);// 执行的方法是func
threads->back().Start();
}
}
void StartConsumer(std::vector<Thread<blockqueue_t>>* threads, int num, blockqueue_t& bq)
{
StartComm(threads, num, bq, Consumer);
}
void StartProductor(std::vector<Thread<blockqueue_t>>* threads, int num, blockqueue_t& bq)
{
StartComm(threads, num, bq, Productor);
}
void WaitAllThread(std::vector<Thread<blockqueue_t>>& threads)
{
for (auto& thread : threads)
{
thread.Join();
}
}
int main()
{
blockqueue_t* bq = new blockqueue_t(5);
std::vector<Thread<blockqueue_t>> threads;
// std::vector<Thread<ThreadData>> threads;// 线程在初始化时,传ThreadData
StartConsumer(&threads, 3, *bq);// 消费者线程
StartProductor(&threads, 1, *bq);// 生产者线程
WaitAllThread(threads);
return 0;
}
如果只有一个生产者线程,有5个消费者线程,当生产者线程生产了一个数据,然后通过pthread_cond_broadcast()函数在条件变量下唤醒了5个消费者线程,那么这5个线程就不会在条件变量下等待锁了,这5个线程都会自动竞争这把锁,假如线程1竞争成功了,那么其余4个线程也会阻塞休眠,但是不会在条件变量下休眠了,而是在互斥锁上进行等待的,此时线程1去消费完数据之后解锁,其余4个线程立马继续竞争锁,当消费者线程2竞争锁成功,线程2进行消费,但是此时队列中的数据已经完了,那么就会出错。这4个消费者线程为伪唤醒。
生产者生产出来的任务交给消费者。
生产者在队列中放任务,和消费者在队列中消费任务,在生产消费的过程之中是串行的
一个信号量就是一个计数器 ---> 是衡量资源数目的,只要申请成功,就一定有对应的资源提供给你,有效减少内部判断。
总结
好了,本篇博客到这里就结束了,如果有更好的观点,请及时留言,我会认真观看并学习。
不积硅步,无以至千里;不积小流,无以成江海。