首页 > 其他分享 >SpringBoot集成WebSocket

SpringBoot集成WebSocket

时间:2024-05-07 09:55:05浏览次数:16  
标签:集成 WebSocket SpringBoot void springframework session org import public

SpringBoot集成WebSocket

1.引jar包

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
        <dependency>

2.书写配置类

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * 配置信息
 */
@Configuration
public class WebsocketConfig {

    /**
     * 注入一个ServerEndpointExporter,该Bean会自动注册使用@ServerEndpoint注解声明的websocket endpoint
     * @date 2023/6/20 9:19
     * @return ServerEndpointExporter
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

}

3.使用注解开发一个WebSocket接口类

import org.springframework.stereotype.Component;

import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 监听Websocket接口myWs
 */
@ServerEndpoint("/myWs")
@Component
public class WsServerEndpoint {

    private static Map<String,Session> sessions = new ConcurrentHashMap<>();
    /**
     * 连接成功
     *
     * @param session
     */
    @OnOpen
    public void onOpen(Session session) {
        System.out.println("打开连接");
        sessions.put(session.getId(),session);
    }

    /**
     * 连接关闭
     *
     * @param session
     */
    @OnClose
    public void onClose(Session session) {
        System.out.println("关闭连接");
        sessions.remove(session.getId());
    }

    /**
     * 接收到消息
     *
     * @param text
     */
    @OnMessage
    public void onMsg(String text) throws IOException {
        System.out.println("收到消息:"+text);
        sendMessage();
    }

    /**
     * 发送信息
     */
    public void sendMessage(){
        sessions.forEach((k,v)->{
            try {
                v.getBasicRemote().sendText("hello");
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
    }
}

4.测试

​ 使用apipost或其他测试工具新建一个webSocket接口

​ webSocket接口地址:ws://localhost:8080/myWs

SpringBoot实现WebSocket接口转发

1.书写一个代理处理器

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.socket.BinaryMessage;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.client.WebSocketClient;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;

/**
 * @author wangfan
 */
public class ProxyWebSocketHandler extends AbstractWebSocketHandler {

    private static  final Logger log = LoggerFactory.getLogger(ProxyWebSocketHandler.class);

    private final WebSocketClient client = new StandardWebSocketClient();

    private WebSocketSession targetSession;

    private WebSocketSession thisSession;

    private String targetUrl;


    public ProxyWebSocketHandler(String targetUrl){
        this.targetUrl = targetUrl;
        initTargetSession();
    }
    private void initTargetSession (){
        try {
            ListenableFuture<WebSocketSession> future = client.doHandshake(new ForwardingWebSocketHandler(), this.targetUrl);
            future.addCallback(new WebSocketConnectionCallback());
            targetSession = future.get();
        } catch (Exception e) {
            log.error("目标ws服务链接失败 " + e.getMessage());
        }
    }

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {

        log.info("链接本地会话成功");
        if(targetSession == null || (targetSession != null && !targetSession.isOpen())){
            initTargetSession();
        }else{
            log.info("链接目标会话成功");
        }
        //提取查询参数
        this.thisSession = session;

    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        if (targetSession != null && targetSession.isOpen()) {
            log.info("转发文本消息 {}", message.getPayload());
            targetSession.sendMessage(message);
        } else {
            log.warn("T目标会话不存在或者关闭");
        }
    }

    @Override
    protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
        if (targetSession != null && targetSession.isOpen()) {
            log.info("转发二进制消息");
            targetSession.sendMessage(message);
        } else {
            log.warn("目标会话不存在或者关闭");
        }
    }

    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        log.error("转换错误: {}", exception.getMessage());
        if (session.isOpen()) {
            session.close();
        }
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        log.info("链接关闭: {}", status);
        if (session != null && session.isOpen()) {
            session.close();
        }
    }

    private class ForwardingWebSocketHandler extends AbstractWebSocketHandler {
        @Override
        protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
            log.info("接收到的消息 {}", message.getPayload());
            thisSession.sendMessage(message);
            //ProxyWebSocketHandler.this.handleTextMessage(session, message);
        }

        @Override
        protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
            ProxyWebSocketHandler.this.handleBinaryMessage(session, message);
        }
    }

    private class WebSocketConnectionCallback implements org.springframework.util.concurrent.ListenableFutureCallback<WebSocketSession> {
        @Override
        public void onSuccess(WebSocketSession result) {
            log.info("发布目标对象成功");
            targetSession = result;
        }

        @Override
        public void onFailure(Throwable t) {
            log.error("链接目标对象失败: {}", t.getMessage());
        }
    }
}

2,代理处理器放入容器并开启Websocket接口

import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

/**
 * @author wangfan
 */
@Configuration
@EnableWebSocket
public class ProxyWebSocketConfig implements WebSocketConfigurer {


    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(new ProxyWebSocketHandler("ws://localhost:8080/myWs"),"/webSocket").setAllowedOrigins("*");
    }
}

4.测试

​ 使用apipost或其他测试工具新建一个webSocket接口

​ webSocket接口地址:ws://localhost:8081/webSocket

​ 查看消息被转发到ws://localhost:8080/myWs,且收到该服务的消息

标签:集成,WebSocket,SpringBoot,void,springframework,session,org,import,public
From: https://www.cnblogs.com/WangJingjun/p/18176590

相关文章

  • websocket
    websocket,web版的socket原web中:http协议,无状态&短链接客户端主动连接服务器客户端向服务器发送消息,服务器收到返回数据客户端收到数据断开连接https一些+对数据进行加密。我们在开发过程中想要保留一些状态信息,基于cookie来做现在支持:http协议。一次请求一次响......
  • springboot在2.4以后版本使用application.yml替换bootstrap.yml
    首先确认你的springboot版本是高于2.4的版本的,然后移除以下依赖<!--<dependency>--><!--<groupId>org.springframework.cloud</groupId>--><!--<artifactId>spring-cloud-starter-bootstrap</artifactId>--><!--</d......
  • springboot~CompletableFuture并行计算
    在Spring中,CompletableFuture通常用于异步编程,可以方便地处理异步任务的执行和结果处理,CompletableFuture是Java8引入的一个类,用于支持异步编程和并发操作。它基于Future和CompletionStage接口,提供了丰富的方法来处理异步任务的执行和结果处理。下面是CompletableFuture......
  • web server apache tomcat11-31-websocket
    前言整理这个官方翻译的系列,原因是网上大部分的tomcat版本比较旧,此版本为v11最新的版本。开源项目从零手写实现tomcatminicat别称【嗅虎】心有猛虎,轻嗅蔷薇。系列文章webserverapachetomcat11-01-官方文档入门介绍webserverapachetomcat11-02-setup启动web......
  • 3. SpringBoot 整合第三方技术
    1.整合Junit一般来说是不需要进行处理的,因为在创建SpringBoot工程时,会自动整合junit​的要说怎么配置的话?也可以写一下相关的配置:以下就是SpringBoot整合Junit相关步骤导入相关依赖<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-b......
  • Camunda 整合SpringBoot基本Api
    代码实现:需要接口@AutowiredprivateRuntimeServiceruntimeService;@AutowiredprivateRepositoryServicerepositoryService;@AutowiredprivateTaskServicetaskService;发布流程:@GetMapping("/deploy")publicObjectdeploy(){......
  • SpringBoot3.1.5对应新版本SpringCloud开发(2)-Eureka的负载均衡
    Eureka的负载均衡负载均衡原理负载均衡流程老版本流程介绍当order-servic发起的请求进入Ribbon后会被LoadBalancerInterceptor负载均衡拦截器拦截,拦截器获取到请求中的服务名称,交给RibbonLoadBanlancerCient,然后RibbonLoadBanlancerCient会将服务名称当作服务id交给Dynamic......
  • 测试 springboot 项目苍穹外卖,解决 Unable to connect to Redis 错误问题
       使用IDEA启动springboot项目苍穹外卖后台项目sky-take-out,测试“菜品批量删除”接口时,能够正常完成操作,但是服务器始终显示下面错误信息:2024-05-0320:54:24.134ERROR24360---[nio-8181-exec-3]o.a.c.c.C.[.[.[/].[dispatcherServlet]  :Servlet.service()fo......
  • springboot为什么要用延迟导入?
    SpringBoot使用了多种方式来实现自动配置,其中DeferredImportSelector接口是这些机制之一。DeferredImportSelector是ImportSelector的一个扩展,它允许延迟导入配置类直到所有@Configuration类都被处理完毕。这对于某些自动配置类需要在应用程序上下文的创建过程中的后期阶段才能......
  • webapi添加添加websocket中间件
    添加位置我按照MSDN的例子添加了一个复述客户端响应的中间件。需要注意的时,中间件采用那种方式添加,添加在哪。哪种方式我选择创建一条管道分支,只要时ws的连接请求,就转到这个分支因此,我们需要使用的是MapWhen()来创建管道分支。添加在哪要注意授权的问题,所以应该添加到授权......