目录
简介
记录开发时自用的小轮子:线程安全队列
线程安全队列
#ifndef THREADSAFEQUEUE_H
#define THREADSAFEQUEUE_H
#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <memory>
template<typename T>
class ThreadSafeQueue
{
public:
ThreadSafeQueue();
ThreadSafeQueue(const ThreadSafeQueue& other);
ThreadSafeQueue& operator=(const ThreadSafeQueue& other);
~ThreadSafeQueue();
public:
void push(T new_value);
//对于pop方法,std::queue中的pop只负责弹出元素,不返回元素
//这里为了接口简化,设计为在pop的同时返回弹出的元素
//timeout为超时时间,单位为毫秒
//默认阻塞pop
bool wait_and_pop(T& value, std::chrono::milliseconds timeout_millisecond = std::chrono::milliseconds::max());
std::shared_ptr<T> wait_and_pop(std::chrono::milliseconds timeout_millisecond = std::chrono::milliseconds::max());
bool empty() const;
size_t size() const;
T& front();
const T& front() const;
std::shared_ptr<T> front_ptr();
private:
mutable std::mutex mtx;
std::queue<T> data_queue;
std::condition_variable data_cond;
};
template<typename T>
ThreadSafeQueue<T>::ThreadSafeQueue()
{
}
template<typename T>
ThreadSafeQueue<T>::ThreadSafeQueue(const ThreadSafeQueue& other)
{
std::lock_guard<std::mutex> lock(other.mtx);
data_queue = other.data_queue;
}
template<typename T>
ThreadSafeQueue<T>& ThreadSafeQueue<T>::operator=(const ThreadSafeQueue& other)
{
if (this == &other)
{
return *this;
}
std::lock(mtx, other.mtx);
std::lock_guard<std::mutex> self_lock(mtx, std::adopt_lock);
std::lock_guard<std::mutex> other_lock(other.mtx, std::adopt_lock);
data_queue = other.data_queue;
return *this;
}
template<typename T>
ThreadSafeQueue<T>::~ThreadSafeQueue()
{
}
template<typename T>
void ThreadSafeQueue<T>::push(T new_value)
{
std::lock_guard<std::mutex> lock(mtx);
data_queue.push(std::move(new_value));
data_cond.notify_one();
}
template<typename T>
bool ThreadSafeQueue<T>::wait_and_pop(T& value, std::chrono::milliseconds timeout_millisecond)
{
std::unique_lock<std::mutex> lock(mtx);
if (data_queue.empty())
{
if (timeout_millisecond == std::chrono::milliseconds::max())
{
data_cond.wait(lock, [this] {return !data_queue.empty();});
}
else
{
if (!data_cond.wait_for(lock, timeout_millisecond, [this] {return !data_queue.empty();}))
return false;
}
}
value = std::move(data_queue.front());
data_queue.pop();
return true;
}
template<typename T>
std::shared_ptr<T> ThreadSafeQueue<T>::wait_and_pop(std::chrono::milliseconds timeout_millisecond)
{
std::unique_lock<std::mutex> lock(mtx);
if (data_queue.empty())
{
if (timeout_millisecond == std::chrono::milliseconds::max())
{
data_cond.wait(lock, [this] {return !data_queue.empty();});
}
else
{
if (!(data_cond.wait_for(lock, timeout_millisecond, [this] {return !data_queue.empty();})))
return std::shared_ptr<T>();
}
}
std::shared_ptr<T> res(std::make_shared<T>(std::move(data_queue.front())));
data_queue.pop();
return res;
}
template<typename T>
bool ThreadSafeQueue<T>::empty() const
{
std::lock_guard<std::mutex> lock(mtx);
return data_queue.empty();
}
template<typename T>
size_t ThreadSafeQueue<T>::size() const
{
std::lock_guard<std::mutex> lock(mtx);
return data_queue.size();
}
template<typename T>
T& ThreadSafeQueue<T>::front()
{
std::lock_guard<std::mutex> lock(mtx);
if(data_queue.empty())
throw std::runtime_error("queue is empty");
return data_queue.front();
}
template<typename T>
const T& ThreadSafeQueue<T>::front() const
{
std::lock_guard<std::mutex> lock(mtx);
if(data_queue.empty())
throw std::runtime_error("queue is empty");
return data_queue.front();
}
template<typename T>
std::shared_ptr<T> ThreadSafeQueue<T>::front_ptr()
{
std::lock_guard<std::mutex> lock(mtx);
if(data_queue.empty())
return std::shared_ptr<T>();
return std::make_shared<T>(data_queue.front());
}
#endif // THREADSAFEQUEUE_H
标签:std,return,lock,C++,queue,自用,线程,data,ThreadSafeQueue
From: https://www.cnblogs.com/paw5zx/p/18112408