首页 > 其他分享 >多线程模型之生产者消费者 -- 转载

多线程模型之生产者消费者 -- 转载

时间:2023-03-02 10:33:38浏览次数:41  
标签:std queue cnt 多线程 thread -- param repo 转载

https://www.cnblogs.com/pandamohist/p/13852197.html

互斥量 std::mutex:解决多个线程对共享数据的访问问题。
条件变量 std::condition_variable:是一种线程间的通讯机制,解决线程的执行问题。

#pragma once

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>



std::mutex _mtx;
std::condition_variable _cv_not_full;
std::condition_variable _cv_not_empty;

const int max_queue_size_10 = 10;

enum {
	// 总生产数目
	cnt_total_10 = 10,
};


template<typename T>
struct repo_ {
	// 用作互斥访问缓冲区
	std::mutex				_mtx_queue;

	// 缓冲区最大size
	unsigned int			_count_max_queue_10 = 10;

	// 缓冲区
	std::queue<T>			_queue;

	// 缓冲区没有满,通知生产者继续生产
	std::condition_variable _cv_queue_not_full;

	// 缓冲区不为空,通知消费者继续消费
	std::condition_variable _cv_queue_not_empty;



	// 用于生产者之间的竞争
	std::mutex				_mtx_pro;
	// 计算当前已经生产了多少数据了
	unsigned int			_cnt_cur_pro = 0;


	// 用于消费者之间的竞争
	std::mutex				_mtx_con;
	// 计算当前已经消费多少数据了
	unsigned int			_cnt_cur_con = 0;


	repo_(const unsigned int count_max_queue = 10) :_count_max_queue_10(count_max_queue)
		, _cnt_cur_con(0)

	{
		;
	}

	repo_(const repo_& instance) = delete;
	repo_& operator = (const repo_& instance) = delete;
	repo_(const repo_&& instance) = delete;
	repo_& operator = (const repo_&& instance) = delete;

};

template <typename T>
using repo = repo_<T>;





//----------------------------------------------------------------------------------------

// 生产者生产数据
template <typename T>
void thread_produce_item(const int& thread_index, repo<T>& param_repo, const T& repo_item) {
	std::unique_lock<std::mutex> lock(param_repo._mtx_queue);

	// 1. 生产者只要发现缓冲区没有满, 就继续生产
	param_repo._cv_queue_not_full.wait(lock, [&] { return param_repo._queue.size() < param_repo._count_max_queue_10; });

	// 2. 将生产好的商品放入缓冲区
	param_repo._queue.push(repo_item);

	// log to console
	std::cout << "生产者" << thread_index << "生产数据:" << repo_item << "\n";

	// 3. 通知消费者可以消费了
	//param_repo._cv_queue_not_empty.notify_one();
	param_repo._cv_queue_not_empty.notify_one();
}


//----------------------------------------------------------------------------------------
// 消费者消费数据

template <typename T>
T thread_consume_item(const int thread_index, repo<T>& param_repo) {
	std::unique_lock<std::mutex> lock(param_repo._mtx_queue);

	// 1. 消费者需要等待【缓冲区不为空】的信号
	param_repo._cv_queue_not_empty.wait(lock, [&] {return !param_repo._queue.empty(); });

	// 2. 拿出数据
	T item;
	item = param_repo._queue.front();
	param_repo._queue.pop();

	std::cout << "消费者" << thread_index << "从缓冲区中拿出一组数据:" << item << std::endl;

	// 3. 通知生产者,继续生产
	param_repo._cv_queue_not_full.notify_one();

	return item;
}


//----------------------------------------------------------------------------------------

/**
*  @ brief: 生产者线程
*  @ thread_index - 线程标识,区分是哪一个线程
*  @ count_max_produce - 最大生产次数
*  @ param_repo - 缓冲区
*  @ return - void

*/
template< typename T >
void thread_pro(const int thread_index, const int count_max_produce, repo<T>* param_repo) {
	if (nullptr == param_repo || NULL == param_repo)
		return;


	while (true) {
		bool is_running = true;

		{
			// 用于生产者之间竞争
			std::unique_lock<std::mutex> lock(param_repo->_mtx_pro);

			// 缓冲区没有满,继续生产
			if (param_repo->_cnt_cur_pro < cnt_total_10) {
				thread_produce_item<T>(thread_index, *param_repo, param_repo->_cnt_cur_pro);
				++param_repo->_cnt_cur_pro;
			} else
				is_running = false;
		}

		std::this_thread::sleep_for(std::chrono::microseconds(16));
		if (!is_running)
			break;
	}
}



/**
*  @ brief: 消费者线程
*  @ thread_index - 线程标识,区分线程
*  @ param_repo - 缓冲区
*  @ return - void

*/
template< typename T >
void thread_con(const int thread_index, repo<T>* param_repo) {
	if (nullptr == param_repo || NULL == param_repo)
		return;

	while (true) {
		bool is_running = true;
		{
			std::unique_lock<std::mutex> lock(param_repo->_mtx_con);
			// 还没消费到指定的数目,继续消费
			if (param_repo->_cnt_cur_con < cnt_total_10) {
				thread_consume_item<T>(thread_index, *param_repo);
				++param_repo->_cnt_cur_con;
			} else
				is_running = false;

		}

		std::this_thread::sleep_for(std::chrono::microseconds(16));

		// 结束线程
		if ((!is_running))
			break;
	}
}


// 入口函数
//----------------------------------------------------------------------------------------

int main(int argc, char* argv[], char* env[]) {
	// 缓冲区
	repo<int> repository;
	// 线程池
	std::vector<std::thread> vec_thread;

	// 生产者
	vec_thread.push_back(std::thread(thread_pro<int>, 1, cnt_total_10, &repository));
	vec_thread.push_back(std::thread(thread_pro<int>, 2, cnt_total_10, &repository));

	// 消费者
	vec_thread.push_back(std::thread(thread_con<int>, 1, &repository));
	vec_thread.push_back(std::thread(thread_con<int>, 2, &repository));



	for (auto& item : vec_thread) {
		item.join();
	}

	return 0;
}

标签:std,queue,cnt,多线程,thread,--,param,repo,转载
From: https://www.cnblogs.com/txtp/p/17170975.html

相关文章

  • 用例需注意的点-UI自动化
    记几条--用例注意事项:用例从功能里面转化而来,并且不能脱离业务(针对某一个页面功能\某一个流程业务,写一条用例:即将界面操作间接转化为代码去操作!)1用例要尽量独立,相互不影响!(......
  • AIGC背后的人工智能理论及应用
    随着人工智能技术的不断发展,越来越多的AI技术被应用到了各个领域。其中,AIGC(ArtificialIntelligenceGeneralChip)作为一种通用人工智能芯片,受到了广泛的关注。本文将从理论......
  • 智能前置仓(微仓)如何高效低成本迎合市场随意更换商品?
    我们不得不面对一个非常尴尬的事实,那就是,伏尔泰曾经说过,不经巨大的困难,不会有伟大的事业。这不禁令我们深思问题的关键究竟为何?满足消费者下单后最快速的一小时内甚至30分......
  • 阿里云短信服务
    1、登入阿里云官网  2、申请签名和模板         3、记住签名名称和模板CODE,后续代码里需要用到      4、获取accessKeyId及acce......
  • SQLSERVER 内存管理
    查看每个数据库对内存的占用SELECTISNULL(DB_NAME(DATABASE_ID),\'RESOURCEDB\')ASDATABASENAME,CAST(COUNT(ROW_COUNT)*8.0/(1024.0)ASDECIMAL(28,2)......
  • 总算能生产出能看的了AI绘图
      ......
  • 第一个程序
    HelloWorld!随便新建一个txt文件设置文件名为Hello,将后缀名改为java,即Hello.java编写代码publicclassHello{ publicstaticvoidmain(String[]args){ ......
  • MyBatis基础
    概念      快速入门创建user表,添加数据CREATEDATABASEmybatis;USEmybatis;DROPTABLEIFEXISTStb_user;CREATETABLEtb_user(idINTPRIM......
  • docker+gunicorn+fastapi部署
    一、准备工作1、先确保项目可以正常运行2、使用pipfreeze导出第三方库3、在项目根目录新建pip.conf文件,写入一下内容[global]index-url=http://......
  • squal
    1,执行安装程序mysql-installer-community-8.0.23.0.exe 启动mysqlinstaller1.4 该安装程序可以安装,配置,卸载mysql8各个产品2,选择安装类别:developerdefaulit:安装mys......