线程同步
同步: 在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,这就叫做同步(饥饿问题:某些线程无法得到资源而长时间无法执行,常见的就是申请不到锁)
竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。
单纯的加锁会引起问题。
如果某个线程竞争锁的能力非常强,则会一直申请锁,释放锁。就会导致线程的饥饿问题。
加锁保护了临界资源,但也阻止了高效让每一个线程使用这份资源。
现在规定线程竞争锁必须排队,一个线程得到锁后释放了锁,如果想要重新得到锁必须从队尾等待锁的申请。这样就保证了线程按照某种顺序访问资源!也就是同步。
条件变量
条件变量是一种维持线程同步的一种机制。
如果一个线程申请到锁,条件也不成立,就直接释放锁。轮到下一个线程申请锁,条件不成立,释放锁。反复申请释放,什么都不做。对于系统来说这是一种资源浪费。
我们希望的是,当线程申请到锁时,如果条件不满足,就先阻塞等待。等到条件满足后,唤醒线程。线程执行后续的任务。
条件变量的主要动作
- 当前线程等待条件变量成立而被挂起。
- 条件变量成立,线程被唤醒。
条件变量通过指得是在互斥底下。
条件变量的用法与锁基本一致。
条件变量函数
定义
pthread_cond_t cond;
初始化
全局的初始化为PTHREAD_COND_INITIALIZER
局部初始化为
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
成功返回0,失败返回错误码。
参数: cond条件变量 attr:初始化变量的属性(一般设为空)
// 初始化互斥锁和条件变量
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&cond, NULL);
销毁
全局会自动销毁
// 销毁互斥锁和条件变量
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
条件条件满足
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
参数
- cond:为条件变量
- mutex:当前的互斥锁
- 成功返回0,失败返回错误码
作用是让线程进入挂起状态,等待被唤醒。
注意:进入等待时,实际上会释放掉锁。
唤醒线程
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
- pthread_cond_signal是唤醒等待队列的首个线程。
- pthread_cond_broadcast唤醒等待队列的所有线程。
如果唤醒多个线程,那么线程也要先竞争锁,再去访问资源。
生产者消费模型
生产者消费者模型是OS中的一个重要的模型。它描述了一种通知和等待的机制。
这个模型包括生产者,消费者,交易产所(通常称为缓冲区)。
生产者负责生产数据,将其放入交易场所(一块内存)。同一时间只能有一个生产者放数据,如果交易场所满了,生产者不能继续生产,必须等待消费者从仓库拿走数据。消费者从仓库取出数据。同样,同一时间只能有一个消费者拿走数据,如果仓库满了,消费者必须等待。
生产者和消费者存在复杂的关系。首先必须是互斥的。生产者和消费者都需要面对交易场所。如果生产和消费同时对交易场所操作,可能会互相干扰,因此要对这个临界资源互斥的访问,即加锁。
同时生产和消费也是同步的,必须保证在适当的时机对数据的生产和消费。
这个模型的一个主要优点是解耦,即降低生产者和消费者之间的依赖关系。通过引入交易场所(缓冲区),生产者不需要直接关注消费者的状态,只需要将数据放入缓冲区;同样,消费者也不需要关心生产者的状态,只需要从缓冲区中取出数据进行处理。
生产者和生产者关系,消费者和消费者的关系,生产和消费者的关系为什么是互斥的?
- 生产者和生产者会竞争对临界资源的访问,使用是互斥的。
- 消费者和消费者会竞争取出数据,所以是互斥的。
- 生产者竞争消费者,消费者竞争生产者。所以生产者和消费者是互斥的。
生产和消费者为什么也是同步的?
要保证一个合适的生产和消费数据的顺序。既不能一直生产(仓库满了),也不能一直消费(仓库空了),同时也要防止饥饿问题。
所以生产者消费者模型就是利用锁和条件变量的维护。
基于BlockingQueue的生产者消费者模型
基于阻塞队列,是常用的生产消费模型。
- 当队列为空时,往队列里取数据就会被阻塞,直到队列里有数据。
- 当队列为满时,往队列时放数据会被阻塞,直到队列有数据被取出。
我们以一个单生产者,单消费者来模拟实现。
阻塞队列的框架
- 由于库里面的队列是malloc的空间,所以不是线程安全的,需要我们封装。
- 需要cappcity来判断阻塞队列是空满。
- 需要mutex来保护临界资源(仓库)。
- 需要条件变量 _c_cond _p_cond分别维护消费者和生产者的同步。
- 成员函数需要生产者push,消费者pop
生产者生产数据push
void Push(const T &in)
{
pthread_mutex_lock(&_mutex);
// 满了就不生产
while (IsFull())
{
pthread_cond_wait(&_p_cond, &_mutex);
}
_q.push(in);
pthread_cond_signal(&_c_cond);
pthread_mutex_unlock(&_mutex);
}
首先是加锁
为什么条件判断是while?
为了保持代码的鲁棒性。如果是if判断,在if条件变量等待后,锁被释放。这时候就绕开(不成立)的条件执行后续的代码。因此每一轮都要判断。如果被阻塞了,就必须一直判断。
往队列中push数据后唤醒消费者来消费。
最后释放锁。
消费者消费数据pop
void Pop(T *out)
{
pthread_mutex_lock(&_mutex);
//空了就不消费
while (IsEmpty())
{
pthread_cond_wait(&_c_cond, &_mutex);
}
*out=_q.front();
_q.pop();
pthread_cond_signal(&_p_cond);
pthread_mutex_unlock(&_mutex);
}
out是一个输出型参数。
条件同样需要一直判断。
消费一个数据后,就通知生产者来生产数据。
#include <queue>
#include <pthread.h>
const int defultcap = 5;
template <class T>
class BlockQueue
{
public:
BlockQueue(int cap = defultcap) : _capacity(cap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_p_cond, nullptr);
pthread_cond_init(&_c_cond, nullptr);
}
bool IsFull()
{
return _capacity==_q.size();
}
bool IsEmpty()
{
return _q.size()==0;
}
void Push(const T &in)
{
pthread_mutex_lock(&_mutex);
// 满了就不生产
while (IsFull())
{
pthread_cond_wait(&_p_cond, &_mutex);
}
_q.push(in);
pthread_cond_signal(&_c_cond);
pthread_mutex_unlock(&_mutex);
}
void Pop(T *out)
{
pthread_mutex_lock(&_mutex);
//空了就不消费
while (IsEmpty())
{
pthread_cond_wait(&_c_cond, &_mutex);
}
*out=_q.front();
_q.pop();
pthread_cond_signal(&_p_cond);
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_p_cond);
pthread_cond_destroy(&_c_cond);
}
private:
std::queue<T> _q;
int _capacity;
pthread_mutex_t _mutex;
pthread_cond_t _p_cond; // 生产者
pthread_cond_t _c_cond; // 消费者
};
我们创建的阻塞队列是否适用于多生产者多消费者?
适合。因为生产的过程是进行了保护,即使是多生产的竞争也不会影响放数据。消费也受到保护,不会因为多消费导致数据的读取干扰。
生产和消费的过程本来就是串行的?为什么会高效呢?
- 生产者和消费者模型由于锁的保护,没有改变串行的特点。
- 但是数据的生产是可以并行的!在放数据的那一刻是互斥的。
- 数据效率的过程也是并行的!在取数据的一瞬间是互斥的。
- 故而快就是快在数据怎么来和数据处理的过程。
这种解耦的特点就是高效的原因。
在软件开发过程中,生产者消费者模型常被用于处理数据的生产和消费问题。例如,在并发编程中,可以使用生产者消费者模型来避免数据的竞争条件和提高系统的吞吐量。此外,该模型也可以用于实现异步编程、任务调度和消息队列等功能。
总的来说,生产者消费者模型是一种强大的并发处理模型,能够有效地平衡生产者和消费者的处理能力,并降低它们之间的依赖关系。
利用阻塞队列模拟计算任务。
利用2生产者,3消费者处理任务。
随机生成+ - */%的任务,并且展示结果的正确性,以及错误码。
#pragma once
#include <string>
const int defulvalue = 0;
const std::string opers = "+-*/%()";
enum
{
right = 0,
div_zore,
mod_zore,
unkonw
};
class Task
{
public:
Task()
{
}
Task(int x, int y, char op)
: _x(x), _y(y), _oper(op), _result(0), _code(0)
{
}
void Run()
{
switch (_oper)
{
case '+':
_result = _x + _y;
break;
case '-':
_result = _x - _y;
break;
case '*':
_result = _x * _y;
break;
case '/':
{
if (_y == 0)
_code = div_zore;
else
_result = _x / _y;
}
break;
case '%':
{
if (_y == 0)
_code = mod_zore;
else
_result = _x % _y;
}
break;
default:
_code = unkonw;
break;
}
}
void operator()()
{
Run();
}
std::string PrintTask()
{
std::string s;
s = std::to_string(_x);
s += _oper;
s += std::to_string(_y);
s += "=?";
return s;
}
std::string PrintResult()
{
std::string s;
s = std::to_string(_x);
s += _oper;
s += std::to_string(_y);
s += "=";
s += std::to_string(_result);
s += " [";
s += std::to_string(_code);
s += "]";
return s;
}
~Task()
{
}
private:
int _x;
int _y;
char _oper;
int _result;
int _code;
};
创建多生产多消息模型
#include "BlockQueue.hpp"
#include <iostream>
#include <pthread.h>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
#include <string>
#include "Task.hpp"
class Data
{
public:
BlockQueue<Task> *_bq;
std::string _name;
};
void *consurmer(void *args)
{
Data*td=(Data*)args;
while (true)
{
Task t;
td->_bq->Pop(&t);
t();
std::cout <<td->_name<< "pop :" << t.PrintResult() << std::endl;
}
}
void *productor(void *args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
while (true)
{
int data1 = rand() % 10; // 0-9
usleep(rand() % 123);
int data2 = rand() % 10;
usleep(rand() % 123);
char oper = opers[rand() % opers.size()];
usleep(rand() % 123);
Task t(data1, data2, oper);
std::cout << "_p push: " << t.PrintTask() << std::endl;
bq->Push(t);
}
return nullptr;
}
int main()
{
srand((uint64_t)time(nullptr) ^ getpid() ^ pthread_self()); // 更加随机的种子
BlockQueue<Task> *bq = new BlockQueue<Task>();
pthread_t c[3], p[2];
Data *td1=new Data();
td1->_bq=bq,td1->_name="_c_1";
pthread_create(&c[0], nullptr, consurmer, td1);
Data *td2=new Data();
td2->_bq=bq,td2->_name="_c_2";
pthread_create(&c[1], nullptr, consurmer, td2);
Data *td3=new Data();
td3->_bq=bq,td3->_name="_c_3";
pthread_create(&c[2], nullptr, consurmer, td3);
pthread_create(&p[0], nullptr, productor, bq);
pthread_create(&p[1], nullptr, productor, bq);
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
pthread_join(c[2], nullptr);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
return 0;
}
由于生产的速度小于效率。最终消费者会竞争商品。
在我们的生产者消费者模型中是一个无限循环的,在实际中需要添加信号或者共享的布尔类型让程序停下。
标签:线程,消费者,生产者,模型,Linux,&_,mutex,pthread,cond From: https://blog.csdn.net/m0_73299809/article/details/136791858