首页 > 其他分享 >websocket的基本使用,与消息推送的一般设计

websocket的基本使用,与消息推送的一般设计

时间:2024-07-15 23:27:47浏览次数:17  
标签:重连 websocket socket private ws action 设计 推送 data

本文章需要先阅读前面写的Bus事件,传送门:https://editor.csdn.net/md/?articleId=139666035

// Bus事件,传送门:https://editor.csdn.net/md/?articleId=139666035
import Bus from "@/utils/Bus";
// 全局config的ws异常事件名
import {websocketErrorEventName} from "@/config";
// 获取用户token
import {getToken} from "@/utils/auth";

// 本文章需要先阅读前面写的Bus事件,传送门:https://editor.csdn.net/md/?articleId=139666035

// socket基地址
const socketUrl = import.meta.env.VITE_API_SOCKET_BASE;
class Socket {
    // 是否断线自动重连
    private autoConnect: boolean = true;
    // 自动重连计数器,连接上websocket时自动归置为0,以负数作为连接计数
    public isActive: number = 0;
    // 贮存的socket对象
    private socket: WebSocket | undefined;
    // 心跳时间间隔(单位:秒)
    private heartInterval: number = 35;
    // 心跳重连定时器
    private reconnectTimeOut: NodeJS.Timeout | null = null;
    // 重连时间间隔,即每次websocket error(错误)/close(关闭时)都会自动重连,当重
    private reconnectInterval: number = 10;
    // 最大重连次数(当自动重连N次后,默认服务器异常,终止自动连接)
    private reconnectMax: number = 10;
    // 心跳发送检测回应计时器
    private heartbeatTimer: NodeJS.Timeout | null = null;
    // 自动装箱错误处理事件集
    private autoEmitEvents: string[] = [];
    // 心跳发送计数器时递增,当服务端返回心跳回应时则递减
    private noHeartBackNum: number = 0;
    // 是否通过Bus事件中心向上推送ws心跳检查无返回异常
    private isExport: boolean = false;

    constructor(
        private url: string = socketUrl,
        private protocol: any[] = [],
    ) {
        this.initSocket();
    }
    // 初始化websocket
    private initSocket() {
        if(this.socket) {
            this.socket.close();
            this.socket = undefined;
        }
        if (typeof window.WebSocket === "function") {
            // getToken() 为jwt认证中的token,可加可不加,如果是通过,也可以在请求头中加入token
            // 依据个人喜好,如果在请求头中加入token,则在new Socket时传递第二个参数
            const URL = this.url + (getToken? "/" + getToken() : '');
            this.socket = new WebSocket(this.url, this.protocol);
            console.log('连接websocket 的地址为: ' + this.url);
        } else {
            console.log("WebSocket 不兼容的浏览器!");
            return;
        }
        const _this = this;
        // 为websocket各种事件定制自己的响应函数
        this.socket.onopen = function () {
            _this.open();
        };
        this.socket.onclose = function () {
            _this.close();
        };
        this.socket.onerror = function () {
            _this.error();
        };
        this.socket.onmessage = function (ev) {
            _this.dealMessage(ev);
        };
    }

    // websocket打开函数
    private open() {
        console.log("socket已打开");
        // 重连开始计数
        this.isActive = 1;
        // 如果开启了自动重连,意味着随时要检测心跳
        if (this.autoConnect) {
            this.detectHeartbeat();
        }
    }
    // websocket发生错误时的处理
    private error() {
        console.error('socket发生错误');

        this.socketClose();
    }
    // websocket关闭时的处理
    private close() {
        console.log('socket关闭');
        this.socketClose()
    }
    // 处理websocket心跳
    private detectHeartbeat() {
        // 当存在之前的心跳定时器时则先清理掉之前的心跳定时器
        if (this.heartbeatTimer) {
            // 清除原来的
            clearInterval(this.heartbeatTimer as NodeJS.Timeout);
            this.heartbeatTimer = null;
        }
        // 解决this指向问题,因为setInterval并不是使用的箭头函数
        const _this = this;
        _this.heartbeatTimer = setInterval(function () {
            // 当检查到websocket的状态是open的状态时,则发送心跳包
            if (_this.examOpen()) {
                // 发送心跳包
                _this.send<string>("SocketHeart", "ping");
                // 如果发送心跳包超过5次没有回应默认为服务器异常
                if(_this.noHeartBackNum > 5) {
                    clearInterval(_this.heartbeatTimer as NodeJS.Timeout);
                    _this.heartbeatTimer = null;
                    // 向上传递服务器异常,websocketErrorEventName为全局config的ws异常事件名
                    // 这里可以将websocketErrorEventName更改为自己的全局ws异常事件名,也可注释掉该行代码
                    Bus.on(websocketErrorEventName);
                }
            }
            // 否则默认ws异常关闭websocket
            else _this.socketClose();
        },
            // 心跳间隔时长
            _this.heartInterval * 1000
        );
    }

    // 格式化ws数据包
    /**
     * 本人常以
     * {
     *     action: 'ws处理的事件名称', //是添加好友,群消息,单聊消息,系统消息的区分
     *     data: 相应事件的数据包,
     * }
     * 为ws的数据格式,同样服务端的数据回执也是以本格式的上添加一些校验而已
      */
    /**
     * 格式化ws数据包
     * @param action ws处理的事件名称
     * @param data 相应事件的数据包
     * @private
     */
    private stringifyData(action: string, data: any) {
        const backData = {
            action,
            data: {
                // callback为了区分多个相同的action,但是返回的data数据包不同,自动增加的字段
                callback: action + "$--$" + parseInt((Math.random() * 1000).toString()),
                ...data,
            },
        };
        return JSON.stringify(backData);
    }

    /**
     * 发送ws消息
     * @param action ws处理的事件名称
     * @param data 相应事件的数据包
     * @param errorBack 发送数据后服务器返回错误的状态码时的处理函数,可不传递,此处需要前置的Bus事件中心的知识
     */
    public send<T = any>(action: string, data?: T, errorBack: any = undefined) {
        return new Promise<{action: string, callback: number}>((resolve, reject) => {
            // 先检查ws的状态
            if (this.examOpen()) {
                // 格式化数据包
                const sendData = this.stringifyData(action, data);
                // 解析数据包为下面的errorBack作准备
                const parseData = JSON.parse(sendData);
                // 发送数据包
                this.socket?.send(sendData);
                // 非心跳检查数据包时,打印发送的消息
                if(action !== 'SocketHeart') {
                    console.log('客户端发送的socket消息为: ', sendData);
                }
                // 是心跳检测试,暂时认为服务无回应,无回应次数递增,当服务有回应时则递减
                else this.noHeartBackNum ++;

                // 当传递了errorBack,且errorBack为函数时,说明希望自动处理状态码错误的返回
                if(errorBack && typeof errorBack === 'function') {
                    // 检查Bus事件中心中是否存在,防止之前已有的事件处理的干扰
                    const initiative = Bus.hasEvent(parseData.data.callback);
                    // 不存在是直接监听事件
                    if(!initiative) Bus.emit(parseData.data.callback, errorBack);
                    // 当本对象中不存在该事件的记录时,则加入自动装箱记录
                    if(!initiative && !this.isAutoEmitEvent(parseData.data.callback))
                        this.autoEmitEvents.push(parseData.data.callback);
                }
                // 当第三个参数传递错误时示警
                else if(errorBack) console.error("send方法的第三个参数必须是个函数");

                resolve({
                    action,
                    callback: parseData.data.callback
                });
            } else {
                console.error("socket 不在连接状态");
                // 关闭ws
                this.socketClose();
                reject();
            }
        });
    }
    // 检测ws状态是否正常
    public examOpen() {
        const active = this.socket && this.socket.readyState === WebSocket.OPEN;
        // 连接状态为打开状态时,则计为1
        if (active) this.isActive = 1;

        return active;
    }
    // ws发送管理时的处理
    private socketClose() {
        // 关闭心跳检查
        if (this.heartbeatTimer) {
            clearInterval(this.heartbeatTimer as NodeJS.Timeout);
            this.heartbeatTimer = null;
        }

        // 如果为自动重连且重连次数为超过最大重连次数时,继续执行重连
        if(this.autoConnect && this.isActive >= -1 * this.reconnectMax) {
            this.reconnect();
        }
        // 重连次数超过最大重连次数时,向上传递ws异常
        if(this.isActive < -1 * this.reconnectMax) {
            console.error("socket重连"+ this.reconnectMax +"次失败,自动关闭重连");
            // 向上提醒全局服务异常
            this.isExport && Bus.on(websocketErrorEventName);
            this.isExport = true;
        }
    }
    // 主动关闭ws
    public disconnect() {
        // 主动关闭ws意味着不需要自动重连,也不需要向上传递消息
        this.autoConnect = false;
        this.isActive = 0;
        this.examOpen() && this.socket?.close();
        this.socketClose();
        console.log('socket主动已断开');
    }
    // 重连处理函数
    private reconnect() {
        // 先关闭心跳检测
        if(this.heartbeatTimer) {
            clearInterval(this.heartbeatTimer);
            this.heartbeatTimer = null;
        }
        // 当当前不处于ws重连时
        if (!this.reconnectTimeOut) {
            const _this = this;
            // 自动重连定时器打开
            _this.reconnectTimeOut = setTimeout(function () {
                // 到了重连时间间隔ws还未open时,再次自动初始化ws
                if (!_this.examOpen()) {
                    if(_this.isActive > 0) _this.isActive = 0;
                    else _this.isActive--;

                    console.log("sokcet正在尝试重新连接");
                    // 自动重连几初始化ws
                    _this.initSocket();
                }
                // 关闭当前重连定时器
                clearTimeout(_this.reconnectTimeOut as NodeJS.Timeout);
                _this.reconnectTimeOut = null;
            }, this.reconnectInterval * 1000);
        }
    }

    /**
     * 消息处理,此本作者认为是最经典的东西
     * 即如何将ws推送过来的消息进行自动推送,延长消息的作用域范围,即为回调的作用域放大方式
     * 此处极其依赖于Bus事件中心的推送处理,如果ws推送的action比较少,则可以以switch,if...else if...else丰方式实现
     * @param event MessageEvent
     */
    public dealMessage(
        event: MessageEvent<{
            code: number;
            data: string;
            callback: string;
            action: string;
            msg: string
        }>
    ) {
        // 解析返回的消息
        /**
         * 此处依据http的设计方式一样,只不过多了callback,
         * callback是对应上次action的动作,如果多次调用相同的action,
         * 则有多个相同的action消息,不同的data返回,为了区分是那次的action,特地增加的callback
         * 一般前端传递啥callback,服务端即返回啥callback
         */
        const { code, data, callback, action, msg } = JSON.parse(event.data.toString());
        if(action === 'SocketHeart') {
            this.isActive = 1;
            // 心跳有返回,则直接归零计数
            this.noHeartBackNum = 0;
        }

        // 成功返回则向上推送相应的处理函数
        if (code == 200) {
            // 当存在事件时直接卸载事件
            this.autoUninstallEvent(callback, false);
            Bus.emit(action, data);
            return data;
        }
        // 失败返回则推送相应的错误处理函数
        else {
            Bus.emit(callback, msg);
            this.autoUninstallEvent(callback);
            return null;
        }
    }

    // 判断是否存在装入返回处理函数
    private isAutoEmitEvent(eventName: string) {
        const isAuto = this.autoEmitEvents.indexOf(eventName);
        return isAuto !== -1;
    }

    // 自动卸载返回处理函数
    private autoUninstallEvent(eventName: string, delay: boolean = true) {
        let timer: NodeJS.Timeout | null = setTimeout(() => {
            // Bus事件中心存在相应的处理事件,且暂存的自动装箱错误处理事件集存在
            if(Bus.hasEvent(eventName) && this.isAutoEmitEvent(eventName)) {
                // 销毁相应错误处理事件
                Bus.off(eventName);
                // 将本事件剔除出自动装箱错误处理事件集
                let newEventArr: string[] = [];
                this.autoEmitEvents.forEach(action => {
                    if(action !== eventName) newEventArr.push(action);
                })
                this.autoEmitEvents = newEventArr;
            }
            // 延时消除相应的错误处理事件
            if(timer) {
                clearTimeout(timer);
                timer = null;
            }
        }, delay? 300 : 0);

    }
}

export default Socket;

在vue3中使用

1、加入到vue原型链中

import { createApp } from 'vue';
import App from './App.vue';

socket = new Socket();
const app = createApp(App);
app.config.globalProperties.$Socket = socket;

2、在组件中使用

import {
  getCurrentInstance,
  reactive
} from 'vue';
const { proxy } = getCurrentInstance() as any;

const message = reactive<{
  uid: string,
  username: string,
  avatar: '',
  list: Array<MessageType>
}>({
  uid: '1',
  username: '1231',
  avatar: '/xxx/xxx.png',
  list: []
})

const SendUserMessageName = "sendUserMessage";
/* 普通不处理错误返回的函数调用 */
proxy.$Socket.send(SendUserMessageName , {
	uid: '1',
	message: '你好',
	type: 'text'
});
// 结合Bus事件中心一起使用,这里是正确返回的处理
proxy.$Bus.on(SendUserMessageName, (data: MessageType) => {
	// 相应的数据处理
	// 例如
	message.list.push(data);
})

/* 处理错误返回的函数调用 */
const SendSystemMessageName = "sendSystemMessage";
// 普通不处理错误返回的函数调用
proxy.$Socket.send(SendSystemMessageName , {
	money: 1.68,
	type: 'money',
	inOut: 1
}, (msg: string) => {
	// 错误的处理函数
});
// 结合Bus事件中心一起使用,这里是正确返回的处理
proxy.$Bus.on(SendSystemMessageName, (data: MessageType) => {
	// 相应的数据处理
	// 例如
	message.list.push(data);
})

3、主动关闭websocket

import {
  getCurrentInstance,
  reactive
} from 'vue';
const { proxy } = getCurrentInstance() as any;

proxy.$Socket.disconnect();

标签:重连,websocket,socket,private,ws,action,设计,推送,data
From: https://blog.csdn.net/weixin_47871072/article/details/140425128

相关文章

  • 领域驱动设计(DDD)的概述与应用
    个人名片......
  • 数据库课程设计——火车票售票系统
    数据库课程设计——火车票售票系统私信获取完整代码项目结构:具体功能:用户登录注册用户注册登陆后可以使用系统的所有功能,如添加乘客,购买车票,查询订单等等系统需要提供基础的列车信息查询:根据车次查询列车是否正常运行,以及查看列车的基本信息(如列车类型,始发站,终点站,开......
  • Day3 设计哈希集合
    classMyHashSet{  private:    vector<list<int>>data;    staticconstintbase=769;    staticinthash(intkey){      returnkey%base;    }public:  MyHashSet():data(base){}    voidadd(......
  • 基于SpringBoot+Vue+uniapp的邮件过滤系统的详细设计和实现(源码+lw+部署文档+讲解等)
    文章目录前言详细视频演示具体实现截图技术栈后端框架SpringBoot前端框架Vue持久层框架MyBaitsPlus系统测试系统测试目的系统功能测试系统测试结论为什么选择我代码参考数据库参考源码获取前言......
  • 基于SpringBoot+Vue+uniapp的美食推荐小程序的详细设计和实现(源码+lw+部署文档+讲解
    文章目录前言详细视频演示具体实现截图技术栈后端框架SpringBoot前端框架Vue持久层框架MyBaitsPlus系统测试系统测试目的系统功能测试系统测试结论为什么选择我代码参考数据库参考源码获取前言......
  • 基于SpringBoot+Vue+uniapp的生鲜食品订购的详细设计和实现(源码+lw+部署文档+讲解等)
    文章目录前言详细视频演示具体实现截图技术栈后端框架SpringBoot前端框架Vue持久层框架MyBaitsPlus系统测试系统测试目的系统功能测试系统测试结论为什么选择我代码参考数据库参考源码获取前言......
  • 设计模式之装饰模式(学习笔记)
    定义装饰模式(DecoratorPattern),又称为包装模式,是一种结构型设计模式。它允许在不改变现有对象结构的情况下,动态地添加新的功能。通过将每个功能封装在单独的装饰器类中,并且这些装饰器类通过引用原始对象来实现功能的组合,从而提供了灵活性和可扩展性的优势。装饰模式避免了通过继......
  • Java语言程序设计——篇四(1)
    类和对象面向对象概述面向过程与面向对象面向对象基本概念面向对象的基本特征面向对象的优势及应用为对象定义类类的修饰符成员变量成员变量-修饰符构造方法⭐️成员方法成员方法-修饰符例题讲解⚠️理解栈和堆面向对象概述两种程序设计方法结构化程序设计,典型代表......
  • Java语言程序设计——篇四(2)
    类和对象方法设计定义和使用方法访问方法和修改方法方法的调用方法参数的传递✨方法重载✨构造方法(构造器)......
  • 电商设计与AI技术的融合:美间的创新之路
    近年来,随着电商行业的飞速发展,商家们纷纷寻找新的方式来提高竞争力,降低成本,提高效率。在这一背景下,群核科技推出的美间·AI创意商拍工具应运而生,成为了电商商家们的新宠。从拼流量到拼AI美间·AI创意商拍工具的推出,使得电商商家只需一张白底图,通过一键换背景,即可生成超真实......