首页 > 其他分享 >rust axum sse

rust axum sse

时间:2024-07-24 09:06:36浏览次数:20  
标签:use axum stream let tokio sse event rust

Cargo.toml:

[package]
name = "sse"
version = "0.1.0"
edition = "2021"

[dependencies]
axum = "0.7.5"
axum-extra = { version = "0.9.3", features = ["typed-header"] }
eventsource-stream = "0.2"
futures = "0.3.11"
headers = "0.4.0"
reqwest = "0.12"
reqwest-eventsource = "0.5"
tokio = { version = "1.0.1", features = ["full"] }
tokio-stream = "0.1.0"
tower-http = { version = "0.5.0", features = ["fs", "trace"] }
tracing = "0.1.0"
tracing-subscriber = { version = "0.3.0", features = ["env-filter"] }

[dev-dependencies]
eventsource-stream = "0.2.0"
reqwest = { version = "0.12.0", features = ["stream"] }
reqwest-eventsource = "0.5"

main.rs:

//! Run with
//!
//! ```not_rust
//! cargo run -p example-sse
//! ```
//! Test with
//! ```not_rust
//! cargo test -p example-sse
//! ```

use axum::{
    response::sse::{Event, Sse},
    routing::get,
    Router,
};
use axum_extra::TypedHeader;
use futures::stream::{self, Stream};
use std::{convert::Infallible, path::PathBuf, time::Duration};
use tokio_stream::StreamExt as _;
use tower_http::{services::ServeDir, trace::TraceLayer};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

#[tokio::main]
async fn main() {
    tracing_subscriber::registry()
        .with(
            tracing_subscriber::EnvFilter::try_from_default_env()
                .unwrap_or_else(|_| "example_sse=debug,tower_http=debug".into()),
        )
        .with(tracing_subscriber::fmt::layer())
        .init();

    // build our application
    let app = app();

    // run it
    let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
        .await
        .unwrap();
    tracing::debug!("listening on {}", listener.local_addr().unwrap());
    axum::serve(listener, app).await.unwrap();
}

fn app() -> Router {
    let assets_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("assets");
    let static_files_service = ServeDir::new(assets_dir).append_index_html_on_directories(true);
    // build our application with a route
    Router::new()
        .fallback_service(static_files_service)
        .route("/sse", get(sse_handler))
        .layer(TraceLayer::new_for_http())
}

async fn sse_handler(
    TypedHeader(user_agent): TypedHeader<headers::UserAgent>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    println!("`{}` connected", user_agent.as_str());

    // A `Stream` that repeats an event every second
    //
    // You can also create streams from tokio channels using the wrappers in
    // https://docs.rs/tokio-stream
    let stream = stream::repeat_with(|| Event::default().data("hi!"))
        .map(Ok)
        .throttle(Duration::from_secs(1));

    Sse::new(stream).keep_alive(
        axum::response::sse::KeepAlive::new()
            .interval(Duration::from_secs(1))
            .text("keep-alive-text"),
    )
}

#[cfg(test)]
mod tests {
    use eventsource_stream::Eventsource;
    use tokio::net::TcpListener;

    use super::*;

    #[tokio::test]
    async fn integration_test() {
        // A helper function that spawns our application in the background
        async fn spawn_app(host: impl Into<String>) -> String {
            let host = host.into();
            // Bind to localhost at the port 0, which will let the OS assign an available port to us
            let listener = TcpListener::bind(format!("{}:0", host)).await.unwrap();
            // Retrieve the port assigned to us by the OS
            let port = listener.local_addr().unwrap().port();
            tokio::spawn(async {
                axum::serve(listener, app()).await.unwrap();
            });
            // Returns address (e.g. http://127.0.0.1{random_port})
            format!("http://{}:{}", host, port)
        }
        let listening_url = spawn_app("127.0.0.1").await;

        let mut event_stream = reqwest::Client::new()
            .get(format!("{}/sse", listening_url))
            .header("User-Agent", "integration_test")
            .send()
            .await
            .unwrap()
            .bytes_stream()
            .eventsource()
            .take(1);

        let mut event_data: Vec<String> = vec![];
        while let Some(event) = event_stream.next().await {
            match event {
                Ok(event) => {
                    // break the loop at the end of SSE stream
                    if event.data == "[DONE]" {
                        break;
                    }

                    event_data.push(event.data);
                }
                Err(_) => {
                    panic!("Error in event stream");
                }
            }
        }

        assert!(event_data[0] == "hi!");
    }
}

标签:use,axum,stream,let,tokio,sse,event,rust
From: https://www.cnblogs.com/soarowl/p/18320061

相关文章

  • SpringBoot整合SSE技术详解
    SpringBoot整合SSE技术详解1.引言在现代Web应用中,实时通信变得越来越重要。Server-SentEvents(SSE)是一种允许服务器向客户端推送数据的技术,为实现实时更新提供了一种简单而有效的方法。本文将详细介绍如何在SpringBoot中整合SSE,并探讨SSE与WebSocket的区别。2.SS......
  • java中assert用法
    java中assert用法一、java为什么源码框架都用assert调试1、一般是做单元测试的时候用(比如Junit),其它的地方也可以使用,但是基本上没人用,因为在其它的地方判断语句比断言好用。2、如果表达式计算为false,那么系统会报告一个Assertionerror。3、由于assert是一个新关键字,使用老......
  • gofiber sse
    packagemainimport( "bufio" "fmt" "log" "time" "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/middleware/cors" "github.com/valyala/fasthttp")varindex=[]byte......
  • 【数据结构】排序算法——Lessen1
    Hi~!这里是奋斗的小羊,很荣幸您能阅读我的文章,诚请评论指点,欢迎欢迎~~......
  • PyTorch LSTM 模型上的 CrossEntropyLoss,每个时间步一分类
    我正在尝试创建一个LSTM模型来检测时间序列数据中的异常情况。它需要5个输入并产生1个布尔输出(如果检测到异常则为True/False)。异常模式通常连续3-4个时间步长。与大多数LSTM示例不同,它们预测未来数据或对整个数据序列进行分类,我尝试在每个时间步输出True/False检......
  • Rust 发散函数
    发散函数发散函数(divergingfunction)绝不会返回。它们使用!标记,这是一个空类型。fnfoo()->!{panic!("Thiscallneverreturns.");}和所有其他类型相反,这个类型无法实例化,因为此类型可能具有的所有可能值的集合为空。注意,它与()类型不同,后者只有一个可......
  • rust may_minihttp server
    Cargo.toml:[package]name="demo"version="0.1.0"edition="2021"[dependencies]bytes="1.6.1"may="0.3.45"may_minihttp={git="https://github.com/Xudong-Huang/may_minihttp.git"}y......
  • REASSESSMENT TASK
    ResubmissionAssessmentTitle:ResubmissionassignmentUnitLevel:6ReassessmentNumber:1of1CreditValueofUnit:20DateIssued:04/07/2024UnitLeader:PaulDeVriezeSubmissionDueDate:01/08/2024Time:12:30PMOtherMarker(s):N/ASubmissionLocati......
  • Rust语言圣经-流程控制
    提问索引访问集合和for遍历有什么区别回答//第一种letcollection=[1,2,3,4,5];foriin0..collection.len(){letitem=collection[i];//...}//第二种foritemincollection{}使用索引(第一种)访问会因边界访问导致性能损耗;当遍历集合发生改变......
  • dnssec
    ###DNSSEC(DNSSecurityExtensions)**DNSSEC**是一组扩展DNS协议的安全机制,用于确保DNS数据的完整性和真实性。它主要包括以下几个方面:1.**数据完整性**:DNSSEC通过数字签名验证DNS响应的数据是否被篡改,确保从DNS服务器获得的数据未被修改。2.**数据来源认证**:通过DNSSEC,客......