首页 > 其他分享 >rust async

rust async

时间:2023-11-04 14:12:34浏览次数:40  
标签:std use stream let rust async fn

async

  • SendSync 在跨线程时需要关注

    • Send: ownership(所有权) 可以 send 到其他线程
      • A type is Send if it is safe to send it to another thread.
    • Sync: 可以并发,无线程安全问题
      • A type is Sync if it is safe to share between threads (T is Sync if and only if &T is Send).
  • Why import: 因为有些异步执行引擎时可以多线程执行的(比如 Tokio)

  • Rc<T>: !Send, !Sync

    use std::rc::Rc;
    
    #[derive(Debug)]
    struct Handler;
    
    #[tokio::main]
    async fn main() {
        let handler = Rc::new(Handler);
        let handler_clone = Rc::clone(&handler);
        tokio::spawn(process(handler_clone));
    }
    
    async fn process(_handler: Rc<Handler>) {
        unimplemented!()
    }
    

    Rc 不是 Send

    future returned by process is not Send

    error: future cannot be sent between threads safely
    --> src\main.rs:10:18
        |
    10  |     tokio::spawn(process(handler_clone));
        |                  ^^^^^^^^^^^^^^^^^^^^^^ future returned by `process` is not `Send`
        |
        = help: within `impl Future<Output = ()>`, the trait `Send` is not implemented for `Rc<Handler>`
    note: captured value is not `Send`
    --> src\main.rs:13:18
        |
    13  | async fn process(handler: Rc<Handler>) {
        |                  ^^^^^^^ has type `Rc<Handler>` which is not `Send`
    note: required by a bound in `tokio::spawn`
    --> C:\Users\liuzonglin\.cargo\registry\src\mirrors.ustc.edu.cn-61ef6e0cd06fb9b8\tokio-1.29.1\src\task\spawn.rs:166:21
        |
    164 |     pub fn spawn<T>(future: T) -> JoinHandle<T::Output>
        |            ----- required by a bound in this function
    165 |     where
    166 |         T: Future + Send + 'static,
        |                     ^^^^ required by this bound in `spawn`
    
  • Arc<T>: Send & Sync if : T is Send & Sync

    Rc 替换 Arc

    示例:_01_example.rs

    use std::sync::Arc;
    
    #[derive(Debug)]
    struct Handler;
    
    #[tokio::main]
    async fn main() {
        let handler = Arc::new(Handler);
        let handler_clone = Arc::clone(&handler);
        tokio::spawn(process(handler_clone));
    }
    
    async fn process(_handler: Arc<Handler>) {
        unimplemented!()
    }
    

async fn

  • 异步函数语法:
    • async fn do_something() {/* ... */}
    • async fn 返回的是 Future,Future 需要由一个执行者来运行
  • futures::executor::block_on;
    • block_on 阻塞当前线程,直到提供的 Future 运行完成
    • 其它执行者提供更复杂的行为,例如将多个 Future 安排到同一个线程上

示例:async_example_01_.rs

use futures::executor::block_on;
use std::future::Future;

async fn hello_world1() {   // 等同于 hello_world2
    println!("你好,世界!");
}

fn hello_world2() -> impl Future<Output = ()> {
    async {
        println!("你好,世界!");
    }
}

fn main() {
    let fu1 = hello_world1();
    let fu2 = hello_world2();
    block_on(fu1); // 运行 `fu1` 并打印 “你好,世界!”
    block_on(fu2); // 运行 `fu2` 并打印 “你好,世界!”
}

block_on 阻塞当前线程,直到提供的未来完成执行。其他执行器提供更复杂的行为,例如将多个 future 调度到相同的 thread 上。

Await

  • 在 async fn 中,可以使用 .await 来等待另一个实现 Future trait 的完成
  • 与 block_on 不同,.await不会阻塞当前线程,而是异步的等待 Future 的完成(如果该 Future 目前无法取得进展,就允许其他任务执行)
use futures::executor::block_on;

async fn learn_and_sing() {
    // 在唱歌之前等待歌曲被学习。
    // 我们在这里使用`.await`而不是`block_on`,是为了防止阻塞线程,这样我们就可以在同时跳舞。
    let str = "hello";
    let song = learn_song(str).await;
    sing_song(song).await;
}

async fn dance() {
    // 随着歌曲的节奏跳舞。
}

async fn learn_song(song: &'static str) -> String {
    // 学习这首歌的歌词。
    // 这是一个简单示例,实际上,您可能希望使用异步 API 来获取歌词。
    let lyrics = song.to_string();
    lyrics
}

async fn sing_song(song: String) {
    // 唱这首歌的歌词。
    // 这是一个简单示例,实际上,您可能希望使用异步 API 来播放音乐。
    println!("Singing: {}", song);
}

async fn async_main() {
    let f1 = learn_and_sing();
    let f2 = dance();

    // `join!` 像 `.await` 一样,但可以同时等待多个未来。
    // 如果我们在 `learn_and_sing` 未来中暂时阻塞,那么 `dance`
    // 未来将接管当前线程。如果 `dance` 变得阻塞,
    // `learn_and_sing` 可以重新接管。如果两个未来都被阻塞,
    // 那么 `async_main` 就被阻塞,并会交给执行器。
    futures::join!(f1, f2);
}

fn main() {
    block_on(async_main());
}

learn_and_sing 的异步函数和一个名为 async_main 的异步主函数。learn_and_sing 函数异步地学习一首歌并演唱,dance 函数用于跳舞。async_main 函数启动了 learn_and_singdance 两个异步函数,并使用 futures::join! 宏等待这两个函数的完成。
main 函数是程序的入口点,它使用 block_on 宏阻塞当前线程,等待 async_main 函数的完成。

充分利用多核 CPU 的性能。在异步编程中,任务可以在不同的线程之间切换执行,从而提高程序的性能和响应能力。

示例:async_example_02_.rs

use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;

fn main() {
    let tcp_listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    for stream in tcp_listener.incoming() {
        let stream = stream.unwrap();
        handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).unwrap();
    let get = b"GET / HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };

    let contens = fs::read_to_string(filename).unwrap();
    let response = format!("{}, {}", status_line, contens);
    stream.write_all(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}

~> curl -v -X GET http://127.0.0.1:7878/random_url
Note: Unnecessary use of -X or --request, GET is already inferred.
*   Trying 127.0.0.1:7878...
* Connected to 127.0.0.1 (127.0.0.1) port 7878 (#0)
> GET /random_url HTTP/1.1
> Host: 127.0.0.1:7878
> User-Agent: curl/8.0.1
> Accept: */*
>
< HTTP/1.1 404 NOT FOUND
* no chunk, no close, no size. Assume close to signal end
<
, <!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="utf-8">
    <title>Hello!</title>
</head>

<body>
    <h1>Oops!</h1>
    <p>Sorry, I don't know what you're asking for.</p>
</body>

</html>* Closing connection 0

~> curl -X GET http://127.0.0.1:7878/random_url
, <!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="utf-8">
    <title>Hello!</title>
</head>

<body>
    <h1>Oops!</h1>
    <p>Sorry, I don't know what you're asking for.</p>
</body>

</html>

并发地处理连接

use async_std::net::TcpListener;
use async_std::net::TcpStream;
use async_std::prelude::*;
use async_std::task;
use futures::stream::StreamExt;
use std::fs;
use std::time::Duration;

#[tokio::main]
async fn main() {
    let tcp_listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
    tcp_listener
        .incoming()
        .for_each_concurrent(None, |tcp_stream| async move {
            let tcp_stream = tcp_stream.unwrap();
            handle_connection(tcp_stream).await;
        })
        .await
}

async fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).await.unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else if buffer.starts_with(sleep) {
        task::sleep(Duration::from_secs(5)).await;
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };

    let contens = fs::read_to_string(filename).unwrap();
    let response = format!("{}, {}", status_line, contens);
    stream.write_all(response.as_bytes()).await.unwrap();
    stream.flush().await.unwrap();
}

使用多线程并行处理请求

use async_std::net::TcpListener;
use async_std::net::TcpStream;
use async_std::prelude::*;
use async_std::task;
use async_std::task::spawn;
use futures::stream::StreamExt;
use std::fs;
use std::time::Duration;

#[tokio::main]
async fn main() {
    let tcp_listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
    tcp_listener
        .incoming()
        .for_each_concurrent(None, |tcpstream| async move {
            let tcpstream = tcpstream.unwrap();
            spawn(handle_connection(tcpstream));
        })
        .await;
}

async fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).await.unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else if buffer.starts_with(sleep) {
        task::sleep(Duration::from_secs(5)).await;
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };

    let contens = fs::read_to_string(filename).unwrap();
    let response = format!("{}, {}", status_line, contens);
    stream.write_all(response.as_bytes()).await.unwrap();
    stream.flush().await.unwrap();
}

use async_std::io::{Read, Write};
use async_std::net::TcpListener;
use async_std::prelude::*;
use async_std::task;
use async_std::task::spawn;
use futures::stream::StreamExt;
use std::fs;
use std::marker::Unpin;
use std::time::Duration;

#[async_std::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();

    listener
        .incoming()
        .for_each_concurrent(None, |tcpstream| async move {
            let tcpstream = tcpstream.unwrap();
            spawn(handle_connection(tcpstream));
        })
        .await;
}

async fn handle_connection(mut stream: impl Read + Write + Unpin) {
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).await.unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else if buffer.starts_with(sleep) {
        task::sleep(Duration::from_secs(5)).await;
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };
    let contents = fs::read_to_string(filename).unwrap();

    let response = format!("{status_line}{contents}");
    stream.write(response.as_bytes()).await.unwrap();
    stream.flush().await.unwrap();
}

#[cfg(test)]
mod tests {
    use super::*;
    use futures::io::Error;
    use futures::task::{Context, Poll};

    use std::cmp::min;
    use std::pin::Pin;

    struct MockTcpStream {
        read_data: Vec<u8>,
        write_data: Vec<u8>,
    }

    impl Read for MockTcpStream {
        fn poll_read(
            self: Pin<&mut Self>,
            _: &mut Context,
            buf: &mut [u8],
        ) -> Poll<Result<usize, Error>> {
            let size: usize = min(self.read_data.len(), buf.len());
            buf[..size].copy_from_slice(&self.read_data[..size]);
            Poll::Ready(Ok(size))
        }
    }

    impl Write for MockTcpStream {
        fn poll_write(
            mut self: Pin<&mut Self>,
            _: &mut Context,
            buf: &[u8],
        ) -> Poll<Result<usize, Error>> {
            self.write_data = Vec::from(buf);

            Poll::Ready(Ok(buf.len()))
        }

        fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Error>> {
            Poll::Ready(Ok(()))
        }

        fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Error>> {
            Poll::Ready(Ok(()))
        }
    }

    use std::marker::Unpin;
    impl Unpin for MockTcpStream {}

    use std::fs;

    #[async_std::test]
    async fn test_handle_connection() {
        let input_bytes = b"GET / HTTP/1.1\r\n";
        let mut contents = vec![0u8; 1024];
        contents[..input_bytes.len()].clone_from_slice(input_bytes);
        let mut stream = MockTcpStream {
            read_data: contents,
            write_data: Vec::new(),
        };

        handle_connection(&mut stream).await;
        let mut buf = [0u8; 1024];
        stream.read(&mut buf).await.unwrap();

        let expected_contents = fs::read_to_string("hello.html").unwrap();
        let expected_response = format!("HTTP/1.1 200 OK\r\n\r\n{}", expected_contents);
        assert!(stream.write_data.starts_with(expected_response.as_bytes()));
    }
}

标签:std,use,stream,let,rust,async,fn
From: https://www.cnblogs.com/liuzonglin/p/17809265.html

相关文章

  • Rust 结构体的方法描述
    Rust结构体的方法描述原文地址:https://rustwiki.org/zh-CN/rust-by-example/fn/methods.htmlRust的方法(method)是依附于对象的函数。这些方法通过关键字self来访问对象中的数据和其他。方法在impl代码块中定义。静态方法(staticmethod)静态方法不需要实例来调用,把结构体......
  • 信宇宙 TrustVerse:Web3.0数字资产管理
    在数字时代,个人和机构越来越依赖数字资产,这包括加密货币、数字证券、艺术品、不动产等各种价值存储形式。随着数字资产的重要性不断增加,安全性和管理变得至关重要。元信宇宙(TrustVerse)应运而生,旨在成为数字资产管理的未来,充分整合了Web3.0技术,智能生态网络(IEN)作为一种创新性的网络......
  • 22. 从零用Rust编写正反向代理,一个数据包的神奇HTTP历险记!
    wmproxywmproxy已用Rust实现http/https代理,socks5代理,反向代理,静态文件服务器,四层TCP/UDP转发,内网穿透,后续将实现websocket代理等,会将实现过程分享出来,感兴趣的可以一起造个轮子项目地址国内:https://gitee.com/tickbh/wmproxygithub:https://github.com/tickbh/wmpro......
  • 通过tide库如何使用Rust语言采集汽车之家
    身边许多有车的朋友,在日常用车养车过程中,经常会碰到这样那样的问题,很多人都喜欢去汽车之家寻求帮助。那么今天,我将为大家带来的是使用tide库来编写的一个爬虫程序,并使用Rust语言来采集汽车之家的相关内容,不是特别难,有需要的朋友可以看看学习一下喔。```rustusetide::{Request,Re......
  • 使用async和await获取axios的数据注意事项
    使用async和await获取axios的数据的注意事项确定正确使用asyncfunctiongetInfo(){constres=awaitaxios.get('http://example.com')returnres.data}上述代码等同于asyncfunctiongetInfo(){constresult=(awaitaxios.get('http://example.com')).data......
  • 安装rust
    https://blog.csdn.net/xiaojin21cen/article/details/129767672 使用rust语言碰到linker`link.exe`notfound|=note:programnotfoundnote:themsvctarge 运行rustuptoolchaininstallstable-x86_64-pc-windows-gnurustupdefaultstable-x86_64-pc-windows-g......
  • 系统编程:控制文件I/O的内核缓冲之sync(),fsync()和fdatasync()
        通过系统编程:从write()和fwrite()谈开来我们知道了系统调用和glibc库函数为了提升性能而设立的缓冲区,那么,什么情况下数据会从上一次缓冲区刷新到下一层存储介质(可能是缓冲区,也可能是永久存储介质)呢?fflush()库函数提供了强制将stdio库函数缓冲区数据刷新到内核缓冲......
  • rust中使用zip crate解压.gz文件
    添加所需的库到Cargo.toml文件中:zip="0.6.6"直接上代码,都在酒里了.usestd::fs::File;usestd::io::{Read,Write};usestd::process::exit;usestd::path::{Path,PathBuf};usezip::ZipArchive;fnmain(){//======设置输入输出路径======letzip_......
  • `async` 函数没有使用 `await` 的执行顺序
    async函数没有使用await的执行顺序什么是async函数?async是JavaScript中的一个关键字,用于定义异步函数。异步函数返回一个Promise对象,但如果没有使用await,它将不会等待异步操作的完成。基本概念在async函数内没有使用await时,执行顺序遵循以下基本原则:立即执行async函......
  • 理解 JavaScript 的 async/await
    1.async和await在干什么任意一个名称都是有意义的,先从字面意思来理解。async是“异步”的简写,而await可以认为是asyncwait的简写。所以应该很好理解async用于申明一个function是异步的,而await用于等待一个异步方法执行完成。另外还有一个很有意思的语法规定,awai......