写在前面:本文学习自基于C++11实现线程池,代码部分均属于该博主,自己只是想记录以下自己的认知,并以这种方式加深一下自己对于多线程的理解
1 前置知识
左值和右值
左值(Lvalue):左值是指具有持久性和地址的表达式。简单来说,左值是可以被引用的、可以取地址的表达式。左值可以是变量、对象、函数或表达式的结果,它们在内存中有对应的存储位置。
int x = 10; // x 是左值
int* ptr = &x; // &x 是左值,表示 x 的地址
右值(Rvalue):右值是指临时对象、将要销毁的对象或被明确标记为右值的表达式。右值没有持久性,它们通常在语句执行后立即被销毁。右值不能被取地址,不能被引用,只能被移动(通过移动语义)或复制(通过复制构造函数)。
int y = 5 + 3; // 5 + 3 是右值
int&& rvalueRef = 5 + 3; // 5 + 3 是右值,rvalueRef 是右值引用
移动语义和完美转发
右值引用:&&用于声明右值引用类型。右值引用是C++11引入的特性,用于绑定到临时对象、将要销毁的对象或标记为右值的表达式。
右值引用的语法是在类型名称后加上两个连续的&&符号,例如T&&,其中T是类型。右值引用的主要目的是支持移动语义和完美转发。
T&& var; // 右值引用声明
移动语义:右值引用与移动语义紧密相关。通过将资源从一个对象“移动”到另一个对象,而不是资源复制,提高性能。移动语义通常用于支持容器、智能指针和自定义类型等的高效移动操作。
class MyObject {
public:
// 移动构造函数
MyObject(MyObject&& other) {
// 执行资源的移动操作
}
};
MyObject obj1;
MyObject obj2(std::move(obj1)); // 使用 std::move 将左值转换为右值引用,再启动移动语义
完美转发:右值引用也用于实现完美转发,即将函数参数以原样转发给其他函数,保持原参数的值类别(左值或右值)。完美转发通常用于实现泛型函数或模板函数,以处理传递给它们的参数。
template<typename T>
void forwardFunc(T&& arg) {
otherFunc(std::forward<T>(arg)); // 使用 std::forward 进行完美转发
}
std::string str = "Hello";
forwardFunc(str); // 传递左值
forwardFunc(std::string("World")); // 传递右值
other
在C++中,"other"并没有特定的预定义含义或语义。它通常是一个通用的标识符,用于表示不属于特定类别或类型的对象、变量或参数。
当在代码中使用"other"作为标识符时,其具体含义会取决于上下文和使用者的意图。它可以用作占位符或临时变量名,表示未知或未指定的实体。
尾返回类型推导
C++11 引入了尾返回类型推导(Trailing Return Type Deduction),它允许在函数定义的尾部推导出函数的返回类型。尾返回类型推导的语法使用了 auto 关键字和尾部的类型声明符号 ->。
在C++11之前,函数的返回值类型必须显示的定义在函数前面,在C++11及以后的标准中可以使用尾返回类型推导来让编译器推导函数的返回类型,例如:
auto add(int a, int b) -> int // 函数定义
{
return a + b;
}
在上述示例中,-> int 部分表示函数的尾返回类型,指定了函数的返回类型为 int。通过尾返回类型推导,可以根据函数体中的返回语句来推导出函数的返回类型。
尾返回类型推导的优势在于可以根据函数体中的具体实现来确定函数的返回类型,特别是对于返回类型复杂或依赖于模板参数的函数。它可以减少代码中的重复,使函数的定义更加简洁和可读。
template<typename T, typename U>
auto multiply(T a, U b) -> decltype(a * b)
{
return a * b;
}
decltype
decltype 是 C++11 中引入的关键字,用于获取表达式的类型。它可以在编译时推导出表达式的类型,并将其作为 decltype 的结果,它往往与尾返回类型推导一起可以有很好的效果。其使用很简单,decltype(需要获取类型的表达式),如:
int x = 42;
decltype(x) y; // y 的类型为 int
double calculateValue();
decltype(calculateValue()) result; // result 的类型为函数 calculateValue() 的返回类型
int array[] = {1, 2, 3};
decltype(array) anotherArray; // anotherArray 的类型为 int[3]
struct Person {
std::string name;
int age;
};
Person p;
decltype(p) q; // q 的类型为 Person
那么我们是如何结合使用尾返回类型与类型推导decltype呢,按照标准,auto关键字不能用于函数形参的类型推导,在C++14以前,也不能直接用auto func()的形式来推导函数的返回类型。
因此传统C++中我们必须这么写:
template<typename R, typename T, typename U>
R add(T x, U y) {
return x+y;
}
这样存在很明显的缺陷:事实上很多时候我们并不知道add()这个函数会进行什么操作,获取什么样的返回类型。
因此使用decltype来推导函数的返回类型,我们并不能直接写出这种形式的代码:
decltype(x+y) add(T x, U y)
因为编译器在读到decltype(x+y)时,x和y尚未定义。而这个问题的解决方案,正是尾返回类型推导。C++11引入了一个尾返回类型(trailing return type),利用auto关键字将返回类型后置:
template<typename T, typename U>
auto add2(T x, U y) -> decltype(x+y){
return x + y;
}
function
std::function
是 C++ 标准库中的一个类模板,用于封装可调用对象(函数、函数指针、成员函数指针、函数对象等)以及其参数和返回类型。它提供了一种通用的、类型安全的方式来存储、传递和调用各种可调用对象。
std::function 的使用方式类似于函数指针,但更加灵活。它可以接受不同的可调用对象,而不仅限于特定的函数签名。通过 std::function,可以将函数或函数对象作为参数传递给其他函数,或者将其存储在容器中,以便在需要时进行调用。
简单示例:
#include <iostream>
#include <functional>
int add(int a, int b) {
return a + b;
}
struct Multiply {
int operator()(int a, int b) {
return a * b;
}
};
int main() {
std::function<int(int, int)> func1 = add;
std::function<int(int, int)> func2 = Multiply();
std::cout << "Result 1: " << func1(3, 4) << std::endl;
std::cout << "Result 2: " << func2(3, 4) << std::endl;
return 0;
}
在上述示例中,我们定义了一个普通函数 add 和一个函数对象 Multiply,它们都可以用来执行加法和乘法运算。通过 std::function<int(int, int)>,我们定义了两个 std::function 对象 func1 和 func2,它们都接受两个 int 类型的参数并返回一个 int 类型的结果。func1 存储了 add 函数,而 func2 存储了 Multiply 的实例。我们可以像调用普通函数一样,通过 func1 和 func2 来调用相应的函数或函数对象。
总之,其最重要的作用是:std::function对C++中各种可调用实体(普通函数、Lambda表达式、函数指针、以及其它函数对象等)的封装,形成一个新的可调用的std::function对象,简化调用;
bind
有关具体示例和详细内容参考C++11的std::function和std::bind用法详解
std::bind可以看作一个通用的函数适配器,它接受一个可调用对象,生成一个新的可调用对象来适应原对象的参数列表。
std::bind将可调用对象与其参数一起进行绑定,绑定后的结果可以使用std::function保存。std::bind主要有以下两个作用:
- 将可调用对象和其参数绑定成一个仿函数;
- 只绑定部分参数,减少可调用对象传入的参数。
调用bind的一般形式:
auto newCallable = bind(callable, arg_list);
该形式表达的意思是:当调用newCallable时,会调用callable,并传给它arg_list中的参数。
- std::bind的返回值是一个可调用实体,可以直接赋给std::function
2 实现一个线程池
首先我们应该实现一个任务队列,将要执行的任务按照到来的顺序存储起来,以便线程池启动的时候进行执行。对于线程池应该有两个主体要做,提交函数(submit)和内置线程工作类,我们的线程池应该是能够接收任何类型函数并返回其结果,而在提交函数(submit)中,我们的目的在于用function来封装这些任意可调用的函数,使之变成相同类型的对象,以一个统一的方式来保存,然后才能将他们送入到任务队列中去,方便我们调用和执行。而在内置线程工作类中我们应该取出这些任务并执行他们。
1 任务队列
我们是不可以直接定义一个队列来存放任务的,应当确保它是一个安全的队列,即任意时刻队列只能执行一个操作,因为队列是公共资源,应该由一把互斥锁来保护,否则会出现很多问题,比如当前任务队列中还剩一个任务,有一个线程来查询任务队列是否为空,但由于任务队列的操作不受限,另一个线程把这个任务取走了,这时前一个线程想要取任务就出错了。而多个线程同时存在时,这样随意且不受限制的行为造成的后果要更加严重。
template <typename T>
class SafeQueue
{
private:
//任务队列
std::queue<T> m_queue;
//确保任意时刻任务队列只有一个操作在执行,比如在读大小的同时有线程进行了增删操作,就出问题了
std::mutex m_mutex;
public:
//每一个操作都要拿到锁以后才执行
bool empty(){
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.empty();
}
int size(){
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.size();
}
//队列添加任务
void enqueue(T &t){
std::unique_lock<std::mutex> lock(m_mutex);
m_queue.emplace(t);//接收参数并在队尾构造一个对象
}
//队列取出任务
bool dequeue(T &t){
std::unique_lock<std::mutex> lock(m_mutex);
if(m_queue.empty()){
return false;
}
//用move将资源所有权转移而不是复制,可提高程序性能
t = std::move(m_queue.front());
m_queue.pop();
return true;
}
};
3 提交函数submit
明确提交函数的作用:
- 接收任何参数的任何函数。(普通函数,Lambda,成员函数……)
- 返回一个包含执行结果的对象
// Submit a function to be executed asynchronously by the pool
template <typename F, typename... Args>
auto submit(F &&f, Args &&...args) -> std::future<decltype(f(args...))> ①
{
// Create a function with bounded parameter ready to execute
std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...); ②// 连接函数和参数定义,特殊函数类型,避免左右值错误
// Encapsulate it into a shared pointer in order to be able to copy construct
auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func); ③
// Warp packaged task into void function
std::function<void()> warpper_func = [task_ptr]()
{
(*task_ptr)();
}; ④
// 队列通用安全封包函数,并压入安全队列
m_queue.enqueue(warpper_func);
// 唤醒一个等待中的线程
m_conditional_lock.notify_one(); ⑤
// 返回先前注册的任务指针
return task_ptr->get_future();
}
对提交函数逐行理解:
①:submit()是一个模板函数,而template<typename F, typename... Args>中的typename... Args是C++11引入的可变模版参数(variadic templates)。
用decltype推导出传入函数的返回值类型,用尾返回类型推导来得到返回类型为一个future类型的对象,future可以启动一个异步任务并访问异步操结果,方便我们后续获取结果。
std::future<ReturnType> futureObj
//ReturnType:是异步任务的返回类型。
//futureObj:是一个 std::future 对象,用于接收异步任务的结果或状态
总的来说,submit函数会获取一个函数和它的参数列表来作为参数,并返回一个包含结果的future类型的对象
②从第二步开始,主要的操作就是对传入的函数进行封装,以形成一个统一且简单的调用形式。这里首先用std::bind将可调用对象和其参数绑定为仿函数,然后再将返回的可调用实体直接赋给std::function,此时我们的任务函数就变成了一个返回值为传入函数的返回类型、不需要关注参数的可调用对象func,此时调用func就等价于调用最初传入submit的函数。
这里我们可以注意到在bind中使用了std::forward,用于实现完美转发,这是由于函数头中的格式F&& f和Args&&... args,这并不代表函数接收的参数类型应就是右值引用,而是一种特殊现象,这个现象被称作万能引用(universal reference)。
万能引用可以简单理解为,当T是模板参数时,T&&的作用主要是保持值类别进行转发。然而,一个绑定到universial reference上的对象可能具有lvaluesness或者rvalueness,正是因为有这种二义性,所以产生了std::forward。
关于完美转发、万能语义可参考这篇细致的文章现代C++之万能引用、完美转发、引用折叠
③ 使用std::make_shared<>()方法,来声明一个std::packaged_task<decltype(f(args...))()>类型的智能指针,并将前面std::function方法声明的特殊函数包装func传入作为std::packaged_task的实例化参数,创建异步任务,以便后续唤醒线程来执行。智能指针将更方便我们对该std::packaged_task对象进行管理。
std::packaged_task可以用来异步任务的封装:它允许将一个任务封装成一个可调用对象,使得该任务可以在另一个线程中执行。
④ 这里我们再次利用std::function,将task_ptr指向的std::packaged_task对象取出并包装为返回值为void的函数warpper_func。这样我们的代码将更加美观优雅。
⑤接下来的部分就是将封装好的函数压入任务队列,并唤醒一个线程来执行异步任务,返回的时候调用get_future,这个方法主要是用于将packaged_task与一个future对象相关联,也可以可以理解为做了一个future类型的转换,使得返回值是一个future类型的对象。
提交函数到此结束,综上,在使用submit函数时,我们只需要传入一个函数和其参数列表,创建一个future类型的对象来接收,并获取函数运行的接结果。
4 线程内置工作类
线程内置工作类是真正执行任务的函数。
class ThreadWorker // 内置线程工作类
{
private:
int m_id; // 工作id
ThreadPool *m_pool; // 所属线程池
public:
// 构造函数
ThreadWorker(ThreadPool *pool, const int id) : m_pool(pool), m_id(id)
{
}
// 重载()操作
void operator()()
{
std::function<void()> func; // 定义基础函数对象func
bool dequeued; // 是否正在取出队列中元素
// 判断线程池是否关闭,没有关闭则从任务队列中循环提取任务
while (!m_pool->m_shutdown)
{
// 为线程环境加锁,互访问工作线程的休眠和唤醒
std::unique_lock<std::mutex> lock(m_pool->m_conditional_mutex);
// 如果任务队列为空,阻塞当前线程
if (m_pool->m_queue.empty())
{
m_pool->m_conditional_lock.wait(lock); // 等待条件变量通知,开启线程
}
// 取出任务队列中的元素
dequeued = m_pool->m_queue.dequeue(func);
}
// 如果成功取出,执行工作函数
if (dequeued)
func();
}
};
这里最重要的是重载()的操作,我们希望可以像调用函数一样来调用对象。
线程池完整代码
class ThreadPool
{
private:
bool m_shutdown;//用于判断线程池是否关闭
SafeQueue<std::function<void()>> m_queue;//任务队列
std::vector<std::thread> m_threads;//工作线程队列
std::mutex m_conditional_mutex;//线程池加锁,用以确保不会有两个线程取到同一个任务
std::condition_variable m_conditional_lock;//条件变量,用来阻塞或唤醒线程
//内置线程工作类
class ThreadWorker
{
private:
int m_id;//线程id
ThreadPool *m_pool;
public:
ThreadWorker(ThreadPool *pool, const int id) : m_pool(pool), m_id(id){
}
//重载(),使得调用对象和调用函数一样
void operator()(){
std::function<void()> func;//定义基础函数对象
bool dequeued;//是否取出队列中元素
while(!m_pool -> m_shutdown){
std::unique_lock<std::mutex> lock(m_pool ->m_conditional_mutex);
//如果任务队列为空,阻塞当前线程
if(m_pool -> m_queue.empty()){
m_pool -> m_conditional_lock.wait(lock);
}
//取出任务队列中的元素
dequeued = m_pool->m_queue.dequeue(func);
//取出成功,执行func
if (dequeued){
func();
}
}
}
};
public:
//线程池构造函数
ThreadPool(const int n_threads = 4)
: m_threads(std::vector<std::thread>(n_threads)), m_shutdown(false){
}
//初始化线程池
void init(){
for(int i = 0; i < m_threads.size(); ++i){
m_threads.at(i) = std::thread(ThreadWorker(this, i));
}
}
void shutdown()
{
m_shutdown = true;
m_conditional_lock.notify_all(); // 通知,唤醒所有工作线程
for (int i = 0; i < m_threads.size(); ++i)
{
if (m_threads.at(i).joinable()) // 判断线程是否在等待
{
m_threads.at(i).join(); // 将线程加入到等待队列
}
}
}
// Submit a function to be executed asynchronously by the pool
template <typename F, typename... Args>
auto submit(F &&f, Args &&...args) -> std::future<decltype(f(args...))>
{
// Create a function with bounded parameter ready to execute
std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...); // 连接函数和参数定义,特殊函数类型,避免左右值错误
// Encapsulate it into a shared pointer in order to be able to copy construct
auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);
// Warp packaged task into void function
std::function<void()> warpper_func = [task_ptr]()
{
(*task_ptr)();
};
// 队列通用安全封包函数,并压入安全队列
m_queue.enqueue(warpper_func);
// 唤醒一个等待中的线程
m_conditional_lock.notify_one();
// 返回先前注册的任务指针
return task_ptr->get_future();
}
};
注意一下init()函数和shutdown()函数:
- 在线程池初始化函数init()中,我们声明并分配工作线程,将工作线程放入工作线程队列m_threads中。
- 在线程池关闭函数shutdown()中,我们唤醒所有工作线程,并等待期完成所有工作后关闭线程池。
3 线程池完整代码
#ifndef PRACTICE_THREAD_POOL_H
#define PRACTICE_THREAD_POOL_H
#include <mutex>
#include <queue>
#include <functional>
#include <future>
#include <thread>
#include <utility>
#include <vector>
template <typename T>
class SafeQueue
{
private:
//任务队列
std::queue<T> m_queue;
//确保任意时刻任务队列只有一个操作在执行,比如在读大小的同时有线程进行了增删操作,就出问题了
std::mutex m_mutex;
public:
bool empty(){
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.empty();
}
int size(){
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.size();
}
//队列添加任务
void enqueue(T &t){
std::unique_lock<std::mutex> lock(m_mutex);
m_queue.emplace(t);
}
//队列取出任务
bool dequeue(T &t){
std::unique_lock<std::mutex> lock(m_mutex);
if(m_queue.empty()){
return false;
}
t = std::move(m_queue.front());
m_queue.pop();
return true;
}
};
class ThreadPool
{
private:
bool m_shutdown;//用于判断线程池是否关闭
SafeQueue<std::function<void()>> m_queue;//任务队列
std::vector<std::thread> m_threads;//工作线程队列
std::mutex m_conditional_mutex;//线程池加锁,用以确保不会有两个线程取到同一个任务
std::condition_variable m_conditional_lock;//条件变量,用来阻塞或唤醒线程
//内置线程工作类
class ThreadWorker
{
private:
int m_id;//线程id
ThreadPool *m_pool;
public:
ThreadWorker(ThreadPool *pool, const int id) : m_pool(pool), m_id(id){
}
//重载(),使得调用对象和调用函数一样
void operator()(){
std::function<void()> func;//定义基础函数对象
bool dequeued;//是否取出队列中元素
while(!m_pool -> m_shutdown){
std::unique_lock<std::mutex> lock(m_pool ->m_conditional_mutex);
//如果任务队列为空,阻塞当前线程
if(m_pool -> m_queue.empty()){
m_pool -> m_conditional_lock.wait(lock);
}
//取出任务队列中的元素
dequeued = m_pool->m_queue.dequeue(func);
//取出成功,执行func
if (dequeued){
func();
}
}
}
};
public:
//线程池构造函数
ThreadPool(const int n_threads = 4)
: m_threads(std::vector<std::thread>(n_threads)), m_shutdown(false){
}
//初始化线程池
void init(){
for(int i = 0; i < m_threads.size(); ++i){
m_threads.at(i) = std::thread(ThreadWorker(this, i));
}
}
void shutdown()
{
m_shutdown = true;
m_conditional_lock.notify_all(); // 通知,唤醒所有工作线程
for (int i = 0; i < m_threads.size(); ++i)
{
if (m_threads.at(i).joinable()) // 判断线程是否在等待
{
m_threads.at(i).join(); // 将线程加入到等待队列
}
}
}
// Submit a function to be executed asynchronously by the pool
template <typename F, typename... Args>
auto submit(F &&f, Args &&...args) -> std::future<decltype(f(args...))>
{
// Create a function with bounded parameter ready to execute
std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...); // 连接函数和参数定义,特殊函数类型,避免左右值错误
// Encapsulate it into a shared pointer in order to be able to copy construct
auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);
// Warp packaged task into void function
std::function<void()> warpper_func = [task_ptr]()
{
(*task_ptr)();
};
// 队列通用安全封包函数,并压入安全队列
m_queue.enqueue(warpper_func);
// 唤醒一个等待中的线程
m_conditional_lock.notify_one();
// 返回先前注册的任务指针
return task_ptr->get_future();
}
};
#endif //PRACTICE_THREAD_POOL_H
4 测试代码
#include <iostream>
#include <random>
#include "thread_pool.h"
std::random_device rd; // 真实随机数产生器
std::mt19937 mt(rd()); //生成计算随机数mt
std::uniform_int_distribution<int> dist(-1000, 1000); //生成-1000到1000之间的离散均匀分布数
auto rnd = std::bind(dist, mt);
// 设置线程睡眠时间
void simulate_hard_computation()
{
std::this_thread::sleep_for(std::chrono::milliseconds(2000 + rnd()));
}
// 添加两个数字的简单函数并打印结果
void multiply(const int a, const int b)
{
simulate_hard_computation();
const int res = a * b;
std::cout << a << " * " << b << " = " << res << std::endl;
}
// 添加并输出结果
void multiply_output(int &out, const int a, const int b)
{
simulate_hard_computation();
out = a * b;
std::cout << a << " * " << b << " = " << out << std::endl;
}
// 结果返回
int multiply_return(const int a, const int b)
{
simulate_hard_computation();
const int res = a * b;
std::cout << a << " * " << b << " = " << res << std::endl;
return res;
}
void example()
{
// 创建3个线程的线程池
ThreadPool pool(3);
// 初始化线程池
pool.init();
// 提交乘法操作,总共30个
for (int i = 1; i <= 3; ++i)
for (int j = 1; j <= 10; ++j)
{
pool.submit(multiply, i, j);
}
// 使用ref传递的输出参数提交函数
int output_ref;
auto future1 = pool.submit(multiply_output, std::ref(output_ref), 5, 6);
// 等待乘法输出完成
future1.get();
std::cout << "Last operation result is equals to " << output_ref << std::endl;
// 使用return参数提交函数
auto future2 = pool.submit(multiply_return, 5, 3);
// 等待乘法输出完成
int res = future2.get();
std::cout << "Last operation result is equals to " << res << std::endl;
// 关闭线程池
pool.shutdown();
}
int main()
{
example();
return 0;
}
标签:11,std,函数,int,lock,C++,线程,function
From: https://www.cnblogs.com/dreamer-q/p/17558941.html