直接上代码啦:
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <stdexcept>
// 定义一个同步的队列类
class SyncQueue {
public:
SyncQueue(size_t capacity) : max_capacity(capacity) {}
void push(int value) {
std::unique_lock<std::mutex> lock(mtx);
// 等待直到队列有足够空间
space_cond.wait(lock, [this] { return queue.size() < max_capacity; });
queue.push(value);
lock.unlock();
data_cond.notify_one(); // 通知等待的线程数据已经可用
}
int pop() {
std::unique_lock<std::mutex> lock(mtx);
data_cond.wait(lock, [this] { return !queue.empty(); }); // 等待直到队列非空
int value = queue.front();
queue.pop();
lock.unlock();
space_cond.notify_one(); // 通知等待的线程现在有空间了
return value;
}
size_t size() const {
std::lock_guard<std::mutex> lock(mtx);
return queue.size();
}
private:
std::queue<int> queue;
size_t max_capacity;
std::mutex mtx;
std::condition_variable data_cond;
std::condition_variable space_cond;
};
void producer(SyncQueue& q) {
for (int i = 0; i < 10; ++i) {
std::cout << "Producing: " << i << std::endl;
q.push(i);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void consumer(SyncQueue& q) {
for (int i = 0; i < 10; ++i) {
int value = q.pop();
std::cout << "Consuming: " << value << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
int main() {
SyncQueue q(5); // 设置队列的最大容量为5
std::thread t1(producer, std::ref(q));
std::thread t2(consumer, std::ref(q));
t1.join();
t2.join();
return 0;
}
在这个版本中,我们增加了以下几点:
SyncQueue
构造函数接收一个参数capacity
,表示队列的最大容量。- 在
push
方法中,我们使用space_cond
条件变量来等待直到队列中有足够的空间可以插入新元素。 - 在
pop
方法中,我们同样使用data_cond
条件变量来等待直到队列非空。 - 我们还添加了一个
size
成员函数,用于返回当前队列中的元素数量,这可以用来进行测试或调试。
这样,当队列已满时,生产者线程将会阻塞,等待消费者线程消费数据以腾出空间。同样,当队列为空时,消费者线程也会阻塞,等待生产者线程填入数据。
标签:std,生产者,lock,c++,queue,队列,cond,编写,size From: https://blog.csdn.net/semicolon_hello/article/details/140910740