首页 > 其他分享 >42 干货系列从零用Rust编写负载均衡及代理,wmproxy中配置tcp转websocket

42 干货系列从零用Rust编写负载均衡及代理,wmproxy中配置tcp转websocket

时间:2024-01-18 09:00:11浏览次数:33  
标签:domain websocket url 42 tcp ws let

wmproxy

wmproxy已用Rust实现http/https代理, socks5代理, 反向代理, 静态文件服务器,四层TCP/UDP转发,七层负载均衡,内网穿透,后续将实现websocket代理等,会将实现过程分享出来,感兴趣的可以一起造个轮子

项目地址

国内: https://gitee.com/tickbh/wmproxy

github: https://github.com/tickbh/wmproxy

设计目标

通过简单配置方便用户快速使用tcp转websocket及websocket转tcp,也可支持http升级到websocket协议。

改造http升级websocket

因为负载均衡的不确定性,在未读取数据前,未能确定当前的处理逻辑

  • /root/proxy.png 访问当前的件服务器
  • /api/up 通过负载均衡访问后端服务器
  • /ws 将连接升级成websocket
  • 其它情况
    所以我们得预备能支持websocket的可能,那我们将同时设置回调HTTP及websocket,源码在reverse/http.rs
let timeout = oper.servers[0].comm.build_client_timeout();
let mut server = Server::builder()
    .addr(addr)
    .timeout_layer(timeout)
    .stream(inbound);

// 设置HTTP回调
server.set_callback_http(Box::new(Operate { inner: oper }));
// 设置websocket回调,客户端有可能升级到websocket协议
server.set_callback_ws(Box::new(ServerWsOperate::new(servers)));
if let Err(e) = server.incoming().await {
    if server.get_req_num() == 0 {
        log::info!("反向代理:未处理任何请求时发生错误:{:?}", e);
    } else {
        if !e.is_io() {
            log::info!("反向代理:处理信息时发生错误:{:?}", e);
        }
    }
}

ServerWsOperate中定义了服务的内部信息,及向远程websocket发送的sender,以做绑定

pub struct ServerWsOperate {
    inner: InnerWsOper,
    sender: Option<Sender<OwnedMessage>>,
}

在on_open的时候建立和远程websocket的双向绑定:


#[async_trait]
impl WsTrait for ServerWsOperate {
    /// 握手完成后之后的回调,服务端返回了Response之后就认为握手成功
    async fn on_open(&mut self, shake: WsHandshake) -> ProtResult<Option<WsOption>> {
        if shake.request.is_none() {
            return Err(ProtError::Extension("miss request"));
        }
        let mut option = WsOption::new();
        if let Some(location) =
            ReverseHelper::get_location_by_req(&self.inner.servers, shake.request.as_ref().unwrap())
        {
            if !location.is_ws {
                return Err(ProtError::Extension("Not Support Ws"));
            }
            if let Ok((url, domain)) = location.get_reverse_url() {
                println!("connect url = {}, domain = {:?}", url, domain);
                let mut client = Client::builder()
                    .url(url)?
                    .connect_with_domain(&domain)
                    .await?;

                let (serv_sender, serv_receiver) = channel::<OwnedMessage>(10);
                let (cli_sender, cli_receiver) = channel::<OwnedMessage>(10);
                option.set_receiver(serv_receiver);
                self.sender = Some(cli_sender);

                client.set_callback_ws(Box::new(ClientWsOperate {
                    sender: Some(serv_sender),
                    receiver: Some(cli_receiver),
                }));

                tokio::spawn(async move {
                    if let Err(e) = client
                        .wait_ws_operate_with_req(shake.request.unwrap())
                        .await
                    {
                        println!("error = {:?}", e);
                    };
                    println!("client close!!!!!!!!!!");
                });
            }
            return Ok(Some(option));
        }
        return Err(ProtError::Extension("miss match"));
    }
}

在此地方,我们是用负载均衡来做配置location.get_reverse_url(),远程端的域名和ip要映射成本地的ip,所以这边可能要读取负载的ip而不是从dns中解析ip。

获取正确的连接域名和ip地址。

pub fn get_reverse_url(&self) -> ProtResult<(Url, String)> {
    if let Some(addr) = self.get_upstream_addr() {
        if let Some(r) = &self.comm.proxy_url {
            let mut url = r.clone();
            let domain = url.domain.clone().unwrap_or(String::new());
            url.domain = Some(format!("{}", addr.ip()));
            url.port = Some(addr.port());
            Ok((url, domain))
        } else {
            let url = Url::parse(format!("http://{}/", addr).into_bytes())?;
            let domain = format!("{}", addr.ip());
            Ok((url, domain))
        }
    } else {
        Err(ProtError::Extension("error"))
    }
}

此处的处理方式与nginx不同,nginx是将所有升级请求的头信息全部删除,再根据配置的过行补充

Upgrade: websocket
Connection: Upgrade

所以在nginx中配置支持websocket通常如下配置,也就是通常配置的时候需要查找资料进行copy

proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";

在wmproxy中并不会对客户端的请求做特殊的处理,也就是发了升级远程的websocket服务器接受了升级,我们当前协议就会升级。所以我们在配置中加入了一个字段is_ws,如果升级成websocket但是并不支持websocket的时候直接进行报错,告知不支持协议

[[http.server.location]]
rule = "/ws"
is_ws = true
reverse_proxy = "http://ws"

如此在该问该url的时候就可以转websocket了,比如websocat ws://127.0.0.1/ws

tcp转websocket

利用上章讲述的StreamToWs,且利用stream流的转发,将转发类型配置tcp2ws非安全的ws,或者tcp2wss带tls的wss,实现源码在reverse/stream.rs

[[stream.server]]
bind_addr = "0.0.0.0:85"
proxy_url = "ws://127.0.0.1:8081/"
bind_mode = "tcp2ws"

这样子,我们就可以将本地监听的85端口的地址,流量转发成8081的websocket远程地址。如果远程端验证域名可以配置上相应的domain = "wmproxy.com"

if s.bind_mode == "tcp2ws" {
    let mut stream_to_ws = StreamToWs::new(inbound, format!("ws://{}", addr))?;
    if domain.is_some() {
        stream_to_ws.set_domain(domain.unwrap());
    }
    let _ = stream_to_ws.copy_bidirectional().await;
}

如此我们就可以轻松的获取tcp流量转websocket的能力。

websocket转tcp

利用上章讲述的WsToStream,且利用stream流的转发,将转发类型配置ws2tcp转发为tcp,实现源码在reverse/stream.rs

[[stream.server]]
bind_addr = "0.0.0.0:86"
up_name = "ws1"
proxy_url = "tcp://127.0.0.1:8082"
bind_mode = "ws2tcp"

这样子,我们就可以将本地监听的86端口websocket的地址,流量转发成8082的tcp远程地址。如果远程端验证域名可以配置上相应的domain = "wmproxy.com"

if s.bind_mode == "ws2tcp" {
    let mut ws_to_stream = WsToStream::new(inbound, addr)?;
    if domain.is_some() {
        ws_to_stream.set_domain(domain.unwrap());
    }
    let _ = ws_to_stream.copy_bidirectional().await;
}

如此我们就可以轻松的获取websocket流量转tcp的能力。

小结

利用wmproxy可以轻松的转化tcp到websocket的流量互转,配置简单。可以利用现成的websocket高速通道辅助我们的tcp程序获取更稳定的流量通道。

点击 [关注][在看][点赞] 是对作者最大的支持

标签:domain,websocket,url,42,tcp,ws,let
From: https://www.cnblogs.com/wmproxy/p/17971725/wmproxy42

相关文章

  • 两种实现TCP并发的方式
    【一】socketserver模块实现TCP并发服务服务端importsocketserverclassRequesterHandle(socketserver.BaseRequestHandler):defhandle(self)->None:print(self.request)print(self.client_address)#self.request相当于TCP协议的conn......
  • P7424 [THUPC2017] 天天爱射击
    [THUPC2017]天天爱射击题目描述小C爱上了一款名字叫做《天天爱射击》的游戏。如图所示,这个游戏有一些平行于\(x\)轴的木板。现在有一些子弹,按顺序沿着\(y\)轴方向向这些木板射去。第\(i\)块木板被\(S_i\)个子弹贯穿以后,就会碎掉消失。一个子弹可以贯穿其弹道上的全部......
  • 使用nginx代理emqx的TCP、WS、WSS连接请求
    项目代理关系: 注:主机上已存在名为:nginx-proxy的一级nginx的代理,将监听了主机的80、443端口docker-compose.ymlversion:"3.7"services:emqx:image:emqx/emqx:4.4.18restart:unless-stoppedcontainer_name:emqxenvironment:EMQX_ADMI......
  • TCP 拥塞控制对数据延迟的影响
    哈喽大家好,我是咸鱼今天分享一篇文章,是关于TCP拥塞控制对数据延迟产生的影响的。作者在服务延迟变高之后进行抓包分析,结果发现时间花在了TCP本身的机制上面:客户端并不是将请求一股脑发送给服务端,而是只发送了一部分,等到接收到服务端的ACK,然后继续再发送,这就造成了额外的RTT......
  • 洛谷 P8426 [JOI Open 2022] 放学路(School Road)
    洛谷传送门LOJ传送门考虑整个图是一个点双怎么做。显然如果有重边并且两条边边权一样就寄了。否则我们可以把它们当成一条边。考虑一个二度点\(u\)和与它相连的边\((v,u),(u,w)\)。我们可以把它缩成边\((v,w)\)。如果新边已经存在并且边权不等于这两条边边权就寄了。......
  • 洛谷题单指南-模拟和高精度-P1042 [NOIP2003 普及组] 乒乓球
    原题链接:https://www.luogu.com.cn/problem/P1042题意解读:分别针对11分制和21分制,输出每局比分。只需要判断一局的结束条件:得分高者如果达到11或者21,且比分间隔大于等于2分,则表示一局结束,可开始下一局,用模拟法即可解决。100分代码:#include<bits/stdc++.h>usingnamespaces......
  • C# websocket服务端实现
    1、创建一个winform项目2、创建websocket服务端类WebSocket_Service.cs1usingSystem;2usingSystem.Collections.Generic;3usingSystem.ComponentModel;4usingSystem.Linq;5usingSystem.Net;6usingSystem.Net.Sockets;7usingSystem.Security......
  • Springboot3+Vue3在进行WebSocket通讯时出现No mapping for GET或者是404
    参考:在SpringBoot中整合、使用WebSocket-spring中文网(springdoc.cn)===============================原代码(此时前端访问后端,后端会出现:NomappingforGET/wspath)前端相关代码:letsocket:WebSocket|null=nullconstsocketURL=`ws://127.0.0.1:8084/w......
  • 网络编程之TCP协议的三次握手和四次挥手
    引言见过比较典型的面试场景是这样的:面试官:请介绍下三次握手求职者:第一次握手就是客户端给服务器端发送一个报文,第二次就是服务器收到报文之后,会应答一个报文给客户端,第三次握手就是客户端收到报文后再给服务器发送一个报文,三次握手就成功了。面试官:然后呢?求职者:这就是三次......
  • 网络编程之基于TCP协议的socket套接字编程
    基于TCP的套接字【1】方法简介tcp是基于链接的必须先启动服务端然后再启动客户端去链接服务端tcp服务端server=socket()#创建服务器套接字server.bind()#把地址绑定到套接字server.listen()#监听链接inf_loop:#服务器无限循环conn=serv......