首页 > 其他分享 >SpringBoot整合WebSocket实践

SpringBoot整合WebSocket实践

时间:2024-07-23 16:33:01浏览次数:17  
标签:WebSocket SpringBoot 实践 wave 会话 muly import public String

简介

先来看下维基百科WebSocket的简介:

WebSocket是一种与HTTP不同的协议。两者都位于OSI模型应用层,并且都依赖于传输层的TCP协议。 虽然它们不同,但是RFC 6455中规定:it is designed to work over HTTP ports 80 and 443 as well as to support HTTP proxies and intermediaries(WebSocket通过HTTP端口80和443进行工作,并支持HTTP代理和中介),从而使其与HTTP协议兼容。 为了实现兼容性,WebSocket握手使用HTTP Upgrade头[1]从HTTP协议更改为WebSocket协议。

WebSocket协议支持Web浏览器(或其他客户端应用程序)与Web服务器之间的交互,具有较低的开销,便于实现客户端与服务器的实时数据传输。 服务器可以通过标准化的方式来实现,而无需客户端首先请求内容,并允许消息在保持连接打开的同时来回传递。通过这种方式,可以在客户端和服务器之间进行双向持续对话。 通信通过TCP端口80或443完成,这在防火墙阻止非Web网络连接的环境下是有益的。另外,Comet之类的技术以非标准化的方式实现了类似的双向通信。

大多数浏览器都支持该协议,包括Google ChromeFirefoxSafariMicrosoft EdgeInternet ExplorerOpera

WebSocket协议规范将ws(WebSocket)和wss(WebSocket Secure)定义为两个新的统一资源标识符(URI)方案,分别对应明文和加密连接。除了方案名称和片段ID(不支持#)之外,其余的URI组件都被定义为此URI的通用语法。

使用浏览器开发人员工具,开发人员可以检查WebSocket握手以及WebSocket框架。

WebSocket用于前端(Web浏览器)和后端(Web服务器)保持长时间连接及数据实时传输。

客户端首先请求内容,并允许消息在保持连接打开的同时来回传递,通过这种方式,可以在客户端和服务器之间进行双向持续对话。

扩展:维持http长连接的几种方式有以下几种,具体可网上搜索相关实现,这里只介绍WebSocket保持长连接。

  • ajax 轮询
  • long poll 轮询
  • iframe 长连接
  • WebSocket.

以下开始SpringBoot整合WebSocket

引入依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.75</version>
    </dependency>
    <dependency>
        <groupId>cn.hutool</groupId>
        <artifactId>hutool-all</artifactId>
        <version>5.7.9</version>
    </dependency>

开启WebSocket支持

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * 开启WebSocket支持
 *
 * @author wave-muly
 * @date 2021/6/21 下午5:01
 */
@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

会话操作接口封装

如果需要客户端服务端消息传输,WebSocket提供了javax.websocket.Session类,这里只用到其中几个方法

  • socketChannel.getBasicRemote().sendText(msg);
  • socketChannel.getAsyncRemote().sendText(msg);
  • socketChannel.isOpen();
  • socketChannel.close();

这里定义四个方法,主要是对javax.websocket.Session类操作的封装

以下是该接口的包装实现类

/**
 * socket会话操作接口
 * <p>
 * 该接口面向会话,须基于会话的通道调用。
 * 该接口支持扩展,可参考WebSocket模块中{@link com.muly.wave.socket.websocket.operator.channel}包下的类
 *
 * @author wave-muly
 * @date 2021/6/1 上午11:46
 */
public interface SocketSessionOperatorApi {

    /**
     * 写出数据,经过责任链
     *
     * @author wave-muly
     * @date 2021/6/1 上午11:48
     **/
    void writeAndFlush(Object obj);

    /**
     * 写出数据,不经过责任链
     *
     * @author wave-muly
     * @date 2021/6/1 上午11:48
     **/
    void writeToChannel(Object obj);

    /**
     * 关闭会话
     *
     * @author wave-muly
     * @date 2021/6/1 上午11:48
     **/
    void close();

    /**
     * 是否存活
     *
     * @return {@link boolean}
     * @author wave-muly
     * @date 2021/6/1 上午11:50
     **/
    boolean isInvalid();
}
import com.muly.wave.socket.api.session.SocketSessionOperatorApi;

/**
 * 对Api模块的操作类进行扩展
 * <p>
 * 暂时只写接口,SocketOperatorApi方法不够用时再对此类进行扩展
 *
 * @author wave-muly
 * @date 2021/6/1 下午3:44
 */
public interface SocketChannelExpandInterFace extends SocketSessionOperatorApi {

}
import com.alibaba.fastjson.JSON;

import javax.websocket.Session;
import java.io.IOException;

/**
 * Socket操作类实现
 * <p>
 * 简单封装Spring Boot的默认WebSocket
 *
 * @author wave-muly
 * @date 2021/6/1 下午3:41
 */
public class WaveSocketOperator implements SocketChannelExpandInterFace {

    /**
     * 实际操作的通道
     */
    private Session socketChannel;

    public WaveSocketOperator(Session socketChannel) {
        this.socketChannel = socketChannel;
    }

    @Override
    public void writeAndFlush(Object obj) {
        try {
            if (socketChannel.isOpen()) {
                socketChannel.getBasicRemote().sendText(JSON.toJSONString(obj));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void writeToChannel(Object obj) {
        if (socketChannel.isOpen()) {
            socketChannel.getAsyncRemote().sendText(JSON.toJSONString(obj));
        }
    }

    @Override
    public void close() {
        try {
            if (socketChannel.isOpen()) {
                socketChannel.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public boolean isInvalid() {
        return socketChannel.isOpen();
    }
}

SocketSession

该类持有会话操作接口SocketSessionOperatorApi

import com.muly.wave.socket.api.session.SocketSessionOperatorApi;
import lombok.Data;

/**
 * Socket会话
 *
 * @author wave-muly
 * @date 2021/6/1 上午11:28
 */
@Data
public class SocketSession<T extends SocketSessionOperatorApi> {

    /**
     * 会话ID,每一个新建的会话都有(目前使用通道ID)
     */
    private String sessionId;

    /**
     * 会话唯一标识
     */
    private String userId;

    /**
     * 该会话监听的消息类型
     */
    private String messageType;

    /**
     * token信息
     */
    private String token;

    /**
     * 连接时间
     */
    private Long connectionTime;

    /**
     * 最后活跃时间
     */
    private Long lastActiveTime;

    /**
     * 操作API
     */
    private T socketOperatorApi;

    /**
     * 自定义数据
     */
    private Object data;

}

接下来再定义个存储SocketSession的会话中心类

SessionCenter会话中心

import cn.hutool.core.util.ObjectUtil;
import com.muly.wave.socket.api.session.pojo.SocketSession;
import com.muly.wave.socket.websocket.operator.channel.WaveSocketOperator;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
 * 会话中心
 * <p>
 * 维护所有的会话
 *
 * @author wave-muly
 * @date 2021/6/1 下午1:43
 */
public class SessionCenter {

    /**
     * 所有用户会话维护
     */
    private static ConcurrentMap<String, List<SocketSession<WaveSocketOperator>>> socketSessionMap = new ConcurrentHashMap<>();

    /**
     * 获取维护的所有会话
     *
     * @return {@link ConcurrentMap< String, SocketSession< WaveSocketOperator >>}
     * @author wave-muly
     * @date 2021/6/1 下午2:13
     **/
    public static ConcurrentMap<String, List<SocketSession<WaveSocketOperator>>> getSocketSessionMap() {
        return socketSessionMap;
    }

    /**
     * 根据用户ID获取会话信息列表
     *
     * @param userId 用户ID
     * @return {@link SocketSession <GunsSocketOperator>}
     * @author wave-muly
     * @date 2021/6/1 下午1:48
     **/
    public static List<SocketSession<WaveSocketOperator>> getSessionByUserId(String userId) {
        return socketSessionMap.get(userId);
    }

    /**
     * 根据用户ID和消息类型获取会话信息列表
     *
     * @param userId 用户ID
     * @return {@link SocketSession <GunsSocketOperator>}
     * @author wave-muly
     * @date 2021/6/1 下午1:48
     **/
    public static List<SocketSession<WaveSocketOperator>> getSessionByUserIdAndMsgType(String userId) {
        return socketSessionMap.get(userId);
    }

    /**
     * 根据会话ID获取会话信息
     *
     * @param sessionId 会话ID
     * @return {@link SocketSession <GunsSocketOperator>}
     * @author wave-muly
     * @date 2021/6/1 下午1:48
     **/
    public static SocketSession<WaveSocketOperator> getSessionBySessionId(String sessionId) {
        for (List<SocketSession<WaveSocketOperator>> values : socketSessionMap.values()) {
            for (SocketSession<WaveSocketOperator> session : values) {
                if (sessionId.equals(session.getSessionId())) {
                    return session;
                }
            }
        }
        return null;
    }

    /**
     * 设置会话
     *
     * @param socketSession 会话详情
     * @author wave-muly
     * @date 2021/6/1 下午1:49
     **/
    public static void addSocketSession(SocketSession<WaveSocketOperator> socketSession) {
        List<SocketSession<WaveSocketOperator>> socketSessions = socketSessionMap.get(socketSession.getUserId());
        if (ObjectUtil.isEmpty(socketSessions)) {
            socketSessions = Collections.synchronizedList(new ArrayList<>());
            socketSessionMap.put(socketSession.getUserId(), socketSessions);
        }
        socketSessions.add(socketSession);
    }

    /**
     * 连接关闭
     *
     * @param sessionId 会话ID
     * @author wave-muly
     * @date 2021/6/1 下午3:25
     **/
    public static void closed(String sessionId) {
        Set<Map.Entry<String, List<SocketSession<WaveSocketOperator>>>> entrySet = socketSessionMap.entrySet();
        Iterator<Map.Entry<String, List<SocketSession<WaveSocketOperator>>>> iterator = entrySet.iterator();
        while (iterator.hasNext()) {
            Map.Entry<String, List<SocketSession<WaveSocketOperator>>> next = iterator.next();
            List<SocketSession<WaveSocketOperator>> value = next.getValue();
            if (ObjectUtil.isNotEmpty(value)) {
                value.removeIf(GunsSocketOperatorSocketSession -> GunsSocketOperatorSocketSession.getSessionId().equals(sessionId));
            }
        }
    }
}

这个类的主要作用是所有用户会话的维护,存储在支持并发访问的ConcurrentHashMap里,定义的方法主要是用来操作map来实现会话的加入、关闭及查询。

Socket通用操作类

有了操作javax.websocket.Session类及会话中心,现在封装一个服务端通过会话中心拿到Session给客户端发送消息的接口.

定义一个SocketOperatorApi接口:

import com.muly.wave.socket.api.exception.SocketException;
import com.muly.wave.socket.api.message.SocketMsgCallbackInterface;

/**
 * Socket通用操作类
 * <p>
 * 可通过该类直接发送消息,每一个Socket实现的子模块必须实现该接口,以提供统一的操作API
 *
 * @author wave-muly
 * @date 2021/6/2 上午9:25
 */
public interface SocketOperatorApi {

    /**
     * 发送消息到指定会话
     *
     * @param msgType   消息类型可参考{@link com.muly.wave.socket.api.enums}枚举类
     * @param sessionId 会话ID
     * @param msg       消息体
     * @author wave-muly
     * @date 2021/6/11 下午2:19
     **/
    void sendMsgOfUserSessionBySessionId(String msgType, String sessionId, Object msg) throws SocketException;

    /**
     * 发送消息到指定用户的所有会话
     * <p>
     * 如果用户同一个消息类型建立了多个会话,则统一全部发送
     *
     * @param msgType 消息类型可参考{@link com.muly.wave.socket.api.enums}枚举类
     * @param userId  用户ID
     * @param msg     消息体
     * @author wave-muly
     * @date 2021/6/2 上午9:35
     **/
    void sendMsgOfUserSession(String msgType, String userId, Object msg) throws SocketException;

    /**
     * 发送消息到所有会话
     *
     * @param msgType 消息类型可参考{@link com.muly.wave.socket.api.enums}枚举类
     * @param msg     消息体
     * @author wave-muly
     * @date 2021/6/2 上午9:35
     **/
    void sendMsgOfAllUserSession(String msgType, Object msg);

    /**
     * 根据会话id关闭会话
     *
     * @param socketId 会话id
     * @author wave-muly
     * @date 2021/8/13 16:00
     **/
    void closeSocketBySocketId(String socketId);

    /**
     * 监听指定类型消息
     * <p>
     * 1.该方法每调用一次即注册一个监听,同一个消息类型多次调用只有最后一次生效
     *
     * @param msgType           消息类型可参考{@link com.muly.wave.socket.api.enums}枚举类
     * @param callbackInterface 消息监听器
     * @author wave-muly
     * @date 2021/6/2 上午9:54
     **/
    void msgTypeCallback(String msgType, SocketMsgCallbackInterface callbackInterface);
}

以下为该接口的实现类,该实现类主要用于服务端向客户端发送消息

import cn.hutool.core.util.ObjectUtil;
import com.muly.wave.socket.api.SocketOperatorApi;
import com.muly.wave.socket.api.exception.SocketException;
import com.muly.wave.socket.api.exception.enums.SocketExceptionEnum;
import com.muly.wave.socket.api.message.SocketMsgCallbackInterface;
import com.muly.wave.socket.api.session.pojo.SocketSession;
import com.muly.wave.socket.websocket.message.SocketMessageCenter;
import com.muly.wave.socket.websocket.operator.channel.WaveSocketOperator;
import com.muly.wave.socket.websocket.pojo.WebSocketMessageDTO;
import com.muly.wave.socket.websocket.session.SessionCenter;
import org.springframework.stereotype.Component;

import java.util.Collection;
import java.util.List;

/**
 * WebSocket操作实现类
 * <p>
 * 如果是Spring boot项目,通过注入SocketOperatorApi接口操作socket,需将本来交给Spring管理
 *
 * @author wave-muly
 * @date 2021/6/2 上午10:41
 */
@Component
public class WebSocketOperator implements SocketOperatorApi {

    @Override
    public void sendMsgOfUserSessionBySessionId(String msgType, String sessionId, Object msg) throws SocketException {
        SocketSession<WaveSocketOperator> session = SessionCenter.getSessionBySessionId(sessionId);
        if (ObjectUtil.isEmpty(session)) {
            throw new SocketException(SocketExceptionEnum.SESSION_NOT_EXIST);
        }
        WebSocketMessageDTO webSocketMessageDTO = new WebSocketMessageDTO();
        webSocketMessageDTO.setData(msg);
        webSocketMessageDTO.setServerMsgType(msgType);
        session.getSocketOperatorApi().writeAndFlush(webSocketMessageDTO);
    }

    @Override
    public void sendMsgOfUserSession(String msgType, String userId, Object msg) throws SocketException {
        // 根据用户ID获取会话
        List<SocketSession<WaveSocketOperator>> socketSessionList = SessionCenter.getSessionByUserIdAndMsgType(userId);
        if (ObjectUtil.isEmpty(socketSessionList)) {
            throw new SocketException(SocketExceptionEnum.SESSION_NOT_EXIST);
        }
        WebSocketMessageDTO webSocketMessageDTO = new WebSocketMessageDTO();
        webSocketMessageDTO.setData(msg);
        webSocketMessageDTO.setServerMsgType(msgType);
        for (SocketSession<WaveSocketOperator> session : socketSessionList) {
            // 发送内容
            session.getSocketOperatorApi().writeAndFlush(webSocketMessageDTO);
        }
    }

    @Override
    public void sendMsgOfAllUserSession(String msgType, Object msg) {
        Collection<List<SocketSession<WaveSocketOperator>>> values = SessionCenter.getSocketSessionMap().values();
        WebSocketMessageDTO webSocketMessageDTO = new WebSocketMessageDTO();
        webSocketMessageDTO.setData(msg);
        webSocketMessageDTO.setServerMsgType(msgType);
        for (List<SocketSession<WaveSocketOperator>> sessions : values) {
            for (SocketSession<WaveSocketOperator> session : sessions) {
                // 找到该类型的通道
                if (session.getMessageType().equals(msgType)) {
                    session.getSocketOperatorApi().writeAndFlush(webSocketMessageDTO);
                }
            }
        }
    }

    @Override
    public void closeSocketBySocketId(String socketId) {
        SessionCenter.closed(socketId);
    }

    @Override
    public void msgTypeCallback(String msgType, SocketMsgCallbackInterface callbackInterface) {
        SocketMessageCenter.setMessageListener(msgType, callbackInterface);
    }
}

该接口涉及到一个WebSocket交互通用对象

import com.muly.wave.socket.api.SocketOperatorApi;
import lombok.Data;

/**
 * WebSocket交互通用对象
 *
 * @author wave-muly
 * @date 2021/6/1 下午2:56
 */
@Data
public class WebSocketMessageDTO {

    /**
     * 服务端发送的消息类型(客户端如果需要监听该消息类型,注册对应的消息处理器即可)
     */
    private String serverMsgType;

    /**
     * 客户端发送的消息类型(服务端需要处理的消息类型)
     */
    private String clientMsgType;

    /**
     * 目标Id
     */
    private String toUserId;

    /**
     * 发送者ID
     */
    private String formUserId;

    /**
     * 具体发送的数据
     */
    private Object data;

}

鉴于以上多次提到了,msgType这个属性,这个类型定义了消息类型,用于区分服务端与客户端之间消息通讯的类型

以下定义了三种类型的枚举方便管理,

  • SystemMessageTypeEnum

    该枚举适用于服务端监听首次连接和断开连接,以S00开头

    import lombok.Getter;
    
    /**
     * 服务端监听器枚举
     * <p>
     * 说明:该枚举适用于服务端监听首次连接和断开连接
     *
     * @author wave-muly
     * @date 2021/6/3 上午9:14
     */
    @Getter
    public enum SystemMessageTypeEnum {
    
        /**
         * 监听首次连接
         */
        SYS_LISTENER_ONOPEN("S00001", "监听首次连接"),
    
        /**
         * 监听断开连接
         */
        SYS_LISTENER_ONCLOSE("S00002", "监听断开连接"),
    
        /**
         * 监听异常信息
         */
        SYS_LISTENER_ONERROR("S00003", "监听异常信息");
    
        private final String code;
    
        private final String name;
    
        SystemMessageTypeEnum(String code, String name) {
            this.code = code;
            this.name = name;
        }
    }
    
  • ServerMessageTypeEnum

    该枚举适用于服务器推送给客户端消息时使用,以100开头

    import lombok.Getter;
    
    /**
     * 服务端消息类型枚举
     * <p>
     * 说明:该枚举适用于服务器推送给客户端消息时使用
     *
     * @author wave-muly
     * @date 2021/6/3 上午9:14
     */
    @Getter
    public enum ServerMessageTypeEnum {
    
        /**
         * 系统通知消息类型
         */
        SYS_NOTICE_MSG_TYPE("100001", "系统通知消息类型"),
    
        /**
         * 连接消息回复
         */
        SYS_REPLY_MSG_TYPE("100002", "连接消息回复");
    
        private final String code;
    
        private final String name;
    
        ServerMessageTypeEnum(String code, String name) {
            this.code = code;
            this.name = name;
        }
    }
    
  • ClientMessageTypeEnum

    该枚举适用于服务器接收到客户端发来的消息,判断消息类型时使用,以200开头

    import lombok.Getter;
    
    /**
     * 客户端消息类型枚举
     * <p>
     * 说明:该枚举适用于服务器接收到客户端发来的消息,判断消息类型时使用
     *
     * @author wave-muly
     * @date 2021/6/3 上午9:14
     */
    @Getter
    public enum ClientMessageTypeEnum {
    
        /**
         * 用户连接鉴权
         */
        USER_CONNECTION_AUTHENTICATION("200000", "用户连接鉴权"),
    
        /**
         * 用户心跳消息类型
         */
        USER_HEART("299999", "用户心跳消息类型");
    
        private final String code;
    
        private final String name;
    
        ClientMessageTypeEnum(String code, String name) {
            this.code = code;
            this.name = name;
        }
    }
    

    特殊说明一下serverMsgTypeclientMsgType的区别
    1.serverMsgType字段是服务端发送给客户端的字段
    2.clientMsgType字段是客户端发送给服务器的字段
    例如:客户端发送给服务器一个心跳消息(type:299999),服务端如果需要处理该消息就注册一个该消息的监听器,那么收到消息服务端会把消息推送给对应的监听器,接口见

上述四个方法中,还涉及到一个方法void msgTypeCallback(String msgType, SocketMsgCallbackInterface callbackInterface);

其中SocketMsgCallbackInterface为Socket消息接收回调接口,根据枚举类型注入相应的消息处理方法,

import com.muly.wave.socket.api.session.pojo.SocketSession;

/**
 * Socket消息接收回调接口
 *
 * @author wave-muly
 * @date 2021/6/2 上午9:53
 */
@FunctionalInterface
public interface SocketMsgCallbackInterface {

    /**
     * 收到消息的回调
     *
     * @param msgType       消息类型
     * @param msg           消息体
     * @param socketSession 本次通信的会话
     * @author wave-muly
     * @date 2021/6/2 上午9:51
     **/
    void callback(String msgType, Object msg, SocketSession socketSession);
}

该接口是一个函数式接口,可以使用java8的lambda表达式注入回调方法。

再次构造个消息回调方法中心,用于存储各种消息类型的消息回调方法处理。messageListenerMap的key即为msgType

import com.muly.wave.socket.api.message.SocketMsgCallbackInterface;
import java.util.HashMap;
import java.util.Map;

/**
 * 会话消息中心
 * <p>
 * 维护所有消息类型对应的处理器
 *
 * @author wave-muly
 * @date 2021/6/1 下午2:20
 */
public class SocketMessageCenter {

    /**
     * 所有消息监听器维护
     */
    private static Map<String, SocketMsgCallbackInterface> messageListenerMap = new HashMap<>();

    /**
     * 设置消息类型的监听器
     *
     * @param msgType  消息类型
     * @param listener 监听器
     * @author wave-muly
     * @date 2021/6/1 下午2:25
     **/
    public static void setMessageListener(String msgType, SocketMsgCallbackInterface listener) {
        messageListenerMap.put(msgType, listener);
    }

    /**
     * 获取消息监听器
     *
     * @param msgType 消息类型
     * @return {@link SocketMsgCallbackInterface}
     * @author wave-muly
     * @date 2021/6/1 下午2:26
     **/
    public static SocketMsgCallbackInterface getSocketMsgCallbackInterface(String msgType) {
        return messageListenerMap.get(msgType);
    }
}

消息监听处理器WebSocketServer

该类为WebSocket的入口类,用于处理和客户端的连接关闭及接收客户端发送的消息

import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSON;
import com.muly.wave.socket.api.enums.ClientMessageTypeEnum;
import com.muly.wave.socket.api.enums.ServerMessageTypeEnum;
import com.muly.wave.socket.api.enums.SystemMessageTypeEnum;
import com.muly.wave.socket.api.message.SocketMsgCallbackInterface;
import com.muly.wave.socket.api.session.pojo.SocketSession;
import com.muly.wave.socket.websocket.message.SocketMessageCenter;
import com.muly.wave.socket.websocket.operator.channel.WaveSocketOperator;
import com.muly.wave.socket.websocket.pojo.WebSocketMessageDTO;
import com.muly.wave.socket.websocket.session.SessionCenter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

/**
 * 消息监听处理器
 *
 * @author wave-muly
 * @date 2021/6/1 下午2:35
 */
@Slf4j
@ServerEndpoint(value = "/webSocket/{token}")
@Component
public class WebSocketServer {

    /**
     * 连接建立调用的方法
     * <p>
     * 暂时无用,需要在建立连接的时候做一些事情的话可以修改这里
     *
     * @param session 会话信息
     * @author wave-muly
     * @date 2021/6/21 下午5:14
     **/
    @OnOpen
    public void onOpen(Session session, @PathParam("token") String token) {
        String userId = token;
        
        // todo 这里可以加上token校验处理获取用户信息
//        try {
//            // 解析用户信息
//            DefaultJwtPayload defaultPayload = JwtContext.me().getDefaultPayload(token);
//            userId = defaultPayload.getUserId().toString();
//        } catch (io.jsonwebtoken.JwtException e) {
//            try {
//                session.close();
//            } catch (IOException ioException) {
//                ioException.printStackTrace();
//            }
//        }

        // 操作api包装
        WaveSocketOperator gunsSocketOperator = new WaveSocketOperator(session);

        // 回复消息
        WebSocketMessageDTO replyMsg = new WebSocketMessageDTO();
        replyMsg.setServerMsgType(ServerMessageTypeEnum.SYS_REPLY_MSG_TYPE.getCode());
        replyMsg.setToUserId(userId);

        // 创建会话对象
        SocketSession<WaveSocketOperator> socketSession = new SocketSession<>();
        try {
            // 设置回复内容
            replyMsg.setData(session.getId());
            socketSession.setSessionId(session.getId());
            socketSession.setUserId(userId);
            socketSession.setSocketOperatorApi(gunsSocketOperator);
            socketSession.setToken(token);
            socketSession.setConnectionTime(System.currentTimeMillis());

            // 维护会话
            SessionCenter.addSocketSession(socketSession);
        } finally {
            // 回复消息
            gunsSocketOperator.writeAndFlush(replyMsg);

            // 触发首次连接回调
            SocketMsgCallbackInterface socketMsgCallbackInterface = SocketMessageCenter.getSocketMsgCallbackInterface(SystemMessageTypeEnum.SYS_LISTENER_ONOPEN.getCode());
            if (ObjectUtil.isNotEmpty(socketMsgCallbackInterface)) {
                // 触发回调
                socketMsgCallbackInterface.callback(SystemMessageTypeEnum.SYS_LISTENER_ONOPEN.getCode(), null, socketSession);
            }
        }

    }

    /**
     * 连接关闭调用的方法
     *
     * @param session 会话信息
     * @author wave-muly
     * @date 2021/6/21 下午5:14
     **/
    @OnClose
    public void onClose(Session session) {
        try {
            SocketSession<WaveSocketOperator> socketSession = SessionCenter.getSessionBySessionId(session.getId());
            // 触发首次连接回调
            SocketMsgCallbackInterface socketMsgCallbackInterface = SocketMessageCenter.getSocketMsgCallbackInterface(SystemMessageTypeEnum.SYS_LISTENER_ONCLOSE.getCode());
            if (ObjectUtil.isNotEmpty(socketMsgCallbackInterface)) {
                // 触发回调
                socketMsgCallbackInterface.callback(SystemMessageTypeEnum.SYS_LISTENER_ONCLOSE.getCode(), null, socketSession);
            }
        } finally {
            SessionCenter.closed(session.getId());
        }
    }

    /**
     * 收到消息调用的方法
     *
     * @param message        接收到的消息
     * @param socketChannel 会话信息
     * @author wave-muly
     * @date 2021/6/21 下午5:14
     **/
    @OnMessage
    public void onMessage(String message, Session socketChannel) {

        // 转换为Java对象
        WebSocketMessageDTO webSocketMessageDTO = JSON.parseObject(message, WebSocketMessageDTO.class);

        // 维护通道是否已初始化
        SocketSession<WaveSocketOperator> socketSession = SessionCenter.getSessionBySessionId(socketChannel.getId());

        // 心跳包
        if (ObjectUtil.isNotEmpty(socketSession) && ClientMessageTypeEnum.USER_HEART.getCode().equals(webSocketMessageDTO.getClientMsgType())) {
            // 更新会话最后活跃时间
            if (ObjectUtil.isNotEmpty(socketSession)) {
                socketSession.setLastActiveTime(System.currentTimeMillis());
            }
            return;
        }

        // 用户ID为空不处理直接跳过
        if (ObjectUtil.isEmpty(webSocketMessageDTO.getFormUserId())) {
            return;
        }

        // 会话建立成功执行业务逻辑
        if (ObjectUtil.isNotEmpty(socketSession)) {

            // 更新最后会话时间
            socketSession.setLastActiveTime(System.currentTimeMillis());

            // 找到该消息的处理器
            SocketMsgCallbackInterface socketMsgCallbackInterface = SocketMessageCenter.getSocketMsgCallbackInterface(webSocketMessageDTO.getClientMsgType());
            if (ObjectUtil.isNotEmpty(socketMsgCallbackInterface)) {
                // 触发回调
                socketMsgCallbackInterface.callback(webSocketMessageDTO.getClientMsgType(), webSocketMessageDTO, socketSession);
            } else {
                socketChannel.getAsyncRemote().sendText("{\"serverMsgType\":\"404\"}");
            }
        }
    }

    /**
     * 会话发送异常调用的方法
     *
     * @param session 会话信息
     * @param error    错误信息
     * @author wave-muly
     * @date 2021/6/21 下午5:14
     **/
    @OnError
    public void one rror(Session session, Throwable error) {
        SocketSession<WaveSocketOperator> socketSession = SessionCenter.getSessionBySessionId(session.getId());
        // 触发首次连接回调
        SocketMsgCallbackInterface socketMsgCallbackInterface = SocketMessageCenter.getSocketMsgCallbackInterface(SystemMessageTypeEnum.SYS_LISTENER_ONERROR.getCode());
        if (ObjectUtil.isNotEmpty(socketMsgCallbackInterface)) {
            // 触发回调
            socketMsgCallbackInterface.callback(SystemMessageTypeEnum.SYS_LISTENER_ONERROR.getCode(), error, socketSession);
        }
        log.error("session 发生错误:" + session.getId());
    }
}

上述类中存在处理逻辑,定义了首次连接回调的方法,这里会去找SocketMessageCenter方法操作中心里的SystemMessageTypeEnum.SYS_LISTENER_ONOPEN方法

			// 触发首次连接回调
            SocketMsgCallbackInterface socketMsgCallbackInterface = SocketMessageCenter.getSocketMsgCallbackInterface(SystemMessageTypeEnum.SYS_LISTENER_ONOPEN.getCode());
            if (ObjectUtil.isNotEmpty(socketMsgCallbackInterface)) {
                // 触发回调
                socketMsgCallbackInterface.callback(SystemMessageTypeEnum.SYS_LISTENER_ONOPEN.getCode(), null, socketSession);
            }

以下为示例,项目启动时注入一个首次连接回调处理方法:在msgType为SystemMessageTypeEnum.SYS_LISTENER_ONOPEN(S00001)时会调用该接口

/**
 * @Author: wave-muly
 * @Date: 2021/10/9 14:09
 */
@Slf4j
@Component
@Order(99)
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class MessageListener implements CommandLineRunner {
    private final SocketOperatorApi socketOperatorApi;
    @Override
    public void run(String... args) throws Exception {
        // 项目启动默认注册了 监听首次连接的监听器 在客户端连接时会调用该监听器
        socketOperatorApi.msgTypeCallback(SystemMessageTypeEnum.SYS_LISTENER_ONOPEN.getCode(), (msgType, msg, socketSession)->{
            log.info("connection success");
            socketSession.getSocketOperatorApi().writeAndFlush("connection success");
        });
    }
}

再定义一个服务端向客户端发送消息的示例,这里定义一个Controller

import com.muly.wave.socket.api.SocketOperatorApi;
import com.muly.wave.socket.api.enums.ServerMessageTypeEnum;
import com.muly.wave.socket.api.exception.SocketException;
import com.muly.wave.socket.websocket.pojo.SysMessage;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;

import java.util.Date;


/**
 * @Author: wave-muly
 * @Date: 2021/10/9 11:28
 */
@RequestMapping("/api/v1/sys")
@RestController
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class SysMessageController {

    private final SocketOperatorApi socketOperatorApi;

    @PostMapping(value = "/notice/{userId}", produces = MediaType.APPLICATION_JSON_VALUE)
    public void notice(@PathVariable(name = "userId") String userId, @RequestBody String messageContent) {
        SysMessage item = new SysMessage();
        item.setReceiveUserId(Long.valueOf(userId));
        item.setMessageContent(messageContent);
        item.setMessageType(ServerMessageTypeEnum.SYS_NOTICE_MSG_TYPE.getCode());
        item.setMessageSendTime(new Date());
        try {
            socketOperatorApi.sendMsgOfUserSession(ServerMessageTypeEnum.SYS_NOTICE_MSG_TYPE.getCode(), item.getReceiveUserId().toString(), item);
        } catch (SocketException socketException) {
            // 该用户不在线
        }
    }

}

以下为浏览器测试 [WebSocket在线测试](WebSocket在线测试_在线模拟websocket请求工具 (jsonin.com))

image-20220424165738022

至此SpringBoot整合WebSocket完成,后续项目集成,只需要定义好msgType消息类型、处理token校验逻辑即可。
代码详见 wave-socket

标签:WebSocket,SpringBoot,实践,wave,会话,muly,import,public,String
From: https://www.cnblogs.com/yunt/p/18318772

相关文章

  • 使用PHP实现悲观锁的最佳实践。里面包含源码
    在数据库编程中,确保数据的一致性和完整性是非常重要的。当多个用户或线程同时访问和修改同一条数据记录时,可能会出现并发问题,比如读写冲突、数据丢失等。为了解决这些问题,我们可以使用并发控制机制,其中一种常见的方法就是悲观锁。什么是悲观锁?悲观锁是一种并发控制策......
  • IT实战课堂计算机毕业设计源码精品基于springboot的线上辅导班系统的开发与设计
    项目功能简介:《[含文档+PPT+源码等]精品基于springboot的线上辅导班系统的开发与设计[包运行成功]》该项目含有源码、文档、PPT、配套开发软件、软件安装教程、项目发布教程、包运行成功以及课程答疑与微信售后交流群、送查重系统不限次数免费查重等福利!软件开发环境及开......
  • SpringBoot如何解决yml明文密码问题
    日常使用过程中,使用明文密码完全没问题,但是到了企业项目中,往往需要进行安全扫描,被安全扫描组扫到后,就必须修改。有两种思路解决安全扫描问题。方式一、密码不放在代码里密码不放在工程代码中,放到配置中心上,比如SpringCloudConfig,比如Nacos,或者apollo上。一般来说安全扫描都......
  • 深入解析 @SpringBootApplication 注解及其源码
    深入解析@SpringBootApplication注解及其源码在SpringBoot开发中,@SpringBootApplication注解几乎是每个项目的起点。它不仅简化了配置,还集成了多个核心注解。今天,我们将深入探讨这个注解的内部机制,并通过代码示例来展示其强大功能。1.@SpringBootApplication是什么?简单......
  • 智能停车场系统--前后端分离(可直接落地)使用数据:vue,springBoot,redis,mybatis,mysql等
    系统首页-统计停车车收费收费数据展示实现代码:对菜单控制代码@AutowiredprivateMenuServicemenuService;@AutowiredprivateRoleMenuServiceroleMenuService;@GetMapping("/list")publicResultlist(){List<MenuRoleVO>menuRoleList=this.menuServi......
  • 计算机毕业设计——基于SpringBoot的高并发选课系统
    基于SpringBoot的高并发选课系统私信获取完整代码项目简介本项目主要解决在高校选课场景下,保证选课系统在大量读写压力下不宕机,以及选课时尽可能提高选课QPS,给学生一个良好的选课体验,完成上述功能同时保证选课安全运行效果图其他效果图请到image文件夹中查看技术......
  • SpringBoot项目打包成war包
    1.项目场景使用SpringBoot开发项目,由于内置了Tomcat,所以项目可以直接启动,部署到服务器的时候,直接打成jar包,就可以运行了。 有时需要把项目打包放入外置的Tomcat或者TongWeb中运行,就需要把项目打包成war包,。2.实现步骤2.1将启动类的pom文件打包方式更改为war<......
  • AI绘画入门实践 | Midjourney:画面权重控制
    在Midjourney中,使用两个连续的英文冒号::来进行分割与权重控制。作为分隔符使用在提示词中添加双冒号::表示让MJ将部分提示词单独考虑2d illustration, french fries, hot dog --v 62d illustration, french fries, hot:: dog --v 6作为权重......
  • 基于springboot的水产养殖系统
    博主介绍:java高级开发,从事互联网行业六年,熟悉各种主流语言,精通java、python、php、爬虫、web开发,已经做了多年的设计程序开发,开发过上千套设计程序,没有什么华丽的语言,只有实实在在的写点程序。......
  • java毕业设计-基于springboot+vue的校园二手交易系统,基于java的校园二手交易系统,基于j
    文章目录前言演示视频项目背景项目架构和内容获取(文末获取)具体实现截图前台功能管理后台技术栈具体功能模块设计系统需求分析可行性分析系统测试为什么我?关于我我自己的网站前言博主介绍:✌️码农一枚,专注于大学生项目实战开发、讲解和毕业......