首页 > 编程语言 >java-netty客户端断线重启

java-netty客户端断线重启

时间:2024-09-29 19:22:17浏览次数:16  
标签:netty java void import new public channel 客户端

背景

经常会遇到netty客户端,因为网络等多种原因而断线,需要自动重连

核心

就是对连接服务端成功后,对ChannelFuture进行监听,核心代码如下

            f = b.connect("127.0.0.1", 10004).sync(); // (5)
            f.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if(!channelFuture.isSuccess()){
                        System.out.println("重试");
                        channelFuture.channel().eventLoop().schedule(new Runnable() {
                            @Override
                            public void run() {
                                doReconnect();
                            }
                        },3,TimeUnit.SECONDS);
                    }else{

                    }
                }
            });

具体代码

nettyClient

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import java.util.Random;
import java.util.concurrent.TimeUnit;

public class nettyClient {
    private static ChannelFuture f;
    static EventLoopGroup workerGroup;
    static Bootstrap b;
    static ChannelFutureListener channelFutureListener=null;
    static NettyClientHandlerInner nettyClientHandlerInner = new NettyClientHandlerInner();

    public static void main(String[] args) throws Exception {


        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        Thread.sleep(2000);
                        nettyClientHandlerInner.sendMSG("writeWumingStatus@@" + new Random().nextInt(20000));
                        Thread.sleep(2000);
                        nettyClientHandlerInner.sendMSG("writePazhanfoStatus@@" + new Random().nextInt(20000));

                        nettyClientHandlerInner.sendMSG("pkRecord@@" + new Random().nextInt(20000));

                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
        init();
        connectToServer(nettyClientHandlerInner);
    }

    public static void init() {
        workerGroup = new NioEventLoopGroup();

        b = new Bootstrap(); // (1)
        b.group(workerGroup); // (2)
        b.channel(NioSocketChannel.class); // (3)
        b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
        b.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                ch.pipeline().addLast(new StringDecoder());
                ch.pipeline().addLast(nettyClientHandlerInner);
            }
        });

        channelFutureListener=new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if(!channelFuture.isSuccess()){
                    channelFuture.channel().eventLoop().schedule(new Runnable() {
                        @Override
                        public void run() {
                           doReconnect();
                        }
                    },3,TimeUnit.SECONDS);
                }else{
                    System.out.println("重连成功");
                }
            }
        };

    }

    public static void connectToServer(NettyClientHandlerInner nettyClientHandler) {
        try {
            // Start the client.
            f = b.connect("127.0.0.1", 10004).sync(); // (5)
            f.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if(!channelFuture.isSuccess()){
                        System.out.println("重试");
                        channelFuture.channel().eventLoop().schedule(new Runnable() {
                            @Override
                            public void run() {
                                doReconnect();
                            }
                        },3,TimeUnit.SECONDS);
                    }else{

                    }
                }
            });
            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void doReconnect(){
        ChannelFuture future=b.connect("127.0.0.1", 10004);
        future.addListener(channelFutureListener);
    }
}

NettyClientHandlerInner

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

@ChannelHandler.Sharable
class NettyClientHandlerInner extends ChannelInboundHandlerAdapter {
    ChannelHandlerContext ctxOut;
    //通道就绪事件(就是在bootstrap启动助手配置中addlast了handler之后就会触发此事件)
    //但我觉得也可能是当有客户端连接上后才为一次通道就绪
    public void channelActive(ChannelHandlerContext ctx) throws IOException, InterruptedException {
        System.out.println("客户端消息,通道激活,可以发送消息了");
        ctxOut=ctx;

    }

    //数据读取事件
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        //传来的消息包装成字节缓冲区
        String byteBuf = (String) msg;
//        ByteBuf byteBuf = (ByteBuf) msg;
        //Netty提供了字节缓冲区的toString方法,并且可以设置参数为编码格式:CharsetUtil.UTF_8
        System.out.println("客户端读取服务返回的数据:" + byteBuf);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        System.out.println(cause.getMessage());
        ctx.close();
    }

    public void  sendMSG(String msg){
        ctxOut.writeAndFlush(Unpooled.copiedBuffer(msg+"\r\n", CharsetUtil.UTF_8));
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);

        System.out.println("与服务器断开");

        ctx.channel().eventLoop().schedule(new Runnable() {
            @Override
            public void run() {
                nettyClient.doReconnect();
            }
        }, 3, TimeUnit.SECONDS);

        ctx.close();
    }
}

总结

要实现重连,有三个地方需要注意

  1. 对连接成功的ChannelFuture进行监听,调用doReconnect
  2. 实现如上的doReconnect
  3. 在NettyClientHandlerInner中重写channelInactive,再次调用doReconnect
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);

        System.out.println("与服务器断开");

        ctx.channel().eventLoop().schedule(new Runnable() {
            @Override
            public void run() {
                nettyClient.doReconnect();
            }
        }, 3, TimeUnit.SECONDS);

        ctx.close();
    }

标签:netty,java,void,import,new,public,channel,客户端
From: https://blog.csdn.net/m0_60688978/article/details/142641910

相关文章

  • java-快速将普通main类变为javafx类,并加载自定义fxml
    java-快速将普通main类变为javafx类,并加载自定义fxml前提步骤1.普通类继承Application2.实现main方法3.写一个controller4.写一个fxml文件5.写start方法加载fxml6.具体代码7.运行即可前提使用自带javafx的jdk,这里使用的是jdk1.834,当然你可以使用其他的可行......
  • Java必修课——Spring框架
    目录一、Spring框架概述二、IOC概念和原理2.1、什么是IOC2.2、IOC接口三、深入理解Java基础中的集合框架3.1、Collection3.2、Map3.3、集合工具类四、练习写一个SpringMVC框架1、介绍2、程序实践3、总结五、Java开发者必备10大数据工具和框架一、Spring框架概述Sp......
  • java+vue计算机毕设餐厅点餐订餐系统【源码+程序+论文+开题】
    本系统(程序+源码)带文档lw万字以上文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着科技的飞速发展和互联网普及率的不断提高,餐饮业正经历着前所未有的变革。传统餐厅的点餐方式已难以满足现代消费者对于便捷性、个性化及高效服务......
  • java+vue计算机毕设不动产信息管理系统【源码+程序+论文+开题】
    本系统(程序+源码)带文档lw万字以上文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着城市化进程的加速和房地产市场的蓬勃发展,不动产作为重要的经济资产和社会资源,其管理效率与信息化水平直接影响到政府监管、市场交易及民众权益保......
  • 【JavaEE】——CAS指令和ABA问题
    阿华代码,不是逆风,就是我疯你们的点赞收藏是我前进最大的动力!!希望本文内容能够帮助到你!!目录一:CAS指令1:概念2:伪代码例子说明3:优点二:原子类1:引入2:代码示例3:与volatile的区别4:标准库源码三:CAS是如何避免线程安全问题四:CAS中ABA问题1:引入2:极端情况3:解决方案......