生产者消费者模式的代码(以下代码参考链接):
#include <iostream>
#include <thread>
#include <mutex>
#include <queue>
#include <windows.h>
#include <condition_variable>
std::mutex mtx; // 全局互斥锁
std::queue<int> que; // 全局消息队列
std::condition_variable cr; // 全局条件变量
int cnt = 1; // 数据
void producer() {
while(true) {
{
std::unique_lock<std::mutex> lck(mtx);
// 在这里也可以加上wait 防止队列堆积 while(que.size() >= MaxSize) que.wait();
que.push(cnt);
std::cout << "向队列中添加数据:" << cnt ++ << std::endl;
// 这里用大括号括起来了 为了避免出现虚假唤醒的情况 所以先unlock 再去唤醒
}
cr.notify_all(); // 唤醒所有wait
}
}
void consumer() {
while (true) {
std::unique_lock<std::mutex> lck(mtx);
while (que.size() == 0) { // 这里防止出现虚假唤醒 所以在唤醒后再判断一次
cr.wait(lck);
}
int tmp = que.front();
std::cout << "从队列中取出数据:" << tmp << std::endl;
que.pop();
}
}
int main()
{
std::thread thd1[2], thd2[2];
for (int i = 0; i < 2; i++) {
thd1[i] = std::thread(producer);
thd2[i] = std::thread(consumer);
thd1[i].join();
thd2[i].join();
}
return 0;
}
while (que.size() == 0)
的作用:while (que.size() == 0)
不能替换为if(que.size() == 0)
,这是为了防止虚假唤醒。如果while (que.size() == 0)
替换为if(que.size() == 0)
可能会出现如下结果:
- th2[0]拿完了队列里最后一个产品正在处理,此时队列为空。
- th2[1]想去队列里拿发现已经空了,所以停在了wait上。
- th1[0]拿到mtx后,往队列添加了一个产品,并执行了notify_one通知处于等待状态的消费者。
- 由于收到了notify,th2[1]准备要被调度,但是th2[0]此时恰好处理完了手头的任务,并进行了下一轮循环,抢在th2[1]之前拿到了mtx并取走了th1[0]刚放进去的产品,此时th2[1]被阻塞,随后th2[0]释放了mtx。
- th2[0]释放了mtx后,th2[1]终于拿到了mtx却发现队列又是空的,这就是一次虚假唤醒,对于这种情况th2[1]需要继续wait。要想实现“继续wait”,就需要使用
while (que.size() == 0)
,而不是if(que.size() == 0)
wait()有的第二个形参可用于代替while (que.size() == 0)
:
#include <iostream>
#include <thread>
#include <mutex>
#include <queue>
#include <windows.h>
#include <condition_variable>
std::mutex mtx; // 全局互斥锁
std::queue<int> que; // 全局消息队列
std::condition_variable cr; // 全局条件变量
int cnt = 1; // 数据
void producer() {
while(true) {
{
std::unique_lock<std::mutex> lck(mtx);
// 在这里也可以加上wait 防止队列堆积 while(que.size() >= MaxSize) que.wait();
que.push(cnt);
std::cout << "向队列中添加数据:" << cnt ++ << std::endl;
// 这里用大括号括起来了 为了避免出现虚假唤醒的情况 所以先unlock 再去唤醒
}
cr.notify_all(); // 唤醒所有wait
}
}
void consumer() {
while (true) {
std::unique_lock<std::mutex> lck(mtx);
cr.wait(lck,[]{return que.size() > 0;}); // wait被唤醒以后,如果判断return为假时,继续调用wait
int tmp = que.front();
std::cout << "从队列中取出数据:" << tmp << std::endl;
que.pop();
}
}
int main()
{
std::thread thd1[2], thd2[2];
for (int i = 0; i < 2; i++) {
thd1[i] = std::thread(producer);
thd2[i] = std::thread(consumer);
thd1[i].join();
thd2[i].join();
}
return 0;
}
标签:size,std,mtx,C++,que,variable,include,condition,wait
From: https://www.cnblogs.com/codingbigdog/p/16758459.html