首页 > 其他分享 >DotNetty 封装的TcpClient

DotNetty 封装的TcpClient

时间:2024-02-04 15:37:47浏览次数:33  
标签:TcpClient null return NettyChannel new NettyDataHandler 封装 DotNetty public

.net 里 Netty 资料不多,做个记录

public class NetworkCommunicator : ICommunicator
{
    #region Netty 本来想用静态,后来觉得多个client公用一个netty可能分不清返回的数据, 先这样,后期要是吃资源再优化
    Bootstrap _NettyBoot;
    IEventLoopGroup _NettyEventLoop;
    IChannel _NettyChannel;
    private TaskCompletionSource<byte[]> _ResponseCompletionSource;
    CancellationTokenSource _TokenSource;
    NettyDataHandler _NettyDataHandler;
    object sendLock = new object();
    void NettyServerInit()
    {
        // Loop可以指定线程数量来提升负载,如果不设定则是默认cpu核心数为线程数,我不确定系统 tcp 的使用情况,没法确定这里的线程
        _NettyEventLoop ??= new MultithreadEventLoopGroup();
        if (_NettyBoot is null)
        {
            _NettyDataHandler = new NettyDataHandler();
            _NettyBoot = new Bootstrap();
            _NettyBoot.Group(_NettyEventLoop)
                .Channel<TcpSocketChannel>()
                .Option(ChannelOption.TcpNodelay, true)
                .Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
                {
                    IChannelPipeline pipeline = channel.Pipeline;
                    pipeline.AddLast(_NettyDataHandler); // Add your handler here
                }));
        }
    }
    class NettyDataHandler : ChannelHandlerAdapter
    {
        public event Func<byte[], Task>? OnBytesReceived;
        public event Action? OnDisconnected;
        public override void ChannelRead(IChannelHandlerContext context, object message)
        {
            if (message is IByteBuffer byteBuffer)
            {
                byte[] bytes = new byte[byteBuffer.ReadableBytes];
                byteBuffer.GetBytes(byteBuffer.ReaderIndex, bytes);
                OnBytesReceived?.Invoke(bytes);
            }
        }
        public override void ChannelReadComplete(IChannelHandlerContext context)
        {
            context.Flush();
        }

        public override void ExceptionCaught(IChannelHandlerContext context, Exception e)
        {
            context.CloseAsync();
            OnDisconnected?.Invoke();
        }
    }
    #endregion
    public IPEndPoint? LocalEndPoint { get; private set; }

    public IPEndPoint? RemoteEndPoint { get; private set; }

    public bool IsConnected
    {
        get
        {
            if (_NettyChannel is null) return false;
            return _NettyChannel.Open && _NettyChannel.Registered;
        }
    }

    public event Action<byte[]> OnDataReceive;
    internal NetworkCommunicator()
    {
        LocalEndPoint = new IPEndPoint(IPAddress.Any, 0);
        NettyServerInit();
        _NettyDataHandler!.OnBytesReceived += receiveByte => Task.Run(() =>
        {
            OnDataReceive?.Invoke(receiveByte);
            _ResponseCompletionSource?.TrySetResult(receiveByte);
        });


    }
    internal NetworkCommunicator(IPEndPoint localEndPoint)
    {
        LocalEndPoint = localEndPoint;
        NettyServerInit();
        _NettyDataHandler!.OnBytesReceived += receiveByte => Task.Run(() =>
        {
            OnDataReceive?.Invoke(receiveByte);
            _ResponseCompletionSource?.TrySetResult(receiveByte);
        });
    }

    public bool Connect(IPEndPoint remoteEndPoint)
    {
        try
        {
            this.RemoteEndPoint = remoteEndPoint;
            if (!IsConnected)
            {
                _NettyChannel = _NettyBoot.ConnectAsync(RemoteEndPoint).Result;
                return Connect(remoteEndPoint);
            }
            return true;
        }
        catch (AggregateException ex)
        {
            Log4Helper.Communication.Error(ex);
            return false;
        }

    }

    public bool DisConnect()
    {
        _NettyChannel?.DisconnectAsync();
        return true;

    }

    public void Dispose()
    {

        _NettyChannel?.CloseAsync();
        _NettyEventLoop?.ShutdownGracefullyAsync();
        _NettyEventLoop = null;
        _NettyChannel = null;
    }

    public byte[]? Send(byte[]? content)
    {
        lock (sendLock)
        {
            _ResponseCompletionSource = new TaskCompletionSource<byte[]>();
            using (_TokenSource = new CancellationTokenSource(1000))
            {
                _TokenSource.Token.Register(() => _ResponseCompletionSource.TrySetCanceled(), useSynchronizationContext: false);
                SendAsync(content).Wait();
                try
                {
                    return _ResponseCompletionSource.Task.Result;
                }
                catch (TaskCanceledException)
                {
                    return null;
                }
                catch (AggregateException)
                {
                    return null;
                }
            }
        }
    }

    public async Task<bool> SendAsync(byte[]? content)
    {
        if (content is null) return false;
        if (RemoteEndPoint is null) return false;
        if (_NettyChannel is null)
        {
            Connect(RemoteEndPoint);
            return await (SendAsync(content));
        }
        var buffer = Unpooled.WrappedBuffer(content);
        await _NettyChannel.WriteAndFlushAsync(buffer);
        return true;
    }
}

 

标签:TcpClient,null,return,NettyChannel,new,NettyDataHandler,封装,DotNetty,public
From: https://www.cnblogs.com/cchong005/p/18006320

相关文章

  • DotNetty 封装的 UdpClient
    DotNetty资料较少,UdpClient和TcpClient略有不同publicclassUdpCommunicator:ICommunicator{privateIChannel?_ClientChannel;privateBootstrap?_Bootstrap;IEventLoopGroup?_LoopGroup;privateTaskCompletionSource<byte[]>_ResponseComp......
  • 用VB6.0封装DLL组件并在EXCEL中调用
    使用程序:1、MicrosoftOfficeExcel20032、MicrosoftVisualBasic6.0 案例:在工作表的C1单元格得出A1单元格+B1单元格的值。设计的VBA代码:SubTest()  OnErrorResumeNext  Range("C1")=Cells(1,1)+Cells(1,2)EndSub  第一部分、使用VB6.0制作DL......
  • JDBC的简单封装
    1、dataSource.propertiesdriver=oracle.jdbc.driver.OracleDriverurl=jdbc:oracle:thin:@localhost:1521:orclusername=usernamepassword=password2、代码示例packagecom.example;importjava.io.IOException;importjava.io.InputStream;importjava.sql.Connection;imp......
  • uniapp sqlite方法封装
    vardbName='xxx'//数据库名称vardbPath='_doc/xxx.db'//数据库地址,推荐以下划线为开头_doc/xxx.db//判断数据库是否打开constisOpen=(plus:any)=>{//数据库打开了就返回true,否则返回falsevaropen=plus.sqlite.isOpenDatabase({name:......
  • Vue中如何对Axios进行二次封装
    作为一个前端开发者,你一定对Axios这个强大的HTTP库非常熟悉。它不仅简化了与后端API的通信,而且还提供了许多强大的功能,如拦截器、取消请求等。但是在实际开发中,我们经常需要对Axios进行二次封装,以便更好地适应我们的业务需求。今天,我将为大家分享一下如何在Vue中对Axios进行二次封......
  • vue3,封装检测元素大小变化的自定义指令
    1//resizeObserver.ts2//监听元素大小变化的指令3constmap=newWeakMap()4constob=newResizeObserver((entries)=>{5for(constentryofentries){6//获取dom元素的回调7consthandler=map.get(entry.target)8//存在回调函......
  • tcp 远程服务器,C#编程学习之使用TcpClient / BeginConnect测试远程服务器tcp端口连接
    原文链接:hhttps://blog.csdn.net/weixin_36286567/article/details/119265325有时候经常需要对一些服务器的端口进行tcp连接测试,通常使用“telnetIP地址端口号”的方式即可,不能说这种方式不可取,只是使用起来比较麻烦,本着简单好用的目的,不如我们自己动手写一个测试tcp端口连接的......
  • wpf 数据绑定 INotifyPropertyChanged封装
    BindableBase.cspublicabstractclassBindableBase:INotifyPropertyChanged{publiceventPropertyChangedEventHandlerPropertyChanged;//调用方法:publicstringName{get=>name;set{SetProperty<string>(refname,value);}}......
  • D20XB100-ASEMI整流桥D20XB100参数、封装、规格
    编辑:llD20XB100-ASEMI整流桥D20XB100参数、封装、规格型号:D20XB100品牌:ASEMI正向电流(Id):20A反向耐压(VRRM):1000V正向浪涌电流:300A正向电压(VF):1.05V引脚数量:5芯片个数:4芯片尺寸:MIL功率(Pd):大功率设备封装:GBJ-5工作温度:-40°C~150°C类型:插件、整流桥D20XB100描述:ASEMI品牌D20XB100是采......
  • D20XB100-ASEMI整流桥D20XB100参数、封装、规格
    编辑:llD20XB100-ASEMI整流桥D20XB100参数、封装、规格型号:D20XB100品牌:ASEMI正向电流(Id):20A反向耐压(VRRM):1000V正向浪涌电流:300A正向电压(VF):1.05V引脚数量:5芯片个数:4芯片尺寸:MIL功率(Pd):大功率设备封装:GBJ-5工作温度:-40°C~150°C类型:插件、整流桥D20XB100描述:ASEMI......