首页 > 编程语言 >Websocket集群解决方案以及实战(附图文源码)

Websocket集群解决方案以及实战(附图文源码)

时间:2023-09-26 14:32:14浏览次数:44  
标签:Websocket session 消息 服务器 message 图文 源码 服务端 客户端


最近在项目中在做一个消息推送的功能,比如客户下单之后通知给给对应的客户发送系统通知,这种消息推送需要使用到全双工的websocket推送消息。

所谓的全双工表示客户端和服务端都能向对方发送消息。不使用同样是全双工的http是因为http只能由客户端主动发起请求,服务接收后返回消息。websocket建立起连接之后,客户端和服务端都能主动向对方发送消息。

websocket在单机模式下进行消息的发送和接收:

Websocket集群解决方案以及实战(附图文源码)_全双工

用户A和用户B和web服务器建立连接之后,用户A发送一条消息到服务器,服务器再推送给用户B,在单机系统上所有的用户都和同一个服务器建立连接,所有的session都存储在同一个服务器中。

单个服务器是无法支撑几万人同时连接同一个服务器,需要使用到分布式或者集群将请求连接负载均衡到到不同的服务下。消息的发送方和接收方在同一个服务器,这就和单体服务器类似,能成功接收到消息:

Websocket集群解决方案以及实战(附图文源码)_全双工_02


但负载均衡使用轮询的算法,无法保证消息发送方和接收方处于同一个服务器,当发送方和接收方不是在同一个服务器时,接收方是无法接受到消息的:

Websocket集群解决方案以及实战(附图文源码)_搭建_03

websocket集群问题解决思路
客户端和服务端每次建立连接时候,会创建有状态的会话session,服务器的保存维持连接的session。客户端每次只能和集群服务器其中的一个服务器连接,后续也是和该服务器进行数据传输。

要解决集群的问题,应该考虑session共享的问题,客户端成功连接服务器之后,其他服务器也知道客户端连接成功。

方案一:session 共享(不可行)
和websocket类似的http是如何解决集群问题的?解决方案之一就是共享session,客户端登录服务端之后,将session信息存储在Redis数据库中,连接其他服务器时,从Redis获取session,实际就是将session信息存储在Redis中,实现redis的共享。

session可以被共享的前提是可以被序列化,而websocket的session是无法被序列化的,http的session记录的是请求的数据,而websocket的session对应的是连接,连接到不同的服务器,session也不同,无法被序列化。

方案二:ip hash(不可行)
http不使用session共享,就可以使用Nginx负载均衡的ip hash算法,客户端每次都是请求同一个服务器,客户端的session都保存在服务器上,而后续请求都是请求该服务器,都能获取到session,就不存在分布式session问题了。

websocket相对http来说,可以由服务端主动推动消息给客户端,如果接收消息的服务端和发送消息消息的服务端不是同一个服务端,发送消息的服务端无法找到接收消息对应的session,即两个session不处于同一个服务端,也就无法推送消息。如下图所示:

Websocket集群解决方案以及实战(附图文源码)_全双工_04

解决问题的方法是将所有消息的发送方和接收方都处于同一个服务器下,而消息发送方和接收方都是不确定的,显然是无法实现的。

方案三:广播模式

将消息的发送方和接收方都处于同一个服务器下才能发送消息,那么可以转换一下思路,可以将消息以消息广播的方式通知给所有的服务器,可以使用消息中间件发布订阅模式,消息脱离了服务器的限制,通过发送到中间件,再发送给订阅的服务器,类似广播一样,只要订阅了消息,都能接收到消息的通知:

Websocket集群解决方案以及实战(附图文源码)_搭建_05


发布者发布消息到消息中间件,消息中间件再将发送给所有订阅者:

Websocket集群解决方案以及实战(附图文源码)_集群_06

广播模式的实现
搭建单机 websocket
参考以前写的websocket单机搭建 文章,先搭建单机websocket实现消息的推送。

  1. 添加依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-freemarker</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 创建 ServerEndpointExporter 的 bean 实例
    ServerEndpointExporter 的 bean 实例自动注册 @ServerEndpoint 注解声明的 websocket endpoint,使用springboot自带tomcat启动需要该配置,使用独立 tomcat 则不需要该配置。
@Configuration
public class WebSocketConfig {
    //tomcat启动无需该配置
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}
  1. 创建服务端点 ServerEndpoint 和 客户端端
    服务端点
@Component
@ServerEndpoint(value = "/message")
@Slf4j
public class WebSocket {

 private static Map<String, WebSocket> webSocketSet = new ConcurrentHashMap<>();

 private Session session;

 @OnOpen
 public void onOpen(Session session) throws SocketException {
  this.session = session;
  webSocketSet.put(this.session.getId(),this);

  log.info("【websocket】有新的连接,总数:{}",webSocketSet.size());
 }

 @OnClose
 public void onClose(){
  String id = this.session.getId();
  if (id != null){
   webSocketSet.remove(id);
   log.info("【websocket】连接断开:总数:{}",webSocketSet.size());
  }
 }

 @OnMessage
 public void onMessage(String message){
  if (!message.equals("ping")){
   log.info("【wesocket】收到客户端发送的消息,message={}",message);
   sendMessage(message);
  }
 }

 /**
  * 发送消息
  * @param message
  * @return
  */
 public void sendMessage(String message){
  for (WebSocket webSocket : webSocketSet.values()) {
   webSocket.session.getAsyncRemote().sendText(message);
  }
  log.info("【wesocket】发送消息,message={}", message);

 }

}

客户端点

<div>
    <input type="text" name="message" id="message">
    <button id="sendBtn">发送</button>
</div>
<div style="width:100px;height: 500px;" id="content">
</div>
<script src="https://cdn.bootcdn.net/ajax/libs/jquery/3.6.0/jquery.js"></script>
<script type="text/javascript">
    var ws = new WebSocket("ws://127.0.0.1:8080/message");
    ws.onopen = function(evt) {
        console.log("Connection open ...");
    };

    ws.onmessage = function(evt) {
        console.log( "Received Message: " + evt.data);
        var p = $("<p>"+evt.data+"</p>")
        $("#content").prepend(p);
        $("#message").val("");
    };

    ws.onclose = function(evt) {
        console.log("Connection closed.");
    };

    $("#sendBtn").click(function(){
        var aa = $("#message").val();
        ws.send(aa);
    })

</script>

服务端和客户端中的OnOpen、onclose、onmessage都是一一对应的。

服务启动后,客户端ws.onopen调用服务端的@OnOpen注解的方法,储存客户端的session信息,握手建立连接。
客户端调用ws.send发送消息,对应服务端的@OnMessage注解下面的方法接收消息。
服务端调用session.getAsyncRemote().sendText发送消息,对应的客户端ws.onmessage接收消息。
添加 controller

@GetMapping({"","index.html"})
public ModelAndView index() {
 ModelAndView view = new ModelAndView("index");
 return view;
}

效果展示
打开两个客户端,其中的一个客户端发送消息,另一个客户端也能接收到消息。

Websocket集群解决方案以及实战(附图文源码)_全双工_07

添加 RabbitMQ 中间件
这里使用比较常用的RabbitMQ作为消息中间件,而RabbitMQ支持发布订阅模式:

添加消息订阅
交换机使用扇形交换机,消息分发给每一条绑定该交换机的队列。以服务器所在的IP + 端口作为唯一标识作为队列的命名,启动一个服务,使用队列绑定交换机,实现消息的订阅:

@Configuration
public class RabbitConfig {

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("PUBLISH_SUBSCRIBE_EXCHANGE");
    }

    @Bean
    public Queue psQueue() throws SocketException {
        // ip + 端口 为队列名 
        String ip = IpUtils.getServerIp() + "_" + IpUtils.getPort();
        return new Queue("ps_" + ip);
    }

    @Bean
    public Binding routingFirstBinding() throws SocketException {
        return BindingBuilder.bind(psQueue()).to(fanoutExchange());
    }
}

修改服务端点 ServerEndpoint
在WebSocket添加消息的接收方法,@RabbitListener 接收消息,队列名称使用常量命名,动态队列名称使用 #{name},其中的name是Queue的bean 名称:

@RabbitListener(queues= "#{psQueue.name}")
public void pubsubQueueFirst(String message) {
  System.out.println(message);
  sendMessage(message);
}

然后再调用sendMessage方法发送给所在连接的客户端。

修改消息发送
在WebSocket类的onMessage方法将消息发送改成RabbitMQ方式发送:

@OnMessage
public void onMessage(String message){
  if (!message.equals("ping")){
    log.info("【wesocket】收到客户端发送的消息,message={}",message);
    //sendMessage(message);
    if (rabbitTemplate == null) {
      rabbitTemplate = (RabbitTemplate) SpringContextUtil.getBean("rabbitTemplate");
    }
    rabbitTemplate.convertAndSend("PUBLISH_SUBSCRIBE_EXCHANGE", null, message);
  }
}

消息通知流程如下所示:

Websocket集群解决方案以及实战(附图文源码)_集群_08


启动两个实例,模拟集群环境

打开idea的Edit Configurations:

Websocket集群解决方案以及实战(附图文源码)_全双工_09


点击左上角的COPY,然后添加端口server.port=8081:

Websocket集群解决方案以及实战(附图文源码)_网络协议_10


启动两个服务,端口分别是8080和8081。在启动8081端口的服务,将前端连接端口改成8081:

var ws = new WebSocket("ws://127.0.0.1:8081/message");

效果展示

Websocket集群解决方案以及实战(附图文源码)_websocket_11


标签:Websocket,session,消息,服务器,message,图文,源码,服务端,客户端
From: https://blog.51cto.com/u_12763213/7608482

相关文章

  • 在线直播系统源码,取CTreeCtrl控件选中节点的文字
    在线直播系统源码,取CTreeCtrl控件选中节点的文字 voidCAboutDlg::OnSelchangedTree1(NMHDR*pNMHDR,LRESULT*pResult) {NM_TREEVIEW*pNMTreeView=(NM_TREEVIEW*)pNMHDR;//TODO:Addyourcontrolnotificationhandlercodehere    MessageBox(m_tree1.GetIte......
  • LIS实验室(检验科)信息系统源码 C# +.Net+Oracle
    LIS实验室(检验科)信息系统,一体化设计,与其他系统无缝连接,全程化条码管理。集申请、采样、核收、计费、检验、审核、发布、质控、查询、耗材控制等检验科工作为一体的网络管理系统。技术细节:体系结构:Client/Server架构客户端:WPF+WindowsForms服务端:C#+.Net数据库:Oracle接口技术:RESTf......
  • 基于springboot学生请假管理系统-计算机毕业设计源码+LW文档
    摘要:本学生请假管理系统是针对目前学生请假的实际需求,从实际工作出发,对过去的学生请假管理系统存在的问题进行分析,完善用户的使用体会。采用计算机系统来管理信息,取代人工管理模式,查询便利,信息准确率高,节省了开支,提高了工作的效率。本系统结合计算机系统的结构、概念、模型、原理......
  • 基于Spring的大学生竞赛活动平台-计算机毕业设计源码+LW文档
    摘要:本大学生课余休闲平台是针对目前大学生课余休闲平台的实际需求,从实际工作出发,对过去的大学生课余休闲平台存在的问题进行分析,完善用户的使用体会。采用计算机系统来管理信息,取代人工管理模式,查询便利,信息准确率高,节省了开支,提高了工作的效率。本系统结合计算机系统的结构、概......
  • 基于vue.js的社区健康服务管理系统-计算机毕业设计源码+LW文档
    摘要:本社区健康服务管理系统是针对目前社区健康服务管理的实际需求,从实际工作出发,对过去的社区健康服务管理系统存在的问题进行分析,完善用户的使用体会。采用计算机系统来管理信息,取代人工管理模式,查询便利,信息准确率高,节省了开支,提高了工作的效率。本系统结合计算机系统的结构、......
  • PHP手机购物商场源码 麦淘商城 互站价值过万
    这东西打开看一眼看上去搭建不简单,据说价值五位数有兴趣的可以拿去试一下:内有压缩包,提取码:4a6d......
  • pip安装包如何强制使用源码构建
    pip--no-binary是pip命令的一个选项,用于控制是否从二进制分发包中下载Python包。当您使用pip安装Python包时,默认情况下,pip会尝试从预编译的二进制分发包中下载包,以提高安装速度。但有时候,您可能希望强制pip从源代码构建包,而不是使用预编译的二进制包,这可以在以下情况......
  • 非常简洁好看的APP软件下载导航网站源码/APP分享下载页引流导航网站源码带后台
        非常简洁好看的APP软件下载导航网站源码/APP分享下载页引流导航网站源码带后台版,这款源码 安装非常便捷干净,源码只有十几兆只需要上传源码修改连接信息即可。    后台添加应用及轮播广告也非常方便,小白看了都会!tp的后台响应也特别丝滑。材料自取:压缩包内附详细......
  • 2022年抖音最近很火的游戏直播:挤地铁教程+源码+软件
    音最近很火的游戏直播:挤地铁教程+源码+软件先上车先吃肉,卡好后带货,卖号,引私域,接星途广告,接小程序广告,带小游戏赚收益均可。有需要的材料自取:提取码:9jbw ......
  • 使用IDEA下载源码时,始终报错cannot download sources
    注释settings.xml文件以下内容<mirror><id>maven-default-http-blocker</id><mirrorOf>external:http:*</mirrorOf><name>PseudorepositorytomirrorexternalrepositoriesinitiallyusingHTTP.</name&g......