首页 > 系统相关 >19. 从零开始编写一个类nginx工具, 配置数据的热更新原理及实现

19. 从零开始编写一个类nginx工具, 配置数据的热更新原理及实现

时间:2023-10-27 14:46:20浏览次数:41  
标签:control set sender 19 nginx 从零开始 let 进程 reuse

wmproxy

wmproxy是由Rust编写,已实现http/https代理,socks5代理, 反向代理,静态文件服务器,内网穿透,配置热更新等, 后续将实现websocket代理等,同时会将实现过程分享出来, 感兴趣的可以一起造个轮子法

项目地址

gite: https://gitee.com/tickbh/wmproxy

github: https://github.com/tickbh/wmproxy

配置数据

数据通常配置在配置文件中,如果需要变更配置,我们通常将配置文件进行更新,并通知程序重新加载配置以便生效。

nginx的变更方式

在nginx中,我们通常用

nginx -s reload

进行数据的安全无缝的重载。在nginx中,是多进程的模式,也就是在nginx -s reload信号发出后master进程通知之前的work进程停止接收新的流,也就是accpet暂停,但是会服务完当前的数据请求,并同时会启用新的work进程来接受新的请求

缺点:nginx只能整体的配置做全部重置,且无法查看当前的配置(除非看配置文件,配置可能被重新修改过和内存中的值可能不匹配)

当前选取的方式

当前选择的是用HTTP请求的方式,也就是对本地的端口进行监听(http://127.0.0.1:8837),对本地端口监听也不会造成对外暴露端口带来的安全问题,这样子可以高度的自定义。具有比较高的活跃性,也可以实时查询内存中的数据。

例如访问:

  • http://127.0.0.1:8837/reload即可通知目标进程重载当前的配置
  • http://127.0.0.1:8837/now即可以知道当前的所有的配置列表
  • http://127.0.0.1:8837/stop即可以关闭当前的进程,停止服务,类似于nginx中的nginx -s stop
  • http://127.0.0.1:8837/adapt加载当前配置,看是否错误,但是不进行应用。
    等功能。

功能实现的原理

  • 单进程
    单进程模式的缺点:如果存在内存泄漏之类的情况,无论如何重载进程都无法将内存恢复,会始终保持较高的内存值直到最终不可用的阶段。如果发生未正确处理的异常,可能会使该进程崩溃的风险,处于无服务状态。
    单进程模式的优点:在当前进程存储的一些有利于加速服务的将会很好的被保留下来(如健康检查的数据),异步进程里正在处理的数据等。无需进行进程间通讯,配合tokio的异步处理可以将单进程的优势完美发挥出来。

  • 端口复用
    无论哪种模式,都需要处理数据重载时,绑定对象的转移TcpListener或者重新绑定TcpListener,在Rust中转移绑定对象相对来说较麻烦后续如果拓展成多进程模式也无法进行转移,所以不考虑用转移所有权的问题。那么此时我们的解决方法就是set_reuse_addressset_reuse_port,不同平台该方法上有不同的表现,我们用的是socket2的封装,用该方法的注意事项:

  • 在windows平台上,不存在set_reuse_port方法,仅调用set_reuse_address即可实现一个地址多次绑定

  • 在linux上,不同的版本上,有些只需调用set_reuse_address即可端口复用,有些需要同时调用set_reuse_port

  • 在macos上,需要调用set_reuse_addressset_reuse_port函数才可实现端口复用

所以这里涉及一个分平台的编码,我们在此使用的是,这和C/C++中的#ifdef WINDOWS类似,但是只能在函数级的做调整,所以此处额外在封装了两个函数来做调用。

/// 非windows平台
#[cfg(not(target_os = "windows"))]
fn set_reuse_port(socket: &Socket, reuse: bool) -> io::Result<()> {
    socket.set_reuse_port(true)?;
    Ok(())
}

/// windows平台,空实现
#[cfg(target_os = "windows")]
fn set_reuse_port(_socket: &Socket, _sreuse: bool) -> io::Result<()> {
    Ok(())
}

然后将原来的TcpListener::bind(addr)函数改成Helper::bind即可无缝切换到支持端口复用的功能,针对代理端及反向代理端:

/// 可端口复用的绑定方式,该端口可能被多个进程同时使用
pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {
    let addrs = addr.to_socket_addrs()?;
    let mut last_err = None;
    for addr in addrs {
        let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?;
        socket.set_nonblocking(true)?;
        let _ = socket.set_only_v6(false);
        socket.set_reuse_address(true)?;
        Self::set_reuse_port(&socket, true)?;
        socket.bind(&addr.into())?;
        match socket.listen(128) {
            Ok(_) => {
                let listener: std::net::TcpListener = socket.into();
                return TcpListener::from_std(listener);
            }
            Err(e) => {
                log::info!("绑定端口地址失败,原因: {:?}", addr);
                last_err = Some(e);
            }
        }
    }

    Err(last_err.unwrap_or_else(|| {
        io::Error::new(
            io::ErrorKind::InvalidInput,
            "could not resolve to any address",
        )
    }))
}

测试功能

测试配置加载reload,一开始我们绑定81的端口

进程启动后改为绑定82的端口,然后调用reload(curl.exe http://127.0.0.1:8837/reload)

此时,再调用stop(curl.exe http://127.0.0.1:8837/stop),正确的预期应该显示关闭,且82端口不可再访问

符合功能预期,初步测试完毕

相关源码实现

以下是启动及发送重载配置的流程示意图

flowchart TD A[加载配置] B[绑定端口] C[控制端] D[服务1] E[服务2] F[控制窗户端] A -->|加载数据后绑定| B B -->|"(1)绑定端口后启动"| C B -->|"(1)异步的方式启动"| D F -->|发送重载入命令| C C -->|"(3)发送关闭服务命令"| D C -->|"(2)启动新的服务后关闭原服务"| E

以下是中控的定义,消息的通知主要通过Sender/Receiver来进行数据的通知。

/// 控制端,可以对配置进行热更新
pub struct ControlServer {
    /// 控制端当前的配置文件,如果部分修改将直接修改数据进行重启
    option: ConfigOption,
    /// 通知服务进行关闭的Sender,服务相关如果收到该消息则停止Accept
    server_sender_close: Option<Sender<()>>,
    /// 通知中心服务的Sender,每个服务拥有一个该Sender,可反向通知中控关闭
    control_sender_close: Sender<()>,
    /// 通知中心服务的Receiver,收到一次则将当前的引用计数-1,如果为0则表示需要关闭服务器
    control_receiver_close: Option<Receiver<()>>,
    /// 服务的引用计数
    count: i32,
}

启动控制终端,接收HTTP的指令和关闭的指令,此时control已经变成了Arc<Mutex<ControlServer>>,方便在各各线程间传播,同步修改数据。

pub async fn start_control(control: Arc<Mutex<ControlServer>>) -> ProxyResult<()> {
    let listener = {
        let value = &control.lock().await.option;
        TcpListener::bind(format!("127.0.0.1:{}", value.control)).await?
    };

    loop {
        let mut receiver = {
            let value = &mut control.lock().await;
            value.control_receiver_close.take()
        };
        
        tokio::select! {
            Ok((conn, addr)) = listener.accept() => {
                let cc = control.clone();
                tokio::spawn(async move {
                    let mut server = Server::new_data(conn, Some(addr), cc);
                    if let Err(e) = server.incoming(Self::operate).await {
                        log::info!("反向代理:处理信息时发生错误:{:?}", e);
                    }
                });
                let value = &mut control.lock().await;
                value.control_receiver_close = receiver;
            }
            _ = Self::receiver_await(&mut receiver) => {
                let value = &mut control.lock().await;
                value.count -= 1;
                log::info!("反向代理:控制端收到关闭信号,当前:{}", value.count);
                if value.count <= 0 {
                    break;
                }
                value.control_receiver_close = receiver;
            }
        }
    }
    Ok(())
}

处理相关消息:

if req.path() == "/reload" {
    // 将重新启动服务器
    let _ = value.do_restart_serve().await;
    return Ok(Response::text()
    .body("重新加载配置成功")
    .unwrap()
    .into_type());
}

if req.path() == "/stop" {
    // 通知控制端关闭,控制端阻塞主线程,如果控制端退出后进程退出
    if let Some(sender) = &value.server_sender_close {
        let _ = sender.send(()).await;
    }
    return Ok(Response::text()
    .body("关闭进程成功")
    .unwrap()
    .into_type());
}

以下是主要的启动代码:

async fn inner_start_server(&mut self, option: ConfigOption) -> ProxyResult<()>  {
    let sender = self.control_sender_close.clone();
    let (sender_no_listen, receiver_no_listen) = channel::<()>(1);
    let sender_close = self.server_sender_close.take();
    // 每次启动的时候将让控制计数+1
    self.count += 1;
    tokio::spawn(async move {
        let mut proxy = Proxy::new(option);
        // 将上一个进程的关闭权限交由下一个服务,只有等下一个服务准备完毕的时候才能关闭上一个服务
        if let Err(e) = proxy.start_serve(receiver_no_listen, sender_close).await {
            log::info!("处理失败服务进程失败: {:?}", e);
        }
        // 每次退出的时候将让控制计数-1,减到0则退出
        let _ = sender.send(()).await;
    });
    self.server_sender_close = Some(sender_no_listen);
    Ok(())
}

结语

此时以不同于nginx的另一种配置的加载已经开发完毕,配置的热加载可以让您更从容的保护好您的系统。

点击 [关注][在看][点赞] 是对作者最大的支持

标签:control,set,sender,19,nginx,从零开始,let,进程,reuse
From: https://www.cnblogs.com/wmproxy/p/17792338.html

相关文章

  • FastAPI学习-19.response 参数-修改状态码
    前言假设你想默认返回一个HTTP状态码为“OK”200。但如果数据不存在,你想创建它,并返回一个HTTP状态码为“CREATED”201。但你仍然希望能够使用response_model过滤和转换你返回的数据。对于这些情况,你可以使用一个response`参数。使用 response 参数status_code设置默认状态码fr......
  • ABC219 H 区间dp 费用提前计算
    ABC219H跟关路灯很像。很容易注意到我们拿走的只能是一个区间,观察n的范围发现区间dp是个好想法。朴素的想法是定义\(f_{i,j,k,0/1}\)为拿走i到j里面的所有数,走了k秒,现在在i/j的方案数。然后发现k太大了。咱当时的想法是希望优化复杂度,把k去掉结果发现不能保证正确性。......
  • P4198 楼房重建
    P4198楼房重建(RE:题解再改造!!)码#include<bits/stdc++.h>#defineMAXN2000010usingnamespacestd;intn,m;intx[MAXN],y[MAXN],ans[MAXN];doubleK[MAXN];intquery(intp,intl,intr,doublemaxn){if(K[p]<=maxn)return0;if(l==r)returnK[p]>maxn;......
  • 干货!分享Nginx搭建web测试报告服务器的落地方案
    Nginx搭建web测试报告服务器的实现思路有这样一个需求:把自动化测试过程中生成的html测试报告能够通过浏览器直接访问查看!实现思路很简单,就是部署一个web服务器,然后把测试报告部署到web服务器的指定目录即可,然后通过http://ip:port/path/报告名称.html的形式进行访问。我们通过ngin......
  • 从零开始:开发知识付费小程序的入门指南
    当下,知识付费小程序成为了一个独具潜力的领域。本篇文章将为您提供一份从零开始的知识付费小程序开发入门指南,让您能够进入这个领域并开始赚取您的专业知识。第一步:什么是知识付费小程序?知识付费小程序是一种基于微信小程序平台的应用,允许用户购买和访问特定领域的知识内容,如在线课......
  • 数据平面的故障排查-《Istio最佳实践》第19章
    https://blog.csdn.net/qq_36073886/article/details/1311169791、krew是一个可以轻松使用kubectl插件的工具,类似于apt、dnf或brew等工具。安装过程见链接1。wgethttps://github.com/kubernetes-sigs/krew/releases/latest/download/krew-linux_amd64.tar.gztar-zxvfkrew-linux......
  • 20231019NOIP训练赛
    20231019NOIP训练赛时间安排7:50-8:50写T18:50-9:30写T29:30-10:30写T3T410:30-11:50写T1总结T2没花时间想,没想到建图题解T1枚举最大公约数,然后统计最大公约数的倍数T2并查集,设u=\(X_{b_i}\),v=\(X_{a_i}\),在u和v间建一条长度为\(c_i\)的边,可以用并查集维护,如果u和v已......
  • windows nginx 开机启动
    1、在安装目录打开命令行,redis-server--service-installredis.windows.conf2、windows+R---->services.msc找到Redis,右键属性,启动类型选择自动,服务状态选择启动。即可开机自启动3、小tips:卸载服务:redis-server--service-uninstall开启服务:redis-server--service-start......
  • 【洛谷 2347】[NOIP1996 提高组] 砝码称重
    题目描述设有 1g1g、2g2g、3g3g、5g5g、10g10g、20g20g 的砝码各若干枚(其总重≤1000≤1000),可以表示成多少种重量?输入格式输入方式:�1,�2,�3,�4,�5,�6a1​,a2​,a3​,a4​,a5​,a6​(表示 1g1g 砝码有 �1a1​ 个,2g2g 砝码有 �2a2​ 个,…,20g20g 砝码有 �6a6​ 个)输出格式......
  • 将nginx的access.log访问日志发送到rsyslog服务器并写入数据库
    nginx.conf(将原日志路径改为rsyslog服务器地址)access_logsyslog:server=10.10.14.64:514,facility=local6main;如果需要入库需要安装相应数据库的依赖包;mysql依赖:yuminstall-y rsyslog-mysql   pgsql依赖:yuminstall-y rsyslog-pgsql  还有很多其他依赖可以用......