首页 > 其他分享 >Netty 实现 WebSocket 协议

Netty 实现 WebSocket 协议

时间:2024-04-03 15:33:44浏览次数:20  
标签:Netty WebSocket 协议 ruoyi protocol new import com channel

要使用Netty框架实现WebSocket服务端与客户端通信,你需要按照以下步骤进行:

1、添加依赖

        首先,在你的项目中引入Netty和其WebSocket支持库的依赖。如果你使用Maven或Gradle构建项目,可以在pom.xml或build.gradle文件中添加相应的依赖项。
Maven:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.77.Final</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

2、创建WebSocket服务器端

package com.ruoyi.rests.websocket.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

/**
 * NettyServer Netty服务器配置
 */
public class NettyServer {
    private final int port;

    public NettyServer(int port) {
        this.port = port;
    }

    public void start() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();

        EventLoopGroup group     = new NioEventLoopGroup();
        try {
            ServerBootstrap sb = new ServerBootstrap();
            sb.option(ChannelOption.SO_BACKLOG, 1024);
            sb.group(group, bossGroup) // 绑定线程池
                    .channel(NioServerSocketChannel.class) // 指定使用的channel
                    .localAddress(this.port)// 绑定监听端口
                    .childHandler(new ChannelInitializer<SocketChannel>() { // 绑定客户端连接时候触发操作

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            System.out.println("收到新连接");
                            ServletRequestAttributes sra = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
                            RequestContextHolder.setRequestAttributes(sra, true);
                            //websocket协议本身是基于http协议的,所以这边也要使用http解编码器
                            ch.pipeline().addLast(new HttpServerCodec());
                            //以块的方式来写的处理器
                            ch.pipeline().addLast(new ChunkedWriteHandler());
                            ch.pipeline().addLast(new HttpObjectAggregator(8192));
                            ch.pipeline().addLast(new WebSocketServerProtocolHandler("/websocket", null, true, 65536 * 10, false, true));
                            ch.pipeline().addLast(new MyWebSocketHandler());
                        }
                    });
            ChannelFuture cf = sb.bind().sync(); // 服务器异步创建绑定
            System.out.println(NettyServer.class + " 启动正在监听: " + cf.channel().localAddress());
            cf.channel().closeFuture().sync(); // 关闭服务器通道
        } finally {
            group.shutdownGracefully().sync(); // 释放线程池资源
            bossGroup.shutdownGracefully().sync();
        }
    }
}
package com.ruoyi.rests.websocket.netty;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.util.TypeUtils;
import com.ruoyi.rests.api.domain.dto.Protocol;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

public class NettyConfig {

    private static  final ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    /**
     * 存放请求ID与channel的对应关系
     */
    private static final ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();
    private static final AtomicInteger counter = new AtomicInteger();

    /**
     * 定义两把锁
     */

    public static AtomicInteger getCounter() {
        return counter;
    }

    public static ChannelGroup getChannelGroup() {
        return channelGroup;
    }
    public static ConcurrentHashMap<String, Channel> getChannelMap() {
        return channelMap;
    }

    public static void sendMessage(Protocol protocol, Channel channel) {
        TypeUtils.compatibleWithJavaBean = true;
        channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(protocol)));
        if(protocol.getTargetId().contains("3DPC")) {
            protocol.setTargetId(protocol.getSenderUserId() + "3DDEVPC");
            channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(protocol)));
        }
    }

    public static void sendMessageAll(Protocol protocol, Channel channel) {
        TypeUtils.compatibleWithJavaBean = true;
        channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(protocol)));
    }

    public static void broadcast(Protocol protocol) {
        TypeUtils.compatibleWithJavaBean = true;
        getChannelMap().entrySet().stream()
                .filter(it -> it.getKey().contains(protocol.getTargetAliasUTenant()))
                .forEach(it -> sendMessage(protocol, it.getValue()));
    }

    public static void autoFlush() {
        Iterator<Map.Entry<String, Channel>> iterator = getChannelMap().entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<String, Channel> next = iterator.next();
            if (!next.getValue().isOpen()) {
                iterator.remove();
            }
        }
    }

    public static List<Channel> getChannel(String aliasId) {
        return getChannelMap().entrySet().stream()
                .filter(it -> it.getKey().contains(aliasId)).map(Map.Entry::getValue).collect(Collectors.toList());
    }
}

消息处理

package com.ruoyi.rests.websocket.netty;

import com.alibaba.fastjson.util.TypeUtils;
import com.alibaba.fastjson2.JSON;
import com.ruoyi.common.core.context.SecurityContextHolder;
import com.ruoyi.common.core.utils.StringUtils;
import com.ruoyi.rests.api.domain.dto.Protocol;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.Iterator;
import java.util.Map;
import java.util.Objects;

@Component
@ChannelHandler.Sharable
@Slf4j
public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    /**
     * 读取数据
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        log.info("服务器收到消息:{}", msg.text());
        // 获取用户ID,关联channel
        String text = msg.text();
        if (JSON.isValid(text)) {
            Protocol protocol = JSON.parseObject(text, Protocol.class);
            if (StringUtils.isNotEmpty(protocol.getTenantId()) && StringUtils.isNotEmpty(protocol.getSenderUserId())
                    && StringUtils.isNotEmpty(protocol.getSessionId()) ) {
                String uniqueKey = String.format("%s:%s", protocol.getSelfAliasUTenant(), protocol.getSessionId());
                NettyConfig.getChannelMap().put(uniqueKey, ctx.channel());
                if (StringUtils.isNotEmpty(protocol.getConversationType()) && PPSWokerCenter.exits(protocol.getConversationType())) {
                    SecurityContextHolder.setTenantId(protocol.getTenantId());
                    PPSWokerCenter.handler(protocol);
                } else {
                    protocol.setContentText("暂不支持该协议");
                    TypeUtils.compatibleWithJavaBean = true;
                    ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(protocol)));
                }
            }else {
                protocol.setContentText("{tenantId,sendUserId,targetId} 请求参数其中可能为空");
                TypeUtils.compatibleWithJavaBean = true;
                ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(protocol)));
            }
        }
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        log.info("客户端断开: 租户xx:{}", ctx.channel().id().asLongText());
        if (ctx.channel().isActive()) {
            removeUser(ctx);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        removeUser(ctx);
        ctx.close();
    }

    /**
     * 删除用户与channel的对应关系
     */
    private void removeUser(ChannelHandlerContext ctx) {
//        AttributeKey<String> key = AttributeKey.valueOf("userId");
//        String userId = ctx.channel().attr(key).get();
//        NettyConfig.getChannelMap().remove(userId);
        if(ctx.channel().isActive()){
            Iterator<Map.Entry<String, Channel>> iterator = NettyConfig.getChannelMap().entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<String, Channel> entry = iterator.next();
                if (Objects.equals(entry.getValue(), ctx.channel())){
                    iterator.remove();
                }
            }
        }
    }

}
package com.ruoyi.rests.websocket.netty;

import com.ruoyi.common.core.utils.MGThreadPool;
import com.ruoyi.rests.api.domain.dto.Protocol;
import com.ruoyi.rests.websocket.ProtocolHandler;
import com.ruoyi.rests.websocket.protocol.*;

import java.util.HashMap;
import java.util.Map;

public class PPSWokerCenter {

    private static final Map<String, ProtocolHandler> protocolHandlerMap = new HashMap<>();

    static {
        register("REGISTER",            new Register());
        register("SYSTEMHEATBIT",       new SystemHeatbit());
        register("ONLINE",              new Online());
        register("MODELBYAREA",         new ModelByArea());
        register("RESET3DCAMERA",       new Reset3DCamera());
        register("SET3DCAMERA",         new Set3DCamera());
        register("EXCEEDAREA",          new ExceedArea());
        register("PERSONPOINTALL",      new PersonPointAll());
        register("PERSONPOINTHIDDEN",   new PersonPointHidden());
        register("PERSONPOINT",         new PersonPoint());
        register("CAMERAPOINTALL",      new CameraPointAll());
        register("CAMERAPOINTHIDDEN",   new CameraPointHidden());
        register("SENSORPOINTALL",      new SensorPointAll());
        register("SENSORPOINTHIDDEN",   new SensorPointHidden());
        register("PERSONSTREAM",        new PersonStream());
        register("OPENVIDEO",           new OpenVideo());
        register("VIDEOSTREAM",         new VideoStream());
        register("HISTORYGET",          new HistoryGet());
        register("HISTORYREADY",        new HistoryReady());
        register("HISTORYPLAY",         new HistoryPlay());
        register("HISTORYSTOP",         new HistoryStop());
        register("HISTORYCLEAR",        new HistoryClear());
        register("SYSTEMALARMWEB",      new SystemAlarmWeb());
        register("SYSTEMALARMLIST",     new SystemAlarmWeb());
        register("WEBSOCKETLOGOUT",     new WebSocketLogOut());
    }

    // netty server 提供API
    public static void register(String type, ProtocolHandler protocolHandler) {
        protocolHandlerMap.put(type, protocolHandler);
    }

    public static void handler(Protocol protocol) {
        MGThreadPool.runAsync(() -> {
            ProtocolHandler protocolHandler = protocolHandlerMap.get(protocol.getConversationType());
            protocolHandler.handler(protocol);
        });
    }

    public static boolean exits(String conversationType) {
        return protocolHandlerMap.containsKey(conversationType);
    }
}
package com.ruoyi.rests.websocket;

import com.ruoyi.rests.api.domain.dto.Protocol;

public interface ProtocolHandler {
    void handler(Protocol protocol);
}

附 实现类

package com.ruoyi.rests.websocket.protocol;

import com.ruoyi.rests.api.domain.dto.Protocol;
import com.ruoyi.rests.websocket.ProtocolHandler;
import com.ruoyi.rests.websocket.netty.NettyConfig;

public class SystemHeatbit implements ProtocolHandler {

    @Override
    public void handler(Protocol protocol) {
        NettyConfig.getChannelMap().entrySet().stream()
                .forEach(it -> NettyConfig.sendMessage(protocol, it.getValue()));
    }
}

 

package com.ruoyi.rests.websocket.protocol;

import com.ruoyi.rests.api.domain.dto.Protocol;
import com.ruoyi.rests.websocket.ProtocolHandler;
import com.ruoyi.rests.websocket.netty.NettyConfig;
import io.netty.channel.Channel;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class Register implements ProtocolHandler {

    @Override
    public void handler(Protocol protocol) {
        protocol.setContentText(true);
        // 只对当前注册人 响应
        String uniqueKey = String.format("%s:%s", protocol.getSelfAliasUTenant(), protocol.getSessionId());
        NettyConfig.getChannel(protocol.getSelfAliasUTenant()).stream()
                .filter(it -> it.equals(NettyConfig.getChannelMap().get(uniqueKey)))
                .forEach(it -> NettyConfig.sendMessageAll(protocol, it));
        //1. 根据senduserid 包含3d 进行逻辑处理
        //1.a. 根据 channel map , 进行过滤不包含3d的
        //2. 根据senduserId 不包含3d 进行逻辑处理
        // 2.a. 根据channel map ,过滤包含3D

        if(protocol.getSenderUserId().contains("3D")) {
            List<Channel> oldChannels = NettyConfig.getChannelMap().entrySet().stream()
                    .filter(entry -> entry.getKey().contains(protocol.getSelfAliasUTenant()))
                    .filter(it -> it.getKey().contains("3D"))
                    .filter(entry -> !entry.getKey().contains(protocol.getSessionId())).map(Map.Entry::getValue).collect(Collectors.toList());
            Protocol protocol1 = new Protocol();
            protocol1.setConversationType("WEBSOCKETLOGOUT");
            oldChannels.forEach(it -> {
                NettyConfig.sendMessageAll(protocol1, it);
//            it.close();
            });
        } else {
            List<Channel> oldChannels = NettyConfig.getChannelMap().entrySet().stream()
                    .filter(entry -> entry.getKey().contains(protocol.getSelfAliasUTenant()) && !entry.getKey().contains("3D"))
                    .filter(entry -> !entry.getKey().contains(protocol.getSessionId())).map(Map.Entry::getValue).collect(Collectors.toList());
            Protocol protocol1 = new Protocol();
            protocol1.setConversationType("WEBSOCKETLOGOUT");
            oldChannels.forEach(it -> {
                NettyConfig.sendMessageAll(protocol1, it);
//            it.close();
            });
        }

    }
}
package com.ruoyi.rests.websocket.protocol;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.nacos.shaded.com.google.common.collect.Lists;
import com.ruoyi.basis.api.RemoteAreaService;
import com.ruoyi.basis.api.RemoteBuildingService;
import com.ruoyi.basis.api.RemoteFloorService;
import com.ruoyi.basis.api.domain.area.Area;
import com.ruoyi.basis.api.domain.area.Building;
import com.ruoyi.basis.api.domain.area.Floor;
import com.ruoyi.basis.api.domain.area.type.AreaType;
import com.ruoyi.basis.api.domain.dto.AreaDto;
import com.ruoyi.common.core.constant.Constants;
import com.ruoyi.common.core.domain.R;
import com.ruoyi.common.core.utils.SpringUtils;
import com.ruoyi.common.core.utils.StringUtils;
import com.ruoyi.common.redis.service.RedisService;
import com.ruoyi.rests.api.domain.dto.Protocol;
import com.ruoyi.rests.domain.dto.AreaTypeDto;
import com.ruoyi.rests.websocket.ProtocolHandler;
import com.ruoyi.rests.websocket.netty.NettyConfig;
import com.ruoyi.system.api.RemoteModelService;
import com.ruoyi.system.api.domain.Model;

import java.util.List;

public class ModelByArea implements ProtocolHandler {

    @Override
    public void handler(Protocol protocol) {

        RemoteModelService      remoteModelService       = SpringUtils.getBean(RemoteModelService.class);
        RemoteAreaService       remoteAreaService        = SpringUtils.getBean(RemoteAreaService.class);
        RemoteBuildingService   remoteBuildingService    = SpringUtils.getBean(RemoteBuildingService.class);
        RemoteFloorService      remoteFloorService       = SpringUtils.getBean(RemoteFloorService.class);
        RedisService            redisService             = SpringUtils.getBean(RedisService.class);
        JSONObject map = JSON.parseObject(protocol.getContentText().toString(), JSONObject.class);
        String areaId;
        String areaIdStr = map.getString("areaId");
        String areaType = map.getString("areaType");
        AreaTypeDto areaTypeDto = new AreaTypeDto();
        areaTypeDto.setAreaId(areaIdStr);
        if (areaIdStr != null && !StringUtils.isEmpty(areaIdStr)) {
            areaId = areaIdStr;
            AreaDto areaDto = new AreaDto();
            areaDto.setAreaType(areaType);
            areaDto.setTenantId(protocol.getTenantId());
            if (AreaType.AREA.equals(areaType)) {
                areaDto.setAreaId(areaId);
                R<Area> areaR = remoteAreaService.queryByAreaId(areaDto);
                if (areaR.getCode() == Constants.SUCCESS && areaR.getData() != null) {
                    areaTypeDto.setX(areaR.getData().getX());
                    areaTypeDto.setY(areaR.getData().getY());
                    areaTypeDto.setZ(areaR.getData().getZ());
                }
            } else if (AreaType.BUILDING.equals(areaType)) {
                R<Building> buildingR = remoteBuildingService.selectByBuildingId(areaId);
                if (buildingR.getCode() == Constants.SUCCESS && buildingR.getData() != null) {
                    areaTypeDto.setX(buildingR.getData().getX());
                    areaTypeDto.setY(buildingR.getData().getY());
                    areaTypeDto.setZ(buildingR.getData().getZ());
                }
                areaDto.setBuildingId(areaId);
            } else if (AreaType.FLOOR.equals(areaType)) {
                R<Floor> floorR = remoteFloorService.selectByFloorId(areaId);
                if (floorR.getCode() == Constants.SUCCESS && floorR.getData() != null) {
                    areaTypeDto.setX(floorR.getData().getX());
                    areaTypeDto.setY(floorR.getData().getY());
                    areaTypeDto.setZ(floorR.getData().getZ());
                }
                areaDto.setFloorId(areaId);
            } else if (AreaType.ROOM.equals(areaType)) {
                areaDto.setRoomId(areaId);
            }
            R<List<Model>> listR = remoteModelService.areaBindModel(areaDto);
            if (listR.getCode() == Constants.SUCCESS && listR.getData() != null) {
                List<Model> list = listR.getData();
                redisService.setCacheObject(protocol.getTenantId() + ":" + protocol.getSenderUserId() ,list);
                areaTypeDto.setAreaType(areaType);
                areaTypeDto.setModelList(list);
                protocol.setContentText(areaTypeDto);
            } else {
                protocol.setContentText(Lists.newArrayList());
            }
        }
        NettyConfig.getChannelMap().entrySet().stream()
                .filter(it -> it.getKey().contains(protocol.getTargetAliasUTenant()))
                .forEach(it -> NettyConfig.sendMessage(protocol, it.getValue()));
    }

}

标签:Netty,WebSocket,协议,ruoyi,protocol,new,import,com,channel
From: https://blog.csdn.net/m0_69146489/article/details/137346082

相关文章

  • OM6650AM支持蓝牙5.1协议栈与2.4GHz私有协议的双模无线连接SoC芯片
    OM6650AM是一款超低功耗、同时支持蓝牙5.1协议栈与2.4GHz私有协议的双模无线连接SoC芯片,采用4.0mmx4.0mmQFN32封装,具有丰富的资源,极低的功耗,优异的射频性能,可广泛应用于车载数字钥匙模组、胎压检测、PKE钥匙等场景。01主要特性蓝牙规格:全功能BLE5.1;支持数据传输速率:1Mb......
  • 天翼云充值:分布式消息服务RabbitMQ支持AMQP协议,兼容RabbitMQ生态
    天翼云充值:分布式消息服务RabbitMQ支持AMQP协议,兼容RabbitMQ生态简介:飞机@luotuoemo本文由(天翼云代理商:【金推云】www.jintui.cn)撰写天翼云:支持AMQP协议的RabbitMQ服务分布式消息服务RabbitMQ是一种开源的、基于Erlang语言的消息代理和队列服务器。RabbitMQ最大的特性就......
  • 基于Node.js和ws库搭建WebSocket服务并实现消息互通的简单示例
    环境要求:Node.js环境安装npm(Node.js的包管理器)步骤:安装Node.js:如果你还没有安装Node.js,请从Node.js官网下载并安装。创建项目:创建一个新的目录作为项目文件夹,并在该目录下初始化一个新的Node.js项目。        mkdirmy-websocket-server        cdmy......
  • UART协议介绍
    日期版本内容修改2024/04/03V0初版UART协议即UniversalAsynchronousReceiver/Transmitter,是一种串行、异步、全双工通信协议。UART也常常被称作串口。1.信号名称txd:UART发送数据线rxd:UART接收数据线1.1.传输格式UART的传输格式如下图所示(图源[1],有修......
  • SPI传输协议介绍
    日期版本内容修改2023/03/11V0初版1.SPI协议SPI(SerialPeripheralInterface)协议由Motorola公司推出。1.1.端口名称及含义标准SPI有4条信号线。MISOMasterInputSlaveOutput:主机输入,从机输出(数据来自从机)。也可以是SIMO,DOUT,DO,SDO或SO(在主机端)。M......
  • 应用层的http和https协议
    HTTP和HTTPShttp和https是什么?http常用的协议版本http/1.0http/1.1改进http/2.0改进http和https有什么区别?http和https是什么?  HTTP(超文本传输协议)是一种用于在网络上传输超文本数据的协议。它是一种客户端-服务器协议,用于从web服务器传输文本、图像、音频......
  • 【CHI协议-1】CacheLine状态
    从这一章开始就和大家一起分享一下CHI协议中具体的一些事务以及场景。今天主要梳理一下Read事务,但是要讲清楚这些乱七八糟的事务,还需要了解其他很多知识点,不然就是云里雾里的,比如cacheline的状态啊,什么是snoop啊,以及一致性节点啊等等。但是这些太多了,如果先要把这些都讲清楚......
  • js实现websocket断线重连功能
    在项目开发中我们可能经常要使用websocket技术,当连接发生断线后,如果不进行页面刷新将不能正常接收来自服务端的推送消息。为了有效避免这种问题,我们需要在客户端做断线重连处理。当网络或服务出现问题后,客户端会不断检测网络状态,如果服务恢复,客户端则会自动重新连接,并断开......
  • 基于Java NIO 写的一个简单版 Netty 服务端
    ASimpleNettyBasedOnJAVANIO基于JavaNIO写的一个简单版Netty服务端前置知识NIONIO一般指同步非阻塞IO,同样用于**描述程序访问数据方式**的还有BIO(同步阻塞)、AIO(异步非阻塞)同步异步指获取结果的方式,同步为主动去获取结果,不管结果是否准备好,异步为等待结果准......
  • IP(Internet Protocol)是一种网络协议,用于在网络中发送和接收数据包
    IP(InternetProtocol)是一种网络协议,用于在网络中发送和接收数据包。它是一个无连接的、不可靠的数据报协议,负责将数据从源主机传输到目标主机。IP协议的主要功能包括寻址、路由和分段。寻址:IP协议为每个连接到网络的设备分配一个唯一的IP地址,这个地址用于在网络中识别设备......