首页 > 其他分享 >SpringBoot中使用Netty开发WebSocket服务-netty-websocket-spring-boot-starter开源项目使用与改造多线程群发消息

SpringBoot中使用Netty开发WebSocket服务-netty-websocket-spring-boot-starter开源项目使用与改造多线程群发消息

时间:2023-07-12 14:56:03浏览次数:88  
标签:Netty websocket Session spring session import 多线程 public 群发

场景

SpringBoot+Vue整合WebSocket实现前后端消息推送:

https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/114392573

SpringCloud(若依微服务版为例)集成WebSocket实现前后端的消息推送:

https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/114480731

若依前后端分离版手把手教你本地搭建环境并运行项目:

https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/108465662

在上面的基础上,使用websocket仍有不足,比如可能出现如下问题

Nginx代理websocket配置(解决websocket异常断开连接tcp连接不断问题):

https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/126082838

另外如果业务场景中需要高频定时任务通过websocket给多个客户端发动消息,

则短时间内需要使用多线程/自定义线程池实现群发消息功能。

关于自定义线程池相关可参考如下

Java中创建线程的方式以及线程池创建的方式、推荐使用ThreadPoolExecutor以及示例:

https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/130068794

Java中线程的常用操作-后台线程、自定义线程工厂ThreadFactpry、join加入一个线程、线程异常捕获:

https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/130197910

注:

博客:
https://blog.csdn.net/badao_liumang_qizhi

实现

1、netty-websocket-spring-boot-starter与若依集成websocket

本项目帮助你在spring-boot中使用Netty来开发WebSocket服务器,并像spring-websocket的注解开发一样简单。

仓库地址:

https://api.gitee.com/Yeauty/netty-websocket-spring-boot-starter

https://github.com/YeautyYE/netty-websocket-spring-boot-starter/blob/master/README_zh.md

除了此开源项目之外,这里还基于若依开源项目框架中基于websocket的插件集成

http://doc.ruoyi.vip/ruoyi-vue/document/cjjc.html#%E9%9B%86%E6%88%90websocket%E5%AE%9E%E7%8E%B0%E5%AE%9E%E6%97%B6%E9%80%9A%E4%BF%A1

 

2、项目中添加依赖

        <dependency>
            <groupId>org.yeauty</groupId>
            <artifactId>netty-websocket-spring-boot-starter</artifactId>
            <version>0.12.0</version>
        </dependency>

然后集合以上两个开源框架的示例代码新建如下类

 

3、其中WebSocketServer类为端点类,实现代码

package com.badao.demo.websocket;

import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.yeauty.annotation.*;
import org.yeauty.pojo.Session;

import java.io.IOException;
import java.util.concurrent.Semaphore;

@ServerEndpoint(path = "/websocket/{userName}", port = "${ws.port}", readerIdleTimeSeconds = "${ws.readerIdleTimeSeconds}", writerIdleTimeSeconds = "${ws.writerIdleTimeSeconds}", allIdleTimeSeconds = "${ws.allIdleTimeSeconds}")
@Slf4j
public class WebSocketServer {

    /**
     * 默认最多允许同时在线人数 200
     */
    public static int socketMaxOnlineCount = 200;

    private static final Semaphore socketSemaphore = new Semaphore(socketMaxOnlineCount);

    @OnOpen
    public void onOpen(Session session, HttpHeaders headers, @PathVariable String userName) {
        boolean semaphoreFlag = false;
        // 尝试获取信号量
        semaphoreFlag = SemaphoreUtils.tryAcquire(socketSemaphore);
        if (!semaphoreFlag) {
            // 未获取到信号量
            log.error("\n 当前人数 - {} , 限制人数:{} ", WebSocketUsers.getUsers().size(), socketMaxOnlineCount);
            WebSocketUsers.sendMessageToUserByText(session, "当前人数:" + WebSocketUsers.getUsers().size() + " 限制人数:" + socketMaxOnlineCount);
            session.close();
        } else {
            // 添加用户
            WebSocketUsers.sendMessageToUserByText(session, "连接成功");
            WebSocketUsers.put(userName, session);
            log.warn("\n 用户:{} 连接后 , 当前人数 : {}", userName, WebSocketUsers.getUsers().size());
        }
    }

    @OnClose
    public void onClose(Session session, @PathVariable String userName) throws IOException {
        // 移除用户
        WebSocketUsers.remove(userName);
        log.warn("\n 用户:{} 关闭后 , 当前人数 : {}", userName, WebSocketUsers.getUsers().size());
        // 获取到信号量则需释放
        SemaphoreUtils.release(socketSemaphore);
    }

    @OnError
    public void one rror(Session session, @PathVariable String userName, Throwable exception) {
        if (session.isOpen()) {
            // 关闭连接
            session.close();
        }
        log.warn("\n 连接异常 - {}", userName);
        log.warn("\n 异常信息 - {}", exception);
        // 移出用户
        WebSocketUsers.remove(userName);
        // 获取到信号量则需释放
        SemaphoreUtils.release(socketSemaphore);
    }

    @OnMessage
    public void onMessage(Session session, String message) {
        WebSocketUsers.sendMessageToUserByText(session, message);
    }

    @OnBinary
    public void onBinary(Session session, byte[] bytes) {
        for (byte b : bytes) {
            System.out.println(b);
        }
        session.sendBinary(bytes);
    }

    @OnEvent
    public void onEvent(@PathVariable String userName, Session session, Object evt) {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
            switch (idleStateEvent.state()) {
                case READER_IDLE:
                    log.error("user-{} Read timeout!", userName);
                    session.close();
                    break;
                case WRITER_IDLE:
                    log.error("user-{} Write timeout!", userName);
                    session.close();
                    break;
                case ALL_IDLE:
                    log.error("user-{} All timeout!", userName);
                    session.close();
                    break;
                default:
                    break;
            }
        }
    }
}

注意这里的注解@ServerEndpoint是org.yeauty路径下的,别引用错了包。

在端点类上加上@ServerEndpoint注解,并在相应的方法上加上

@BeforeHandshake、@OnOpen、@OnClose、@OnError、@OnMessage、@OnBinary、@OnEvent注解。

注解说明:

@ServerEndpoint

当ServerEndpointExporter类通过Spring配置进行声明并被使用,它将会去扫描带有@ServerEndpoint注解的类

被注解的类将被注册成为一个WebSocket端点 所有的配置项都在这个注解的属性中 ( 如:@ServerEndpoint("/ws") )

@BeforeHandshake

当有新的连接进入时,对该方法进行回调 注入参数的类型:Session、HttpHeaders...

@OnOpen

当有新的WebSocket连接完成时,对该方法进行回调 注入参数的类型:Session、HttpHeaders...

@OnClose

当有WebSocket连接关闭时,对该方法进行回调 注入参数的类型:Session

@OnError

当有WebSocket抛出异常时,对该方法进行回调 注入参数的类型:Session、Throwable

@OnMessage

当接收到字符串消息时,对该方法进行回调 注入参数的类型:Session、String

@OnBinary

当接收到二进制消息时,对该方法进行回调 注入参数的类型:Session、byte[]

@OnEvent

当接收到Netty的事件时,对该方法进行回调 注入参数的类型:Session、Object

配置说明

所有的配置项都在这个注解的属性中。

属性 默认值 说明
path "/" WebSocket的path,也可以用value来设置
host "0.0.0.0" WebSocket的host,"0.0.0.0"即是所有本地地址
port 80 WebSocket绑定端口号。如果为0,则使用随机端口(端口获取可见 多端点服务)
bossLoopGroupThreads 0 bossEventLoopGroup的线程数
workerLoopGroupThreads 0 workerEventLoopGroup的线程数
useCompressionHandler false 是否添加WebSocketServerCompressionHandler到pipeline
optionConnectTimeoutMillis 30000 与Netty的ChannelOption.CONNECT_TIMEOUT_MILLIS一致
optionSoBacklog 128 与Netty的ChannelOption.SO_BACKLOG一致
childOptionWriteSpinCount 16 与Netty的ChannelOption.WRITE_SPIN_COUNT一致
childOptionWriteBufferHighWaterMark 64*1024 与Netty的ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK一致,但实际上是使用ChannelOption.WRITE_BUFFER_WATER_MARK
childOptionWriteBufferLowWaterMark 32*1024 与Netty的ChannelOption.WRITE_BUFFER_LOW_WATER_MARK一致,但实际上是使用 ChannelOption.WRITE_BUFFER_WATER_MARK
childOptionSoRcvbuf -1(即未设置) 与Netty的ChannelOption.SO_RCVBUF一致
childOptionSoSndbuf -1(即未设置) 与Netty的ChannelOption.SO_SNDBUF一致
childOptionTcpNodelay true 与Netty的ChannelOption.TCP_NODELAY一致
childOptionSoKeepalive false 与Netty的ChannelOption.SO_KEEPALIVE一致
childOptionSoLinger -1 与Netty的ChannelOption.SO_LINGER一致
childOptionAllowHalfClosure false 与Netty的ChannelOption.ALLOW_HALF_CLOSURE一致
readerIdleTimeSeconds 0 与IdleStateHandler中的readerIdleTimeSeconds一致,并且当它不为0时,将在pipeline中添加IdleStateHandler
writerIdleTimeSeconds 0 与IdleStateHandler中的writerIdleTimeSeconds一致,并且当它不为0时,将在pipeline中添加IdleStateHandler
allIdleTimeSeconds 0 与IdleStateHandler中的allIdleTimeSeconds一致,并且当它不为0时,将在pipeline中添加IdleStateHandler
maxFramePayloadLength 65536 最大允许帧载荷长度
useEventExecutorGroup true 是否使用另一个线程池来执行耗时的同步业务逻辑
eventExecutorGroupThreads 16 eventExecutorGroup的线程数
sslKeyPassword ""(即未设置) 与spring-boot的server.ssl.key-password一致
sslKeyStore ""(即未设置) 与spring-boot的server.ssl.key-store一致
sslKeyStorePassword ""(即未设置) 与spring-boot的server.ssl.key-store-password一致
sslKeyStoreType ""(即未设置) 与spring-boot的server.ssl.key-store-type一致
sslTrustStore ""(即未设置) 与spring-boot的server.ssl.trust-store一致
sslTrustStorePassword ""(即未设置) 与spring-boot的server.ssl.trust-store-password一致
sslTrustStoreType ""(即未设置) 与spring-boot的server.ssl.trust-store-type一致
corsOrigins {}(即未设置) 与spring-boot的@CrossOrigin#origins一致
corsAllowCredentials ""(即未设置) 与spring-boot的@CrossOrigin#allowCredentials一致

且这里的配置可以在yml中进行配置。

需要在application.yml中添加如下配置项

ws:
  # websocket 端口
  port: 8071
  # websocket 读超时
  readerIdleTimeSeconds: 10
  # websocket 写超时
  writerIdleTimeSeconds: 10
  # websocket 所有信道超时
  allIdleTimeSeconds: 15

4、注意上面的信号量相关处理封装的工具类

package com.badao.demo.websocket;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Semaphore;

/**
 * 信号量相关处理
 */
public class SemaphoreUtils {
    /**
     * SemaphoreUtils 日志控制器
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(SemaphoreUtils.class);

    /**
     * 获取信号量
     *
     * @param semaphore
     * @return
     */
    public static boolean tryAcquire(Semaphore semaphore) {
        boolean flag = false;

        try {
            flag = semaphore.tryAcquire(1);
        } catch (Exception e) {
            LOGGER.error("获取信号量异常", e);
        }

        return flag;
    }

    /**
     * 释放信号量
     *
     * @param semaphore
     */
    public static void release(Semaphore semaphore) {

        try {
            semaphore.release();
        } catch (Exception e) {
            LOGGER.error("释放信号量异常", e);
        }
    }
}

5、存储用户信息的WebSocketUsers类实现

package com.badao.demo.websocket;

import com.badao.demo.config.MyThreadFactory;
import lombok.extern.slf4j.Slf4j;
import org.yeauty.pojo.Session;

import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;


@Slf4j
public class WebSocketUsers {

    private static final ThreadPoolExecutor pool = new ThreadPoolExecutor(10,WebSocketServer.socketMaxOnlineCount,10, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000),new MyThreadFactory("websocket-"));


    /**
     * 用户集
     */
    private static Map<String, Session> USERS = new ConcurrentHashMap<String, Session>();

    /**
     * 存储用户
     *
     * @param key     唯一键
     * @param session 用户信息
     */
    public static void put(String key, Session session) {
        USERS.put(key, session);
    }

    /**
     * 移除用户
     *
     * @param session 用户信息
     * @return 移除结果
     */
    public static boolean remove(Session session) {
        String key = null;
        boolean flag = USERS.containsValue(session);
        if (flag) {
            Set<Map.Entry<String, Session>> entries = USERS.entrySet();
            for (Map.Entry<String, Session> entry : entries) {
                Session value = entry.getValue();
                if (value.equals(session)) {
                    key = entry.getKey();
                    break;
                }
            }
        } else {
            return true;
        }
        return remove(key);
    }

    /**
     * 移出用户
     *
     * @param key 键
     */
    public static boolean remove(String key) {
        Session remove = USERS.remove(key);
        if (remove != null) {
            boolean containsValue = USERS.containsValue(remove);
            log.warn("\n 移出结果 - {}", containsValue ? "失败" : "成功");
            return containsValue;
        } else {
            return true;
        }
    }

    /**
     * 获取在线用户列表
     *
     * @return 返回用户集合
     */
    public static Map<String, Session> getUsers() {
        return USERS;
    }

    /**
     * 群发消息文本消息
     *
     * @param message 消息内容
     */
    public static void sendMessageToUsersByText(String message) {
        Collection<Session> values = USERS.values();
        for (Session value : values) {
            pool.submit(() -> {
                synchronized (value) {
                    value.sendText(message);
                }
            });
        }
    }

    /**
     * 发送文本消息
     *
     * @param session 自己的用户名
     * @param message 消息内容
     */
    public static void sendMessageToUserByText(Session session, String message) {
        if (session != null) {
            session.sendText(message);
        } else {
            log.info("\n[你已离线]");
        }
    }
}

这里用的线程池相关概念参考上面博客,附自定义线程工厂MyThreadFactory实现

package com.badao.demo.config;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class MyThreadFactory implements ThreadFactory {

    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    public MyThreadFactory(String threadName) {
        SecurityManager s = System.getSecurityManager();
        group = (s !=null)?s.getThreadGroup():Thread.currentThread().getThreadGroup();
        if(threadName == null || threadName.isEmpty()){
            threadName = "pool";
        }
        namePrefix = threadName + poolNumber.getAndIncrement()+"-thread-";
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(group,r,namePrefix+threadNumber.getAndIncrement(),0);
        if(t.isDaemon()){
            t.setDaemon(false);
        }
        if(t.getPriority()!= Thread.NORM_PRIORITY){
            t.setPriority(Thread.NORM_PRIORITY);
        }
        return t;
    }
}

这里定义了线程名前缀。

6、又新建了一个Controller目的是为了获取当前所有的用户,因为前面限制了只能允许最多200用户

package com.badao.demo.websocket;


import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/websocket")
public class WebsocketController  {

    @GetMapping("/total")
    public String websocketTotal(){
        return WebSocketUsers.getUsers().keySet().toString();
    }

}

7、启动项目并使用websocket客户端工具进行测试

 

 

标签:Netty,websocket,Session,spring,session,import,多线程,public,群发
From: https://www.cnblogs.com/badaoliumangqizhi/p/17547483.html

相关文章

  • Java入门12(多线程)
    多线程线程的实现方式继承Thread类:一旦继承了Thread类,就不能再继承其他类了,可拓展性差实现Runnable接口:仍然可以继承其他类,可拓展性较好使用线程池继承Thread类​ 不能通过线程对象调用run()方法,需要通过t1.start()方法,使线程进入到就绪状态,只要进入到就绪状态......
  • 【Netty】「源码解析」(三)设置连接超时:深入分析 ChannelFuture.sync() 的执行过程
    前言本篇博文是《从0到1学习Netty》中源码系列的第三篇博文,主要内容是深入分析连接超时的实现原理,包括了connect方法的源码解析和ChannelFuture.sync()执行过程的解析。,往期系列文章请访问博主的Netty专栏,博文中的所有代码全部收集在博主的GitHub仓库中;介绍在实际应用中,当......
  • Java之多线程的同步和死锁
    设计模式中的单例模式的懒汉方式会存在多线程的安全问题;通过以下测试代码可以看到两个线程中得到的并不是同一个单例对象;@TestpublicvoidunsafeSingleInstanceTest()throwsInterruptedException{AtomicReference<UnSafeSingleInstance>s1=newAtomicRe......
  • imessages群发,苹果imessages短信,苹果imessages推信,完美实现 - 电脑升级版
    一、PC电脑版苹果系统(MacOS)上实现imessages群发总结为以下几种方式:/*MacOS苹果系统,正常情况下,只能安装到苹果公司自己出品的Mac电脑,俗称白苹果,不能安装到各种组装机或者其他品牌的品牌机上,黑苹果的的原理,就是通过一些“破解补丁”工具欺骗macOS系统,让苹果系统认为你的电......
  • Java-Day-31( 多用户即时通信系统 —— 无异常退出 + 私聊 + 群发 )
    Java-Day-31多用户即时通信系统无异常退出问题指出:客户端输入9退出的是输出在控制台的主线程,退出的是主菜单,并没有真正的退出因为客户端启动后,相当于是开启了一个进程,在这个进程中启动了一个主线程(main线程),在main主线程中又启动了一个客户端的线程(ClientConn......
  • [Raspberry Pi]树莓派多线程下串口收发数据
    [RaspberryPi]树莓派多线程下串口收发数据鼠鼠用的是python开发树莓派,因为python是最优美的语言!少废话,直接上代码:importthreadingimportserialimportcv2ser=serial.Serial("/dev/ttyAMA0",115200)ser.timeout=5ifnotser.isOpen:ser.open()#打开串口......
  • 多线程part2-多线程的两个概念
    并发同一时刻,有多个指令在单个CPU上交替执行理解:打游戏的时候,右手操作鼠标,同时用右手喝可乐,手速特别快交替执行,这时,你的右手相当于CPU,执行的对象相当于线程并行同一时刻,有多个指令在多个CPU上同时执行理解:关键点在于同时 ......
  • 多线程
    importtime,threadingfromthreadingimportLockdefwork(timer):#如果在函数内部找不到变量,就会向上寻找,main函数属于向上的领域withlock:print('操作文件')print('只要在withlock里,就是单线程')print(f'线程名称名:{threading......
  • C++之原子操作:实现高效、安全的多线程编程
    背景在多线程编程中,线程间的同步和数据竞争问题是无法避免的。传统的同步方法,如互斥锁(mutex)和条件变量(conditionvariable),可能导致性能下降和死锁等问题。C++11引入了原子操作,提供了一种更高效、安全的多线程编程方式。本文将介绍C++中的原子操作概念、使用方法及示例。C++中的......
  • 多线程知识:三个线程如何交替打印ABC循环100次
    本文博主给大家讲解一道网上非常经典的多线程面试题目。关于三个线程如何交替打印ABC循环100次的问题。下文实现代码都基于Java代码在单个JVM内实现。问题描述给定三个线程,分别命名为A、B、C,要求这三个线程按照顺序交替打印ABC,每个字母打印100次,最终输出结果为:ABCABC.......