首页 > 编程语言 >006 Rust 异步编程,Stream 介绍

006 Rust 异步编程,Stream 介绍

时间:2022-11-07 11:36:33浏览次数:31  
标签:Stream stream sum next try item 006 Rust


Stream 介绍

​Stream​​​和​​Future​​​类似,但是​​Future​​​对应的是一个​​item​​​的状态的变化,而​​Stream​​​则是类似于​​iterator​​​,在结束之前能够得到多个值。或者我们可以简单的理解为,​​Stream​​​是由一系列的​​Future​​​组成,我们可以从​​Stream​​​读取各个​​Future​​​的结果,直到​​Stream​​结束。

Stream trait定义

定义如下:

trait Stream {
type Item;

fn poll_next(self: Pin<&mut Self>, lw: &LocalWaker)
-> Poll<Option<Self::Item>>;
}

​poll_next​​函数有三种可能的返回值,分别如下:

  • ​Poll::Pending​​ 说明下一个值还没有就绪,仍然需要等待。
  • ​Poll::Ready(Some(val))​​​ 已经就绪,成功返回一个值,程序可以通过调用​​poll_next​​再获取下一个值。
  • ​Poll::Ready(None)​​​ 表示​​Stream​​​已经结束,不应该在调用​​poll_next​​。

迭代

和同步的​​Iterator​​​类似,​​Stream​​​可以迭代处理其中的值,如使用​​map, filter, fold, try_map, try_filter, and try_fold​​​等。但是​​Stream​​​不支持使用​​for​​​,而​​while let​​​和 ​​next/try_next​​则是允许的。 例子如下:

async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
use futures::stream::StreamExt; // for `next`
let mut sum = 0;
while let Some(item) = stream.next().await {
sum += item;
}
sum
}

async fn sum_with_try_next(
mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>,
) -> Result<i32, io::Error> {
use futures::stream::TryStreamExt; // for `try_next`
let mut sum = 0;
while let Some(item) = stream.try_next().await? {
sum += item;
}
Ok(sum)
}

并发

上面的使用的迭代处理,如果我们要并发的处理流,则应该使用​​for_each_concurrent​​​和 ​​try_for_each_concurrent​​,示例如下:

async fn jump_around(
mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
) -> Result<(), io::Error> {
use futures::stream::TryStreamExt; // for `try_for_each_concurrent`
const MAX_CONCURRENT_JUMPERS: usize = 100;

stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
jump_n_times(num).await?;
report_n_jumps(num).await?;
Ok(())
}).await?;

Ok(())
}

参考资料

Rust异步编程


标签:Stream,stream,sum,next,try,item,006,Rust
From: https://blog.51cto.com/u_15862521/5828346

相关文章

  • 005 Rust异步编程,Pin介绍
    为了对Future调用poll,需要使用到Pin的特殊类型。本节就介绍一下Pin类型。异步背后的一些原理例子1源码//文件src/main.rsusefutures::executor;asyncfnasync_function1()......
  • Rust 编程中使用 leveldb 的简单例子
    前言最近准备用Rust写一个完善的blockchain的教学demo,在持久化时考虑到使用leveldb。通过查阅文档,发现Rust中已经提供了使用leveldb的接口。将官方的例子修改了下,能够运行通......
  • 用Rust刷leetcode第八题
    ProblemImplement ​​atoi​​​ which convertsastringtoaninteger.Thefunctionfirstdiscardsasmanywhitespacecharactersasnecessaryuntilthefirst......
  • 023 通过链表学Rust之非安全方式实现链表1
    介绍视频地址:https://www.bilibili.com/video/av78062009/相关源码:https://github.com/anonymousGiga/Rust-link-list链表定义我们重新定义链表如下:pubstructList<T>{......
  • 022 通过链表学Rust之为什么要非安全的单链表
    介绍视频地址:https://www.bilibili.com/video/av78062009/相关源码:https://github.com/anonymousGiga/Rust-link-list详细内容前面我们都是使用安全的Rust编程来实现链表,但......
  • 025 通过链表学Rust之使用栈实现双端队列
    介绍视频地址:https://www.bilibili.com/video/av78062009/相关源码:https://github.com/anonymousGiga/Rust-link-list详细内容本节我们使用栈来实现双端队列。实现栈栈的实......
  • 024 通过链表学Rust之非安全方式实现链表2
    介绍视频地址:https://www.bilibili.com/video/av78062009/相关源码:https://github.com/anonymousGiga/Rust-link-list详细内容本节实现剩余的迭代器、Drop等。IntoIter实现......
  • 用Rust实现一个多线程的web server
    在本文之前,我们用Rust实现一个单线程的webserver的例子,但是单线程的webserver不够高效,所以本篇文章就来实现一个多线程的例子。单线程webserver存在的问题请求只能串行处......
  • 用Rust创建一个简单的webserver
    目的本节的例子教大家用Rust语言创建一个简单的webserver程序。webserver中涉及到的两个主要协议是超文本传输协议(HypertextTransferProtocol,HTTP)和传输控制协议(Tran......
  • java--Stream流基础
    Stream流操作方法执行完此方法之后,Stream流依然可以继续执行其他操作Stream流的思想 Stream流的三类方法  获取Stream流    好比创建一条流水线,并把数据放......