首页 > 其他分享 >netty使用

netty使用

时间:2022-10-05 10:45:30浏览次数:53  
标签:info netty String new 使用 import channel

pom.xml

<dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.25.Final</version>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.3.3</version>
        </dependency>

 

一、创建连接缓存对象

package com.example.demo.netty;

import cn.hutool.core.date.DateUtil;
import io.netty.channel.socket.SocketChannel;
import lombok.Data;

import java.sql.Timestamp;
import java.util.Date;

/**
 * netty缓存对象
 */
@Data
public class NettyCacheInfo {

    /**
     * ip:port
     */
    private String ipPort;
    /**
     * netty连接对象
     */
    private SocketChannel socketChannel;
    /**
     * 连接状态
     */
    private boolean connected;
    /**
     * 活动时间
     */
    private Timestamp activeTime;
    /**
     * 活动时间格式化
     */
    private String activeTimeFormat;


    public NettyCacheInfo() {
    }

    public void setActiveTime(Timestamp activeTime) {
        this.activeTime = activeTime;
        this.activeTimeFormat= DateUtil.format(new Date(activeTime.getTime()),"yyyy-MM-dd HH:mm:ss");
    }

}

 

二、创建netty客户端

package com.example.demo.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;

import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;

/**
 * netty客户端
 */
@Slf4j
public class NettyClient {

    private NettyClient(){}

    /**
     * Netty连接信息缓存
     * key-> host:port
     * value-> nettyInfo
     */
    public static Map<String,NettyCacheInfo> channels = new HashMap<>();
    /**
     * 失败的连接缓存
     */
    public static Map<String,NettyCacheInfo> failChannels = new HashMap<>();
    /**
     * netty连接池
     */
    static EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

    static Bootstrap bootstrap = null;
    /**
     * netty回调类
     */
    static NettyHandler nettyHandler;

    /**
     * 塞入回调处理类
     * @param handler
     */
    public static void setNettyHandler(NettyHandler handler){
        nettyHandler = handler;
    }

    /**
     * 初始化BootStrap
     * @param eventLoopGroup
     * @return
     */
    public static final Bootstrap getBootstrap(EventLoopGroup eventLoopGroup){
        if(bootstrap==null){
            bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(20480,20480,65536))
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,5*1000)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline()
                                    .addLast("encoder", new RpcDecoder())
                                    .addLast(nettyHandler);
                        }
                    });
        }
        return bootstrap;
    }

    /**
     * 获取连接对象
     * @param ipPort
     */
    public static NettyCacheInfo getNettyCacheInfo(String ipPort){
        NettyCacheInfo nettyInfo = null;
        if(channels.containsKey(ipPort)){
            nettyInfo = channels.get(ipPort);
        }else{
            log.error("地址:【{}】暂无连接",ipPort);
        }
        return nettyInfo;
    }

    /**
     * 单独连接某一个装置
     * @param info
     */
    public static  void connect(NettyCacheInfo info){
        String ipPort = info.getIpPort();
        if(!channels.containsKey(ipPort)){
            //先添加缓存,方便handler中处理
            info.setConnected(false);
            String[] arr = ipPort.split(":");
            bootstrap.remoteAddress(arr[0],Integer.parseInt(arr[1]));
            //连接初始化监听
            bootstrap.connect().addListener((ChannelFutureListener) futureListener -> {
                if (futureListener.isSuccess()) {
                    log.info("监听:{}连接成功", ipPort);
                    info.setActiveTime(new Timestamp(System.currentTimeMillis()));
                    info.setSocketChannel((SocketChannel) futureListener.channel());
                    info.setConnected(true);
                    channels.put(ipPort,info);
                } else {
                    failChannels.put(ipPort,info);
                    log.info("监听:{}连接失败", ipPort);
                }
            });
        }else{
            log.info("地址:{}连接已存在",ipPort);
        }
    }
}

创建数据解码器

package com.example.demo.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/**
 * Netty消息体解码器
 * 自定义
 */
public class RpcDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        int dataLength = in.readableBytes();
        byte[] data = new byte[dataLength];
        in.readBytes(data);
        String str = bytesToHex(data, dataLength);
        out.add(str);
    }

    /**
     * 16进制转String
     * @param bytes
     * @param counts
     * @return
     */
    private String bytesToHex(byte[] bytes, int counts){
        StringBuffer sbu = new StringBuffer();
        for (int i = 0; i < counts/* bytes.length */; i++) {
            int val = new Byte(bytes[i]).intValue();
            String str = Integer.toHexString(val & 0xff);
            if (str.length() == 1) {
                str = "0" + str;
            }
            sbu.append(str);
            if (i != counts - 1) {
                sbu.append(" ");
            }
        }
        return sbu.toString().toUpperCase();
    }
}

三、创建netty回调类

package com.example.demo.netty;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;
import java.sql.Timestamp;

/**
 * Netty回调类
 */
@Slf4j
@ChannelHandler.Sharable
@Component
public class NettyHandler extends SimpleChannelInboundHandler<String> {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        try {
            //获取连接信息 ip,port
            InetSocketAddress address=(InetSocketAddress)ctx.channel().remoteAddress();
            String host = address.getHostString();
            int port = address.getPort();
            log.info("....客户端连接成功....连接地址为:"+host+":"+port);
            //获取netty缓存
            NettyCacheInfo cacheInfo = NettyClient.getNettyCacheInfo(host + ":" + port);
            if(cacheInfo!=null) {
                cacheInfo.setActiveTime(new Timestamp(System.currentTimeMillis()));
            }
            ctx.fireChannelActive();
        } catch (Exception e) {
            log.error("channelActive:"+e.getMessage());
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        try {
            //获取连接信息 ip,port
            InetSocketAddress address=(InetSocketAddress)ctx.channel().remoteAddress();
            String host = address.getHostString();
            int port = address.getPort();
            log.info("....客户端连接断开....连接地址为:"+host+":"+port);
            //获取netty缓存
            NettyCacheInfo cacheInfo = NettyClient.getNettyCacheInfo(host + ":" + port);
            if(cacheInfo!=null) {
                cacheInfo.setActiveTime(new Timestamp(System.currentTimeMillis()));
            }
            NettyClient.channels.remove(host+":"+port);
            NettyClient.failChannels.put(host+":"+port,cacheInfo);
            final EventLoop eventLoop = ctx.channel().eventLoop();
        } catch (Exception e) {
            log.error("channelInactive:"+e.getMessage());
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        try {
            //获取连接信息 ip,port
            InetSocketAddress address=(InetSocketAddress)ctx.channel().remoteAddress();
            String host = address.getHostString();
            int port = address.getPort();
            log.info("....客户端连接异常....连接地址为:{}:{}...异常信息为:{}",host,port,ctx.channel().id().asShortText());
            //获取netty缓存
            NettyCacheInfo cacheInfo = NettyClient.getNettyCacheInfo(host + ":" + port);
            if(cacheInfo!=null) {
                cacheInfo.setActiveTime(new Timestamp(System.currentTimeMillis()));
            }
            NettyClient.channels.remove(host + ":" + port);
            NettyClient.failChannels.put(host+":"+port,cacheInfo);
        } catch (Exception e) {
            log.error("exceptionCaught:"+e.getMessage());
        }
    }

    /**
     * 接受到消息后的方法
     * @param ctx
     * @param s
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
        try {
            //获取连接信息 ip,port
            InetSocketAddress address=(InetSocketAddress)ctx.channel().remoteAddress();
            String host = address.getHostString();
            int port = address.getPort();
            log.debug("接收到地址:【{}:{}】的数据:{}",host,port,s);
            //获取netty缓存
            NettyCacheInfo cacheInfo = NettyClient.getNettyCacheInfo(host + ":" + port);
            cacheInfo.setActiveTime(new Timestamp(System.currentTimeMillis()));
            //TODO:准备处理数据
        } catch (Exception e) {
            log.error("channelRead0:"+e.getMessage());
        }
    }
}

四、创建配置加载类,在项目启动时进行加载

package com.example.demo.netty;

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Map;

/**
 *netty配置类
 */
@Slf4j
@Component
public class NettyConfig {

    @Resource
    NettyHandler nettyHandler;

    /**
     * 初始化加载时注入handler回调类
     */
    @PostConstruct
    public void init(){
        NettyClient.bootstrap =NettyClient.getBootstrap(NettyClient.eventLoopGroup);
        NettyClient.setNettyHandler(nettyHandler);
        log.info("进行Netty初始化加载");
    }

    /**
     * 初始化连接
     */
    @Scheduled(cron = "0 0/5 * * * ?")
    public void connectDevice(){
        log.info("删除长时间不活动的装置终端");
        long curl = System.currentTimeMillis();
        for(Map.Entry<String,NettyCacheInfo> entry:NettyClient.channels.entrySet()){
            String ipPort = entry.getKey();
            NettyCacheInfo info = entry.getValue();
            long actt = info.getActiveTime().getTime();
            log.info("地址信息:{},当前时间:{},活动时间{}",ipPort,curl,actt);
            //大于5分钟未响应
            if(curl-actt>5*60*1000){
                log.info("地址:{}连接5分钟无活动,剔除连接",ipPort);
                info.getSocketChannel().close();
                NettyClient.channels.remove(ipPort);
                info.setConnected(false);
                NettyClient.failChannels.put(ipPort,info);
            }
        }
    }


}

五、创建消息发送工具类

package com.example.demo.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.socket.SocketChannel;
import lombok.extern.slf4j.Slf4j;

/**
 * 通过netty对象发送数据
 */
@Slf4j
public class NettyWrite {

    /**
     * 发送数据-十六进制数据
     */
    public static void writeAndFlushBy(NettyCacheInfo cacheInfo, String hexString) {
        //获取连接对象
        SocketChannel channel = cacheInfo.getSocketChannel();
        if (channel == null ||
                !channel.isActive()
                || !channel.isOpen()
                || !channel.isWritable()) {
            // 获取不到串口,将任务停止删掉
            throw new NullPointerException("未获取到网口连接");
        }

        log.info("{} 开始下发报文数据!", cacheInfo.getIpPort());

        try {
            byte[] bytes = hexStrToBytes(hexString);
            ByteBuf buffer = channel.alloc().buffer();
            ByteBuf byteBuf = buffer.writeBytes(bytes);
            channel.writeAndFlush(byteBuf).addListener((ChannelFutureListener) future -> {
                if (future.isSuccess()) {
                    log.info("{} 下发报文成功!", cacheInfo.getIpPort());
                } else {
                    throw new Exception("下发报文失败");
                }
            });
        } catch (Exception e) {
            throw e;
        }
    }

    /**
     * 十六进制字符串转为数组
     * @param hexStr
     * @return
     */
    private static byte[] hexStrToBytes(String hexStr) {
        String[] hexs = splitString(hexStr);
        byte[] hexBytes = new byte[hexs.length];
        for (int i = 0; i < hexs.length; i++) {
            hexBytes[i] = hexToByte(hexs[i]);
        }
        return hexBytes;
    }

    private static byte hexToByte(String arg) {
        int val = Integer.valueOf(arg, 16).intValue();
        byte c = (byte) (val & 0xff);
        return c;
    }

    private static String[] splitString(String messageContent) {
        messageContent = messageContent.replace(" ", "");
        int leng = messageContent.length() / 2;
        String[] messageContentArray = new String[leng];
        int beginIndex = 0;
        for (int i = 0; i < leng; i++) {
            int endIndex = beginIndex + 2;
            messageContentArray[i] = messageContent.substring(beginIndex,
                    endIndex);
            beginIndex += 2;
        }
        return messageContentArray;
    }
}

 

使用方法:

List<String> addressList = new ArrayList<>();
        addressList.add("192.168.0.0.1:111");
        addressList.add("192.168.0.0.2:111");
        addressList.add("192.168.0.0.3:111");
        addressList.add("192.168.0.0.4:111");
        addressList.add("192.168.0.0.5:111");
        addressList.add("192.168.0.0.6:111");
        for (String address:addressList) {
            //创建缓存对象
            NettyCacheInfo nettyCacheInfo = new NettyCacheInfo();
            nettyCacheInfo.setIpPort(address);
            nettyCacheInfo.setConnected(false);
            //进行连接
            NettyClient.connect(nettyCacheInfo);
        }
        //发送数据
        String hexString = "01 A1 00 B2";
        NettyWrite.writeAndFlushBy(NettyClient.getNettyCacheInfo("192.168.0.0.1:111"),hexString);

 

标签:info,netty,String,new,使用,import,channel
From: https://www.cnblogs.com/Sora-L/p/16755185.html

相关文章

  • postman7种断言的使用
    导航:1.postman断言介绍2.状态码断言3.响应正文断言-包含4.响应正文断言-json字段5.响应头断言6.响应时间断言7.环境变量的断言---------分割线--------......
  • Netty 学习(八):新连接接入源码说明
    Netty学习(八):新连接接入源码说明作者:Grey原文地址:博客园:Netty学习(八):新连接接入源码说明CSDN:Netty学习(八):新连接接入源码说明新连接的接入分为3个过程检测到有新......
  • Django使用uwsgi部署教程
     一直想用uwsgi部署Django,因为uwsgi性能更好而且可以多进程跑(pythonmanager.pyrunserver是单进程)。但在windows开发机上实验的时候死活装不了uwsgi(有人说可以......
  • python爬虫使用session保持登录状态
    今天有个客户需求,从网站上下载会员试题,需要在登录状态下载,然后将网页中展示的试题保存在word中。网站上展示的所有试题要保存在一个word文档中,但是每一个试题结束下一个试......
  • SV学习(7)——包的使用
    1.包的定义SV提供了一种在多个module、interface和program中共享parameter、data、type、task、function、class等的方法,即利用package(包)的方法来实现。完整的验证环境......
  • 【服务器管理】禁止指定用户使用密码登录服务器
    前言登录服务器通常会采用“用户名-密码”和“ssh密钥”这两种方案中的一种,而在调研到的大多数观点中,都认为密钥比密码更加安全。在管理服务器时,强制服务器用户均使用密......
  • XX学Python·运算符的使用
    算数运算符加减乘除//取商%取余(取模)**幂次运算()小括号用来提高运算优先级float1=10.2int1=4int2=11#数值型数据(float,int,bool)间可以进行算数运算#......
  • 【微信小程序】button和image组件的基本使用
    ......
  • python中类与对象的命名空间(静态属性的陷阱)、__dict__ 和 dir() 在继承中使用说明
    1.面向对象的概念1)类是一类抽象的事物,对象是一个具体的事物;用类创建对象的过程,称为实例化。2)类就是一个模子,只知道在这个模子里有什么属性、什么方法,但是不知道......
  • 使用Authorization Code保护Interactive Applications
    上一篇介绍了ClientCredentials这一最简单的granttype,通常应用在machine对machine的通信环境有安全保障的场景下。对于WebApplication,SAP以及native/mobileApplication......