首页 > 编程语言 >Rust异步编程

Rust异步编程

时间:2022-12-21 18:00:29浏览次数:41  
标签:异步 一个 self 编程 线程 poll 我们 Rust

概要

就像其名字所暗示的一样,异步编程,就是非同步的编程。从高层来看,一个异步操作就是在后台执行的操作——程序不会等待异步操作结束,而是立即开始继续执行下一行代码。

如果你已经熟悉了异步编程,这个定义可能不会让你满意,因为它并没有实际解释什么才是异步编程。想要真的理解异步编程模型,以及理解它在Rust中如何工作,我们首先需要挖掘与之相对的概念,我们需要理解同步编程模型。这对阐明概念以及展示异步编程的权衡都是很重要的,是的,异步的解决办法不一定是最优的。我们将从“首先促成异步编程这一概念的动机”开始这一章,然后,我们将深挖Rust中的异步编程在底层是如何工作的。

什么是异步

在我们进入同步以及异步编程模型的细节前,我们首先快速地看一下,当你运行你的程序时,你的计算机实际在做什么。

计算机的速度很快,真的很快。实际上,它花费了大量的时间来等待事情发生,除非你正在解压一个文件;编码一个音频;或者处理数字......否则,你的CPU大部分时间都是空闲的,以等待操作完成。它等待一个网络包到达;等待鼠标移动;等待磁盘写完字节,或者仅仅是等待从主存读取完成。从CPU的视角来看,大多数这样的事件之间像是相隔了一万年,当一个发生了,CPU运行少量几条指令,然后继续回到等待状态。看看你的CPU利用率,它很可能是某些很低的数字,而且,在大部分时间里都是这样。

同步接口

同步接口允许你的程序(或者只是你程序中的一个线程)在同一时间只执行单一操作,每一个操作必须等待前一个同步操作结束,然后它才能开始运行。你看到的大多数接口都是同步的:你调用他们,他们做某些事,然后最终当操作完成时返回,此时,你的程序可以从此处继续了。其原因我们将在本章的稍后看到,这是因为使用一个异步操作需要相当一些额外的机制,除非你需要异步操作所带来的好处,坚持使用同步模型,它们需要更少的环境。

同步接口隐藏了所有这些等待,一个应用程序调用一个函数,比如“写入一些字节到文件中”,稍后,这个函数结束了,然后下一行代码将背执行。在底层实际发生的是,操作系统将一个磁盘写入操作入队,然后让应用睡眠,直到磁盘报告它已经结束了写入。应用可能认为这个函数需要花费很长时间来执行,但其实,它根本没有真的执行,只是在等待。

一个像这样按顺序执行的接口也经常被称作“阻塞”的,因为接口中的操作不得不等待一些外部事件发生才能推进,从而阻塞进一步的执行,直到该事件发生。无论你怎么称呼它,基本的理念是不会变的:程序不会推进,直到当前操作结束,当操作在等待的同事,程序也在等待。

同步接口通常被认为是易于使用和推理的,因为你的代码在同一时间仅仅执行一行。

但是问题就出在它只允许应用程序在给定时间只能做一件事,这意味着如果你想要你的程序同事等待用户输入或者一个网络包,那你就没办法了,除非你的操作系统专门提供这样一个操作。同样的,你的应用无法在写一个磁盘文件时做任何其它有用的事,你只能让文件写入操作阻塞执行。

多线程

迄今,允许并发执行的最通用的解决办法是使用多线程。在一个多线程程序中,每一个线程负责执行一个特定的独立的阻塞操作序列,然后操作系统在线程间进行多路复用,一旦任一线程可以推进,就让它推进,如果一个线程阻塞了,一些其它线程可以继续执行,所以,应用可以继续做一些有用的工作。

通常,为了让程序中的每个工作仍能协作,这些线程使用如锁或通道的同步原语来互相交流。例如,你可能有一个线程等待用户输入,而另一个线程正在等待网络包,另一个线程等待这两个线程,以在一个在三个线程间共享的通道上发送一条消息。

多线程给了你并发——在任一时间执行多个独立操作的能力,由运行应用程序的系统(在本例中是操作系统)来在没有阻塞的线程中进行选择并决定下一个执行哪个。如果一个线程阻塞了,它(系统)可以选择运行另一个继续推进进度。

将多线程与阻塞接口结合可以让你走的更远,并且很多商用软件都基于此构建,但是这种方法并不是没有缺点。首先,持续地跟踪这些线程很快就会变得麻烦起来,如果你不得不为每个并发任务创建一个线程,包括简单的等待键盘输入,线程将很快躲起来,并且为了跟踪所有这些线程间如何交互、交流以及写作也会引入额外的复杂性。

其次,线程越多,在其间进行切换的成本就越高。每当一个线程结束运行然后另一个开始接替它的位置时,你将需要对操作系统的调度器做一次往返,这并不是无开销的。在一些平台上,生成一个新的线程也是一个相当笨重的操作,高性能的程序经常通过重用这些线程以及使用一些系统调用来避免这种开销,这些系统调用允许你在许多相关方法上阻塞(译者认为是暂时阻塞一些任务以避免生成过多的线程),但最后,你又遇到了相同的问题:阻塞接口需要你有和你希望产生的阻塞调用数同等数量的线程。

最后,线程给你的程序带来了并行,并发和并行之间的区别是精妙但重要的:并发意味着你的任务是交替执行的,而并行意味着多个任务在同一时间执行。如果你有两个任务,以ASCII来表示它们的执行,看起来是这样的:_-_-_(并发),=====(并行)。多线程并不一定暗示着并行,甚至你有许多线程,但你可能只有一个核心,所以在给定时间只有一个线程可以执行,但是两者通常是齐头并进的。你可以使用一个Mutex或者其它同步原语来创造两个在它们的执行过程中互斥的线程,但是这会引入额外的复杂性,线程想要并行执行。虽然并行总是好事(谁会不像它们的程序在多个核心上运行的更快呢),但你必须处理真正的共享数据结构的同步访问。这意味着你需要从RcCell以及RefCell移动到更加强大但更慢的Arc以及Mutex上。虽然你可能想要使用后面的那些类型在你的并发程序中以开启并行,但线程将强制你使用它们。我们将在第十章中看到多线程的更多细节。

异步接口

现在我们已经探索了同步接口,我们可以看另一个了:异步,或者说非阻塞接口。异步接口,是一个并不直接产生结果,但是可能会指示你结果将在晚些时间可用的接口。这给了调用者在同时做一些其它事情的机会,而不是不得不睡眠,直到特定操作完成。在Rust的说法中,一个异步接口就是一个返回一个Poll(轮询)的方法,Poll在Listing8-1中定义:

enum Poll<T> {
   Ready(T),
   Pending
}

Poll通常展示在名字以poll开头的函数的返回值类型上,poll开头的函数代表它是一个可以不阻塞的尝试某个操作的方法。我们将会在本章的稍后部分详细介绍它们是如何做到这一点的,但是通常来说,它们会在阻塞之前尽可能地尝试更多的操作,然后返回。并且,关键的是,它们会记住它们是在哪里离开的,所以在稍后可以推进额外的进度时,它们可以恢复执行。

这些非阻塞函数允许我们简单地执行多个并发任务,比如,如果你想要从网络或用户的键盘中读取,而不管哪一个先有事件可用,你要做的所有就是在一个循环中poll它们,直到其中的一个返回Poll::Ready。你不需要任何其它的线程或同步。

这里这个loop单词可能会让你有一点紧张,当下一个输入可能还有几分钟才会到达时,你会不希望你的程序在一秒钟之内穿过一个循环三十亿次。在阻塞接口的世界中这并不是一个问题,因为操作系统会让一个线程睡眠,并且负责在相关事件到达时唤醒它,但是在这个全新的非阻塞世界中,我们该如何避免在等待时的不断循环呢?

标准化的Polling

为了让每个库都能以非阻塞的风格使用,我们可以让每个库的作者编写它们自己的poll方法,所有的方法名称、签名和返回类型都会略有不同,这很快就会让事情变得麻烦。在Rust中,取而代之的是,轮询通过Future trait被标准化,Listing8-2是一个简单版本的Future(我们会在这一章的后面回到真实的Future上)

trait Future {
   type Output;
   fn poll(&mut self) -> Poll<Self::Output>;
}

实现了Futuretrait的类型被称作futures,它代表当前可能还无法使用的值。一个future可能代表下一个进入的网络包;下一次鼠标移动;或者只是一段时间之后时间点。你可以将Future<Output = Foo>读作“一个将会在未来产生一个Foo的类型”。这样的类型在其它的语言中通常称为promise——它们最终会返回指定的类型。当一个future最终返回了一个Poll::Ready(T),我们说future resolve(解决)到了一个T

有了这个trait,我们就可以描述提供poll方法的模式。与其使用poll_recvpoll_keypress这样的接口,不如使用像返回一个具有合适的Output类型的impl Futurerecv以及keypress函数。这不会改变你必须轮询它们的事实——我们稍后将处理它——但是至少现在我们有了这些pending值的标准接口,并且我们不需要在任何地方使用poll_前缀。

注意:通常来说,你不需要在一个future返回Poll::Ready后再次轮询它,如果你这样做了,future有权利panic,一个在返回Ready后仍能安全地poll的future有时被称为fused future。

Ergonomic Futures

使用我目前为止介绍的方式编写一个实现了Future的类型是非常痛苦的,为了体会到为什么,首先先看Listing8-3中非常简单直接的异步代码块,它只是简单的尝试从输入通道rx转发消息到输出通道tx

async fn forward<T>(rx: Recevier<T>, tx: Sender<T>) {
   while let Some(t) = rx.next().await {
      tx.send(t).await;
   }
}

这个代码,使用asyncawait语法,看起来非常像与之对应的同步代码并且易于阅读。我们简单的发送我们在循环中接收到的每一条消息,直到没有更多的消息了,并且每一个await的点对应着其同步变体中可能发生阻塞的地方。现在,想象如果你必须通过手动实现Future trait来编写这段代码,由于每一个poll调用都从函数顶部开始,因此你需要打包必要的状态,以便从代码返回(yielded 或者说让步)的最后一个位置继续。结果十分怪诞,就像Listing8-4所展示的:

enum Forward<T> {
	WaitingForReceive(ReceiveFuture<T>, Option<Sender<T>>),
	WatingForSend(SendFuture<T>, Option<Receiver<T>>),
}

impl<T> Future for Forward<T> {
	type Output = ();
	fn poll(&mut self) -> Poll<Self::Output> {
		match self {
			Forward::WaitingForReceive(recv, tx) => {
				if let Poll::Ready((rx, v)) = recv.poll() {
					let tx = tx.take().unwrap();
					*self = Forward::WatingForSend(tx.send(v), Some(rx));
					// 尝试在sending上推进
					return self.poll();
				} else {
					// 没有更多项
					Poll::Ready(())
				}
			}
			Forward::WatingForSend(send, receiver) => {
				if let Poll::Ready(tx) = send.poll() {
					let rx = rx.take().unwrap();
					*self = Forward::WaitingForReceive(rx.receive(), Some(tx));
					// 尝试在receiving上推进
					return self.poll();
				} else {
					Poll::Pending
				}
			}
		}
	}
}

你几乎不用在Rust中编写这样的代码,但是它给了你其底层是如何工作的重要见解,所以,我们谈论下它们。首先,我们定义我们的future类型为一个enum,用于跟踪我们当前正在等待什么。这是由于当我们返回了Poll::Pending,下一个poll调用将会在方法顶部开始执行,我们需要一些手段来了解我们之前正在做什么,这样我们就知道应该继续进行哪一个操作了。此外,我们需要持续跟踪不同的信息,这取决于我们当前正在做什么:如果我们正在等待一个receive结束,我们需要保存ReceiveFuture(它的定义没有在这个示例中展示)为了我们在下次自己被轮询时轮询它。对于SendFuture也一样,这里的Options可能让你感到奇怪,我们将会在稍后看回到这里。

当我们为Forward实现Future时,我们定义了它的输出类型为(),因为这个future并不实际地返回任何东西,取而代之,当future完成了从输入通道到输出通道的所有的转发后,它会解决(没有任何结果)。在一个更复杂的示例中,我们的转发类型的Output可能是一个Result,这样它就可以把来自receive()send()的错误传回堆栈,传给正在轮询转发完成的函数。但是这个代码已经足够复杂了,所以我们把它留到以后再说。

Forward被轮询,它需要恢复到它上次离开的位置,这是通过match匹配当前保存在self上的enum变体实现的。不论我们进入哪个分支,第一步都是轮询阻塞当前操作进度的future,如果我们正尝试接收,我们轮询ReceiveFuture,如果我们正尝试发送,我们轮询SendFuture。如果调用poll返回了一个Poll::Pending,我们可以不做任何推进,我们也返回Poll::Pending,但是如果当前的future解决了,我们必须推进!

当一个内部future解决,我们必须通过切换self中保存的enum变体来更新当前的操作。为了这样做,我们必须移出self,以调用Receiver::reveiveSender::send——但我们不能这样做,因为我们只有一个&mut self。所以,我们保存了需要移动的状态到一个Option中,我们会使用Option::take将它移出。这有点愚蠢,因为我们无论如何都要重写self,并且Option永远为Some,但为了让借用检查器开心,我们需要用上这些技巧。

原文:When one of the inner futures resolves, we need to update what the current operation is by switching which enum variant is stored in self. In order to do so, we have to move out of self to call Receiver::receive or Sender::send—but we can’t do that because all we have is &mut self. So, we store the state we have to move in an Option, which we move out of with Option::take. This is silly since we’re about to overwrite self anyway, and hence the Options will always be Some, but sometimes tricks are needed to make the borrow checker happy.

最后,如果我们确实取得了进展,我们会再次轮询自己。因此,(上面的代码中)如果我们可以立即在pending的send或receive上去的进展,我们就轮询了自己。实际上,当你要实现一个真实的Futuretrait时,为了正确性,这样做是必要的。我们会在稍后讨论它,现在,你可以认为这是一种优化。

我们刚刚手写了一个状态机:一种具有几种可能的状态,并且在接收到特定事件时在这些状态间转换的类型。这只是一个十分简单的状态机,想象你必须为更复杂的用例(其中有额外的中间步骤)编写这样的代码!

除了编写笨拙的状态机之外,我们必须知道Sender::sendReceiver::receive返回的future类型让我们可以将它们存储在自己的类型中。如果这些方法没有返回impl Future,我们不可能写出我们的变体类型。这个sendreceive方法也必须持有sender以及receiver的所有权,如果它们没有,它们返回的future将与self借用绑定在一起,其生命周期将在我们从poll中返回时结束。这是行不通的,因为我们尝试将这些future存储在self中。

注意:你可能已经注意到了,Receiver看起来很像一个异步版本的Iterator。其他人也注意到了这件事,并且标准库正准备为能够有意义地实现poll_next的类型添加一个特质。接下来,这些异步迭代器(通常被称为流)最终可能会有一流的语言支持,比如直接在它们上面进行循环的能力。

最后,这个代码很难编写,很难阅读,并且很难修改。比如如果我们想添加错误处理,代码的复杂度将显著提高!很幸运,现在有一条更好的路。

async/await

未完...

标签:异步,一个,self,编程,线程,poll,我们,Rust
From: https://www.cnblogs.com/lilpig/p/16996834.html

相关文章