一、生产者&消费者模式
生产者-消费者模式(Producer-Consumer Pattern)是一种常见的并发设计模式,这种模式最常见,所以把它单独拿出来,这种模式用于处理生产者和消费者之间的协调问题。生产者和消费者之间不直接关联或依赖,而是用一个第三方来协调双方的供需关系。这种模式解决了生产者生成数据和消费者处理数据之间的同步与缓冲问题。特别适合于需要协调多个线程生产和消费数据的场景。
二、生产者&消费者的设计案例
代码说明:通过对象池存放生产者产生的产品。即生产者生产的产品存入对象池中,消费者从对象池获取产品,生产者和消费者通过供需关系时间比例调节平衡。
完整代码
producer_consumer.cpp
#include <iostream>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <thread>
#include <queue>
#include <map>
#include <unordered_set>
#include <atomic>
#include <shared_mutex>
#include <algorithm>
#include <chrono>
#include <atomic>
std::mutex productMutex;
// 对象池
template <typename T>
class ObjectPool {
public:
ObjectPool() : createdCount_(0) {}
~ObjectPool() {
std::lock_guard<std::mutex> lock(mutex_);
pool_.clear();
creationTimes_.clear();
destructionTimes_.clear();
}
std::shared_ptr<T> acquireObject() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this]() { return !pool_.empty(); });
auto obj = pool_.back().second;
pool_.pop_back();
return obj;
}
void addObject(std::shared_ptr<T> obj, int PID) {
std::lock_guard<std::mutex> lock(mutex_);
pool_.emplace_back(PID, obj);
++createdCount_;
creationTimes_.insert({PID, std::make_pair(obj, std::chrono::system_clock::now())});
cv_.notify_one();
}
void destructObject(std::shared_ptr<T> obj, int PID) {
std::lock_guard<std::mutex> lock(mutex_);
destructionTimes_.insert({PID, std::make_pair(obj, std::chrono::system_clock::now())});
}
void printStatus() {
std::lock_guard<std::mutex> lock(mutex_);
std::cout << "对象池总创建产品数: " << createdCount_ << std::endl;
std::cout << "对象池中剩余产品数: " << pool_.size() << std::endl;
for (const auto& [PID, obj] : pool_) {
std::cout << "剩余产品PID: " << PID << std::endl;
}
std::cout << "\n*********** 生产者生产的所有产品信息 ************\n" << std::endl;
for (const auto& [PID, pair] : creationTimes_) {
std::time_t t = std::chrono::system_clock::to_time_t(pair.second);
std::cout << "PID:" << PID << " 产品创建时间: " << std::ctime(&t);
}
std::cout << "\n------------ 消费者消耗的所有产品信息 ------------\n" << std::endl;
for (const auto& [PID, pair] : destructionTimes_) {
std::time_t t = std::chrono::system_clock::to_time_t(pair.second);
std::cout << "产品" << PID << " 被买走时间: " << std::ctime(&t);
}
}
private:
std::vector<std::pair<int, std::shared_ptr<T>>> pool_;
std::mutex mutex_;
std::condition_variable cv_;
size_t createdCount_;
std::multimap<int, std::pair<std::shared_ptr<T>, std::chrono::system_clock::time_point>> creationTimes_;
std::multimap<int, std::pair<std::shared_ptr<T>, std::chrono::system_clock::time_point>> destructionTimes_;
};
// 产品
class Product {
public:
Product() : data_(0) {}
void setData(int data) { data_ = data; }
int getData() const { return data_; }
private:
int data_;
};
// 生产者
class Producer {
public:
Producer(ObjectPool<Product>& pool, std::atomic<bool>& running,const int PRODUCETIME) : pool_(pool), running_(running), PRODUCETIME_(PRODUCETIME) {}
~Producer() {
generated_.clear();
}
void produce() {
while (running_) {
// 生产产品
auto product = std::make_shared<Product>();
if (generated_.size() < capacity_) {
int data = std::rand() % capacity_;
if (generated_.find(data) == generated_.end()) {
generated_.insert(data);
product->setData(data);
std::this_thread::sleep_for(std::chrono::milliseconds(PRODUCETIME_));
{
std::lock_guard<std::mutex> lock(productMutex);
std::cout << "已生产数字币: " << product->getData() << std::endl;
}
// 将产品放入池中
pool_.addObject(product, product->getData());
}
} else {
std::cerr << "Error: 已超过最大产能负荷,生产者崩溃!" << std::endl;
return ;
}
}
}
private:
ObjectPool<Product>& pool_;
std::atomic<bool>& running_;
int PRODUCETIME_;
std::unordered_set<int> generated_;
const int capacity_ = 10000; // 焊四的最大产能
};
// 消费者
class Consumer {
public:
Consumer(ObjectPool<Product>& pool, std::atomic<bool>& running, const int CONSUMETIME) : pool_(pool), running_(running), CONSUMETIME_(CONSUMETIME) {}
void consume() {
while (running_) {
auto product = pool_.acquireObject();
// 消费产品
{
std::lock_guard<std::mutex> lock(productMutex);
std::cout << "已真金白银购买数字币: " << product->getData() << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(CONSUMETIME_));
// 将产品消耗掉
pool_.destructObject(product, product->getData());
}
}
private:
ObjectPool<Product>& pool_;
std::atomic<bool>& running_;
int CONSUMETIME_;
};
int main(int argc, char* argv[]) {
int PRODUCETIME = 100; // default produce time
int CONSUMETIME = 100; // default consume time
if (argc > 1) {
PRODUCETIME = std::atoi(argv[1]);
}
if (argc > 2) {
CONSUMETIME = std::atoi(argv[2]);
}
// 创建对象池
ObjectPool<Product> pool;
std::atomic<bool> running(true);
// 创建生产者和消费者
Producer producer(pool, running, PRODUCETIME);
Consumer consumer(pool, running, CONSUMETIME);
// 启动生产者和消费者线程
std::thread producerThread(&Producer::produce, &producer);
std::thread consumerThread(&Consumer::consume, &consumer);
// 模拟1分钟的产品生产和交易行为
std::this_thread::sleep_for(std::chrono::minutes(1));
running = false;
// 数据缓冲
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "\n一分钟已到,正在清理资源! 即将停止程序运行 " << std::endl;
// 查询对象池
pool.printStatus();
// 停止生产和交易行为
producerThread.join();
consumerThread.join();
std::cout << "\n程序退出 " << std::endl;
return 0;
}
demo演示
供给等于需求
设置了16秒的限时,默认供需时间关系比例为 1:1,demo如下
代码执行方式:
g++ producer_consumer.cpp
./a.out
对象池打印结果:
对象池总创建产品数: 159
对象池中剩余产品数: 1
剩余产品PID: 2379
供给小于需求
设置了16秒的限时,设置供需时间关系比例为 4:1,demo如下
代码执行方式:
g++ producer_consumer.cpp
./a.out 400 100
对象池打印结果:
对象池总创建产品数: 40
对象池中剩余产品数: 0
供给大于需求
设置了16秒的限时,设置供需时间关系比例为 1:4,demo如下
代码执行方式:
g++ producer_consumer.cpp
./a.out 100 400
对象池打印结果:
对象池总创建产品数: 159
对象池中剩余产品数: 119
剩余产品PID: 886
剩余产品PID: 2777
剩余产品PID: 7793
剩余产品PID: 8335
剩余产品PID: 5386
剩余产品PID: 6649
剩余产品PID: 1421
剩余产品PID: 2362
......
三、总结
从上面的演示中可知,当供给等于需求,即生产&消费时间比例 = 1:1,生产者生产产品速度和消费者消费产品的速度基本保持一致,达到供需平衡。当供给小于需求,即生产&消费时间比例 = 4:1,输出信息明显放慢,说明生产者生产产品速度放慢,导致供给紧张,需求旺盛。当供给大于需求,即生产&消费时间比例 = 1:4,生产者生产产品速度过快,导致产能过剩。
预测:如果不限时,当供给小于需求,运行效果中,产出始终慢(效率低下),当供给大于需求,运行效果中,产品越堆越多,产能过剩(内存很可能不足!)。
通过c++代码可以模拟供需关系,有趣也有成就感。
标签:std,生产者,lock,PID,c++,_.,int,供需,pool From: https://blog.csdn.net/qq_55610255/article/details/141754951