首页 > 其他分享 >websocket2.0 适用于发送的数据体很大

websocket2.0 适用于发送的数据体很大

时间:2022-10-03 11:33:22浏览次数:45  
标签:websocket WebSocketCache 适用 userId websocket2.0 发送 session import public

当websocket发送的数据体积很大,需要的传输时间很长,并且传输频率较高的情况下使用

pom配置

      <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-websocket</artifactId>
            <version>5.0.4.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

 

一、创建内部的缓存对象

package com.example.demo.websocket;

import jakarta.websocket.Session;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Future;

/**
 * @author luwl
 */
public class WebSocketCache {

    /**
     * 记录连接
     */
    public static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<>();
    /**
     * 记录在线对象
     */
    public static ConcurrentHashMap<String, Session> livingSessions = new ConcurrentHashMap<>();
    /**
     * 记录session的使用情况
     */
    public static ConcurrentHashMap<Session, Future<Void>> sessionBusy = new ConcurrentHashMap<>();

    /**
     * 当前在线人数
     */
    public static volatile  int livingCount = 0;

    public static synchronized int getLivingCount(){
        return livingCount;
    }

    public static synchronized void addLivingCount(){
        WebSocketCache.livingCount++;
    }

    public static synchronized void subLivingCount(){
        if(WebSocketCache.livingCount>0) {
            WebSocketCache.livingCount--;
        }
    }

}

二、创建websocket对象

package com.example.demo.websocket;

import jakarta.websocket.*;
import jakarta.websocket.server.PathParam;
import jakarta.websocket.server.ServerEndpoint;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * WebSocket
 * @author luwl
 */
@Slf4j
@Component
@EqualsAndHashCode
@ServerEndpoint("/webSocket/{userId}")
public class WebSocketServer {

    private Session session;
    /**
     * 当前通道标识符
     */
    public volatile String userId = "";

    /**
     * 客户端进入连接
     * @param session
     * @param userId
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userId")String userId){
        if(!WebSocketCache.livingSessions.containsKey(userId)){
            WebSocketCache.webSocketSet.add(this);
            WebSocketCache.livingSessions.put(userId,session);
            this.session=session;
            this.userId = userId;
            WebSocketCache.addLivingCount();
            log.info("{}进入连接,当前链接数量为:"+ WebSocketCache.getLivingCount(),userId);
        }else{
            log.info("{}重新进入连接,当前链接数量为:"+ WebSocketCache.getLivingCount(),userId);
        }
    }

    /**
     * 接收到客户端消息
     * @param message
     * @param session
     * @param userId
     */
    @OnMessage
    public  void onMessage(String message,Session session,@PathParam("userId")String userId){
        log.info("接收到客户端消息【"+userId+":"+message+"】");
    }

    @OnError
    public void one rror(Session session,Throwable err){
        log.error("发生错误");
        log.info(err.getStackTrace()+"");
    }

    @OnClose
    public void onClose(Session session,@PathParam("userId")String userId) throws InterruptedException {
        WebSocketCache.sessionBusy.remove(session);
        WebSocketCache.livingSessions.remove(userId);
        WebSocketCache.subLivingCount();
        log.info(userId+"关闭连接,当前链接数量为:"+ WebSocketCache.getLivingCount());
        //关闭用户线程
        WebSocketCache.webSocketSet.remove(this);
    }

    /**
     * 发送消息到单个对象
     * @param userId
     * @param message
     */
    public void sendMessage(String userId,String message){
        try {
            if(WebSocketCache.getLivingCount()!=0){
                WebSocketCache.sessionBusy.put(session,session.getAsyncRemote().sendText(message));
                log.info("用户:{}发送消息成功",userId);
                if(WebSocketCache.livingSessions.containsKey(userId)){
                    WebSocketCache.livingSessions.remove(userId,session);
                }
            }else{
                log.warn("当前无连接对象,关闭连接");
                onClose(null,userId);
            }
        } catch (Exception e) {
            log.error("webSocket发送消息失败",e);
        }
    }

    synchronized public Session getMySession(){
        return this.session;
    }
}

三、创建定时任务进行状态刷新

package com.example.demo.websocket;

import jakarta.websocket.Session;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.Random;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * 定时任务(定时全员发送消息、定时刷新session使用情况)
 * @author luwl
 */
@Slf4j
@Component
public class SendMessageTask {
    /**
     * 定时全员推送
     * @throws Exception
     */
    @Scheduled(cron = "0/5 * * * * ?")
    public void run()throws Exception{
        if(WebSocketCache.webSocketSet.size()>0){
            CopyOnWriteArraySet<WebSocketServer> serverList =  new CopyOnWriteArraySet();
            serverList.addAll(WebSocketCache.webSocketSet);
            log.info("=====开始推送数据,当前用户数量为:{}=====",serverList.size());
            Random random = new Random();
            serverList.parallelStream().forEach(e->{
                e.sendMessage(e.userId, String.valueOf(random.nextInt(999)));
            });
        }
    }

    /**
     * 定时扫描session
     * @throws Exception
     */
    @Scheduled(cron = "0/1 * * * * ?")
    public void sendMessageListener()throws Exception{
        WebSocketCache.webSocketSet.parallelStream().forEach(e->{
            String userId = e.userId;
            Session session = e.getMySession();
            if(session==null){
                return;
            }
            if(!WebSocketCache.livingSessions.containsKey(userId)) {
                synchronized (WebSocketCache.sessionBusy) {
                    if (WebSocketCache.sessionBusy.containsKey(session) && WebSocketCache.sessionBusy.get(session).isDone()) {
                        WebSocketCache.sessionBusy.remove(session);
                        WebSocketCache.livingSessions.put(userId, session);
                    }
                }
            }
        });
    }

}

 

标签:websocket,WebSocketCache,适用,userId,websocket2.0,发送,session,import,public
From: https://www.cnblogs.com/Sora-L/p/16750207.html

相关文章