首页 > 其他分享 >记录springboot的一次使用socketio的经历

记录springboot的一次使用socketio的经历

时间:2023-12-08 10:00:15浏览次数:44  
标签:springboot 记录 socketio String client socket public 客户端

pom中加入依赖

        <dependency>
            <groupId>com.corundumstudio.socketio</groupId>
            <artifactId>netty-socketio</artifactId>
            <version>2.0.6</version>
        </dependency>

        <dependency>
            <groupId>io.socket</groupId>
            <artifactId>socket.io-client</artifactId>
            <version>2.1.0</version>
        </dependency>

netty socketio 配置信息

# netty-socketio 配置
socketio:
  host: 127.0.0.1
  port: 8889
  contextPath: /mwapi/ws/spl
  # 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器
  maxFramePayloadLength: 1048576
  # 设置http交互最大内容长度
  maxHttpContentLength: 1048576
  # socket连接数大小(如只监听一个端口boss线程组为1即可)
  bossCount: 1
  workCount: 100
  allowCustomRequests: true
  # 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间
  upgradeTimeout: 1000000
  # Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
  pingTimeout: 6000000
  # Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
  pingInterval: 25000

java socketio config配置

@Configuration
public class SocketIOConfig {

    @Value("${socketio.host}")
    private String host;

    @Value("${socketio.port}")
    private Integer port;

    @Value("${socketio.contextPath}")
    private String contextPath;

    @Value("${socketio.bossCount}")
    private int bossCount;

    @Value("${socketio.workCount}")
    private int workCount;

    @Value("${socketio.allowCustomRequests}")
    private boolean allowCustomRequests;

    @Value("${socketio.upgradeTimeout}")
    private int upgradeTimeout;

    @Value("${socketio.pingTimeout}")
    private int pingTimeout;

    @Value("${socketio.pingInterval}")
    private int pingInterval;

    @Bean
    public SocketIOServer socketIOServer() {
        SocketConfig socketConfig = new SocketConfig();
        socketConfig.setSoLinger(0);
        socketConfig.setReuseAddress(true);

        com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
        config.setSocketConfig(socketConfig);
        config.setHostname(host);
        config.setPort(port);
        // config.setContext(contextPath);

        config.setBossThreads(bossCount);
        config.setWorkerThreads(workCount);
        config.setAllowCustomRequests(allowCustomRequests);
        config.setUpgradeTimeout(upgradeTimeout);
        config.setPingTimeout(pingTimeout);
        config.setPingInterval(pingInterval);
        config.setOrigin("*");
        SocketIOServer socketIOServer = new SocketIOServer(config);
        socketIOServer.addNamespace("/mynamespace");

        return socketIOServer;
    }

    @Bean
    public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
        return new SpringAnnotationScanner(socketServer);
    }
}

java server 通用代码编写

@Component
public class MessageEventHandler {

    private static final Logger logger = LoggerFactory.getLogger(MessageEventHandler.class);

    /**
     * 服务器socket对象
     */
    public SocketIOServer socketIoServer;

    /**
     * 客户端集合
     */
    public List<String> listClient = new CopyOnWriteArrayList<>();

    public SocketInstance socketInstance = SocketInstance.getSocketInstance();

    /**
     * 超时时间
     */
    static final int limitSeconds = 60;

    /**
     * 初始化消息事件处理器
     *
     * @param server 服务器socket对象
     */
    @Autowired
    public MessageEventHandler(SocketIOServer server) {
        logger.info("初始化SOCKET消息事件处理器");
        this.socketIoServer = server;
    }

    /**
     * 客户端发起连接时触发
     *
     * @param client 客户端Socket对象信息
     */
    @OnConnect
    public void onConnect(SocketIOClient client) {
        logger.info("客户端{}已连接", client.getSessionId());
        String sessionId = getSessionId(client);
        listClient.add(sessionId);
        socketInstance.insertSocketClient(sessionId, client);
        //向前端发送接收数据成功标识
        client.sendEvent("connect_success", "已经成功连接");

    }

    /**
     * 客户端断开连接时触发
     *
     * @param client 客户端Socket对象信息
     */
    @OnDisconnect
    public void onDisconnect(SocketIOClient client) {
        logger.info("客户端{}断开连接", client.getSessionId());
        String sessionId = getSessionId(client);

        listClient.remove(sessionId);
        socketInstance.remoteClient(sessionId);

    }


    /**
     * 客户端发送消息时触发
     *
     * @param client  客户端Socket对象信息
     * @param request AckRequest 回调对象
     * @param data    消息信息实体
     */
    @OnEvent(value = SocketConstants.SocketEvent.MESSAGE)
    public void onEvent(SocketIOClient client, AckRequest request, String data) {
        System.out.println("发来消息:" + data);
        request.sendAckData("服务端已收到");
        client.sendEvent("messageevent", "back data");
        //socketIoServer.getClient(client.getSessionId()).sendEvent("messageevent", "back data");
    }

    @OnEvent(value = SocketConstants.SocketEvent.BROADCAST)
    public void onEventByBroadcast(SocketIOClient client, AckRequest request, String data) {
        System.out.println("发来消息:" + data);
        request.sendAckData("服务端-广播事件已收到");
        client.sendEvent(SocketConstants.SocketEvent.BROADCAST, "广播事件 " + DateUtil.now());
        //socketIoServer.getClient(client.getSessionId()).sendEvent("messageevent", "back data");
    }


    /**
     * 广播消息 函数可在其他类中调用
     */
    public void sendBroadcast(byte[] data) {
        //向已连接的所有客户端发送数据,map实现客户端的存储
        for (SocketIOClient client : socketInstance.getClientSocketAll().values()) {
            if (client.isChannelOpen()) {
                client.sendEvent("message_event", data);
            }
        }
    }

    /**
     * 获取客户端的session Id
     *
     * @param client: 客户端
     */
    private String getSessionId(SocketIOClient client) {
        return client.getSessionId().toString();

    }

    /**
     * 获取连接的客户端ip地址
     *
     * @param client: 客户端
     * @return 获取连接的客户端ip地址
     */
    private String getIpByClient(SocketIOClient client) {
        String sa = client.getRemoteAddress().toString();
        return sa.substring(1, sa.indexOf(":"));
    }

}

自定义命名空间,事件处理

@Slf4j
@Component
public class MyNamespaceHandler {
    //测试使用
    @OnEvent("message")
    public void testHandler(SocketIOClient client, String data, AckRequest ackRequest) throws JsonProcessingException {

        log.info("SplSearch:{}", data);

        if (ackRequest.isAckRequested()) {
            //返回给客户端,说我接收到了
            ackRequest.sendAckData("SplSearch", data);
        }

    }
}

项目启动加载并且项目关闭时关闭socket io

可以使用springboot方式也可以使用spring的方式,这里方式很多根据自己喜好来定
如果使用spring方式来处理可以实现:ApplicationListener<...>
实现类同时希望拥有启动加载和关闭销毁两个功能,可以这样做

  1. spring 实现应用监听
@Component
public class MyApplicationListener implements ApplicationListener<ApplicationEvent> {

    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        if (event instanceof ContextRefreshedEvent) {
            // 处理应用程序启动事件
            System.out.println("Application Started");
        } else if (event instanceof ContextClosedEvent) {
            // 处理应用程序关闭事件
            System.out.println("Application Closed");
        }
    }
}
  1. springboot方式
@Slf4j
@Component
@Order(value = 1)
public class MyCommandLineRunner implements CommandLineRunner, DisposableBean {

    private final SocketIOServer server;

    private final MyNamespaceHandler myNamespaceHandler;

    @Autowired
    public MyCommandLineRunner(SocketIOServer server, MyNamespaceHandler myNamespaceHandler) {
        this.myNamespaceHandler = myNamespaceHandler;
        this.server = server;
        System.out.println("初始化MyCommandLineRunner");
    }

    @Override
    public void run(String... args) {
        try {
            server.getNamespace("/mynamespace").addListeners(myNamespaceHandler);
            server.start();
            System.out.println("socket.io启动成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    @Override
    public void destroy() {
        //如果用kill -9  这个监听是没用的,有可能会导致你服务kill掉了,但是socket服务没有kill掉
        server.stop();
        log.info("SocketIOServer==============================关闭成功");
    }
}

唯一socket实例

public class SocketInstance {

    /**
     * 客户端Socket连接对象容器
     */
    private static Map<String, SocketIOClient> socketClients = null;

    /**
     * 私有构造
     */
    private SocketInstance() {
        //从缓存中获取socketClients
        socketClients = new HashMap<>();
    }

    /**
     * 定义一个私有的内部类,在第一次用这个嵌套类时,会创建一个实例。而类型为SocketInstanceHolder的类,
     * 只有在SocketInstance.getSocketInstance()中调用,
     * 由于私有的属性,他人无法使用SocketInstanceHolder,不调用SocketInstance.getSocketInstance()就不会创建实例。
     * 优点:达到了lazy loading的效果,即按需创建实例。
     * 无法适用于分布式集群部署
     */
    private static class SocketInstanceHolder {
        /**
         * 创建全局唯一实例
         */
        private final static SocketInstance instance = new SocketInstance();
    }

    /**
     * 获取全局唯一实例
     *
     * @return SocketInstance对象
     */
    public static SocketInstance getSocketInstance() {
        return SocketInstanceHolder.instance;
    }

    /**
     * 新增客户端连接到容器
     *
     * @param encode         设备En号
     * @param socketIOClient 客户端socket对象
     */
    public void insertSocketClient(String encode, SocketIOClient socketIOClient) {
        socketClients.put(encode, socketIOClient);
    }

    /**
     * 获取客户端Socket对象
     *
     * @param encode 设备encode
     * @return 客户端Socket对象
     */
    public SocketIOClient getClientSocket(String encode) {
        return socketClients.get(encode);
    }

    /**
     * 获取所有客户端Socket对象
     *
     * @return 客户端Socket对象
     */
    public Map<String, SocketIOClient> getClientSocketAll() {
        return socketClients;
    }


    /**
     * 删除客户端
     * @param sessionId 客户端的id
     */
    public void remoteClient(String sessionId) {
        SocketIOClient oldSocketIOClient = socketClients.get(sessionId);
        if (oldSocketIOClient != null) {
            try {
                //关闭客户端连接
                oldSocketIOClient.disconnect();
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
        socketClients.remove(sessionId);
    }
}

public class SocketConstants {

    /**
     * Socket事件类
     */
    public class SocketEvent {

        /**
         * 效验连接状况
         */
        public static final String HEALTH_CHECK = "HEALTH_CHECK";

        /**
         * 消息接收事件名称
         */
        public static final String MESSAGE = "message";

        public static final String BROADCAST = "broadcast";


    }
}

客户端代码

默认没有命名空间的,需要命名空间在url后面加/xxx

public class SocketIOClientLaunch {
    public static void main(String[] args) {
        // 服务端socket.io连接通信地址
        String url = "http://127.0.0.1:8889";
        try {
            IO.Options options = new IO.Options();

            options.transports = new String[]{"websocket"};
            options.reconnectionAttempts = 2;
            // 失败重连的时间间隔
            options.reconnectionDelay = 1000;
            // 连接超时时间(ms)
            options.timeout = 500;
            // userId: 唯一标识 传给服务端存储
            final Socket socket = IO.socket(url + "?userId=1", options);

            socket.on(Socket.EVENT_CONNECT, args1 -> socket.send("hello..."));

            // 自定义事件`connected` -> 接收服务端成功连接消息
            socket.on(SocketConstant.CONNECTION, objects -> {
                int length = objects.length;
                log.info("服务端自定义事件`connected`:" + objects[0].toString());
            });

            // 自定义事件`push_data_event` -> 接收服务端消息
            socket.on(SocketConstant.PUSH_DATA_EVENT, objects -> log.info("服务端自定义`push_data_event`:" + objects[0].toString()));

            // 自定义事件`myBroadcast` -> 接收服务端广播消息
            socket.on(SocketConstant.BROADCAST, objects -> log.info("服务端广播消息:" + objects[0].toString()));

            socket.connect();

            while (true) {
                Thread.sleep(3000);
                // 自定义事件`push_data_event` -> 向服务端发送消息
                socket.emit(SocketConstant.PUSH_DATA_EVENT, "发送数据 " + DateUtil.now());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

postman 调用方式

回调事件

image

发送事件

image

配置

image

下一篇:k8s中配置socket io,gateway 配置socket io

标签:springboot,记录,socketio,String,client,socket,public,客户端
From: https://www.cnblogs.com/xxsdnol/p/17884549.html

相关文章

  • 模拟赛记录
    每周三场模拟赛,用来记录。2023.11.22计数场。\(100+0+0+0=100\)。C0392B【1109B组】预处理器题意:求有多少个长度为\(n\)的数组\(a\)满足以下条件。条件一:\(l_{i}\lea_{i}\ler_{i}\)。条件二:\(a_{i}\)模\(2\)等于\(p_{i}\)。条件三:\(s\le\suma_{......
  • 记录一下工作遇到的一个小bug,DataGrid的DataGridCheckBoxColumn 问题
    <DataTemplatex:Key="CheckBoxDataTemplate"><Grid><CheckBoxClick="CheckBox_Checked"IsChecked="{BindingIsSelect,Mode=OneWay,UpdateSourceTrigger=PropertyChanged}"><......
  • MySQL服务器8核32G max_connections设置为10000的情况,springboot里面的Druid参数配置
    MySQL服务器8核32Gmax_connections设置为10000的情况,springboot里面的Druid参数配置多少合适啊,MySQL服务器8核32G,max_connections设置为10000,确实是相当大的一个配置啊。对于Druid的参数配置,得看你系统的具体情况。一般来说,你可以考虑以下几个参数:initialSize:连接池的初始大小,你......
  • 【环境配置记录】ubuntu用samba共享文件夹给windows
    中文社区真的不太行,英文社区资源丰富很多转载https://askubuntu.com/questions/1462387/trying-to-samba-share-a-folder-always-gives-errors的答案 Pleaseseethefollowinginstallationguideline.Itcaneffectivelysolvetheoutstandingissueof'netusershare'r......
  • springboot蜗牛兼职网的设计与实现-计算机毕业设计源码+LW文档
    摘 要随着科学技术的飞速发展,社会的方方面面、各行各业都在努力与现代的先进技术接轨,通过科技手段来提高自身的优势,蜗牛兼职网当然也不能排除在外。蜗牛兼职网是以实际运用为开发背景,运用软件工程原理和开发方法,采用springboot框架构建的一个管理系统。整个开发过程首先对软件系......
  • 前端问题记录:el-row和el-col出现排版错乱
    el-row和el-col出现排版错乱,如图使用场景:使用了el-row和col配合form使用,不操作时候页面排版是正确的,进行操作就会出现排版错乱。问题原因:因为el-row和el-col的中的span元素之和超过了24的时候,就会出现排版错乱解决方案:.el-row{display:flex;//设置布局flex-wrap:wrap;//......
  • @SpringBootTest与@RunWith注解的区别
    @SpringBootTest与@RunWith注解的区别@SpringBootTest与@RunWith注解的区别在于:@SpringBootTest是spring的注解,用于加载ApplicationContext,启动spring容器;而@RunWith是junit的注解,它指定了junit测试的时候使用的Runner(运行器)类。通常@SpringBootTest与@RunWith这两个是配合使......
  • 记录一个debian11环境下更新源失败问题
    之前用的阿里云源,时间久了不知道为何拉取失败。网上搜教程替换管理其他国内镜像源。配置方法原文件备份sudocp/etc/apt/sources.list/etc/apt/sources.list.bak编辑源列表文件sudovim/etc/apt/sources.list将原来的清单内容删除或注释,并增加镜像源地址,这里推荐......
  • 记录--静态网站 H5 跳小程序,以及踩坑
    这里给大家分享我在网上总结出来的一些知识,希望对大家有所帮助背景我司有智慧功成家APP和对应的小程序,现在已经实现APP分享到微信,微信点击分享链接直接进入小程序。目前有一个问题就是我们APP在网警那边还没有完全审批下来,已经搞了几个月了,还不知道啥时能上线。微信对于这类......
  • springboot018母婴商城-计算机毕业设计源码+LW文档
    一、选题背景以母婴人群和准母婴人群及其家庭群体为目标用户。站在整个社会产业的角度,有些产业为所有用户提供某类基本需求,有些产业为某类用户提供某类特定需求,而母婴产业是最终满足特定人群相关多元化需求的一个宽辐射市场。母婴产品及服务最终以线上与线下为出口抵达用户,从市场......