首页 > 其他分享 >Rust | 实现 API 限速操作 Example

Rust | 实现 API 限速操作 Example

时间:2024-05-09 14:13:07浏览次数:18  
标签:std use governor rate API let tower Example Rust

在这篇文章中,我们将讨论如何在 Rust 中实现 API 限速。当涉及到生产中的服务时,是为了确保不良行为者不会滥用 API——这就是 API 限速的作用所在。

我们将实现 “滑动窗口” 算法,通过动态周期来检查请求历史,并使用基本的内存 hashmap 来存储用户 IP 及其请求时间。我们还将研究如何使用 tower-governor 库来实现限速。

实现一个简单的滑动窗口限速器

让我们从头开始编写一个简单的基于 IP 的滑动窗口限速器。

在项目中加入以下依赖项:

[dependencies]
chrono = { version = "0.4.34", features = ["serde", "clock"] }
serde = { version = "1.0.196", features = ["derive"] }

我们将声明一个新的结构体 RateLimiter,它保存 IP 地址作为键的 HashMap,值为 Vec<DateTime>(Utc 时区的时间戳)。

use std::sync::{Arc, Mutex};
use std::collections::HashMap;
use std::net::IpAddr;
use chrono::{DateTime, Utc};

// 這是用戶訪問端點的請求限制(每分鐘)
// 如果用戶試圖超過這個限制,返回一個錯誤
const REQUEST_LIMIT: usize = 120;

#[derive(Clone, Default)]
pub struct RateLimiter {
    requests: Arc<Mutex<HashMap<IpAddr, Vec<DateTime<Utc>>>>>,
}

首先,我们想要通过使用.lock() 来锁定我们的 HashMap,这给了我们写访问权限。然后,我们需要检查hashmap 是否包含一个键,其中包含我们想要使用.entry() 函数检查的IP 地址,然后通过保留有效的时间戳来修改它,并根据长度是否在请求限制之下push 一个新记录。然后检查记录长度是否大于请求限制—如果是,则返回错误;如果不是,则返回 Ok(())。

impl RateLimiter {
    fn check_if_rate_limited(&self, ip_addr: IpAddr) -> Result<(), String> {
        // 我們只想保留60秒前的時間戳
        let throttle_time_limit = Utc::now() - std::time::Duration::from_secs(60);

        let mut requests_hashmap = self.requests.lock().unwrap();

        let mut requests_for_ip = requests_hashmap
            // 檢索記錄,並允許我們在適當的地方修改它
            .entry(ip_addr)
            // 如果記錄爲空,則插入帶有當前時間戳的vec
            .or_default();

        requests_for_ip.retain(|x| x.to_utc() > throttle_time_limit);
        requests_for_ip.push(Utc::now());

        if requests_for_ip.len() > REQUEST_LIMIT {
            return Err("IP is rate limited :(".to_string());
        }

        Ok(())
    }
}

测试如下:

fn main() {
    let rate_limiter = RateLimiter::default();

    let localhost_v4 = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));

    // 這裏我們請求120次
    for _ in 1..80 {
        assert!(rate_limiter.check_if_rate_limited(localhost_v4).is_ok())
    }

    // 等待30秒
    std::thread::sleep(std::time::Duration::from_secs(30));

    // 在這裏再做40個請求
    for _ in 1..40 {
        assert!(rate_limiter.check_if_rate_limited(localhost_v4).is_ok())
    }

    // 再等30秒
    std::thread::sleep(std::time::Duration::from_secs(30));

    // 現在我們可以再做80個請求
    for _ in 1..80 {
        assert!(rate_limiter.check_if_rate_limited(localhost_v4).is_ok())
    }
}

然而,生产上的限速系统通常比这先进得多。我们将在下面讨论如何利用 tower-governor crate 进行速率限制。

使用 tower-governor 实现限速器

提供速率限制的 tower 服务和层,后端是 governo 库。 tower-governor 很大程度上基于 actix-governor 所做的工作,可以与 Axum, Hyper, Tonic 和其他任何基于 tower 组件库的框架一起使用!

tower-governor 的特点:

  • 速率限制请求基于对端的 IP 地址,IP 地址头或通过自定义设置

  • 自定义流量限制的标准是按每秒计算

  • 使用简单

  • 高可定制性

  • 高性能

  • 健壮而灵活的 API

它是如何工作的?

每个调控器中间件都有一个存储配额的配置,配额指定可以从一个 IP 地址发送多少请求,如果超过配额则阻止进一步请求。

例如,如果配额允许 10 个请求,客户机可以在中间件开始阻塞之前的短时间内发送 10 个请求。

一旦使用了至少一个配额元素,配额元素将在指定时间后补充。

例如,如果周期为 2 秒,并且配额为空,则需要 2 秒来补充配额的一个元素。这意味着可以平均每两秒钟发送一个请求。

如果配额允许在同一时间段内发送10 个请求,则客户端可以再次发送10 个请求的突发事件,然后必须等待2 秒才能发送进一步的请求,或者在完整配额被补充之前等待20 秒,他可以发送另一个突发事件。

下面我们来实现一个例子。

首先,在项目中加入以下依赖项:

建议版本一致

[dependencies]
tokio = {version = "1.36.0", features = ["full"] }
tower_governor = "0.3.2"
axum = "0.7" 
tracing = {version="0.1.37", features=["attributes"]}
tracing-subscriber = "0.3"
tower = "0.4.13"

然后,在 main.rs 文件中写入以下代码:

use axum::{routing::get, Router};
use std::net::SocketAddr;
use std::time::Duration;
use tokio::net::TcpListener;
use tower_governor::{governor::GovernorConfigBuilder, GovernorLayer};

async fn hello() -> &'static str {
   "Hello world"
}

#[tokio::main]
async fn main() {
   // 配置跟蹤
   // 構造一個訂閱者,將格式化的跟蹤信息打印到標準輸出
   let subscriber = tracing_subscriber::FmtSubscriber::new();
   // 使用subscriber處理在此點之後發出的跟蹤
   tracing::subscriber::set_global_default(subscriber).unwrap();

   // 允許每個IP地址最多有五個請求,每兩秒鐘補充一個
   // 我們將其裝箱是因爲Axum 0.6要求所有層都是克隆的,因此我們需要一個靜態引用
   let governor_conf = Box::new(
       GovernorConfigBuilder::default()
           .per_second(2)
           .burst_size(5)
           .finish()
           .unwrap(),
   );

   let governor_limiter = governor_conf.limiter().clone();
   let interval = Duration::from_secs(60);
   // 一個單獨的後臺任務
   std::thread::spawn(move || {
       loop {
           std::thread::sleep(interval);
           tracing::info!("rate limiting storage size: {}", governor_limiter.len());
           governor_limiter.retain_recent();
       }
   });

   // 構建路由
   let app = Router::new()
       // `GET /` 
       .route("/", get(hello))
       .layer(GovernorLayer {
           // 我們可以泄漏它,因爲它是一次性創建的
           config: Box::leak(governor_conf),
       });

   let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
   tracing::debug!("listening on {}", addr);
   let listener = TcpListener::bind(addr).await.unwrap();
   axum::serve(listener, app.into_make_service_with_connect_info::<SocketAddr>())
       .await
       .unwrap();
}

运行服务器后,在浏览器中刷新请求,2 秒内超过 5 次会提示以下错误:

服务器后台跟踪日志如下:

2024-05-09T05:41:10.065323Z  INFO limit_api2: rate limiting storage size: 1
2024-05-09T05:41:20.067175Z  INFO limit_api2: rate limiting storage size: 1
2024-05-09T05:41:30.071745Z  INFO limit_api2: rate limiting storage size: 1
2024-05-09T05:41:40.073187Z  INFO limit_api2: rate limiting storage size: 1
2024-05-09T05:41:50.077540Z  INFO limit_api2: rate limiting storage size: 0
2024-05-09T05:42:00.077448Z  INFO limit_api2: rate limiting storage size: 0

标签:std,use,governor,rate,API,let,tower,Example,Rust
From: https://www.cnblogs.com/RioTian/p/18182017

相关文章

  • 【转】[C#] WebAPI 防止并发调用二(冥等性)
    来自:阿里的通义灵码使用幂等性设计来防止C#WebAPI方法的并发调用是一种推荐的方法,因为它不会阻塞其他请求,而是确保多次调用同一个操作会产生相同的结果。这里有一个简单的示例,说明如何在WebAPI控制器中实现幂等性的API:usingSystem;usingSystem.Web.Http;usingSystem.Lin......
  • rust搭建交叉编译环境
    最近尝试了一下rust交叉编译,简单记录一下。原理1、使用rust的编译器将rust源码编译到汇编或者.o的状态(具体是汇编还是.o没有考证过)。2、使用目标平台的toolchain将rust生成的汇编或者.o链接成ELF等可执行的格式。基于上述原理,需要解决两个问题:首先,怎么让rust将rust代......
  • 【转】[C#] WebAPI 防止并发调用一(锁)
    来源:阿里的通义灵码在C#WebAPI中,如果你想要使用锁来防止并发调用,你可以使用System.Threading.Mutex或System.Threading.Lock来实现。但是,这种方法通常不推荐,因为它可能会导致请求阻塞,从而影响整个Web服务的性能。在Web环境中,更好的做法是使用幂等性设计或数据库事务来处理并发......
  • k8s——api
    api概述api是k8s系统的重要部分,组件之间的所有操作和通信均由apiserver处理的restapi调用,大多数情况下,api定义和实现都符合标准的httprest格式,可以通过kubctl命令管理工具或其他命令行工具来执行api类型alpha包含alpha名称的版本(例如v1alpha1)该软件可能会包含错误。......
  • 关于Java Chassis 3的契约优先(API First)开发
    本文分享自华为云社区《JavaChassis3技术解密:契约优先(APIFirst)开发》,作者:liubao68。契约优先(APIFirst)开发是指应用程序开发过程中,将API设计作为第一优先级的任务。契约优先开发随着WebServices概念的发展而不断得到重视,特别是微服务架构出现以后,API设计成为影响功能开放、......
  • Face Detection API
    一个针对图像中的人脸进行识别的底层加速平台组件<!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"/><metaname="viewport"content="width=device-width,user-scalable=no,initial-scale=1.0"......
  • 经过dnat后访问kube-apiserver证书认证失败
    问题现象iptables-tnat-IOUTPUT-d10.10.10.10-ptcp--dport443-jDNAT--to-destination192.168.0.105:6443#报错requesteddomainnamedoesnotmatchtheserver'scertificate,无法通过证书认证。curlhttps://10.10.10.10:443/livez--key./client.key--cert......
  • 三维API sheder 基础
    这个shader是靠三维数学影响二维像素导致像素颜色改变它是每个像素走一遍脚本算法写的时候注意语言格式写错了shader脚本是不能用的,根本就不好使这个可以用区域用xyzy为0没第三坐标,Y是三维的高度xz才是地面用数字限制出是椭圆啊,还是正方形长方形啥的全影响就不锁......
  • 5.8——前端api
    文章分类模块importrequestfrom"@/utils/request.js";//import{useTokenStore}from"@/stores/token.js";//文章分类列表查询//文章分类列表查询exportconstarticleCategoryListService=()=>{//获取token状态//consttokenStore=useTokenStore();//通过......
  • 实时股票数据API接口websocket接入方法
    一、使用websocket的协议提升传输速度实时金融股票API接口对于投资者和交易员来说至关重要。通过使用WebSocket接入方法,可以轻松获取实时金融股票API接口的数据并及时做出决策。WebSocket是一种高效的双向通信协议,它允许数据的实时推送,避免了不断的轮询请求。这种接入方法具有多......