首页 > 其他分享 >WebSocket连接实现实时数据推送

WebSocket连接实现实时数据推送

时间:2023-12-28 10:55:06浏览次数:33  
标签:websocket socket userId 实时 new WebSocket message 推送 public

WebSocket连接实现实时数据推送

1、前端

1-1、webSocket.js

//暴露自定义websocket对象
export const socket = {
    //后台请求路径
    url: "",
    //websocket对象
    websocket: null,
    //websocket状态
    websocketState: false,
    //重新连接次数
    reconnectNum: 0,
    //重连锁状态,保证重连按顺序执行
    lockReconnect: false,
    //定时器信息
    timeout: null,
    clientTimeout: null,
    serverTimeout: null,
    //初始化方法,根据url创建websocket对象封装基本连接方法,并重置心跳检测
    initWebSocket(newUrl) {
      socket.url = newUrl;
      socket.websocket = new WebSocket(socket.url);
      socket.websocket.onopen = socket.websocketOnOpen;
      socket.websocket.onerror = socket.websocketOnError;
      socket.websocket.onclose = socket.websocketOnClose;
      this.resetHeartbeat()
    },
    reconnect() {
      //判断连接状态
      if (socket.lockReconnect) return;
      socket.reconnectNum += 1;
      //重新连接三次还未成功调用连接关闭方法
      if (socket.reconnectNum === 3) {
        socket.reconnectNum = 0;
        socket.websocket.onclose()
        return;
      }
      //等待本次重连完成后再进行下一次
      socket.lockReconnect = true;
      //5s后进行重新连接
      socket.timeout = setTimeout(() => {
        socket.initWebSocket(socket.url);
        socket.lockReconnect = false;
      }, 5000);
    },
    //重置心跳检测
    resetHeartbeat() {
      socket.heartbeat();
    },
    //心跳检测
    heartbeat() {
      socket.clientTimeout = setTimeout(() => {
        if (socket.websocket) {
          //向后台发送消息进行心跳检测
          socket.websocket.send(JSON.stringify({ type: "heartbeat" }));
          socket.websocketState = false;
          //一分钟内服务器不响应则关闭连接
          socket.serverTimeout = setTimeout(() => {
            if (!socket.websocketState) {
              socket.websocket.onclose()
            } else {
              this.resetHeartbeat()
            }
          }, 60 * 1000);
        }
      }, 3 * 1000);
    },
    //发送消息
    sendMsg(message) {
      socket.websocket.send(message);
    },
    websocketOnOpen(event) {
      //连接开启后向后台发送消息进行一次心跳检测
      console.log(event);
      socket.sendMsg(JSON.stringify({ type: "heartbeat" }));
      socket.sendDataRequest();
    },
    websocketOnError(error) {
      console.log(error);
      socket.reconnect();
    },
    websocketOnClose() {
      socket.websocket.close();
    },
  };

1-2、组件

<script>
import { socket } from "@/request/websocket";

export default {
  data() {
        return {
          tableData: [],
          loading: true,
          websocketCount: -1,
          //查询条件
          queryCondition: {
            type: "message",
          },
        }
  },
  created(){
    socket.initWebSocket(
      `ws://localhost:9999/notice/` +"guest"  //要连接的socket服务器地址,以及连接用户名
    );
    //绑定接收消息方法
    socket.websocket.onmessage = this.websocketOnMessage;
  },
  methods:{
    timestampToDate(time){
      console.log(time);
      return new Date(time).toLocaleString();
    },


    init() {
      this.queryCondition.type = "message";
      socket.sendMsg(JSON.stringify(this.queryCondition));
    },


    websocketOnMessage(event) {
      //初始化界面时,主动向后台发送一次消息,获取数据
      this.websocketCount += 1;
      if (this.websocketCount === 0) {
        this.init();
      }
      console.log(event.data);
      let info = JSON.parse(event.data);
      console.log(info);
      switch (info.type) {
        case "heartbeat":
          socket.websocketState = true;
          break;
        case "message":
          this.loading = true;
          this.$nextTick(() => {
            this.consumeMessage(info);
          })
          break;
        case "error":
          this.loading = false;
          break;
      }
    },
    consumeMessage(info) {
      //拿到最新数据重新渲染界面
      var resp = JSON.parse(info.content);
      if(resp == this.tableData){
        return;
      }else{
        this.tableData=[];  
        this.tableData=resp;  
      }
    },
  }
}
</script>

2、后端

2-1、编写配置类

跨域配置(统一设定)

@Configuration
public class WebConfig implements WebMvcConfigurer {
    @Override
    public void addCorsMappings(CorsRegistry registry) {
        registry.addMapping("/**")
                .allowedOrigins("*")
                .allowedMethods("*")
                .allowedHeaders("*");
    }
}

webSocket配置

/**
 * @author LiuJunDong
 * @ ClassName WebSocketConfig
 * @ description: TODO
 * @ date 2023-10-17
 * @ version: 1.0
 */
@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }
}

2-2、编写Socket服务

@Slf4j
@Component
@ServerEndpoint("/notice/{userId}")
public class WebSocketServer {
    /**
     * 解决无法注入bean:定义静态service对象,通过@Autowired在系统启动时为静态变量赋值
     * @ Autowired 注解作用在方法上,如果方法没有参数,spring容器会在类加载完后执行一次这个方法,
     * 如果方法中有参数的话,还会从容器中自动注入这个方法的参数,然后执行一次这个方法。
     */
    public static DataService dataService;

    @Autowired
    public void setXxService(DataService dataService) {
        WebSocketServer.dataService = dataService;
    }

    /**
     * 存储客户端session信息
     */
    public static Map<String, Session> clients = new ConcurrentHashMap<>();

    /**
     * 存储把不同用户的客户端session信息集合
     */
    public static Map<String, Set<String>> connection = new ConcurrentHashMap<>();

    /**
     * 会话id
     */
    private String sid = null;

    /**
     * 建立连接的用户id
     */
    private String userId;

    /**
     * @ description: 当与前端的websocket连接成功时,执行该方法
     * @ PathParam 获取ServerEndpoint路径中的占位符信息类似 控制层的 @PathVariable注解
     **/
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId) {
        this.sid = UUID.randomUUID().toString();
        this.userId = userId;
        clients.put(this.sid, session);
        //判断该用户是否存在会话信息,不存在则添加
        Set<String> clientSet = connection.get(userId);
        if (clientSet == null) {
            clientSet = new HashSet<>();
            connection.put(userId, clientSet);
        }
        clientSet.add(this.sid);
        log.info(this.userId + "用户建立连接," + this.sid + "连接开启!");
    }

    /**
     * @ description: 当连接失败时,执行该方法
     **/
    @OnClose
    public void onClose() {
        clients.remove(this.sid);
        log.info(this.sid + "连接断开");
    }

    /**
     * @ description: 当收到前台发送的消息时,执行该方法
     **/
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("收到来自用户:" + this.userId + "的信息   " + message);
        //自定义消息实体
        ViewQueryInfoDTO viewQueryInfoDTO = JSON.parseObject(message, ViewQueryInfoDTO.class);
        viewQueryInfoDTO.setUserId(this.userId);
        //判断该次请求的消息类型是心跳检测还是获取信息
        if ("heartbeat".equals(viewQueryInfoDTO.getType())) {
            log.info("我在正常运行");
            //立刻向前台发送消息,代表后台正常运行
            sendMessageByUserId(this.userId, new MessageInfo("heartbeat", "ok"));
        }
        if ("message".equals(viewQueryInfoDTO.getType())) {
            log.info("我来取数据了");
            //执行业务逻辑
            MessageInfo messageInfo = dataService.list(viewQueryInfoDTO);
            sendMessageByUserId(this.userId, messageInfo);
        }
    }

    /**
     * @ description: 当连接发生错误时,执行该方法
     **/
    @OnError
    public void one rror(Throwable error) {
        log.info("系统错误");
        error.printStackTrace();
    }

    /**
     * @ description: 通过userId向用户发送信息
     * 该类定义成静态可以配合定时任务实现定时推送
     **/
    public static void sendMessageByUserId(String userId, MessageInfo message) {
        if (!StringUtils.isEmpty(userId)) {
            Set<String> clientSet = connection.get(userId);
            //用户是否存在客户端连接
            if (Objects.nonNull(clientSet)) {
                Iterator<String> iterator = clientSet.iterator();
                while (iterator.hasNext()) {
                    String sid = iterator.next();
                    Session session = clients.get(sid);
                    //向每个会话发送消息
                    if (Objects.nonNull(session)) {
                        try {
                            String jsonString = JSON.toJSONString(message);
                            //同步发送数据,需要等上一个sendText发送完成才执行下一个发送
                            session.getBasicRemote().sendText(jsonString);
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }
}

3、方式一:前端定时器

//在websocket.js里添加定时器
     sendDataRequest(){
       socket.sendMsg(JSON.stringify({ type: "message" }));
       // 每隔2秒发送一次数据请求
       setTimeout(() => {
         this.sendDataRequest();
       }, 2000);
     }

//在websocketOnOpen里添加
socket.sendDataRequest();

4、方式二:后端处理后实时推送

这里以定时器模拟将数据处理结束之后,向客户端推送数据。

@Slf4j
@Component
@EnableScheduling
public class SendDataTask {

    @Autowired
    private DataService dataService ;

    @Scheduled(fixedRate = 5000)
    @Async
    public void sendMessageToClient(){
        log.info("定时任务:发送数据");
        try {
            TimeUnit.SECONDS.sleep(5);
            MessageInfo messageInfo = dataService.list(new ViewQueryInfoDTO());
            sendMessageByUserId("guest",messageInfo);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

模拟一个数据集(实际是从数据库取的)

/**
 * @author LiuJunDong
 * @ ClassName DataService
 * @ description: TODO
 * @ date 2023-10-17
 * @ version: 1.0
 */
@Service
public class DataService {
    public static CopyOnWriteArrayList<Device> list = new CopyOnWriteArrayList<>();
    public static AtomicInteger atomicInteger = new AtomicInteger(5);

    static {
        Device device = new Device("1", "博联", System.currentTimeMillis());
        Device device2 = new Device("2", "博联2", System.currentTimeMillis());
        Device device3= new Device("3", "博联3", System.currentTimeMillis());
        Device device4 = new Device("4", "博联4", System.currentTimeMillis());
        list.add(device);
        list.add(device2);
        list.add(device3);
        list.add(device4);
    }
    public MessageInfo list(ViewQueryInfoDTO viewQueryInfoDTO){
        String s = Integer.valueOf(atomicInteger.incrementAndGet()).toString();
        list.add(new Device(s,"博联"+s,System.currentTimeMillis()));
        String jsonString = JSONObject.toJSONString(list);
        return new MessageInfo("message","ok",jsonString);
    }
}

集合最好采用线程安全的,防止多线程环境下出现问题。

标签:websocket,socket,userId,实时,new,WebSocket,message,推送,public
From: https://www.cnblogs.com/jundong2177/p/17932244.html

相关文章

  • 音视频本地推送标题不显示昵称的排查方法
    1、本地推送标题优先选择呼叫发起方设置的pushConfig,所以首先要确定发起方pushConfig的设置是否为昵称;CallKit的默认设置为[RCIMsharedRCIM].currentUserInfo中的name,如果发起方没有设置currentUserInfo对应的内容,则name默认值为user;对应源码位置:2、如果您修改了C......
  • 获得JD商品评论 API 如何实现实时数据获取
    一、背景介绍随着互联网的快速发展,电商平台如雨后春笋般涌现,其中京东(JD)作为中国最大的自营式电商平台之一,拥有庞大的用户群体和丰富的商品资源。为了更好地了解用户对商品的反馈,京东开放了商品评论的API接口,允许开发者实时获取商品评论数据。本文将介绍如何通过JD商品评论API实现实......
  • 实时风控预警平台:架构设计之精髓
    1.背景介绍实时风控预警平台是一种基于大数据技术的应用,主要用于实时监控和预警各种风险事件。在当今的数字化时代,数据量不断增加,风险事件也变得更加复杂和快速。因此,实时风控预警平台的重要性不断提高,成为企业和组织的核心需求。实时风控预警平台的核心功能包括数据收集、数据处理......
  • 【HMS Core】推送问题小集合
    ​【问题描述1】“一个应用订阅的主题数量不能超过2000个”,如果超过了,会出现什么情况,如何解决?【解决方案】主题数量上限是2000,超过后会导致订阅主题失败。可以尝试删除不需要的主题。https://developer.huawei.com/consumer/cn/doc/HMSCore-References/topic-delete-api-00000......
  • 基于开源模型搭建实时人脸识别系统(六):人脸识别(人脸特征提取)
    目录人脸识别的几个发展阶段基于深度学习的人脸识别技术的流程闭集和开集(Openset)识别人脸识别的损失Insightface人脸识别数据集模型选型参考文献结语人脸识别系统项目源码前面我们讲过了人脸检测、人脸质量、人脸关键点、人脸跟踪,接下来就是人脸识别系统里面的重中之重人脸识别......
  • 淘宝/天猫商品API:实时数据获取与安全隐私保护的指南
    一、引言随着电子商务的快速发展,淘宝/天猫等电商平台已成为商家和消费者的重要交易场所。对于电商企业而言,实时掌握店铺商品的销售情况、库存状态等信息至关重要。然而,手动管理和更新商品信息既费时又费力。因此,淘宝/天猫提供的商品API成为商家实时获取商品数据的关键工具。本文将......
  • 记一下在IIS中部署WebSocket服务的经验
    因业务需求需要使用长连接推送数据,这边选择使用.NET框架原生支持的WebSocket技术。 一、版本要求 对于IIS的版本必须是IIS8.0及以上 .NETFramework版本必须为4.5及以上PS:低于上述版本,需要自己实现协议或寻找第三方实现 二、IIS配置在服务器IIS上安装“WebSoc......
  • Qt/C++音视频开发61-多屏渲染/一个解码渲染到多个窗口/画面实时同步
    一、前言多屏渲染就是一个解码线程对应多个渲染界面,通过addrender这种方式添加多个绘制窗体,我们经常可以在展会或者卖电视机的地方可以看到很多电视播放的同一个画面,原理应该类似,一个地方负责打开解码播放,将画面同步传输到多个显示的地方,完全保证了画面的一致性。这样相当于复用......
  • Gitlab仓库推送到Gitee仓库的一种思路
    文章目录Gitlab仓库推送到Gitee仓库的一种思路1、创建Gitee的ssh公钥(默认已有Gitlab的ssh公钥)2、添加Gitlab远程仓库地址3、添加Gitee远程仓库地址4、拉取Gitlab远程仓库指定分支到本地仓库指定分支(以test分支为例)5、推送本地仓库指定分支到Gitee远程仓库指定分支(以test分支为例)6......
  • unigui显示websocket服务端向客户端发送信息【15】
    用WebSocket从服务端直接发送消息给all客户端。1、在ServerModule放TUniThreadTimer 2、timerevent:procedureTUniServerModule.UniThreadTimer1Timer(Sender:TObject);beginBroadcastMessage('update',[......