首页 > 其他分享 >11. 用Rust手把手编写一个wmproxy(代理,内网穿透等), 实现健康检查

11. 用Rust手把手编写一个wmproxy(代理,内网穿透等), 实现健康检查

时间:2023-10-12 09:04:02浏览次数:35  
标签:11 let addr value health fall 健康检查 Rust wmproxy

11. 用Rust手把手编写一个wmproxy(代理,内网穿透等), 实现健康检查

项目 ++wmproxy++

gite: https://gitee.com/tickbh/wmproxy

github: https://github.com/tickbh/wmproxy

健康检查的意义

健康检查维持着系统的稳定运行, 极大的加速着服务的响应时间, 并保证服务器不会把消息包转发到不能响应的服务器上, 从而使系统快速稳定的运转
在LINUX系统中,系统默认TCP建立连接超时时间为127秒。通常网络不可达或者网络连接被拒绝或者网络连接超时需要耗时的时长较长。此时会超成服务器的响应时间变长很多,而且重复发起不可达的连接尝试也在耗着大量的IO资源。
当健康检查介入后,如果短时间内多次建立连接失败,则暂时判定该地址不可达,状态设置为不可达。如果此时接收到该地址的请求时直接返回错误。大大提高了响应的时间。
所以健康检查是必不可少的存在。

如何实现

由于健康状态需要调用的地方可能在任意处需要发起连接的地方,如果通过参数透传也会涉及到多线程的数据共用,如Arc<Mutex<Data>>,取用的时候也是要通过锁共用,且编码的复杂度和理解成本急剧升高,所以此处健康检查选用的是多线程共用的静态处理变量。

Rust中的静态变量

在Rust中,全局变量可以分为两种:

  • 编译期初始化的全局变量
  • 运行期初始化的全局变量

编译期初始化的全局变量有:

const创建的常量,如 const MAX_ID:usize=usize::MAX/2;
static创建的静态变量,如 static mut REQUEST_RECV:usize=0;

运行期初始化的全局变量有lazy_static用于懒初始化。例如:

lazy_static! {
    static ref HEALTH_CHECK: RwLock<HealthCheck> = RwLock::new(HealthCheck::new(60, 3, 2));
}

此外还有

  • 实现你自己的运行时初始化:std::sync::Once + static mut T
  • 单线程运行时初始化的特殊情况:thread_local

我们此处维持一个HealthCheck的全局变量,因为程序是多线程,用thread_local,无法共用其它线程的检测,不条例预期,所以此处用读写锁来保证全局变量的正确性,读写锁的特点是允许存在多个读,但如果获取写必须保证唯一。

源码解析,暂时不做主动性的健康检查

接下来我们看HealthCheck的定义

pub struct HealthCheck {
    /// 健康检查的重置时间, 失败超过该时间会重新检查, 统一单位秒
    fail_timeout: usize,
    /// 最大失败次数, 一定时间内超过该次数认为不可访问
    max_fails: usize,
    /// 最小上线次数, 到达这个次数被认为存活
    min_rises: usize,
    /// 记录跟地址相关的信息
    health_map: HashMap<SocketAddr, HealthRecord>,
}

/// 每个SocketAddr的记录值
struct HealthRecord {
    /// 最后的记录时间
    last_record: Instant,
    /// 失败的恢复时间
    fail_timeout: Duration,
    /// 当前连续失败的次数
    fall_times: usize,
    /// 当前连续成功的次数
    rise_times: usize,
    /// 当前的状态
    failed: bool,
}

主要有最后记录时间,失败次数,成功次数,最大失败惩罚时间等元素组成

我们通过函数is_fall_down判定是否是异常状态,未检查前默认为正常状态,超出一定时间后,解除异常状态。

/// 检测状态是否能连接
pub fn is_fall_down(addr: &SocketAddr) -> bool {
    // 只读,获取读锁
    if let Ok(h) = HEALTH_CHECK.read() {
        if !h.health_map.contains_key(addr) {
            return false;
        }
        let value = h.health_map.get(&addr).unwrap();
        if Instant::now().duration_since(value.last_record) > value.fail_timeout {
            return false;
        }
        h.health_map[addr].failed
    } else {
        false
    }
}

如果连接TCP失败则调用add_fall_down将该地址失败连接次数+1,如果失败次数达到最大失败次数将状态置为不可用。

/// 失败时调用
pub fn add_fall_down(addr: SocketAddr) {
    // 需要写入,获取写入锁
    if let Ok(mut h) = HEALTH_CHECK.write() {
        if !h.health_map.contains_key(&addr) {
            let mut health = HealthRecord::new(h.fail_timeout);
            health.fall_times = 1;
            h.health_map.insert(addr, health);
        } else {
            let max_fails = h.max_fails;
            let value = h.health_map.get_mut(&addr).unwrap();
            // 超出最大的失败时长,重新计算状态
            if Instant::now().duration_since(value.last_record) > value.fail_timeout {
                value.clear_status();
            }
            value.last_record = Instant::now();
            value.fall_times += 1;
            value.rise_times = 0;

            if value.fall_times >= max_fails {
                value.failed = true;
            }
        }
    }
}

如果连接TCP成功则调用add_rise_up将该地址成功连接次数+1,如果成功次数达到最小次数将状态置为不可用。

/// 成功时调用
pub fn add_rise_up(addr: SocketAddr) {
    // 需要写入,获取写入锁
    if let Ok(mut h) = HEALTH_CHECK.write() {
        if !h.health_map.contains_key(&addr) {
            let mut health = HealthRecord::new(h.fail_timeout);
            health.rise_times = 1;
            h.health_map.insert(addr, health);
        } else {
            let min_rises = h.min_rises;
            let value = h.health_map.get_mut(&addr).unwrap();
            // 超出最大的失败时长,重新计算状态
            if Instant::now().duration_since(value.last_record) > value.fail_timeout {
                value.clear_status();
            }
            value.last_record = Instant::now();
            value.rise_times += 1;
            value.fall_times = 0;

            if value.rise_times >= min_rises {
                value.failed = false;
            }
        }
    }
}

接下来我们将TcpStream::connect函数统一替换成HealthCheck::connect外部修改几乎为0,可实现开启健康检查,后续还会有主动式的健康检查。

pub async fn connect<A>(addr: &A) -> io::Result<TcpStream>
    where
        A: ToSocketAddrs,
    {
        let addrs = addr.to_socket_addrs()?;
        let mut last_err = None;

        for addr in addrs {
            // 健康检查失败,直接返回错误
            if Self::is_fall_down(&addr) {
                last_err = Some(io::Error::new(io::ErrorKind::Other, "health check falldown"));
            } else {
                match TcpStream::connect(&addr).await {
                    Ok(stream) => 
                    {
                        Self::add_rise_up(addr);
                        return Ok(stream)
                    },
                    Err(e) => {
                        Self::add_fall_down(addr);
                        last_err = Some(e)
                    },
                }
            }
        }

        Err(last_err.unwrap_or_else(|| {
            io::Error::new(
                io::ErrorKind::InvalidInput,
                "could not resolve to any address",
            )
        }))
    }

效果

在前三次请求的时候,将花费5秒左右才抛出拒绝链接的错误

connect server Err(Os { code: 10061, kind: ConnectionRefused, message: "由于目标计算机积极拒绝,无
法连接。" })

可以发现三次之后,将会快速的抛出错误,达成健康检查的目标

connect server Err(Custom { kind: Other, error: "health check falldown" })

此时被动式的健康检查已完成,后续按需要的话将按需看是否实现主动式的健康检查。

标签:11,let,addr,value,health,fall,健康检查,Rust,wmproxy
From: https://www.cnblogs.com/luojiawaf/p/17758649.html

相关文章

  • 10.11总结
    1.解决了好几个报错①数据库表设置自增,在application.properties中mybatiesplus中要写id_type=auto,实现不写入id数据,数据库进行自增操作②没写注释@Autowired导致的一系列错误③mapper映射找不到,包对不上忽略一个问题就是创建包的时候要用com/example/mapper而不是com.example.......
  • 1011打卡
    1.串联所有单词的子串(30) 思想:哈希表+滑动窗口classSolution{publicList<Integer>findSubstring(Strings,String[]words){intlen=words.length;intwordlen=words[0].length();HashMap<String,Integer>map=newHashMap<>......
  • 每日总结20231011
    代码时间(包括上课)3h代码量(行):100行博客数量(篇):1篇相关事项:1、今天是周三,今天上午上的是软件构造,软件构造讲的是程序规范化。2、今天下午我们进行了献血的演讲的观看,明白了献血的意义。3、今天还打算看看软件设计师相关的题目,我要过,我要通过,我要高分通过!......
  • 20231011
    //eclectic,extreme,halfway,medium,request,tradeoff,giveground,meanmethod,middleway,midwaypoint,strikeahappymediumeclectic-折中的,多元的Eclecticreferstoastyleorapproachthatcombineselementsorideasfromvarioussourcesorstyles.......
  • 144-11
    给定二叉树,删除结点值为x的左右子树利用层次遍历找到结点值为x的左右子树,分别删除;删除算法voidDelete(Tree&T){if(T){Delete(T->lchild);Delete(T->rchild);free(T);}}完整算法#include<stdio.h>#include<stdlib.h>#d......
  • 2023.10.11 一些好题
    A你有\(m\)个相同的球,球有性能\(c\),你可以测试\(x\),若\(x\gec\),那么球会碎掉,若\(x<c\),那么球不碎。性能的范围\(n\le1e5\)。求最多要测试多少次。首先答案有一个上限是\(\logn\)。所以令\(m\to\min(m,\logn)\)所以我们记状态可以记\(dp_{l,r,k}\)表示当前确......
  • 大二打卡(10.11)
    今天做了什么:英语课,今天对于老师上课的小发问都能回答上来,不知道是不是因为坐的稍微靠后心情没那么紧张,脑子活了,听力最后一部分听的不行,比上回好了一点,但有限弄了半天建民的测试,我现在已经不知道自己搞错哪一步了,tomcat重装,connector也下了重装,web项目建立的也没问题,代码也没保......
  • 2023.10.11——每日总结
    学习所花时间(包括上课):9h代码量(行):0行博客量(篇):1篇今天,上午学习,下午学习;我了解到的知识点:1.java的一些特性;明日计划:学习......
  • 10.11(动物园管理员)
    第一版:把每种动物都定义为一个类,漏洞大,每天加一种动物或者动物数量发生变化都会需要对代码进行调整每种动物的喂食函数名也不同packagehomework;publicclasstext{publicstaticvoidmain(Stringargs[]){Feederf=newFeeder("小王");......
  • 2023/10/11 网络的学习
    学习笔记1网络基础1.1 什么是网络网络:计算机网络,电脑和电脑之间通过线缆或其他介质连接起来,并实现相互之间的通信。通信:人与人,人与物,物与物之间通过某种媒介和行为进行沟通。1.2 网络的分类局域网:作用于相对较小区域。例如企业内部网络,校园内部网络等。城域网:作用于城......