async
-
Send
和Sync
在跨线程时需要关注- Send:
ownership(所有权)
可以 send 到其他线程- A type is Send if it is safe to send it to another thread.
- Sync: 可以并发,无线程安全问题
- A type is Sync if it is safe to share between threads (T is Sync if and only if &T is Send).
- Send:
-
Why import: 因为有些异步执行引擎时可以多线程执行的(比如 Tokio)
-
Rc<T>
: !Send, !Syncuse std::rc::Rc; #[derive(Debug)] struct Handler; #[tokio::main] async fn main() { let handler = Rc::new(Handler); let handler_clone = Rc::clone(&handler); tokio::spawn(process(handler_clone)); } async fn process(_handler: Rc<Handler>) { unimplemented!() }
Rc 不是 Send
future returned by
process
is notSend
error: future cannot be sent between threads safely --> src\main.rs:10:18 | 10 | tokio::spawn(process(handler_clone)); | ^^^^^^^^^^^^^^^^^^^^^^ future returned by `process` is not `Send` | = help: within `impl Future<Output = ()>`, the trait `Send` is not implemented for `Rc<Handler>` note: captured value is not `Send` --> src\main.rs:13:18 | 13 | async fn process(handler: Rc<Handler>) { | ^^^^^^^ has type `Rc<Handler>` which is not `Send` note: required by a bound in `tokio::spawn` --> C:\Users\liuzonglin\.cargo\registry\src\mirrors.ustc.edu.cn-61ef6e0cd06fb9b8\tokio-1.29.1\src\task\spawn.rs:166:21 | 164 | pub fn spawn<T>(future: T) -> JoinHandle<T::Output> | ----- required by a bound in this function 165 | where 166 | T: Future + Send + 'static, | ^^^^ required by this bound in `spawn`
-
Arc<T>
: Send & Sync if : T is Send & SyncRc 替换 Arc
use std::sync::Arc; #[derive(Debug)] struct Handler; #[tokio::main] async fn main() { let handler = Arc::new(Handler); let handler_clone = Arc::clone(&handler); tokio::spawn(process(handler_clone)); } async fn process(_handler: Arc<Handler>) { unimplemented!() }
async fn
- 异步函数语法:
async fn do_something() {/* ... */}
async fn
返回的是 Future,Future 需要由一个执行者来运行
futures::executor::block_on;
- block_on 阻塞当前线程,直到提供的 Future 运行完成
- 其它执行者提供更复杂的行为,例如将多个 Future 安排到同一个线程上
use futures::executor::block_on;
use std::future::Future;
async fn hello_world1() { // 等同于 hello_world2
println!("你好,世界!");
}
fn hello_world2() -> impl Future<Output = ()> {
async {
println!("你好,世界!");
}
}
fn main() {
let fu1 = hello_world1();
let fu2 = hello_world2();
block_on(fu1); // 运行 `fu1` 并打印 “你好,世界!”
block_on(fu2); // 运行 `fu2` 并打印 “你好,世界!”
}
block_on
阻塞当前线程,直到提供的未来完成执行。其他执行器提供更复杂的行为,例如将多个 future
调度到相同的 thread
上。
Await
- 在 async fn 中,可以使用 .await 来等待另一个实现 Future trait 的完成
- 与 block_on 不同,.await不会阻塞当前线程,而是异步的等待 Future 的完成(如果该 Future 目前无法取得进展,就允许其他任务执行)
use futures::executor::block_on;
async fn learn_and_sing() {
// 在唱歌之前等待歌曲被学习。
// 我们在这里使用`.await`而不是`block_on`,是为了防止阻塞线程,这样我们就可以在同时跳舞。
let str = "hello";
let song = learn_song(str).await;
sing_song(song).await;
}
async fn dance() {
// 随着歌曲的节奏跳舞。
}
async fn learn_song(song: &'static str) -> String {
// 学习这首歌的歌词。
// 这是一个简单示例,实际上,您可能希望使用异步 API 来获取歌词。
let lyrics = song.to_string();
lyrics
}
async fn sing_song(song: String) {
// 唱这首歌的歌词。
// 这是一个简单示例,实际上,您可能希望使用异步 API 来播放音乐。
println!("Singing: {}", song);
}
async fn async_main() {
let f1 = learn_and_sing();
let f2 = dance();
// `join!` 像 `.await` 一样,但可以同时等待多个未来。
// 如果我们在 `learn_and_sing` 未来中暂时阻塞,那么 `dance`
// 未来将接管当前线程。如果 `dance` 变得阻塞,
// `learn_and_sing` 可以重新接管。如果两个未来都被阻塞,
// 那么 `async_main` 就被阻塞,并会交给执行器。
futures::join!(f1, f2);
}
fn main() {
block_on(async_main());
}
learn_and_sing
的异步函数和一个名为 async_main
的异步主函数。learn_and_sing
函数异步地学习一首歌并演唱,dance
函数用于跳舞。async_main
函数启动了 learn_and_sing
和 dance
两个异步函数,并使用 futures::join!
宏等待这两个函数的完成。
main
函数是程序的入口点,它使用 block_on
宏阻塞当前线程,等待 async_main
函数的完成。
充分利用多核 CPU
的性能。在异步编程中,任务可以在不同的线程之间切换执行,从而提高程序的性能和响应能力。
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
fn main() {
let tcp_listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in tcp_listener.incoming() {
let stream = stream.unwrap();
handle_connection(stream);
}
}
fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 1024];
stream.read(&mut buffer).unwrap();
let get = b"GET / HTTP/1.1\r\n";
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
};
let contens = fs::read_to_string(filename).unwrap();
let response = format!("{}, {}", status_line, contens);
stream.write_all(response.as_bytes()).unwrap();
stream.flush().unwrap();
}
~> curl -v -X GET http://127.0.0.1:7878/random_url
Note: Unnecessary use of -X or --request, GET is already inferred.
* Trying 127.0.0.1:7878...
* Connected to 127.0.0.1 (127.0.0.1) port 7878 (#0)
> GET /random_url HTTP/1.1
> Host: 127.0.0.1:7878
> User-Agent: curl/8.0.1
> Accept: */*
>
< HTTP/1.1 404 NOT FOUND
* no chunk, no close, no size. Assume close to signal end
<
, <!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Hello!</title>
</head>
<body>
<h1>Oops!</h1>
<p>Sorry, I don't know what you're asking for.</p>
</body>
</html>* Closing connection 0
~> curl -X GET http://127.0.0.1:7878/random_url
, <!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Hello!</title>
</head>
<body>
<h1>Oops!</h1>
<p>Sorry, I don't know what you're asking for.</p>
</body>
</html>
并发地处理连接
use async_std::net::TcpListener;
use async_std::net::TcpStream;
use async_std::prelude::*;
use async_std::task;
use futures::stream::StreamExt;
use std::fs;
use std::time::Duration;
#[tokio::main]
async fn main() {
let tcp_listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
tcp_listener
.incoming()
.for_each_concurrent(None, |tcp_stream| async move {
let tcp_stream = tcp_stream.unwrap();
handle_connection(tcp_stream).await;
})
.await
}
async fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 1024];
stream.read(&mut buffer).await.unwrap();
let get = b"GET / HTTP/1.1\r\n";
let sleep = b"GET /sleep HTTP/1.1\r\n";
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else if buffer.starts_with(sleep) {
task::sleep(Duration::from_secs(5)).await;
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
};
let contens = fs::read_to_string(filename).unwrap();
let response = format!("{}, {}", status_line, contens);
stream.write_all(response.as_bytes()).await.unwrap();
stream.flush().await.unwrap();
}
使用多线程并行处理请求
use async_std::net::TcpListener;
use async_std::net::TcpStream;
use async_std::prelude::*;
use async_std::task;
use async_std::task::spawn;
use futures::stream::StreamExt;
use std::fs;
use std::time::Duration;
#[tokio::main]
async fn main() {
let tcp_listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
tcp_listener
.incoming()
.for_each_concurrent(None, |tcpstream| async move {
let tcpstream = tcpstream.unwrap();
spawn(handle_connection(tcpstream));
})
.await;
}
async fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 1024];
stream.read(&mut buffer).await.unwrap();
let get = b"GET / HTTP/1.1\r\n";
let sleep = b"GET /sleep HTTP/1.1\r\n";
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else if buffer.starts_with(sleep) {
task::sleep(Duration::from_secs(5)).await;
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
};
let contens = fs::read_to_string(filename).unwrap();
let response = format!("{}, {}", status_line, contens);
stream.write_all(response.as_bytes()).await.unwrap();
stream.flush().await.unwrap();
}
use async_std::io::{Read, Write};
use async_std::net::TcpListener;
use async_std::prelude::*;
use async_std::task;
use async_std::task::spawn;
use futures::stream::StreamExt;
use std::fs;
use std::marker::Unpin;
use std::time::Duration;
#[async_std::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
listener
.incoming()
.for_each_concurrent(None, |tcpstream| async move {
let tcpstream = tcpstream.unwrap();
spawn(handle_connection(tcpstream));
})
.await;
}
async fn handle_connection(mut stream: impl Read + Write + Unpin) {
let mut buffer = [0; 1024];
stream.read(&mut buffer).await.unwrap();
let get = b"GET / HTTP/1.1\r\n";
let sleep = b"GET /sleep HTTP/1.1\r\n";
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else if buffer.starts_with(sleep) {
task::sleep(Duration::from_secs(5)).await;
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
};
let contents = fs::read_to_string(filename).unwrap();
let response = format!("{status_line}{contents}");
stream.write(response.as_bytes()).await.unwrap();
stream.flush().await.unwrap();
}
#[cfg(test)]
mod tests {
use super::*;
use futures::io::Error;
use futures::task::{Context, Poll};
use std::cmp::min;
use std::pin::Pin;
struct MockTcpStream {
read_data: Vec<u8>,
write_data: Vec<u8>,
}
impl Read for MockTcpStream {
fn poll_read(
self: Pin<&mut Self>,
_: &mut Context,
buf: &mut [u8],
) -> Poll<Result<usize, Error>> {
let size: usize = min(self.read_data.len(), buf.len());
buf[..size].copy_from_slice(&self.read_data[..size]);
Poll::Ready(Ok(size))
}
}
impl Write for MockTcpStream {
fn poll_write(
mut self: Pin<&mut Self>,
_: &mut Context,
buf: &[u8],
) -> Poll<Result<usize, Error>> {
self.write_data = Vec::from(buf);
Poll::Ready(Ok(buf.len()))
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Error>> {
Poll::Ready(Ok(()))
}
}
use std::marker::Unpin;
impl Unpin for MockTcpStream {}
use std::fs;
#[async_std::test]
async fn test_handle_connection() {
let input_bytes = b"GET / HTTP/1.1\r\n";
let mut contents = vec![0u8; 1024];
contents[..input_bytes.len()].clone_from_slice(input_bytes);
let mut stream = MockTcpStream {
read_data: contents,
write_data: Vec::new(),
};
handle_connection(&mut stream).await;
let mut buf = [0u8; 1024];
stream.read(&mut buf).await.unwrap();
let expected_contents = fs::read_to_string("hello.html").unwrap();
let expected_response = format!("HTTP/1.1 200 OK\r\n\r\n{}", expected_contents);
assert!(stream.write_data.starts_with(expected_response.as_bytes()));
}
}
标签:std,use,stream,let,rust,async,fn
From: https://www.cnblogs.com/liuzonglin/p/17809265.html