首页 > 编程语言 >java整合sse

java整合sse

时间:2024-11-11 17:18:33浏览次数:6  
标签:java SSE 服务器 整合 messageId sse import 连接 客户端

目录

sse工作原理

 SSE 的优缺点

优点:

缺点:

Servlet 方式的基本逻辑

SSE 与 WebSocket 的对比

示例代码

总结


sse工作原理

SSE 的工作原理基于 HTTP 协议,它通过以下几个步骤完成:

  1. 客户端请求:客户端通过 GET 请求来开启 SSE 通道。客户端的请求必须设置适当的请求头,告诉服务器它希望接收 SSE 数据流。

  2. 服务器响应:服务器响应时设置适当的 HTTP 头信息,告诉客户端它将发送一个事件流。然后,服务器保持这个连接,定期推送数据(事件)给客户端。

  3. 数据流传输:服务器通过持续的 HTTP 连接,将数据以事件流的形式发送给客户端。每个事件数据块会以 data: 开头,之后是数据内容。事件之间通常以 \n\n(两个换行符)分隔。

  4. 客户端处理:客户端通过 JavaScript 监听 SSE 事件流,并在收到数据时进行相应的处理。

 SSE 的优缺点

优点:
  • 简单:相比 WebSocket,SSE 实现起来相对简单,尤其适合只需要服务器推送数据的场景。
  • 基于 HTTP:使用标准的 HTTP 协议,不需要额外的协议或库支持,因此很容易部署和维护。
  • 自动重连:如果连接断开,客户端会自动尝试重新连接。
缺点:
  • 单向通信:只支持服务器到客户端的数据传输,无法实现双向通信(如 WebSocket)。
  • 浏览器支持:虽然现代浏览器都支持 SSE,但 Internet Explorer 和早期版本的 Edge 不支持。

Servlet 方式的基本逻辑

  1. 客户端通过 HTTP 请求访问服务器的一个特定 URL(如 /sse)。
  2. 服务器响应时设置 Content-Type 为 text/event-stream,并保持连接不关闭。
  3. 服务器通过该连接持续推送数据,每个数据块以 data: 开头,并用两个换行符分隔。
  4. 客户端通过 JavaScript 监听这些数据并更新页面。

SSE 与 WebSocket 的对比

虽然 SSEWebSocket 都可以用于实时通信,但它们各自的特点适用于不同的场景:

特性SSEWebSocket
连接方式单向通信,客户端从服务器接收数据双向通信,客户端和服务器都可以发送和接收数据
协议基于 HTTP(默认端口 80 或 443)独立协议,基于 TCP(默认端口 80 或 443)
客户端支持大多数现代浏览器支持,IE 不支持大多数现代浏览器支持
传输延迟通常较低,适合推送消息延迟通常较低,适合实时互动
应用场景服务器推送数据到客户端,如实时通知、股票行情等双向实时互动,如多人游戏、聊天室等

示例代码

maven依赖

<dependencies>
    <!-- Spring Boot Starter Web 依赖(包含了Servlet API、Tomcat等) -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>

controller层

package com.ybk.common.controller;

import com.ybk.common.api.ApiResult;
import com.ybk.common.api.Result;
import com.ybk.common.core.sse.SseServer;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

/**
 * @author 
 * @date 
 */
@RestController
@RequestMapping("/pos/sse")
public class SseController {
    /**
     * 用户SSE连接
     * 它返回一个SseEmitter实例,这时候连接就已经创建了.
     *
     * @return
     */
    @GetMapping(value = "/connect",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter connect(@RequestParam String snNo) {
        return SseServer.createConnect(snNo);
    }

    @GetMapping("/sendMessage")
    public Result<?> sendMessage(@RequestParam String snNo,@RequestParam String message) {
        /**
         * 一般取登录用户账号作为 messageId。分组的话需要约定 messageId的格式。
         * 这里模拟创建一个用户连接
         */
        SseServer.sendMessage(snNo,message);
        return ApiResult.success();
    }
}
SseServer
package com.ybk.common.core.sse;

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

/**
 * @author 
 * @date 
 */
@Slf4j
public class SseServer {
    /**
     * 当前连接总数
     */
    private static AtomicInteger currentConnectTotal = new AtomicInteger(0);

    /**
     * messageId的 SseEmitter对象映射集
     */
    private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();


    public static SseEmitter createConnect(String messageId) {
        SseEmitter sseEmitter = new SseEmitter(0L);
        sseEmitter.onCompletion(completionCallBack(messageId));
        sseEmitter.onTimeout(timeOutCallBack(messageId));
        sseEmitter.onError(errorCallBack(messageId));
        sseEmitterMap.put(messageId, sseEmitter);
        currentConnectTotal.incrementAndGet();
        return sseEmitter;
    }


    public static void sendMessage(String messageId, String message) {
        if (sseEmitterMap.containsKey(messageId)) {
            try {
                sseEmitterMap.get(messageId).send(message);
            } catch (IOException e) {
                log.error("发送消息异常 ==> messageId={}, 异常信息:", messageId, e.getMessage());
                throw new RuntimeException(e);
            }
        } else {
            throw new RuntimeException("连接不存在或者超时, messageId=" + messageId);
        }
    }

    /**
     * 移除 MessageId
     *
     * @param messageId
     */
    public static void removeMessageId(String messageId) {
        sseEmitterMap.remove(messageId);
        //数量-1
        currentConnectTotal.getAndDecrement();
        log.info("remove messageId={}", messageId);
    }

    /**
     * 断开SSE连接时的回调
     *
     * @param messageId
     * @return
     */
    private static Runnable completionCallBack(String messageId) {
        return () -> {
            log.info("结束连接 ==> messageId={}", messageId);
            removeMessageId(messageId);
        };
    }

    /**
     * 连接超时时回调触发
     *
     * @param messageId
     * @return
     */
    private static Runnable timeOutCallBack(String messageId) {
        return () -> {
            log.info("连接超时 ==> messageId={}", messageId);
            removeMessageId(messageId);
        };
    }

    /**
     * 连接报错时回调触发。
     *
     * @param messageId
     * @return
     */
    private static Consumer<Throwable> errorCallBack(String messageId) {
        return throwable -> {
            log.error("连接异常 ==> messageId={}", messageId);
            removeMessageId(messageId);
        };
    }
}

总结

  1. SSE 是一种服务器推送技术,通过 HTTP 协议将数据流传输到客户端,适合用于实时更新场景。
  2. Java Servlet 和 Spring Boot 都可以很方便地实现 SSE,分别使用标准的 HttpServlet 和 SseEmitter 类。
  3. SSE 的优点是实现简单,且基于 HTTP 协议,但不支持双向通信;相比之下,WebSocket 支持双向通信,适用于更复杂的实时互动场景。

标签:java,SSE,服务器,整合,messageId,sse,import,连接,客户端
From: https://blog.csdn.net/m0_67852515/article/details/143690575

相关文章

  • 深入理解Java对象结构
    一、Java对象结构实例化一个Java对象之后,该对象在内存中的结构是怎么样的?Java对象(Object实例)结构包括三部分:对象头、对象体和对齐字节,具体下图所示1、Java对象的三部分(1)对象头对象头包括三个字段,第一个字段叫作MarkWord(标记字),用于存储自身运行时的数据,例如GC标志位......
  • Java-关于final关键字不得不知道的几大特点
    final-最终的修饰类:表示类不可被继承修饰方法:表示方法不可被子类覆盖,但是可以重载修饰变量:表示变量一旦被赋值就不可以更改它的值。(1)修饰成员变量如果final修饰的是类变量,只能在静态初始化块中指定初始值或者声明该类变量时指定初始值。如果final修饰的是成员变量,可......
  • Java流程控制-break,continue,goto
    breakcontinuegotobreakbreak在任何循环语句的主体部分,均可用break控制循环的流程。break用于强行退出循环,不执行循环中剩余的语句(break语句也在switch语句中使用)代码:publicstaticvoidmain(String[]args){inti=0;while(i<100){i++;......
  • Java概述与第一个程序,及JDK环境的安装
    开始学习java,我们需要对java有一个基本的了解,以及设置相关的开发环境一.为什么选择JavaJava有一个显著的优势就是用途广泛服务器后端:银行,证券交易平台,电子商务后台....Android应用:安卓手机,各种移动设备,智能家电…大数据技术:Hadoop以及其他大数据处理技术都是......
  • Java集合基础——针对实习面试
    目录Java集合基础什么是Java集合?说说List,Set,Queue,Map的区别?说说List?说说Set?说说Map?说说Queue?为什么要用集合?如何选用集合?Java集合基础什么是Java集合?Java集合(JavaCollections)是Java中提供的一种容器,用于存储和管理一组对象。Java集合框架(JavaCollections......
  • 【老白学 Java】人以群分,物以类聚
    人以群分,物以类聚文章来源:《HeadFirstJava》修炼感悟。上一章,Guess想必让师兄们留下「深刻」印象,也见到了OOP应有的样子。本章老白继续学习Java变量相关的知识,感兴趣的师兄可以继续往下看。一、变量的分类Java的数据类型可分为两大类:基本数据类型引用数据类......
  • visualvm远程连接Docker容器中部署的java应用并监控
    visualvm远程连接Docker容器中部署的java应用前言jdk1.8中自带了,java11中需要单独下载下载地址visualvm下载地址简介java虚拟机监控,故障排查及性能分析工具。网络配置局域网与docker内网打通,请参考:办公网络与Docker内网网络互通服务于网络服务名称节点IPj......
  • 将学习型索引ALEX的cmake项目在虚拟机上用java运行
    一、环境配置虚拟机:Centos7gcc-v:11.2.1java-version:1.8.0 二、ALEX实现步骤   1、安装c++输入命令sudoapt-getinstallg++出错sudo:apt-get:找不到命令原因:Centos7中用yum命令下载再次输入命令sudoyuminstallg++再次报错已加载插件:fastestmirror,l......
  • Java 日期 API
    JDK8之前日期与时间戳之间的转换publicclassTest{publicstaticvoidmain(String[]args){Datedate=newDate();System.out.println("date="+date);//date=SunSep2614:48:52CST2021Datedate1=newDate(1632638970000L......
  • Java爬虫:京东商品SKU信息的“偷心盗贼”
    在这个信息爆炸的时代,数据就像是藏在深山里的宝藏,等待着我们这些“数据探险家”去发掘。今天,我们要化身为一名“偷心盗贼”,用Java这把锋利的“剑”,精准地从京东的海洋中窃取商品的SKU信息。别担心,我们这是合法的“盗窃”,因为我们只是在学习如何用代码与网页对话。环境准备:装......