首页 > 编程语言 >C++多线程:async、future、packaged_task、promise、shared_future的学习与使用(九)

C++多线程:async、future、packaged_task、promise、shared_future的学习与使用(九)

时间:2024-04-06 23:59:49浏览次数:37  
标签:std __ task get future 线程 async 多线程

1、异步任务线程
  • 异步线程的概念:

    • 异步:就是非同步,同步就是必须一个一个的执行,异步可以两个事情一起干
    • 异步线程:异步线程就相当于把非关联的两件事分开找两个线程去执行,而分开的那个就是异步线程
    • 举例:例如登录信息,用户登录完毕主线程肯定是需要去及时响应用户的请求的,而系统设计的时候通常会保存用户的登录信息(日志)等等,如果处理这些任务的时间过长就可能无法及时响应用户的请求,而处理这些日志和响应用户是两个独立的事件,因此可以开启异步线程来处理日志,响应用户的操作继续由主线程向下执行,且无需等待异步线程的结果。
  • std::async、std::future

    • C++11线程库中提供了这std::async函数创建后台任务(异步线程)并且返回值
    • std::future类模板来支持获取异步线程std::async创建后台任务最后执行返回的值,
    • std::async是一个函数模板,用来启动一个异步任务,启动起来一个异步任务之后,它返回一个std::future对象,std::future是一个类模板
    • std::future类模板提供了一些函数,get可以获取返回值,wait系列获取不到
  • 启动异步任务:

    • 创建一个async线程并且开始执行对应的线程入口函数,它返回一个std::future对象
    • std::future对象里边就含有线程入口函数所返回的结果(线程返回的结果),我们可以通过调用future对象的成员函数get()来获取结果
    • std::future提供了一种访问异步操作结果的机制,就是说这个结果你可能没有办法马上拿到,但在不就的将来线程执行完毕的时候,就能够拿到结果了
    • 因此如果在std::future调用get之前就会出现两种情况:
      • 异步线程async已经计算完毕,那么std::future调用get直接获取结果
      • 异步线程async没有计算完毕,那么std::future调用get将会阻塞,等待async执行完毕并且返回。
1.1、std::async源码浅析
template<typename _Fn, typename... _Args>
inline future<__async_result_of<_Fn, _Args...>>
async(_Fn&& __fn, _Args&&... __args)
{
    return std::async(launch::async|launch::deferred,
                      std::forward<_Fn>(__fn),
                      std::forward<_Args>(__args)...);
}


template<typename _Fn, typename... _Args>
future<__async_result_of<_Fn, _Args...>> async(launch __policy, _Fn&& __fn, _Args&&... __args){
    std::shared_ptr<__future_base::_State_base> __state;
    if ((__policy & launch::async) == launch::async){				// here
        __try{
            __state = __future_base::_S_make_async_state(
                    std::thread::__make_invoker(std::forward<_Fn>(__fn),
                                                std::forward<_Args>(__args)...)
            );
        }
#if __cpp_exceptions
        catch(const system_error& __e) {
            if (__e.code() != errc::resource_unavailable_try_again
                || (__policy & launch::deferred) != launch::deferred)
                throw;
        }
#endif
    }
    if (!__state){
        __state = __future_base::_S_make_deferred_state(
                std::thread::__make_invoker(std::forward<_Fn>(__fn),
                                            std::forward<_Args>(__args)...));
    }
    return future<__async_result_of<_Fn, _Args...>>(__state);
}
  • std::launch::async|std::launch::deferred:这两个是常量标记。

    • async表示立即创建异步线程
    • deferred表示推迟创建线程,推迟到future调用get或者wait时在创建
  • 默认情况下可以看到源码here处以async的默认形式创建线程,但是这只是Linux,并不知道其他编译器是什么样子!因此使用时根据自身的需求最好指定一下,防止编译器默认!

  • 而可以看到使用std::launch::deferred的并没有看到源码处有创建线程的代码,这里就被推迟了!

  • std::async就两个构造函数

    • 一个不指定launch方式的,只传入线程入口函数和入口函数参数的构造
    • 指定launch方式的构造,并且传入线程入口函数和入口函数参数的构造
1.2、future源码浅析
template<>
class future<void> : public __basic_future<void>{};

template<typename _Res>
class future<_Res&> : public __basic_future<_Res&>{};

template<typename _Res>
class future : public __basic_future<_Res>{
    friend class promise<_Res>;
    template<typename> friend class packaged_task;
    template<typename _Fn, typename... _Args>
    friend future<__async_result_of<_Fn, _Args...>> async(launch, _Fn&&, _Args&&...);

    typedef __basic_future<_Res> _Base_type;
    typedef typename _Base_type::__state_type __state_type;

    explicit future(const __state_type& __state) : _Base_type(__state) { }

public:
    constexpr future() noexcept : _Base_type() { }

    /// Move constructor
    future(future&& __uf) noexcept : _Base_type(std::move(__uf)) { }

    // Disable copying
    future(const future&) = delete;
    future& operator=(const future&) = delete;

    future& operator=(future&& __fut) noexcept {
        future(std::move(__fut))._M_swap(*this);
        return *this;
    }
    /// Retrieving the value
    _Res get(){
        typename _Base_type::_Reset __reset(*this);
        return std::move(this->_M_get_result()._M_value());
    }
    shared_future<_Res> share() noexcept;
};
  • future模板类提供了:void、Res、Res&引用三大类返回类型的处理,因此async可以void空返回、可以类型返回、可以引用返回
  • future最主要的就是get函数,get函数可以看到从一个方法中将值std::move移动出来
  • __basic_future是future的基类,里面提供了wait、wait_for、wait_unitl…函数,所以future可以调用到父类的这些方法
  • get这个函数只能调用一次,因为可以看到get函数中的实际操作是一个std::move,这个值移动之后就为空了。
2、async的使用
#include <iostream>
#include <thread>
#include <future>
#include <string>

#ifndef INC_08_ASYNC_ASYNC_FUTURE_H
#define INC_08_ASYNC_ASYNC_FUTURE_H

class AsyncFuture{
public:
    int class_thread_func(std::string msg){
        std::cout << msg << " 开始执行! thread_id =" << std::this_thread::get_id() << std::endl;
        std::chrono::milliseconds duration(5000);
        std::this_thread::sleep_for(duration);
        std::cout << msg << " 执行完毕! thread_id =" << std::this_thread::get_id() << std::endl;
    }
};


int thread_func()
{
    std::cout << __func__ << " 开始执行! thread_id =" << std::this_thread::get_id() << std::endl;
    std::chrono::milliseconds duration(5000);
    std::this_thread::sleep_for(duration);
    std::cout << __func__ << " 执行完毕! thread_id =" << std::this_thread::get_id() << std::endl;

    return 5;
}

#endif //INC_08_ASYNC_ASYNC_FUTURE_H\

2.1、普通成员函数
void async_test1()
{
    std::cout << "main thread_id = " << std::this_thread::get_id() << std::endl;
    std::future<int> async_result = std::async(std::launch::deferred, thread_func);
    std::cout << "continue......" <<  std::endl;
    std::cout << async_result.get() <<  std::endl;
}
/*
    由于get会阻塞等待,因此输出结果
    main thread_id = 139778198386496
    continue......
    这里阻塞等待async执行完毕
    thread_func 开始执行! thread_id =139778198386496
    thread_func 执行完毕! thread_id =139778198386496
    5
*/
2.2、类成员函数
void async_test2()
{
    std::string msg = "class_thread_func";
    std::cout << "main thread_id = " << std::this_thread::get_id() << std::endl;
    AsyncFuture asyncFuture;
    std::future<int> async_result = std::async(&AsyncFuture::class_thread_func, &asyncFuture, msg);
    std::cout << "continue......" <<  std::endl;
    std::cout << async_result.get() <<  std::endl;
}
/*由于get会阻塞等待,因此会阻塞
    main thread_id = 139783108294464
    这里阻塞等待async执行完毕
    continue......
    class_thread_func 开始执行! thread_id =139783090120448
    class_thread_func 执行完毕! thread_id =139783090120448
    3
*/
2.3、std::launch::deffered
void async_test3()
{
    std::cout << "main thread_id = " << std::this_thread::get_id() << std::endl;
    std::future<int> async_result = std::async(std::launch::deferred, thread_func);
    std::cout << "continue......" <<  std::endl;
    std::cout << async_result.get() <<  std::endl;
}
/*
    由于deffered是延迟创建,因此主线程在执行完之后调用get会导致两个线程串行,最后子线程不会new出来, 由主线程执行!
    main thread_id = 140214074877760
    continue......
    thread_func 开始执行! thread_id =140214074877760
    thread_func 执行完毕! thread_id =140214074877760
    5
*/

void async_test4()
{
    std::cout << "main thread_id = " << std::this_thread::get_id() << std::endl;
    std::future<int> async_result = std::async(std::launch::deferred, thread_func);
    std::cout << "continue......" <<  std::endl;
}
/*
    没有调用get,不进行执行。但是可能在析构的时候会调用get或者wait执行,这个需要看编译器的情况来定,上面没有看到析构
     main thread_id = 139630206793536
     continue......
 */
3、std::packaged_task类模板的使用
  • std::packaged_task是一个类模板,打包任务,把任务都装起来,

  • 它的模板参数是各种可调用对象,方便将来作为线程入口函数来调用

  • 写法很抽象,大概就是可以把一个线程进行打包起来,需要用的是可以以各种方式进行调用和返回一些东西, C++真是一门玄学

#include <iostream>
#include <thread>
#include <future>
#include <vector>
#include <string>

int thread_func1(int t)
{
    std::cout << __func__ << " 开始执行! thread_id =" << std::this_thread::get_id() << std::endl;
    std::chrono::milliseconds duration(t);
    std::this_thread::sleep_for(duration);
    std::cout << __func__ << " 执行完毕! thread_id =" << std::this_thread::get_id() << std::endl;
    return t / 1000;
}

void packaged_task_test()
{
    std::vector<std::packaged_task<int(int)>> container;
    std::cout << "main thread_id = " << std::this_thread::get_id() << std::endl;
    std::packaged_task<int(int)> my_packaged_task1(thread_func1);
    std::packaged_task<int(int)> my_packaged_task2([](int t){
        std::cout << __func__ << " 开始执行! thread_id =" << std::this_thread::get_id() << std::endl;
        std::chrono::milliseconds duration(t);
        std::this_thread::sleep_for(duration);
        std::cout << __func__ << " 执行完毕! thread_id =" << std::this_thread::get_id() << std::endl;
        return t / 1000;
    });
    container.emplace_back(std::move(my_packaged_task1));
    container.emplace_back(std::move(my_packaged_task2));

    int t = 1000;
    for(auto it = container.begin();it != container.end();it++){
        std::packaged_task<int(int)> my_packaged_task = std::move(*it);
        my_packaged_task(t);
        t += 2000;
        std::future<int> result = my_packaged_task.get_future();
        std::cout << result.get() << std::endl;
    }
}
  • 这一串代码就是把一个自定义函数和lambda函数都以packaged_task的格式打包,然后放到一个容器里
  • 最后需要的时候从容器里取出来,执行并且获取它们各自的返回值
4、promise类模板的使用
  • promise的作用主要是可以在一个线程中计算的结果可以通过它传入到另外一个线程中去
  • 当线程中存在大量的数据需要返回时,不仅可以通过线程入口函数的返回值带回,可以通过promise类模板进行带回
  • 基于这一点,其实在多个线程中进行数据通信或者需要协同等可以使用promise进行数据传递
  • 例如下面的例子:
    • get_result_thread_func线程入口函数依赖calculate_thread_func函数计算的结果
    • 在主线程中可以将calculate_thread_func进行异步创建,并且传入promise的引用
    • calculate_thread_func计算完毕将值塞入到promise中,结束线程
    • 主线程在创建一个get_result_thread_func线程,然后将上面的计算结果的get_future传入到该函数中
    • 在该函数中获取值,此时如果calculate_thread_func计算完毕就直接返回值,如果没有计算完毕就继续等待!
      • 为什么可以实现,其实很好理解,因为get_result_thread_func传入了calculate_thread_func的get_future类模板对象
      • 通过future类模板对象可以获取到线程的执行权、自然就可以获得这个传入的值。
#include <iostream>
#include <thread>
#include <future>
#include <vector>
#include <string>

void calculate_thread_func(std::promise<int> &prom, int t)
{
    std::cout << __func__ << " 开始执行! thread_id =" << std::this_thread::get_id() << std::endl;
    std::chrono::milliseconds duration(1000);
    std::this_thread::sleep_for(duration);
    t += 1;
    t *= 10;
    std::cout << "calculate_thread_func()::t = " << t << std::endl;
    std::cout << __func__ << " 执行完毕! thread_id =" << std::this_thread::get_id() << std::endl;
    prom.set_value(t);
}


void get_result_thread_func(std::future<int> &result)
{
    int calculate = result.get();
    std::cout << __func__ << " 开始执行! thread_id =" << std::this_thread::get_id() << std::endl;
    std::cout << "get_result_thread_func()::calculate result = " << calculate << std::endl;
    std::cout << __func__ << " 执行完毕! thread_id =" << std::this_thread::get_id() << std::endl;
}


void test_promise()
{
    std::promise<int> prom;
    std::async(std::launch::async, calculate_thread_func, std::ref(prom), 365);
    std::future<int> calculate_result = prom.get_future();
    std::future<void> final_result = std::async(std::launch::async, get_result_thread_func, std::ref(calculate_result));
    final_result.get();
}
/*
calculate_thread_func 开始执行! thread_id =139767452948224
calculate_thread_func()::t = 3660
calculate_thread_func 执行完毕! thread_id =139767452948224
get_result_thread_func 开始执行! thread_id =139767452948224
get_result_thread_func()::calculate result = 3660
get_result_thread_func 执行完毕! thread_id =139767452948224
*/
6、future模板函数补充
  • future模板类其主要的方法就是get,并且这个get不能重复获取,其原因在之前的源码浅析中看过,get里面是对值的std::move,因此不能重复get获取线程的返回值。

  • 此外future模板类提供了除get外的一些其他函数例如wait_for、wait_until、vaild这些函数

    • wait_for:等待线程执行到指定的时间长度
    • wait_until:等待线程执行到一个固定的时间点
    • valid:判断是否可以get取值,可以为true,否则唯false
  • 如果等待的时间内线程会出现三种情况:

    • ready:在规定的时间内等到了线程的执行,future_status的状态为ready

    • timeout:在规定的时间内线程没有执行或者没有执行完毕,future_status的状态为timeout

    • deferred:线程创建时采用延迟创建,规定的时间内不会执行,那么future_status的状态为deferred

#include <iostream>
#include <string>
#include <thread>
#include <future>

int thread_func(int t)
{
    std::cout << __func__ << " 开始执行! thread_id =" << std::this_thread::get_id() << std::endl;
    std::chrono::seconds duration(t);
    std::this_thread::sleep_for(duration);
    std::cout << __func__ << " 执行完毕! thread_id =" << std::this_thread::get_id() << std::endl;

    return 5;
}

void future_func_test1()
{
    std::cout << "main thread_id = " << std::this_thread::get_id() << std::endl;
//    std::future<int> async_result = std::async(std::launch::async, thread_func, 1);				// ready
//    std::future<int> async_result = std::async(std::launch::async, thread_func, 3);				// timeout
    std::future<int> async_result = std::async(std::launch::deferred, thread_func, 1);			  // deferred
    std::cout << "continue......" <<  std::endl;

    std::future_status status = async_result.wait_for(std::chrono::seconds(2));
    if(status == std::future_status::ready){
        std::cout << "线程已经执行完毕了!" << std::endl;
        std::cout << "thread_func return value = " << async_result.get() << std::endl;
    }
    else if(status == std::future_status::timeout){
        std::cout << "线程已经超时了!" << std::endl;  
    }
    else if(status == std::future_status::deferred){
        std::cout << "线程被延迟创建了!" << std::endl;
        std::cout << async_result.get() << std::endl;
    }
}
6.1、shared_future类模板
  • shared_future类模板和future类模板几乎没有区别,唯一的区别就是get()函数的返回值是否可以重复获取。

  • 如果线程A的值需要被线程B和线程C的都需要使用,那么就需要重复获取这个值了。

  • 为了解决future无法重复获取的这个烦恼,C++11提供了shared_future类,其实就是可以重复get获取线程返回值的future

  • 由于返回的是一个引用因此多次获取都是同一个地址,一处修改这个值将会导致这个值的更改

6.1.1、future和shared_future的get函数源码对比
// future
_Res get(){
        typename _Base_type::_Reset __reset(*this);
        return std::move(this->_M_get_result()._M_value());
}

// shared_future
const _Res&  get() const { return this->_M_get_result()._M_value(); }
6.1.2、简单使用
int thread_func(int t)
{
    std::cout << __func__ << " 开始执行! thread_id =" << std::this_thread::get_id() << std::endl;
    std::chrono::seconds duration(t);
    std::this_thread::sleep_for(duration);
    std::cout << __func__ << " 执行完毕! thread_id =" << std::this_thread::get_id() << std::endl;

    return 5;
}


void shared_future_func_test2()
{
    std::cout << "main thread_id = " << std::this_thread::get_id() << std::endl;
    std::shared_future<int> async_result = std::async(std::launch::deferred, thread_func, 1);
    std::cout << "continue......" <<  std::endl;
    for(int i = 0;i < 3;i++) {
        if (async_result.valid()) {
            int a = async_result.get();
            std::cout << "a = " << a << ", &a = " << &a << std::endl;
        }
    }
}
/*
输出
main thread_id = 140671767070528
continue......
thread_func 开始执行! thread_id =140671767070528
thread_func 执行完毕! thread_id =140671767070528
a = 5, &a = 0x7ffc0452ec00
a = 5, &a = 0x7ffc0452ec00
a = 5, &a = 0x7ffc0452ec00
*/
7、总结
  • 学习这些东西都是为了将其灵活使用,能够把它们融入到我们自己的实际开发当中

  • 能够灵活的写出稳定的多线程并发程序

  • 便于我们认识新的东西,通过学习它们可以通过阅读一些高手写的代码从而实现快速的代码积累和开拓眼界

标签:std,__,task,get,future,线程,async,多线程
From: https://blog.csdn.net/weixin_43808717/article/details/137238449

相关文章

  • 【JAVA】JAVA多线程基础4
    目录一、synchronized关键字特性1、互斥2、刷新内存3、可重入二、synchronized使用方法1、直接修饰普通方法2、修饰静态方法3、修饰代码块三、volatile关键字一、synchronized关键字特性1、互斥synchronized会起到互斥效果,某个线程执行到某个对象的synchronized......
  • 我给你列举一个详细生动的例子来说明,多个任务数据混合在一起的泛化性能好,还是利用多任
    假设你正在学习做三种不同的手工艺品:编织毛衣、雕刻木雕、和折纸。现在你有两种方法来学习这些手工艺品:将所有材料混合在一起学习:你把毛线、木头和纸张都混在一起,然后学习如何制作所有这些手工艺品。这种方法会让你对各种材料和技术有一定的了解,但可能会导致你在某个特......
  • 多线程(33)ConcurrentHashMap
    ConcurrentHashMap是Java并发包中提供的一个线程安全的哈希表实现。与传统的同步容器相比,ConcurrentHashMap通过一种分段锁的机制实现了更高的并发度。本节将深入探讨其设计原理,结合源码进行分析,并通过代码示例来演示其使用方法。设计原理ConcurrentHashMap的设计理......
  • 多线程(34)CopyOnWriteArrayList
    CopyOnWriteArrayList是Java中一个线程安全的ArrayList变体,属于java.util.concurrent包。它通过在所有修改操作(如add,set等)上执行显式复制来实现线程安全。这种设计适用于列表读操作的数量远远大于写操作的场景。设计原理CopyOnWriteArrayList的基本思想是,每当......
  • C#-多线程
    线程 被定义为程序的执行路径。每个线程都定义了一个独特的控制流。如果您的应用程序涉及到复杂的和耗时的操作,那么设置不同的线程执行路径往往是有益的,每个线程执行特定的工作。线程是**轻量级进程**。一个使用线程的常见实例是现代操作系统中并行编程的实现。使用线程节省了C......
  • 【Java EE】多线程(一)
    ......
  • easyExcel通用导出(非注解,多线程)
    1、基础类描述ExcelWriter(导出工具类)Query(通用查询)Consumer(函数参数)SpringBeanUtil(获取bean)2、代码ExcelWriterimportcn.hutool.core.collection.CollUtil;importcn.hutool.core.collection.ListUtil;importcn.hutool.core.util.PageUtil;importcn.hutool.core.u......
  • 在Python中用concurrent.futures创建线程池进程池
    简介Python3.2带来了concurrent.futures模块,借此能够快速使用线程池和进程池。对于不需要控制优先级与资源分配的多任务,使用concurrent.futures模块快捷优雅。示例代码与效果importconcurrent.futuresimporttimedefa_task(x):"""模拟一个耗时的任务"""de......
  • Java:多线程-继承Thread类
    在Java中,通过继承Thread类是实现多线程的一种方式。这种方式允许你创建自己的线程类,并定义线程执行的具体内容。以下是关于继承Thread类的详细讲解:继承Thread类的步骤创建线程类:创建一个继承自Thread的子类。重写run方法:在子类中重写run方法,定义线程执行的任务。run方法是......
  • Java:多线程相关知识概念
    Java中的多线程是指在单个程序中并行执行多个线程(即执行路径或任务)的能力。多线程在Java中是一个核心概念,它允许应用程序更有效地利用CPU资源,同时还能进行并发操作。以下是Java中多线程相关的详细知识:线程的基本概念线程(Thread):是操作系统能够进行运算调度的最小单位。它......