昇腾 - AscendCL C++应用开发 线程安全的队列
flyfish
C++ mutex 各种各样的互斥锁 mutex、timed_mutex、recursive_mutex、shared_mutex
C++ 线程间同步的条件变量 std::condition_variable 和 std::condition_variable_any
C++提供的智能指针 unique_ptr、shared_ptr、weak_ptr
C++中的左值(lvalue)和 右值(rvalue),移动语义(move semantics)和完美转发(perfect forwarding)
std::thread非常详细的解释
在写推理视频代码时,需要线程安全的队列,例如可以一个线程存储视频帧,另一个线程取出帧,然后推理,再将推理结果写入另一个队列。
#ifndef THREAD_SAFE_QUEUE_H
#define THREAD_SAFE_QUEUE_H
#include <mutex>
#include <queue>
#include <condition_variable>
#include <stdexcept>
namespace aclcustom {
template<typename T>
class ThreadSafeQueue {
public:
explicit ThreadSafeQueue(uint32_t capacity = kDefaultQueueCapacity)
{
if (capacity < kMinQueueCapacity) {
queueCapacity = kDefaultQueueCapacity;
} else if (capacity > kMaxQueueCapacity) {
queueCapacity = kMaxQueueCapacity;
} else {
queueCapacity = capacity;
}
}
~ThreadSafeQueue() = default;
bool Push(T input_value)
{
std::unique_lock<std::mutex> lock(mutex_);
cond_full_.wait(lock, [this] { return queue_.size() < queueCapacity; });
queue_.push(std::move(input_value));
cond_empty_.notify_one();
return true;
}
T Pop()
{
std::unique_lock<std::mutex> lock(mutex_);
cond_empty_.wait(lock, [this] { return !queue_.empty(); });
T tmp_ptr = std::move(queue_.front());
queue_.pop();
cond_full_.notify_one();
return tmp_ptr;
}
bool Empty() const
{
std::lock_guard<std::mutex> lock(mutex_);
return queue_.empty();
}
uint32_t Size() const
{
std::lock_guard<std::mutex> lock(mutex_);
return queue_.size();
}
bool ExtendCapacity(uint32_t newSize) {
std::lock_guard<std::mutex> lock(mutex_);
if (newSize < queue_.size()) {
return false; // Indicate that the operation failed
}
queueCapacity = newSize;
cond_full_.notify_all(); // Notify all producers if needed
return true; // Indicate that the operation succeeded
}
private:
std::queue<T> queue_;
uint32_t queueCapacity;
mutable std::mutex mutex_;
std::condition_variable cond_empty_;
std::condition_variable cond_full_;
static constexpr uint32_t kMinQueueCapacity = 1;
static constexpr uint32_t kMaxQueueCapacity = 10000;
static constexpr uint32_t kDefaultQueueCapacity = 10;
};
} // namespace aclcustom
#endif /* THREAD_SAFE_QUEUE_H */
调用示例
#include "ThreadSafeQueue.h"
#include <iostream>
#include <thread>
void producer(aclcustom::ThreadSafeQueue<int>& queue) {
for (int i = 0; i < 10; ++i) {
queue.Push(i);
std::cout << "Produced: " << i << std::endl;
}
}
void consumer(aclcustom::ThreadSafeQueue<int>& queue) {
for (int i = 0; i < 10; ++i) {
int value = queue.Pop();
std::cout << "Consumed: " << value << std::endl;
}
}
int main() {
aclcustom::ThreadSafeQueue<int> queue(100001); // This will be clamped to 10000
std::thread producer_thread(producer, std::ref(queue));
std::thread consumer_thread(consumer, std::ref(queue));
producer_thread.join();
consumer_thread.join();
return 0;
}
队列容量
如果传入的 capacity
小于 kMinQueueCapacity
,队列容量会设置为 kDefaultQueueCapacity
。
如果 capacity
超过了 kMaxQueueCapacity
,队列容量会被限制在 kMaxQueueCapacity
。
否则,容量按用户输入的 capacity
设置。确保了队列的容量始终在合理的范围内,不会超出预设的最大容量,也不会过小。
Push 方法
Push
方法负责将新元素插入队列中:
使用 std::unique_lock
锁住互斥量 mutex_
以保护共享数据。
使用 cond_full_
条件变量等待队列有空间可供插入,如果队列已满,生产者线程会阻塞在这里,直到有空间释放。
插入元素后,使用 cond_empty_
条件变量通知至少一个等待的消费者线程,表明队列中有数据可供消费。
Pop 方法
Pop
方法用于从队列中取出元素:
同样使用 std::unique_lock
锁住互斥量。
使用 cond_empty_
条件变量等待队列中有数据可供消费,如果队列为空,消费者线程会阻塞在这里,直到有数据被插入。
取出元素后,使用 cond_full_
条件变量通知至少一个等待的生产者线程,表明队列中有空间可供插入新数据。
Empty 方法
Empty
方法检查队列是否为空,使用 std::lock_guard
进行加锁,提供快速访问:
Size 方法
Size
方法返回当前队列的大小,也使用 std::lock_guard
进行加锁:
ExtendCapacity 方法
ExtendCapacity
方法允许动态调整队列容量:
如果新容量小于当前队列大小,可以返回一个错误代码或状态,表示扩容失败
更新容量后,通知所有等待的生产者线程以便他们可以继续插入数据。
线程通知机制
通过使用 std::condition_variable
,我们实现了生产者和消费者之间的同步:
当队列满时,生产者线程等待空间释放。
当队列空时,消费者线程等待数据生成。
这种机制有效避免了不必要的忙等待(Busy Waiting),从而减少了 CPU 占用。
线程间同步 :std::condition_variable
用于让一个或多个线程等待,直到另一个线程发出某种信号或条件满足时,才会被唤醒继续执行。通常与 std::mutex
一起使用,保证条件判断和线程唤醒的原子性。
避免忙等待 :在没有 std::condition_variable
的情况下,线程可能会通过循环不断检查某个条件是否满足,这会消耗大量的 CPU 资源。std::condition_variable
允许线程在条件不满足时进入等待状态,不占用 CPU,直到条件满足时才被唤醒。