这里模拟了一个队列,5个消费者线程和5个生产者线程来共同使用队列里面的数据处理。包括了阻塞和非阻塞的出队入队方法。注意在添加线程时使用到了std::ref引用,在传参给线程时不能直接传入queue的引用,要使用std::ref才是真正意义上的引用,当直接使用queue编译器是不会通过的,因为其实直接传入queue只是对当前的队列的引用的拷贝,不是其本身。而std::ref才是真正引用其本身。
include
include <condition_variable>
include
include
using namespace std;
template<typename T,size_t capacity>
class SyncQueue {
public:
SyncQueue():count(0){}
void Eequeue(T value){
std::unique_lock<std::mutex> lock(m);
not_full.wait(lock, [this]() {return count != capacity; });
syncQueue.push_back(value);
++count;
lock.unlock();
not_empty.notify_one();
}
bool TryEequeue(T value) {
std::lock_guard<std::mutex> lock(m);
if (count == capacity) return false;
syncQueue.push(value);
++count;
not_empty.notify_one();
return true;
}
T Dequeue() {
std::unique_lock<std::mutex> lock(m);
not_empty.wait(lock, [this]() {return count !=0; });
T value = syncQueue.front();
syncQueue.pop();
--count;
lock.unlock();
not_full.notify_one();
return value;
}
bool TryDequeue(T &value) {
std::lock_guard<std::mutex> lock(m);
if (count == 0) return false;
value = syncQueue.front();
syncQueue.pop();
--count;
not_full.notify_one();
return true;
}
size_t getCurrentSize() {
std::lock_guard<std::mutex> lock(m);
return count;
}
private:
std::mutex m;
std::condition_variable not_full;
std::condition_variable not_empty;
std::queue
size_t count;
};
void producer(SyncQueue<int, 10>& queue, int ThreadID) {
int value;
for (int i = 0; i < 10; i++) {
value = ThreadID * 100 + i;
if (queue.TryEequeue(value)) {
cout << "TryEnqueue success value = " << value << endl;
}
else {
cout << "TryEnqueue faild value = " << value << endl;
i--;
}
}
}
void consumer(SyncQueue<int, 10>& queue, int ThreadID) {
int value;
while (queue.TryDequeue(value)) {
cout << "TryDequeue success : " << value << endl;
}
}
int main() {
SyncQueue<int, 10> queue;
std::vectorstd::thread producers;
std::vectorstd::thread consumers;
for (int i = 0; i < 5; i++) {
producers.emplace_back(std::thread(producer,std::ref(queue),i));
}
for (int i = 0; i < 5; i++) {
consumers.emplace_back(std::thread(consumer, std::ref(queue), i));
}
for (auto &thread: producers) {
thread.join();
}
for (auto& thread : consumers) {
thread.join();
}
return 0;
}