1、异步任务线程
-
异步线程的概念:
- 异步:就是非同步,同步就是必须一个一个的执行,异步可以两个事情一起干
- 异步线程:异步线程就相当于把非关联的两件事分开找两个线程去执行,而分开的那个就是异步线程
- 举例:例如登录信息,用户登录完毕主线程肯定是需要去及时响应用户的请求的,而系统设计的时候通常会保存用户的登录信息(日志)等等,如果处理这些任务的时间过长就可能无法及时响应用户的请求,而处理这些日志和响应用户是两个独立的事件,因此可以开启异步线程来处理日志,响应用户的操作继续由主线程向下执行,且无需等待异步线程的结果。
-
std::async、std::future
- C++11线程库中提供了这std::async函数创建后台任务(异步线程)并且返回值
- std::future类模板来支持获取异步线程std::async创建后台任务最后执行返回的值,
- std::async是一个函数模板,用来启动一个异步任务,启动起来一个异步任务之后,它返回一个std::future对象,std::future是一个类模板
- std::future类模板提供了一些函数,get可以获取返回值,wait系列获取不到
-
启动异步任务:
- 创建一个async线程并且开始执行对应的线程入口函数,它返回一个std::future对象
- std::future对象里边就含有线程入口函数所返回的结果(线程返回的结果),我们可以通过调用future对象的成员函数get()来获取结果
- std::future提供了一种访问异步操作结果的机制,就是说这个结果你可能没有办法马上拿到,但在不就的将来线程执行完毕的时候,就能够拿到结果了
- 因此如果在std::future调用get之前就会出现两种情况:
- 异步线程async已经计算完毕,那么std::future调用get直接获取结果
- 异步线程async没有计算完毕,那么std::future调用get将会阻塞,等待async执行完毕并且返回。
1.1、std::async源码浅析
template<typename _Fn, typename... _Args>
inline future<__async_result_of<_Fn, _Args...>>
async(_Fn&& __fn, _Args&&... __args)
{
return std::async(launch::async|launch::deferred,
std::forward<_Fn>(__fn),
std::forward<_Args>(__args)...);
}
template<typename _Fn, typename... _Args>
future<__async_result_of<_Fn, _Args...>> async(launch __policy, _Fn&& __fn, _Args&&... __args){
std::shared_ptr<__future_base::_State_base> __state;
if ((__policy & launch::async) == launch::async){ // here
__try{
__state = __future_base::_S_make_async_state(
std::thread::__make_invoker(std::forward<_Fn>(__fn),
std::forward<_Args>(__args)...)
);
}
#if __cpp_exceptions
catch(const system_error& __e) {
if (__e.code() != errc::resource_unavailable_try_again
|| (__policy & launch::deferred) != launch::deferred)
throw;
}
#endif
}
if (!__state){
__state = __future_base::_S_make_deferred_state(
std::thread::__make_invoker(std::forward<_Fn>(__fn),
std::forward<_Args>(__args)...));
}
return future<__async_result_of<_Fn, _Args...>>(__state);
}
-
std::launch::async|std::launch::deferred:这两个是常量标记。
- async表示立即创建异步线程
- deferred表示推迟创建线程,推迟到future调用get或者wait时在创建
-
默认情况下可以看到源码here处以async的默认形式创建线程,但是这只是Linux,并不知道其他编译器是什么样子!因此使用时根据自身的需求最好指定一下,防止编译器默认!
-
而可以看到使用std::launch::deferred的并没有看到源码处有创建线程的代码,这里就被推迟了!
-
std::async就两个构造函数
- 一个不指定launch方式的,只传入线程入口函数和入口函数参数的构造
- 指定launch方式的构造,并且传入线程入口函数和入口函数参数的构造
1.2、future源码浅析
template<>
class future<void> : public __basic_future<void>{};
template<typename _Res>
class future<_Res&> : public __basic_future<_Res&>{};
template<typename _Res>
class future : public __basic_future<_Res>{
friend class promise<_Res>;
template<typename> friend class packaged_task;
template<typename _Fn, typename... _Args>
friend future<__async_result_of<_Fn, _Args...>> async(launch, _Fn&&, _Args&&...);
typedef __basic_future<_Res> _Base_type;
typedef typename _Base_type::__state_type __state_type;
explicit future(const __state_type& __state) : _Base_type(__state) { }
public:
constexpr future() noexcept : _Base_type() { }
/// Move constructor
future(future&& __uf) noexcept : _Base_type(std::move(__uf)) { }
// Disable copying
future(const future&) = delete;
future& operator=(const future&) = delete;
future& operator=(future&& __fut) noexcept {
future(std::move(__fut))._M_swap(*this);
return *this;
}
/// Retrieving the value
_Res get(){
typename _Base_type::_Reset __reset(*this);
return std::move(this->_M_get_result()._M_value());
}
shared_future<_Res> share() noexcept;
};
- future模板类提供了:void、Res、Res&引用三大类返回类型的处理,因此async可以void空返回、可以类型返回、可以引用返回
- future最主要的就是get函数,get函数可以看到从一个方法中将值std::move移动出来
- __basic_future是future的基类,里面提供了wait、wait_for、wait_unitl…函数,所以future可以调用到父类的这些方法
- get这个函数只能调用一次,因为可以看到get函数中的实际操作是一个std::move,这个值移动之后就为空了。
2、async的使用
#include <iostream>
#include <thread>
#include <future>
#include <string>
#ifndef INC_08_ASYNC_ASYNC_FUTURE_H
#define INC_08_ASYNC_ASYNC_FUTURE_H
class AsyncFuture{
public:
int class_thread_func(std::string msg){
std::cout << msg << " 开始执行! thread_id =" << std::this_thread::get_id() << std::endl;
std::chrono::milliseconds duration(5000);
std::this_thread::sleep_for(duration);
std::cout << msg << " 执行完毕! thread_id =" << std::this_thread::get_id() << std::endl;
}
};
int thread_func()
{
std::cout << __func__ << " 开始执行! thread_id =" << std::this_thread::get_id() << std::endl;
std::chrono::milliseconds duration(5000);
std::this_thread::sleep_for(duration);
std::cout << __func__ << " 执行完毕! thread_id =" << std::this_thread::get_id() << std::endl;
return 5;
}
#endif //INC_08_ASYNC_ASYNC_FUTURE_H\
2.1、普通成员函数
void async_test1()
{
std::cout << "main thread_id = " << std::this_thread::get_id() << std::endl;
std::future<int> async_result = std::async(std::launch::deferred, thread_func);
std::cout << "continue......" << std::endl;
std::cout << async_result.get() << std::endl;
}
/*
由于get会阻塞等待,因此输出结果
main thread_id = 139778198386496
continue......
这里阻塞等待async执行完毕
thread_func 开始执行! thread_id =139778198386496
thread_func 执行完毕! thread_id =139778198386496
5
*/
2.2、类成员函数
void async_test2()
{
std::string msg = "class_thread_func";
std::cout << "main thread_id = " << std::this_thread::get_id() << std::endl;
AsyncFuture asyncFuture;
std::future<int> async_result = std::async(&AsyncFuture::class_thread_func, &asyncFuture, msg);
std::cout << "continue......" << std::endl;
std::cout << async_result.get() << std::endl;
}
/*由于get会阻塞等待,因此会阻塞
main thread_id = 139783108294464
这里阻塞等待async执行完毕
continue......
class_thread_func 开始执行! thread_id =139783090120448
class_thread_func 执行完毕! thread_id =139783090120448
3
*/
2.3、std::launch::deffered
void async_test3()
{
std::cout << "main thread_id = " << std::this_thread::get_id() << std::endl;
std::future<int> async_result = std::async(std::launch::deferred, thread_func);
std::cout << "continue......" << std::endl;
std::cout << async_result.get() << std::endl;
}
/*
由于deffered是延迟创建,因此主线程在执行完之后调用get会导致两个线程串行,最后子线程不会new出来, 由主线程执行!
main thread_id = 140214074877760
continue......
thread_func 开始执行! thread_id =140214074877760
thread_func 执行完毕! thread_id =140214074877760
5
*/
void async_test4()
{
std::cout << "main thread_id = " << std::this_thread::get_id() << std::endl;
std::future<int> async_result = std::async(std::launch::deferred, thread_func);
std::cout << "continue......" << std::endl;
}
/*
没有调用get,不进行执行。但是可能在析构的时候会调用get或者wait执行,这个需要看编译器的情况来定,上面没有看到析构
main thread_id = 139630206793536
continue......
*/
3、std::packaged_task类模板的使用
-
std::packaged_task是一个类模板,打包任务,把任务都装起来,
-
它的模板参数是各种可调用对象,方便将来作为线程入口函数来调用
-
写法很抽象,大概就是可以把一个线程进行打包起来,需要用的是可以以各种方式进行调用和返回一些东西, C++真是一门玄学
#include <iostream>
#include <thread>
#include <future>
#include <vector>
#include <string>
int thread_func1(int t)
{
std::cout << __func__ << " 开始执行! thread_id =" << std::this_thread::get_id() << std::endl;
std::chrono::milliseconds duration(t);
std::this_thread::sleep_for(duration);
std::cout << __func__ << " 执行完毕! thread_id =" << std::this_thread::get_id() << std::endl;
return t / 1000;
}
void packaged_task_test()
{
std::vector<std::packaged_task<int(int)>> container;
std::cout << "main thread_id = " << std::this_thread::get_id() << std::endl;
std::packaged_task<int(int)> my_packaged_task1(thread_func1);
std::packaged_task<int(int)> my_packaged_task2([](int t){
std::cout << __func__ << " 开始执行! thread_id =" << std::this_thread::get_id() << std::endl;
std::chrono::milliseconds duration(t);
std::this_thread::sleep_for(duration);
std::cout << __func__ << " 执行完毕! thread_id =" << std::this_thread::get_id() << std::endl;
return t / 1000;
});
container.emplace_back(std::move(my_packaged_task1));
container.emplace_back(std::move(my_packaged_task2));
int t = 1000;
for(auto it = container.begin();it != container.end();it++){
std::packaged_task<int(int)> my_packaged_task = std::move(*it);
my_packaged_task(t);
t += 2000;
std::future<int> result = my_packaged_task.get_future();
std::cout << result.get() << std::endl;
}
}
- 这一串代码就是把一个自定义函数和lambda函数都以packaged_task的格式打包,然后放到一个容器里
- 最后需要的时候从容器里取出来,执行并且获取它们各自的返回值
4、promise类模板的使用
- promise的作用主要是可以在一个线程中计算的结果可以通过它传入到另外一个线程中去
- 当线程中存在大量的数据需要返回时,不仅可以通过线程入口函数的返回值带回,可以通过promise类模板进行带回
- 基于这一点,其实在多个线程中进行数据通信或者需要协同等可以使用promise进行数据传递
- 例如下面的例子:
- get_result_thread_func线程入口函数依赖calculate_thread_func函数计算的结果
- 在主线程中可以将calculate_thread_func进行异步创建,并且传入promise的引用
- calculate_thread_func计算完毕将值塞入到promise中,结束线程
- 主线程在创建一个get_result_thread_func线程,然后将上面的计算结果的get_future传入到该函数中
- 在该函数中获取值,此时如果calculate_thread_func计算完毕就直接返回值,如果没有计算完毕就继续等待!
- 为什么可以实现,其实很好理解,因为get_result_thread_func传入了calculate_thread_func的get_future类模板对象
- 通过future类模板对象可以获取到线程的执行权、自然就可以获得这个传入的值。
#include <iostream>
#include <thread>
#include <future>
#include <vector>
#include <string>
void calculate_thread_func(std::promise<int> &prom, int t)
{
std::cout << __func__ << " 开始执行! thread_id =" << std::this_thread::get_id() << std::endl;
std::chrono::milliseconds duration(1000);
std::this_thread::sleep_for(duration);
t += 1;
t *= 10;
std::cout << "calculate_thread_func()::t = " << t << std::endl;
std::cout << __func__ << " 执行完毕! thread_id =" << std::this_thread::get_id() << std::endl;
prom.set_value(t);
}
void get_result_thread_func(std::future<int> &result)
{
int calculate = result.get();
std::cout << __func__ << " 开始执行! thread_id =" << std::this_thread::get_id() << std::endl;
std::cout << "get_result_thread_func()::calculate result = " << calculate << std::endl;
std::cout << __func__ << " 执行完毕! thread_id =" << std::this_thread::get_id() << std::endl;
}
void test_promise()
{
std::promise<int> prom;
std::async(std::launch::async, calculate_thread_func, std::ref(prom), 365);
std::future<int> calculate_result = prom.get_future();
std::future<void> final_result = std::async(std::launch::async, get_result_thread_func, std::ref(calculate_result));
final_result.get();
}
/*
calculate_thread_func 开始执行! thread_id =139767452948224
calculate_thread_func()::t = 3660
calculate_thread_func 执行完毕! thread_id =139767452948224
get_result_thread_func 开始执行! thread_id =139767452948224
get_result_thread_func()::calculate result = 3660
get_result_thread_func 执行完毕! thread_id =139767452948224
*/
6、future模板函数补充
-
future模板类其主要的方法就是get,并且这个get不能重复获取,其原因在之前的源码浅析中看过,get里面是对值的std::move,因此不能重复get获取线程的返回值。
-
此外future模板类提供了除get外的一些其他函数例如wait_for、wait_until、vaild这些函数
- wait_for:等待线程执行到指定的时间长度
- wait_until:等待线程执行到一个固定的时间点
- valid:判断是否可以get取值,可以为true,否则唯false
-
如果等待的时间内线程会出现三种情况:
-
ready:在规定的时间内等到了线程的执行,future_status的状态为ready
-
timeout:在规定的时间内线程没有执行或者没有执行完毕,future_status的状态为timeout
-
deferred:线程创建时采用延迟创建,规定的时间内不会执行,那么future_status的状态为deferred
-
#include <iostream>
#include <string>
#include <thread>
#include <future>
int thread_func(int t)
{
std::cout << __func__ << " 开始执行! thread_id =" << std::this_thread::get_id() << std::endl;
std::chrono::seconds duration(t);
std::this_thread::sleep_for(duration);
std::cout << __func__ << " 执行完毕! thread_id =" << std::this_thread::get_id() << std::endl;
return 5;
}
void future_func_test1()
{
std::cout << "main thread_id = " << std::this_thread::get_id() << std::endl;
// std::future<int> async_result = std::async(std::launch::async, thread_func, 1); // ready
// std::future<int> async_result = std::async(std::launch::async, thread_func, 3); // timeout
std::future<int> async_result = std::async(std::launch::deferred, thread_func, 1); // deferred
std::cout << "continue......" << std::endl;
std::future_status status = async_result.wait_for(std::chrono::seconds(2));
if(status == std::future_status::ready){
std::cout << "线程已经执行完毕了!" << std::endl;
std::cout << "thread_func return value = " << async_result.get() << std::endl;
}
else if(status == std::future_status::timeout){
std::cout << "线程已经超时了!" << std::endl;
}
else if(status == std::future_status::deferred){
std::cout << "线程被延迟创建了!" << std::endl;
std::cout << async_result.get() << std::endl;
}
}
6.1、shared_future类模板
-
shared_future类模板和future类模板几乎没有区别,唯一的区别就是get()函数的返回值是否可以重复获取。
-
如果线程A的值需要被线程B和线程C的都需要使用,那么就需要重复获取这个值了。
-
为了解决future无法重复获取的这个烦恼,C++11提供了shared_future类,其实就是可以重复get获取线程返回值的future
-
由于返回的是一个引用因此多次获取都是同一个地址,一处修改这个值将会导致这个值的更改
6.1.1、future和shared_future的get函数源码对比
// future
_Res get(){
typename _Base_type::_Reset __reset(*this);
return std::move(this->_M_get_result()._M_value());
}
// shared_future
const _Res& get() const { return this->_M_get_result()._M_value(); }
6.1.2、简单使用
int thread_func(int t)
{
std::cout << __func__ << " 开始执行! thread_id =" << std::this_thread::get_id() << std::endl;
std::chrono::seconds duration(t);
std::this_thread::sleep_for(duration);
std::cout << __func__ << " 执行完毕! thread_id =" << std::this_thread::get_id() << std::endl;
return 5;
}
void shared_future_func_test2()
{
std::cout << "main thread_id = " << std::this_thread::get_id() << std::endl;
std::shared_future<int> async_result = std::async(std::launch::deferred, thread_func, 1);
std::cout << "continue......" << std::endl;
for(int i = 0;i < 3;i++) {
if (async_result.valid()) {
int a = async_result.get();
std::cout << "a = " << a << ", &a = " << &a << std::endl;
}
}
}
/*
输出
main thread_id = 140671767070528
continue......
thread_func 开始执行! thread_id =140671767070528
thread_func 执行完毕! thread_id =140671767070528
a = 5, &a = 0x7ffc0452ec00
a = 5, &a = 0x7ffc0452ec00
a = 5, &a = 0x7ffc0452ec00
*/
7、总结
-
学习这些东西都是为了将其灵活使用,能够把它们融入到我们自己的实际开发当中
-
能够灵活的写出稳定的多线程并发程序
-
便于我们认识新的东西,通过学习它们可以通过阅读一些高手写的代码从而实现快速的代码积累和开拓眼界