首页 > 编程语言 >异步编程:promise and future

异步编程:promise and future

时间:2023-08-20 10:04:33浏览次数:48  
标签:std 异步 thread stop future promise __ async

本文介绍C++中异步编程相关的基础操作类,以及借鉴promise and future思想解决回调地狱介绍。

std::thread and std::jthread

std::thread为C++11引入,一个简单的例子如下:

class Worker final {
public:
    void Execute()
    {
        std::cout << __FUNCTION__ << std::endl;
    }
};

int main()
{
    Worker w;
    auto thread = std::thread(&Worker::Execute, &w);
    thread.join();
    return 0;
}

这里如果少调用了thread.join(),类析构了但线程仍在运行,导致代码异常终止;

添加封装Worker:

class Worker final {
public:
    Worker()
    {   
        m_thread = std::thread(&Worker::execute, this);
    }   

    ~Worker()
    {   
        m_thread.join();
    }   

private:
    void execute()
    {   
        std::cout << __FUNCTION__ << std::endl;
    }   

private:
    std::thread m_thread;
};

这里应用了 RAII ,封装了Worker类在析构时自动调用join函数等待线程运行结束。

而std::jthread为C++20引入,是对std::thread的扩展:

/// A thread that can be requested to stop and automatically joined.
    ~jthread()
    {   
      if (joinable())
        {
          request_stop();
          join();
        }
    }
Worker w;
    auto thread = std::jthread(&Worker::Execute, &w); // 无需再单独调用join

同时std::jthread增加了能够主动停止线程执行的新特性,修改上述例子为:

class Worker final {
public:
    void operator()(std::stop_token st)
    {
        while (!st.stop_requested()) {
            sleep(1);
            std::cout << __FUNCTION__ << std::endl;
        }
    }
};

int main()
{
    Worker w;
    auto thread = std::jthread(w);
    sleep(3);
    std::cout << "request stop" << std::endl;
    thread.request_stop();
    return 0;
}

进一步,我们打开std::jthread的源码可以看到其实现原理,stop_token会作为入参送给可调用对象,通过stop_source获取到stop_token对象,两者共享stop_token::_Stop_state_ref状态;

class jthread
  {  
  public:
    template<typename _Callable, typename... _Args,
         typename = enable_if_t<!is_same_v<remove_cvref_t<_Callable>,
                           jthread>>>                                                                                    
      explicit
      jthread(_Callable&& __f, _Args&&... __args)
      : _M_thread{_S_create(_M_stop_source, std::forward<_Callable>(__f),
                std::forward<_Args>(__args)...)}
      { }
      // ...
    [[nodiscard]] stop_token
    get_stop_token() const noexcept
    {
      return _M_stop_source.get_token();
    }

    bool request_stop() noexcept
    {
      return _M_stop_source.request_stop(); // 通知线程停止
    }
      
  private:
    template<typename _Callable, typename... _Args>                                                                 
      static thread
      _S_create(stop_source& __ssrc, _Callable&& __f, _Args&&... __args)
      { 
    if constexpr(is_invocable_v<decay_t<_Callable>, stop_token,                                                     
                    decay_t<_Args>...>)
      return thread{std::forward<_Callable>(__f), __ssrc.get_token(),                                               
            std::forward<_Args>(__args)...}; // 将stop_token作为入参送入到可调用对象
    else      
      {
        static_assert(is_invocable_v<decay_t<_Callable>,
                     decay_t<_Args>...>,
              "std::thread arguments must be invocable after"
              " conversion to rvalues");
        return thread{std::forward<_Callable>(__f),
              std::forward<_Args>(__args)...};
      }
      }
    
    stop_source _M_stop_source; 
      // 通过stop_source获取到stop_token对象,两者共享stop_token::_Stop_state_ref状态
    thread _M_thread;
  };

std::async

但软件线程是有限的资源,当试图创建的线程数量大于系统能够提供的最大数量,则会抛std::system_error异常,因此使用std::thread就需要考虑这种异常处理;

使用std::async则将线程管理的责任转交给标准库的实现者,使用std::async时系统不保证会创建一个新的软件线程,基本用法如下:

int main()
{
    std::cout << "threadId:" << std::this_thread::get_id() << std::endl;
    std::future<int> f1 = std::async(std::launch::async, [](){
        std::cout << "threadId:" << std::this_thread::get_id() << std::endl;
        return 1;
    }); // 创建新的线程
    std::cout << f1.get() << std::endl;

    std::future<int> f2 = std::async(std::launch::deferred, [](){
        std::cout << "threadId:" << std::this_thread::get_id() << std::endl;
        return 2;
    });
    std::cout << "wait:" << std::endl;
    f2.wait(); // 在主线程调用回调,没有创建新的线程
    std::cout << f2.get() << std::endl;

    std::future<int> f3 = std::async(std::launch::async, [](){
        sleep(5);
        return 3;
    });
    std::future_status status;
    do {
        status = f3.wait_for(std::chrono::seconds(1));
       if (status == std::future_status::timeout) {
            std::cout << "timeout" << std::endl;
        } else if (status == std::future_status::ready) {
            std::cout << "ready" << std::endl;
        }
    } while (status != std::future_status::ready);
    return 0;
}

std::async有两种启动策略:

  1. std::launch::async:在调用async时就开始创建线程
  2. std::launch::deferred:延迟运行,函数只有在调用了get或者wait时才会运行;

默认启动策略是std::launch::async|std::launch::deferred,运行函数以同步或者异步的方式运行,这里有线程管理组件承担起负载均衡的责任【3】

注意:std::future的get()为移动语义,不能进行多次调用,否则会抛异常

改成std::shared_future则是拷贝语义,可以进行多次调用

std::promise and std::future

std::future在上一节中已经使用,而std::promise将数据与std::future绑定,为获取线程函数中的某个值提供便利:

std::promise<int> p;
    {
        auto thread = std::jthread([&p](){
            p.set_value(10);
        });
    }
    auto fu = p.get_future();
    std::cout << fu.get() << std::endl; // 获取promise在线程中set的值

std::package_task

std::package_task用于封装可调用对象,并且可获取future以便异步获取可调用对象的返回值:

auto func = [](){
        return 10;
    };
    std::packaged_task<int()> task(func);
    auto fu = task.get_future();
    {
        auto thread = std::jthread(std::ref(task));
    }
    std::cout << fu.get() << std::endl;

callback hell

借鉴future and promise思想,可以用于解决“回调地狱的问题”,什么是“回调地狱”?

template<typename Callback>
void Async(Callback&& call) // 异步处理
{
    // ...
}

// callX为可调用对象需要进行异步处理,并且可调用对象依赖上一个可调用的输出
// outputA = callA(input) // outputB = callB(outputA) // outputC = callC(outputB) // outputD = callD(outputC)
// 传统的实现方式可以为
Async([](input){
    outputA = callA(input);
    Async([outputA](){
        outputB = callB(outputA);
        Async([outputB](){
            outputC = callC(outputB);
            Async([outputC](){
                outputD = callD(outputC);
                ...
            });
        });
    });
})

上述代码中异步回调函数会出现不断的嵌套, 形成一个横向的金字塔,对于代码阅读者带来不必要的负担,不利用维护与扩展。

Facebook开源的folly future库、ananas都提供了 promise/future技术的解决方案,可以实现类似如下的效果:

future
.then(&loop, [](){ return ouputA; })
.then(&loop, [](ouputA){ return ouputB; })
.then(&loop, [](ouputB){ return ouputC; })
.then(&loop, [](ouputC){ return ouputD; })

详细实现源码下次再展开分析~

参考资料

【1】https://github.com/loveyacper/ananas

【2】https://en.cppreference.com/w/cpp/thread/async

【3】Effective Modern C++

标签:std,异步,thread,stop,future,promise,__,async
From: https://blog.51cto.com/u_13137973/7157569

相关文章

  • Future是什么
    Future代表异步执行async:在方法体前面是使用,定义该方法为一个异步方法。await:等待并获得异步表达式的执行结果,并且给关键字只能在async修饰的方法中。Future是单线程,先执行完全部微任务,再执行事件队列任务Future修饰的关键字,会将事件加入到队列任务中Future如何获取异步......
  • Future和Isolate有什么区别?
    future是异步编程,调用本身立即返回,并在稍后的某个时候执行完成时再获得返回结果。在普通代码中可以使用await等待一个异步调用结束。 isolate是并发编程,isolate是有自己的内存和单线程控制的运行实体。isolate本身的意思是“隔离”Dart是单线程,Dart为我们提供了isolate,i......
  • SyntaxError: /xxxx.vue: Unexpected token, expected “,“,[object Promise]export {
    本地老工程vue2.7.x+webpack4在升级webpack5的时候遇启动和打包报错:SyntaxError:SyntaxError:/xxxxx.vueUnexpectedtoken,expected","(1:8)>1|[objectPromise]|^2|export{render,staticRenderFns}最后才发现是prettier导致的。推荐看......
  • 深入理解后端开发中的消息队列与异步处理
    在现代的应用开发中,消息队列和异步处理已成为构建高性能、可伸缩的后端系统的重要工具。本文将深入探讨消息队列的原理、优势,以及如何在后端开发中应用消息队列和异步处理。什么是消息队列?消息队列是一种将消息从一个应用传递到另一个应用的通信方式。它具有以下特点:解耦:消息队列可......
  • 构建高效可靠的后端服务:使用消息队列和异步处理
    在现代应用开发中,构建高效可靠的后端服务是至关重要的。本文将深入探讨如何使用消息队列和异步处理来优化后端服务,实现高性能和可扩展性。为什么使用消息队列和异步处理?消息队列是一种用于在应用程序之间传递消息的机制,而异步处理是一种执行任务的方式,它不会阻塞主要的应用程序流程......
  • C#异步调用Process(),后台静默调用cmd控制台
    C#调用cmd控制台操作,网上有太多的教程了,但是大多数都是执行完一条指令,退出Process,下次执行指令,再次newProcess(),(只为了接收到cmd指令的回复,不然会进程阻塞,程序至此不会再跑。)这种情形如果是执行bat文件,或者执行类似ping这种对执行下条指令没有运行环境要求的指令,影响不大。同......
  • C++ 多线程详解之异步编程 std::packaged_task
    std::packaged_task将任何可调用对象(比如函数、lambda表达式等等)封装成一个task,可以异步执行。执行结果可以使用std::future获取。比如下面的例子,构造一个std::packaged_task后,get_future()函数返回一个std::future对象,可以获取task异步或者同步执行的结果。#includ......
  • 异步代码微任务宏任务案例
    案例一案例01process.nextTick(()=>{console.log("111");})constpromise=newPromise(resolve=>{console.log("222");resolve()})setTimeout(()=>{console.log("333");},100)promise.then(()=>{console.log(&quo......
  • Promise的理解和使用
    一:Promise是什么?(1)Promise是JS中进行异步编程的解决方案备注:旧方案是单纯使用回调函数异步编程包括:fs文件操作、数据库操作、AJAX、定时器......(2)从语法上来说:Promise是一个构造函数(3)从功能上来说:Promise对象用来封装一个异步操作并可以获取其成功/失败的结果值二......
  • 在C++中实现多线程异步TCP消息发送
    本文主要解释了在C++中如何实现一个多线程,异步发送TCP消息的系统。我们将会用到C++的标准库和Boost.Asio库。基础知识TCP(TransmissionControlProtocol):是一种面向连接的、可靠的、基于字节流的通信协议。它在两个网络节点之间创建一个稳定的连接,以便可以交换字节流。多线程编程:......