线程安全的队列:使用Monitor模式和C++11多线程库
引言
在多线程编程中,数据共享是一个关键的问题。如果多个线程需要访问同一个数据结构,不正确的管理会导致数据不一致甚至程序崩溃。本文将介绍如何使用C++11的多线程库和Monitor模式来实现一个线程安全的队列。
Monitor模式
Monitor模式是一种同步原语,用于封装对共享资源的访问。在C++中,我们可以通过组合使用std::mutex
和std::condition_variable
来实现Monitor模式。
// 定义一个通用的Monitor模板类
template <typename T>
class Monitor {
public:
// 嵌套结构体,用于管理互斥锁和条件变量
struct UnlockAndNotify {
std::mutex d_mutex; // 互斥锁
std::condition_variable d_condition; // 条件变量
// 锁定互斥锁
void lock() { d_mutex.lock(); }
// 解锁互斥锁,并通知一个等待的线程
void unlock() { d_mutex.unlock(); d_condition.notify_one(); }
};
private:
// 由于可能在const成员函数中需要修改这两个成员,因此将它们声明为mutable
mutable UnlockAndNotify d_combined; // UnlockAndNotify实例
mutable T d_data; // 存储的数据
public:
// 用于生产者的锁定函数
// 返回一个包含数据引用和锁的tuple
std::tuple<T&, std::unique_lock<UnlockAndNotify>> makeProducerLock() const {
return { d_data, std::unique_lock{d_combined} };
}
// 用于消费者的锁定函数
// 只有当满足某个条件(PRED predicate)时,才返回数据和锁
template <typename PRED>
std::tuple<T&, std::unique_lock<std::mutex>> makeConsumerLockWhen(PRED predicate) const {
std::unique_lock lock{d_combined.d_mutex}; // 获取锁
// 等待条件满足
d_combined.d_condition.wait(lock, [this, predicate]{ return predicate(d_data); });
return { d_data, std::move(lock) }; // 返回数据和锁
}
};
线程安全的队列实现
我们定义了一个名为ThreadQueue
的模板类,它使用一个Monitor
实例来封装其内部的std::deque
。
// 定义一个模板类 ThreadQueue
template <typename T>
class ThreadQueue {
// 使用 Monitor 模板类封装一个 std::deque
// 以保证其线程安全性
Monitor<std::deque<T>> d_monitor;
public:
// 添加一个元素到队列中
void add(T number) {
// 使用 Monitor 的 makeProducerLock 方法获取一个唯一锁和队列引用
// 这确保了在添加元素时队列不会被其他线程修改
auto[numberQueue, lock] = d_monitor.makeProducerLock();
// 在获取到锁的情况下,将元素添加到队列的末尾
numberQueue.push_back(number);
}
// 从队列中移除并返回一个元素
T remove() {
// 使用 Monitor 的 makeConsumerLockWhen 方法在满足某个条件(队列非空)时
// 获取一个唯一锁和队列引用
auto[numberQueue, lock] = d_monitor.makeConsumerLockWhen([](auto& numberQueue) { return !numberQueue.empty(); });
// 在获取到锁和确认队列非空的情况下,从队列前端移除一个元素
const auto number = numberQueue.front();
numberQueue.pop_front();
// 返回被移除的元素
return number;
}
};
添加元素
在add
函数中,我们首先使用makeProducerLock
方法获取一个锁和队列的引用。然后,我们在获取锁的情况下,安全地将元素添加到队列中。
移除元素
在remove
函数中,我们使用makeConsumerLockWhen
方法。该方法会等待队列非空的条件成立,然后才获取锁和队列的引用。
测试
class Dice {
public:
int operator()(){ return rand(); }
private:
std::function<int()> rand = std::bind(std::uniform_int_distribution<>(1, 6),
std::default_random_engine());
};
int main(){
std::cout << '\n';
constexpr auto NumberThreads = 100;
ThreadQueue<int> safeQueue;
auto addLambda = [&safeQueue](int val){ safeQueue.add(val);
std::cout << val << " "
<< std::this_thread::get_id() << "; ";
};
auto getLambda = [&safeQueue]{ safeQueue.remove(); };
std::vector<std::thread> addThreads(NumberThreads);
Dice dice;
for (auto& thr: addThreads) thr = std::thread(addLambda, dice());
std::vector<std::thread> getThreads(NumberThreads);
for (auto& thr: getThreads) thr = std::thread(getLambda);
for (auto& thr: addThreads) thr.join();
for (auto& thr: getThreads) thr.join();
std::cout << "\n\n";
}
文章由ChatGPT-4模型协助完成。
参考:Thread-Safe Queue: Two Serious Errors