在并发编程中,常常需要用到线程安全的队列。常见的线程安全队列的设计分为两种:
- 阻塞队列:常用于生产者和消费者的场景,其中,生产者存放元素,而消费者获取元素。常用的实现方法是在入队和出队时使用同一把锁,或者入队和出队使用不同的两把锁。
- 非阻塞队列:与阻塞队列不同,非阻塞队列在队列为空时获取元素会直接返回,在队列为满时插入元素会抛出异常。常用的实现方法是使用自旋锁 + CAS。
而对于阻塞队列而言,从队列容量大小的角度来看,队列又可分为如下两种:
- 有界队列:即固定容量大小的队列。队列一旦满了,不会进行扩容,也就无法添加新的元素。
- 无界队列:即没有设置固定大小的队列,队列会进行动态扩容。
在 Muduo 的实现中,设计了两种队列,一种是无界阻塞队列 BlockingQueue
,一种是有界阻塞队列 BoundedBlockingQueue
。向队列中添加、取出元素均采用互斥量和条件变量结合的方式来完成线程同步。
BlockingQueue
BlockingQueue
底层采用了标准库中的双端队列 deque
,由于 STL 库中的容器并非线程安全,因此采用互斥锁和条件变量的方式来完成线程同步:
template<typename T>
class BlockingQueue : nocopyable {
public:
using queue_type = std::queue<T>;
private:
mutable MutexLock mutex_;
Condition notEmpty_ GUARDED_BY(mutex_);
queue_type queue_ GUARDED_BY(mutex_);
};
其中,变量 mutex_
使用 mutable
关键字进行修饰,以表示该变量在常成员函数中也是可变的。由于在如下方法中,需要对互斥锁进行加解锁操作,因此采用 mutable 关键字进行修饰:
size_t size() const {
MutexLockGuard lock(mutex_);
return queue_.size();
}
放入元素的方法如下:
void put(const T& x) {
MutexLockGuard lock(mutex_);
queue_.push_back(x);
notEmpty_.notify();
}
void put(T&& x) {
MutexLockGuard lock(mutex_);
queue_.push_back(std::move(x));
notEmpty_.notify();
}
上述重载函数,一个用于传递右值,一个用于传递左值,且均在最后通过 notify()
唤醒阻塞线程来取出队列元素。
如果 T 是一个实际类型,而不是模板参数,那么 string&&
这样的参数为右值引用类型,只能匹配一个右值。而 const string&
这样的参数是常左值引用,可以匹配左值和右值。一个右值会优先匹配右值引用,来实现移动语义。但它也可以匹配常左值引用。
但是,如果 T 是函数模板的类型参数,const T&
的意义不变,可以匹配左值和右值。而 T&&
的意义就发生变化了,这时,T&&
是一个 “转发引用”,可以接受左值或右值的参数,并一般配合 std::forward
来完美转发到另外的函数中。
在 Scott Meyers 的《Effective Modern C++》中称为 “万能引用”。
但是,如果 T&&
是一个模板类成员函数的函数参数,那么它就只能代表右值,因为在编译期类型 T 就已经确认了。
从队列中取出元素的方法如下:
T take() {
MutexLockGuard lock(mutex_);
while (queue_.empty()) {
notEmpty_.wait();
}
assert(!queue_.empty());
T front(std::move(queue_.front()));
queue_.pop_front();
return front;
}
queue_type drain() {
std::deque<T> queue;
{
MutexLockGuard lock(mutex_);
queue = std::move(queue_);
assert(queue_.empty());
}
return queue;
}
其中,drain()
方法则是将队列中的所有元素取出来,放入到一个新的队列中。大括号的作用是给互斥锁一个函数内作用域,使得互斥锁在离开此作用域时调用析构函数进行解锁。
BoundedBlockingQueue
BoundedBlockingQueue
是一个有界的阻塞队列,其底层与 BlockingQueue
有所不同,它使用了一块环形缓冲区来作为底层容器,源于 Boost 库的 circular_buffer
。
boost::circular_buffer<T> queue_ GUARDED_BY(mutex_);
circular_buffer
是一块环形的内存区域,当该区块塞满数据时,新的数据会覆盖旧的数据。支持随机访问,在头尾的插入、删除操作耗时均为常数,并且可以使用标准库中的算法对其中的元素进行操作,比如排序、求和等。
其只在创建、显式调整容量或者需要赋值时才分配内存。
它可能的使用场景如下:
- 用新收到的数据覆盖旧的数据的场景;
- 对指定数目的元素进行某种缓存的场景;
- 高效的固定容量先进先出场景;
- 高效的固定容量的后进先出队列,当队列满时需要删除头元素的场景;
在 BoundedBlockingQueue
中,有两个成员变量来作为条件变量,一个表示队列非空,一个表示队列未满。
Condition notEmpty_ GUARDED_BY(mutex_);
Condition notFull_ GUARDED_BY(mutex_);
其插入元素的方法如下:
void put(const T& x) {
MutexLockGuard lock(mutex_);
while (queue_.full()) {
notFull_.wait();
}
assert(!queue_.full());
queue_.push_back(x);
notEmpty_.notify();
}
其中,在队列已满时,它并未采用 circular_buffer
中覆盖队头元素的方法,而是通过条件变量 notFull_
让线程等待队列未满,然后再进行插入操作。