首页 > 其他分享 >SpringBoot——整合WebSocket(基于STOMP协议)

SpringBoot——整合WebSocket(基于STOMP协议)

时间:2023-08-14 18:35:23浏览次数:48  
标签:WebSocket SpringBoot STOMP chatMessage param user message public String

参考链接

  • Spring 官文: https://docs.spring.io/spring-framework/docs/6.0.0-SNAPSHOT/reference/html/web.html#websocket-stomp-benefits

  • 前端页面: https://github.com/callicoder/spring-boot-websocket-chat-demo

STOMP

定义

STOMP 中文为“面向消息的简单文本协议”,STOMP 提供了能够协作的报文格 式,以至于 STOMP 客户端可以与任何 STOMP 消息代理Brokers进行通信,从而为多语言,多平台和 Brokers 集群提供简单且普遍的消息协作。STOMP 协议可 以建立在 WebSocket 之上,也可以建立在其他应用层协议之上。通过 Websocket 建立 STOMP 连接,也就是说在 Websocket 连接的基础上再建立 STOMP 连接。

常见的 STOMP 的服务器/客户端的开源实现

  • STOMP 服务器:ActiveMQ、RabbitMQ、StompServer、…
  • STOMP 客户端库:stomp.js(javascript)、stomp.py(python)、Gozirra(java)、…

STOMP Over WebSocket

即 WebSocket 结合 Stomp 的实现。WebSocket 协议是基于 TCP 的一种新的网络协议,实现双工通讯,但是 websocket 没有规范payload (除控制信息外的有效载体)格式,可以是文本数据,也可以发送二进制数据,需要我们自己定义。而我们可以使用 stomp 协议去规范传输数据格式标准。

Stomp 帧格式示例

STOMP的客户端和服务器之间的通信是通过“帧”(Frame) 实现的,每个帧由多“行”(Line)组成。

  • 第一行包含了命令,然后紧跟键值对形式的Header内容。
  • 第二行必须是空行。
  • 第三行开始就是Body内容,末尾都以空字符结尾。
MESSAGE
message-id:nxahklf6-1
subscription:sub-1
destination:/topic/price.stock.MMM

{"ticker":"MMM","price":129.45}^@

参考:http://stomp.github.io/stomp-specification-1.1.html

STOMP Over WebSocket 优点

image.png

image.png

运行流程图

Stomp 本身

image.png

  • channel:即客户端与服务端连接的通道
  • /app:由程序配置定义的缀,这种前缀指该消息需要经过一些自定义处理(在Controller中)再发到 Stomp 代理(后续详解)
  • /topic:同上,不过这类是不需要经过自定义处理的,直接发到 Stomp 代理(后续详解)

结合其他消息中间件

image.png

即相对于上述增加了一步 Stomp 代理与消息中间件之间的交互。

SocketJs 是什么

SockJS 是一个浏览器的 JavaScript 库,它提供了一个类似于网络的对象,SockJS 提供了一个连贯的,跨浏览器的 JavaScriptAPI,它在浏览器和 Web 服务器之间创建了一个低延迟、全双工、跨域 通信通道。SockJS 的一大好处在于提供了浏览器兼容性。即优先使用原生 WebSocket,如果浏览器不支持 WebSocket,会自动降为轮询的方式。

依赖引入

<dependencies>

    <!-- web-socket -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <!-- lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <scope>compile</scope>
    </dependency>

    <!-- redis -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>

    <!-- security -->
    <!--        <dependency>-->
    <!--            <groupId>org.springframework.boot</groupId>-->
    <!--            <artifactId>spring-boot-starter-security</artifactId>-->
    <!--        </dependency>-->

    <!-- fastjson -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.47</version>
    </dependency>

</dependencies>

群发实现

服务端代码
  • 1、WebSocketCofig 配置类
/**
 * @EnableWebSocketMessageBroker 开启 WebSocket Over Stomp
 * @author 17697
 */
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    /**
     * 注册Stomp服务端点
     * @param registry
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // addEndpoint 设置与客户端建立连接的url
        registry.addEndpoint("/ws")
                // 设置允许跨域
                .setAllowedOriginPatterns("*")
                // 允许SocketJs使用,是为了防止某些浏览器客户端不支持websocket协议的降级策略
                .withSockJS();
    }
}
  • 2、实体类
/**
 * 消息模型类
 */
@Data
public class ChatMessage {

    /**
     * 消息类型
     */
    private MessageType type;

    /**
     * 消息正文
     */
    private String content;

    /**
     * 消息发送者
     */
    private String sender;

    /**
     * 消息接收者
     */
    private String toUser;

    public enum MessageType {
        CHAT,
        JOIN,
        LEAVE
    }
}
  • 3、Controller层代码
@RestController
public class ChatController {

    @Autowired
    private SimpMessagingTemplate messagingTemplate;

    /**
     * 客户端发送消息入口,群发消息
     * @param chatMessage
     * @return
     */
    @MessageMapping("/chat/sendMessage")
    @SendTo({"/topic/public"})
    public ChatMessage sendMessage(@Payload ChatMessage chatMessage) {
        return chatMessage;
    }

    /**
     * 客户端新增用户消息入口,用于群发显示:新进入xx用户
     * @param chatMessage
     * @param headerAccessor
     * @return
     */
    @MessageMapping("/chat/addUser")
    @SendTo({"/topic/public"})
    public ChatMessage addUser(@Payload ChatMessage chatMessage,
                               SimpMessageHeaderAccessor headerAccessor) {
        // Add username in web socket session
        headerAccessor.getSessionAttributes().put("username", chatMessage.getSender());
        return chatMessage;
    }
    
}

注解介绍

  • @MessageMapping:和 @RequestMapping 注解功能类似,不过该注解用于接收 Stomp 客户端向服务端发送的url地址 注意:

    • @MessageMapping 使用若未搭配 @SendTo 或 @SendToUser 则会默认发送同 @MessageMapping 中 url 的主题消息
    • 使用该注解,则方法下尽量不要使用 SimpMessagingTemplate 的转发方法,如果非要使用,请把方法返回值改为 void ,否则会出现同时向主题发送两次消息。
  • @SendTo:定义方法~返回数据向其定义的 url 发送; 等同于 SimpMessagingTemplate.convertAndSendTo("/message", "新消息") 例: @SendTo({"/topic/public"}) 将消息发送到 /topic/public 主题下

  • @SendToUser:同上,不过是向单一用户发送消息; 等同于 SimpMessagingTemplate.convertAndSendToUser(Key,"/message", "新消息")

SimpMessagingTemplate 主要方法介绍

推荐使用 SimpMessagingTemplate 处理消息,这种相对于上注解更易理解。

  • void convertAndSend(D destination, Object payload) 群发消息:arg1: 目的地址,arg2: 消息内容
  • void convertAndSendToUser(String user, String destination, Object payload) 单发消息,arg1: 向谁发送,arg2: 目的地址,arg3: 消息内容。
@Override
public void convertAndSendToUser(String user, String destination, Object payload,
                                 @Nullable Map<String, Object> headers, @Nullable MessagePostProcessor postProcessor) throws MessagingException {
    Assert.notNull(user, "User must not be null");
    Assert.isTrue(!user.contains("%2F"), "Invalid sequence \"%2F\" in user name: " + user);
    user = StringUtils.replace(user, "/", "%2F");
    destination = destination.startsWith("/") ? destination : "/" + destination;
    // 实际还是调用 convertAndSend,destinationPrefix 默认值是 "/user/"(可配置修改)
    super.convertAndSend(this.destinationPrefix + user + destination, payload, headers, postProcessor);
}
  • 4、事件监听类编写
@Component
@Slf4j
public class WebSocketEventListener {

    @Autowired
    private SimpMessageSendingOperations messagingTemplate;

    /**
     * 连接建立事件
     * @param event
     */
    @EventListener
    public void handleWebSocketConnectListener(SessionConnectEvent event) {
        log.info("建立一个新的连接");
    }


    /**
     * 连接断开事件
     * @param event
     */
    @EventListener
    public void handleWebSocketDisconnectListener(SessionDisconnectEvent event) {
        StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());

        String username = (String) headerAccessor.getSessionAttributes().get("username");
        if(username != null) {
            log.info("用户断开连接 : " + username);

            ChatMessage chatMessage = new ChatMessage();
            chatMessage.setType(ChatMessage.MessageType.LEAVE);
            chatMessage.setSender(username);

            messagingTemplate.convertAndSend("/topic/public", chatMessage);
        }
    }
}
  • 5、前端代码
function connect(event) {
    username = document.querySelector('#name').value.trim();

    if(username) {
        usernamePage.classList.add('hidden');
        chatPage.classList.remove('hidden');

        // 建立服务端 websocket 连接,/ws 是后端服务器配置端点路径
        var socket = new SockJS('/ws');
        stompClient = Stomp.over(socket);

        stompClient.connect({}, onConnected, one rror);

    }
    event.preventDefault();
}

function onConnected() {
    // 订阅群发主题
    stompClient.subscribe('/topic/public', onMessageReceived);

    stompClient.send("/chat/addUser",
        {},
        JSON.stringify({sender: username, type: 'JOIN'})
    )

    connectingElement.classList.add('hidden');
}


function one rror(error) {
    connectingElement.textContent = 'Could not connect to WebSocket server. Please refresh this page to try again!';
    connectingElement.style.color = 'red';
}


function sendMessage(event) {
    var messageContent = messageInput.value.trim();

    if(messageContent && stompClient) {
        var chatMessage = {
            sender: username,
            content: messageInput.value,
            type: 'CHAT'
        };
        // arg1: 消息发送url,arg2: 消息头信息(例:加入用户认证头信息),arg3: 消息体
        stompClient.send("/chat/sendMessage", {}, JSON.stringify(chatMessage));
        messageInput.value = '';
    }
    event.preventDefault();
}

单发消息(一对一)

服务端代码 其他不变,在 Controller 层新增一对一方法;

使用 @RequestMapping 的方式通常用于其他服务端发送消息的入口

这里由于本人不会前端,所以采用下述通过 Postman 发送消息的形式(菜狗),会前端的大佬可以按之前的群发自行改造

/**
 * 一对一消息发送
 * @param chatMessage
*/
@PostMapping("/chat/single")
public void sendSingleMessage(@RequestBody ChatMessage chatMessage) {
    messagingTemplate.convertAndSendToUser(chatMessage.getToUser(),"/single",chatMessage);
}
前端代码

在连接方法中新增订阅个人主题

function onConnected() {
    // 订阅群发主题
    stompClient.subscribe('/topic/public', onMessageReceived);

    // 新增订阅一对一主题,即通过用户名等唯一性标识拼接到订阅主题地址
    stompClient.subscribe('/user/'+username+'/single', onMessageReceived);
    
    stompClient.send("/chat/addUser",
        {},
        JSON.stringify({sender: username, type: 'JOIN'})
    )

    connectingElement.classList.add('hidden');
}

拦截器配置

  • 1、编写自定义拦截器实现 ChannelInterceptor
/**
 * Socket拦截器
 * @author 17697
 */
@Component
public class SocketChannelInterceptor implements ChannelInterceptor {

    @Autowired
    private StringRedisTemplate redisTemplate;

    private final static String SOCKET_TOKEN_PREFIX = "webSocket:";

    private final static String SOCKET_AUTH = "socket_auth:";
    /**
     * 发送消息到通道前
     * @param message
     * @param channel
     * @return
     */
    @SneakyThrows
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        // 获取连接头信息
        StompHeaderAccessor accessor =
                MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);

        // 连接验证token合法性(简单模拟)
        if (StompCommand.CONNECT.equals(accessor.getCommand())) {
            // 获取头中的token
            String token = accessor.getFirstNativeHeader("token");
            if (StringUtils.hasText(token)) {
                String redisToken = redisTemplate.opsForValue().get(SOCKET_TOKEN_PREFIX);
                if (token.equals(redisToken)) {
                    /* 这里可以结合 Security
                    UsernamePasswordAuthenticationToken authentication = new UsernamePasswordAuthenticationToken(userDetails, null, userDetails.getAuthorities());
                   SecurityContextHolder.getContext().setAuthentication(authentication);
                    accessor.setUser(authentication);
*/
                    // 简单处理设置对应权限。完整的应该根据用户的权限得出是否有发送/订阅到某个目的路径的权限
                    accessor.setUser(new UserPrincipal() {
                        @Override
                        public String getName() {
                            // 模拟权限类,仅有属性可发送/订阅
                            Permission permission = new Permission();
                            permission.setIsSend(true);
                            permission.setIsSubscribe(true);
                            String s = JSON.toJSONString(permission);
                            return s;
                        }
                    });

                } else {
                    throw new IllegalAccessException("未授权!!!");
                }
            } else {
                throw new IllegalAccessException("未授权!!!");
            }
            // 订阅权限认证
        } else if (StompCommand.SUBSCRIBE.equals(accessor.getCommand())) {
            UserPrincipal user = ((UserPrincipal) accessor.getUser());
            String value = user.getName();
            if (StringUtils.hasText(value)) {
                JSONObject jsonObject = JSONObject.parseObject(value);
                Boolean flag = ((Boolean) jsonObject.get("isSubscribe"));
                if (!flag) {
                    throw new IllegalAccessException("无权限订阅!!!");
                }
            } else {
                throw new IllegalAccessException("无权限订阅!!!");
            }
            // 发送权限验证
        } else if (StompCommand.SEND.equals(accessor.getCommand())) {
            UserPrincipal user = ((UserPrincipal) accessor.getUser());
            String value = user.getName();
            if (StringUtils.hasText(value)) {
                JSONObject jsonObject = JSONObject.parseObject(value);
                Boolean flag = ((Boolean) jsonObject.get("isSend"));
                if (!flag) {
                    throw new IllegalAccessException("无权限发送!!!");
                }
            } else {
                throw new IllegalAccessException("无权限发送!!!");
            }
        }
        return message;
    }


    /**
     * 发送消息到通道后
     * @param message
     * @param channel
     * @return
     */
    @Override
    public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
        ChannelInterceptor.super.postSend(message, channel, sent);
    }

    /**
     * 发送完成后
     * @param message
     * @param channel
     * @return
     */
    @Override
    public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
        ChannelInterceptor.super.afterSendCompletion(message, channel, sent, ex);
    }

    @Override
    public boolean preReceive(MessageChannel channel) {
        return ChannelInterceptor.super.preReceive(channel);
    }

    @Override
    public Message<?> postReceive(Message<?> message, MessageChannel channel) {
        return ChannelInterceptor.super.postReceive(message, channel);
    }

    @Override
    public void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex) {
        ChannelInterceptor.super.afterReceiveCompletion(message, channel, ex);
    }
}

为什么保存认证信息使用 setUser 方法? 该方法表示会话的拥有者,即存储该会话拥有者信息。 每次建立连接都会创建一个 WebSocketSession 会话信息类,在该会话进行消息传递每次都会把 SessionId ,SessionAttributes 和 Principal(即我们setUser()保存的信息) 赋值到 Message 中,而 Principal 就是专门存储身份认证信息的。

  • SessionId: 初始随机分配的,用于确定唯一的会话
  • SessionAttributes: 用于给 WebSocketSession 设置一些额外记录属性,结构是 Map
  • Principal: 用于设置 WebSocketSession 的身份认证信息

额外配置项

根据需求添加

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Autowired
    private SocketChannelInterceptor socketChanelInterceptor;
    
    /**
     * 注册Stomp服务端点
     * @param registry
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // addEndpoint 设置与客户端建立连接的url
        registry.addEndpoint("/ws")
                // 设置允许跨域
                .setAllowedOriginPatterns("*")
                // 允许SocketJs使用,是为了防止某些浏览器客户端不支持websocket协议的降级策略
                .withSockJS();
    }
    
    /**
     * 自定义拦截器配置
     * @param registration
     */
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(socketChanelInterceptor);
    }


    /**
     * 配置消息代理的路由规则
     * @param registry
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // 定义服务端应用目标前缀;客户端只有以这个前缀才能进入服务端方法 @MessageMapping
        registry.setApplicationDestinationPrefixes("/app/");
        // 定义SimpleBroker处理的消息前缀;只有消息以这个为前缀才会被SimpleBroker处理转发
        registry.enableSimpleBroker("/topic/","/user/");
        // 设置一对一消息前缀,默认的是"/user/",可通过该方法修改
        registry.setUserDestinationPrefix("/user/");

    }
}

项目代码

最后附上所有的代码地址:https://github.com/jjhyb/websocket-master

参考: https://blog.csdn.net/AhangA/article/details/125470930

标签:WebSocket,SpringBoot,STOMP,chatMessage,param,user,message,public,String
From: https://blog.51cto.com/u_14014612/7079837

相关文章

  • springboot整合nacos和dubbo
    0.源码源码:gitee1.版本java:1.8.0_281nacos:2.1.22.创建项目创建一个简单的springboot或者maven项目,或者代码库(gitee/github/其他代码库管理平台)创建一个空白的拉下来,最后只保留一个pom.xml即可.2.1根项目依赖版本控制参考:版本说明其中有一句话Spring......
  • SpringBoot有几种获取Request对象的方法?
    HttpServletRequest简称Request,它是一个ServletAPI提供的对象,用于获取客户端发起的HTTP请求信息。例如:获取请求参数、获取请求头、获取Session会话信息、获取请求的IP地址等信息。那么问题来了,在SpringBoot中,获取Request对象的方法有哪些?常见的获取Request对......
  • springboot开启prometheus可采集的指标配置
    1、引包<!--实现对Actuator的自动化配置--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency>......
  • SpringBoot实现大文件上传
    ​ 对于大文件的处理,无论是用户端还是服务端,如果一次性进行读取发送、接收都是不可取,很容易导致内存问题。所以对于大文件上传,采用切块分段上传,从上传的效率来看,利用多线程并发上传能够达到最大效率。 本文是基于springboot+vue实现的文件上传,本文主要介绍服务端实现文件......
  • WebSocket
    在搭建聊天室时,选择使用TCP请求而不是HTTP请求是因为TCP(传输控制协议)和HTTP(超文本传输协议)具有不同的特性,适用于不同的场景。以下是选择TCP请求而不是HTTP请求的一些原因:即时性:TCP连接可以保持长时间,使得聊天室能够实时地传输消息,而不需要每次都建立新的连接,从而减少了延迟。......
  • 使用Spring initializr快速创建一个springboot项目
     第一步首先new一个新的project选择Springinitializr配置好相关信息后下一步编辑在左上角我们可以选择SprinBoot的版本,在这里直接加入web依赖springweb和Template的Thymeleaf依赖,点上对勾后可以在最右边里看到你选择的依赖,然后点击create编辑可以看到我们的项目结构,很多sprin......
  • 手摸手3-springboot整合swagger-ui,实现自动文档
    (目录)手摸手3-springboot整合swagger-ui,实现自动文档修改pom.xml<!--解决FluentIterable.class找不到问题--><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>26.0-jre</version>......
  • springboot中tomcat线程池
    一、Tomcat中的默认配置线程任务就是一个连接的请求,每个请求都会尝试创建线程来处理。最大工作线程数,默认200。server.tomcat.max-threads=200最大连接数默认是10000,同时支持的并发连接数server.tomcat.max-connections=10000等待队列长度,默认100。server.tomcat.acce......
  • 自定义springboot-starter包
    https://www.cnblogs.com/yuansc/p/9088212.html 前言我们都知道可以使用SpringBoot快速的开发基于Spring框架的项目。由于围绕SpringBoot存在很多开箱即用的Starter依赖,使得我们在开发业务代码时能够非常方便的、不需要过多关注框架的配置,而只需要关注业务即可。例如我想......
  • springboot综合案例第四课
    day04_springboot综合案例用户管理查询用户查询所有用户执行流程编写UserMapper接口publicinterfaceUserMapper{//查询所有用户List<UserInfo>findAllUsers();}编写UserServicepublicinterfaceUserServiceextendsUserDetailsService{/**......