背景
经常会遇到netty客户端,因为网络等多种原因而断线,需要自动重连
核心
就是对连接服务端成功后,对ChannelFuture进行监听,核心代码如下
f = b.connect("127.0.0.1", 10004).sync(); // (5)
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if(!channelFuture.isSuccess()){
System.out.println("重试");
channelFuture.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
doReconnect();
}
},3,TimeUnit.SECONDS);
}else{
}
}
});
具体代码
nettyClient
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class nettyClient {
private static ChannelFuture f;
static EventLoopGroup workerGroup;
static Bootstrap b;
static ChannelFutureListener channelFutureListener=null;
static NettyClientHandlerInner nettyClientHandlerInner = new NettyClientHandlerInner();
public static void main(String[] args) throws Exception {
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
Thread.sleep(2000);
nettyClientHandlerInner.sendMSG("writeWumingStatus@@" + new Random().nextInt(20000));
Thread.sleep(2000);
nettyClientHandlerInner.sendMSG("writePazhanfoStatus@@" + new Random().nextInt(20000));
nettyClientHandlerInner.sendMSG("pkRecord@@" + new Random().nextInt(20000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
init();
connectToServer(nettyClientHandlerInner);
}
public static void init() {
workerGroup = new NioEventLoopGroup();
b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(nettyClientHandlerInner);
}
});
channelFutureListener=new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if(!channelFuture.isSuccess()){
channelFuture.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
doReconnect();
}
},3,TimeUnit.SECONDS);
}else{
System.out.println("重连成功");
}
}
};
}
public static void connectToServer(NettyClientHandlerInner nettyClientHandler) {
try {
// Start the client.
f = b.connect("127.0.0.1", 10004).sync(); // (5)
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if(!channelFuture.isSuccess()){
System.out.println("重试");
channelFuture.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
doReconnect();
}
},3,TimeUnit.SECONDS);
}else{
}
}
});
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void doReconnect(){
ChannelFuture future=b.connect("127.0.0.1", 10004);
future.addListener(channelFutureListener);
}
}
NettyClientHandlerInner
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
@ChannelHandler.Sharable
class NettyClientHandlerInner extends ChannelInboundHandlerAdapter {
ChannelHandlerContext ctxOut;
//通道就绪事件(就是在bootstrap启动助手配置中addlast了handler之后就会触发此事件)
//但我觉得也可能是当有客户端连接上后才为一次通道就绪
public void channelActive(ChannelHandlerContext ctx) throws IOException, InterruptedException {
System.out.println("客户端消息,通道激活,可以发送消息了");
ctxOut=ctx;
}
//数据读取事件
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//传来的消息包装成字节缓冲区
String byteBuf = (String) msg;
// ByteBuf byteBuf = (ByteBuf) msg;
//Netty提供了字节缓冲区的toString方法,并且可以设置参数为编码格式:CharsetUtil.UTF_8
System.out.println("客户端读取服务返回的数据:" + byteBuf);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// Close the connection when an exception is raised.
cause.printStackTrace();
System.out.println(cause.getMessage());
ctx.close();
}
public void sendMSG(String msg){
ctxOut.writeAndFlush(Unpooled.copiedBuffer(msg+"\r\n", CharsetUtil.UTF_8));
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
System.out.println("与服务器断开");
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
nettyClient.doReconnect();
}
}, 3, TimeUnit.SECONDS);
ctx.close();
}
}
总结
要实现重连,有三个地方需要注意
- 对连接成功的ChannelFuture进行监听,调用doReconnect
- 实现如上的doReconnect
- 在NettyClientHandlerInner中重写channelInactive,再次调用doReconnect
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
System.out.println("与服务器断开");
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
nettyClient.doReconnect();
}
}, 3, TimeUnit.SECONDS);
ctx.close();
}
标签:netty,java,void,import,new,public,channel,客户端
From: https://blog.csdn.net/m0_60688978/article/details/142641910