首页 > 其他分享 >WebSocket+心跳机制

WebSocket+心跳机制

时间:2023-04-17 17:48:20浏览次数:36  
标签:WebSocket String idCard private static 心跳 import 机制 public

maven依赖

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

WebSocket.class

import cn.hutool.json.JSONUtil;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.CloseReason.CloseCodes;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

@Component
@ServerEndpoint("/websocket/{idCard}")
@Slf4j
public class WebSocket {

    private static final Set<WebSocketEntity> connections = new CopyOnWriteArraySet<>();

    static {
        startHeart();
    }

    /**
     * 连接方法
     */
    @OnOpen
    public synchronized void onOpen(@PathParam("idCard") String idCard, Session session) {
        addUser(idCard, session);
    }

    /**
     * 收到消息执行
     */
    @OnMessage
    public synchronized void onMessage(@PathParam("idCard") String idCard, String message) {
        WebSocketMessage msg = JSONUtil.toBean(message, WebSocketMessage.class);
        if (WebSocketMessage.MSG_TYPE_HEART.equals(msg.getMsgType())) {
            log.info("【websocket心跳(接收)】:收到{}的心跳:{}", idCard, message);
            WebSocketEntity entity = getUserEntity(idCard);
            if (entity != null) {
                entity.setHeartbeat(true);
                entity.setLastTime(System.currentTimeMillis());
            }
        }
    }

    /**
     * 链接错误执行
     */
    @OnError
    public synchronized void one rror(@PathParam("idCard") String idCard, Throwable error) {
        log.info("【websocket消息推送(异常)】:{}:发生了错误:{}", idCard, error);
        removeUser(idCard, new CloseReason(CloseCodes.NO_EXTENSION, "客户端异常"));
    }

    @OnClose
    public static synchronized void onClose(@PathParam("idCard") String idCard, CloseReason reason) {
        removeUser(idCard, reason);
    }

    /**
     * 获取在线人数
     */
    private static synchronized int getUserOnlineNum() {
        return connections.size();
    }

    /**
     * 用户上线
     */
    private synchronized void addUser(String idCard, Session session) {
        //查询当前用户是否已在集合
        WebSocketEntity entity = getUserEntity(idCard);
        if (null == entity) {
            WebSocketEntity webSocket = new WebSocketEntity();
            webSocket.setIdCard(idCard);
            webSocket.setSession(session);
            webSocket.setHeartbeat(true);
            webSocket.setLastTime(System.currentTimeMillis());
            connections.add(webSocket);
            log.info("【websocket消息推送(上线)】:\"{}\"用户已上线,当前人数为:{}", idCard, getUserOnlineNum());
        }

    }

    /**
     * 根据idCard获取实体类
     */
    private static WebSocketEntity getUserEntity(String idCard) {
        WebSocketEntity entity = null;
        if (connections.size() == 0) {
            return null;
        }
        for (WebSocketEntity webSocketEntity : connections) {
            if (webSocketEntity.getIdCard().contentEquals(idCard)) {
                entity = webSocketEntity;
                break;
            }
        }
        return entity;
    }

    /**
     * 根据多个idCard获取多个实体类
     */
    private static List<WebSocketEntity> getUserEntities(List<String> idCard) {
        List<WebSocketEntity> entities = new ArrayList<>();
        if (connections.size() == 0) {
            return entities;
        }
        for (WebSocketEntity webSocketEntity : connections) {
            if (idCard.contains(webSocketEntity.getIdCard())) {
                entities.add(webSocketEntity);
            }
        }
        return entities;
    }

    /**
     * 用户下线
     */
    private static void removeUser(String idCard, CloseReason reason) {
        WebSocketEntity entity = getUserEntity(idCard);
        if (null != entity) {
            connections.remove(entity);
            log.info("【websocket消息推送(下线)】:\"{}\"用户因{}已下线,当前人数为:{}", idCard, reason.getReasonPhrase(), getUserOnlineNum());
        }
    }


    /**
     * 发送消息
     */
    public static synchronized void sendMsg(String idCard, String message) {
        WebSocketEntity userEntity = getUserEntity(idCard);
        if (userEntity != null) {
            Session session = userEntity.getSession();
            if (session != null) {
                synchronized (session) {
                    try {
                        session.getBasicRemote().sendText(message);
                        log.info("【websocket消息推送(发送)】:发送给\"{}\"用户消息:{}", userEntity.getIdCard(), message);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    /**
     * 批量发送消息
     */
    public static synchronized void sendMsgToUsers(List<String> idCard, String message) {
        List<WebSocketEntity> userEntities = getUserEntities(idCard);
        if (userEntities.size() != 0) {
            userEntities.forEach(userEntity -> {
                Session session = userEntity.getSession();
                if (session != null) {
                    synchronized (session) {
                        try {
                            session.getBasicRemote().sendText(message);
                            log.info("【websocket消息推送(发送)】:发送给\"{}\"用户消息:{}", userEntity.getIdCard(), message);
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }
    }


    /**
     * 启动心跳包
     */
    private static synchronized void startHeart() {
        ExamineHeartThread examineHeart = new ExamineHeartThread();
        Thread examineThread = new Thread(examineHeart);

        KeepHeartThread keepHeart = new KeepHeartThread();
        Thread keepThread = new Thread(keepHeart);


        keepThread.start();
        examineThread.start();

    }

    /**
     * 发送心跳包
     */
    public static synchronized void sendPing(String message) {
        if (connections.size() <= 0) {
            return;
        }
        log.info("【websocket心跳】:发送心跳包当前人数为:" + getUserOnlineNum());
        for (WebSocketEntity webSocketEntity : connections) {
            synchronized (webSocketEntity) {
                webSocketEntity.setLastTime(System.currentTimeMillis());
                webSocketEntity.setHeartbeat(false);
                try {
                    webSocketEntity.getSession().getBasicRemote().sendText(message);
                    log.info("【websocket心跳(发送)】:发送给\"{}\"用户消息:{}", webSocketEntity.getIdCard(), message);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 发送心跳包 10秒一次
     */
    private static class KeepHeartThread implements Runnable {
        @SneakyThrows
        @Override
        public void run() {
            WebSocketMessage msg = new WebSocketMessage();
            msg.setMsgType(WebSocketMessage.MSG_TYPE_HEART);
            String message = JSONUtil.toJsonStr(msg);
            while (true) {
                try {
                    sendPing(message);
                    Thread.sleep(10000);
                } catch (Exception e) {
                    log.error("【websocket心跳(异常)】:发送心跳包异常:", e);
                }
            }
        }

    }

    /**
     * 检测是否收到client心跳 每秒一次
     */
    private static class ExamineHeartThread implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    // 心跳过期时间
                    long now = System.currentTimeMillis();
                    for (WebSocketEntity entity : connections) {
                        // 心跳过期时间
                        long heartbeatExpirationTime = entity.getLastTime() + 3000;
                        if (!entity.isHeartbeat() && entity.getLastTime() != 0 && now > heartbeatExpirationTime) {
                            onClose(entity.getIdCard(), new CloseReason(CloseCodes.NORMAL_CLOSURE, "没有收到心跳"));
                        }
                    }
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    log.error("【websocket心跳(异常)】:检测心跳异常:", e);
                }
            }
        }
    }

}

WebSocketEntity.class

import lombok.Data;

import javax.websocket.Session;

@Data
public class WebSocketEntity {

    private String idCard;

    private Session session;

    private boolean heartbeat;

    private long lastTime;
}

WebSocketMessage.class


import lombok.Data;

@Data
public class WebSocketMessage {
    // 心跳
    public static final Integer MSG_TYPE_HEART = 0;
    // PC端消息
    public static final Integer MSG_TYPE_PC_MESSAGE = 1;
    // APP端消息
    public static final Integer MSG_TYPE_APP_MESSAGE = 2;
    // 刷新页面
    public static final Integer MSG_TYPE_REFRESH_PAGE = 3;

    // 请求成功
    public static final String SUCCESS_CODE = "200";
    // 请求失败
    public static final String FAILURE_CODE = "201";

    // 消息类型
    private Integer msgType;
    // 请求结果码
    private String code;
    // token
    private String authorization;
    // 消息
    private String message;
}

标签:WebSocket,String,idCard,private,static,心跳,import,机制,public
From: https://www.cnblogs.com/zhimi/p/17326565.html

相关文章

  • js EventLoop事件循环机制
    1、js是单线程语言,其事件分为:同步任务和异步任务,异步任务分为宏任务与微任务;2、执行顺序为:同步任务(主流程任务)--->微任务 ---->宏任务先执行同步任务,遇到异步任务则放入任务队列中,等同步任务执行完毕,执行任务队列中的微任务,再执行宏任务...主线程从"任务队列"中读取事件,这......
  • MFC运行机制
    1.创建一个窗口代码2.设断点  3.启动调试 调用堆栈窗口:上面的函数被下面的函数所调用 4.双击函数   设断点  F5让程序执行到断点处  F11进入AfxWinMain函数 CWinThread*pThread=AfxGetThread()以Afx打头的是全局函数 F10让程序运行几行把鼠标放在pThread和pApp上......
  • 关于中国软件业如何建立信任机制和如何做事的讨论
    今天和一个朋友聊天提到了过去,她说:“……人漂泊都不知道漂泊到什么时候,未来太遥远了,其实我可以在家过得很好,但总是不甘心这样平淡就过完一辈子,出来漂泊的感觉也还不错”。我想到了刚毕业的时候曾经写过的那篇文字。我对我自己的总结是:“我后来是不甘于平淡和贫穷才抗争,小......
  • [人事管理]评论蜀中为何无大将,看企业人才管理培养机制
    原文引用如下:“蜀中无大将,廖化作先锋”,这句成语所揭示的原本是三国后期蜀汉人才奇缺的历史事实,后来则引伸为泛指因为没有杰出人才,平庸之辈也能侥幸成名,与“山中无老虎,猴子称霸王”意义相近。唐朝的魏元忠说:“何代而不生才,何才而不生代?”进而他说:“士有不用,未有无士之时。”魏元......
  • 碳交易机制下考虑需求响应的综合能源系统优化运行综合能源系统是实现&ldquo;双碳&rdqu
    碳交易机制下考虑需求响应的综合能源系统优化运行综合能源系统是实现“双碳”目标的有效途径,为进一步挖掘其需求侧可调节潜力对碳减排的作用,提出了一种碳交易机制下考虑需求响应的综合能源系统优化运行模型。首先,根据负荷响应特性将需求响应分为价格型和替代型2类,分别建立了基......
  • 综合上述文献搭建了冷热电联供型综合能源系统,系统结构如图2所示,通过引入需求响应机制
    关键词:冷热电联供;CHP机组;热泵;冰储冷空调;需求响应参考文献:《基于综合需求响应和奖惩阶梯型碳交易的综合能源系统优化调度》《计及需求响应和阶梯型碳交易机制的区域综合能源系统优化运行》《碳交易机制下考虑需求响应的综合能源系统优化运行》《考虑综合需求侧响应的区域综合能源......
  • 提出了一种碳交易机制下考虑需求响应的综合能源系统优化运行模型
    [1]关键词:碳交易机制;需求响应;综合能源系统;[2]文献:《碳交易机制下考虑需求响应的综合能源系统优化运行》[3]主要内容:提出了一种碳交易机制下考虑需求响应的综合能源系统优化运行模型。首先,根据负荷响应特性将需求响应分为价格型和替代型2类,分别建立了基于价格弹性矩阵的......
  • 首先考虑IES参与到碳交易市场,引入阶梯式碳交易机制引导IES控制碳排放;接着细化电转气(P2
    [1]关键词:氢能;阶梯式碳交易机制;热电比可调;综合能源系统;低碳经济;Matlab程序[2]参考文献:《考虑阶梯式碳交易机制与电制氢的综合能源系统热电优化》[3]主要内容:首先考虑IES参与到碳交易市场,引入阶梯式碳交易机制引导IES控制碳排放;接着细化电转气(P2G)的两阶段运行过程,引入电解槽、甲......
  • 聊聊Redis sentinel 机制
    Redis的哨兵机制自动完成了以下三大功能,从而实现了主从库的自动切换,可以降低Redis集群的运维开销:监控主库运行状态,并判断主库是否客观下线;在主库客观下线后,选取新主库;选出新主库后,通知从库和客户端。 一、为什么需要哨兵主从模式下,如果主库发生故障了,那就直接会影响到......
  • Cypress运行机制
    Cypress运行机制运行机制运行测试时Cypress会使用网络包把所有的测试代码放到一个.js文件当中运行浏览器,将测试代码注入一个空白页,通过空白页在浏览器中运行测试代码运行过程实际加载Cypress时,Cypressweb应用程序先把自己托管到本地一个随机端口(如:`http://localhost:65......