参考:https://www.jianshu.com/p/f8f99f20d7f4
WebSocketClient.java
package com.flash.client;
import com.flash.handler.WebSocketClientHandler;
import com.flash.log.Log;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.URI;
public class WebSocketClient {
public boolean ConnWebSocketServerAsync(String svrUrl){
Thread thd = new Thread(){
@Override
public void run() {
ConnWebSocketServer(svrUrl);
}
};
thd.start();
return true;
}
public boolean ConnWebSocketServer(String svrUrl){
EventLoopGroup client = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(client);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new WebSocketClientInit());
URI wsURI = new URI(svrUrl); // new URI("ws://localhost:8899/ws");
ChannelFuture cf = bootstrap.connect(wsURI.getHost(), wsURI.getPort()).sync();
cf.addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
String log = "";
log = String.format("连接websocket服务器: %s isSuccess=%s", svrUrl, channelFuture.isSuccess());
System.out.println(log);
if(channelFuture.isSuccess()){
//进行握手
Channel channel = channelFuture.channel();
HttpHeaders httpHeaders = new DefaultHttpHeaders();
WebSocketClientHandler handler = (WebSocketClientHandler)channel.pipeline().get("WebSocketClientHandler");
WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(wsURI,
WebSocketVersion.V13, (String)null, true, httpHeaders);
handler.setHandshaker(handshaker);
handshaker.handshake(channel);
// 阻塞等待是否握手成功?
// handler.handshakeFuture().sync();
handler.handshakeFuture();
}
}
});
cf.channel().closeFuture().sync();
return true;
}
catch (Exception ex){
Log.ErrorLog(ex);
}
finally {
client.shutdownGracefully();
}
return false;
} // ConnWebSocketServer end
}
WebSocketClientInit.java
package com.flash.client;
import com.flash.handler.WebSocketClientHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
public class WebSocketClientInit extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("HttpClientCodec", new HttpClientCodec());
pipeline.addLast("HttpObjectAggregator", new HttpObjectAggregator(1024*10));
pipeline.addLast("WebSocketClientHandler", new WebSocketClientHandler());
}
}
WebSocketClientHandler.java
package com.flash.handler;
import com.flash.log.Log;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
private WebSocketClientHandshaker handshaker = null;
private ChannelPromise handshakeFuture = null;
public void handlerAdded(ChannelHandlerContext ctx) {
this.handshakeFuture = ctx.newPromise();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Log.DebugLog("WebSocketClientHandler::channelRead0: ");
// 握手协议返回,设置结束握手
if (!this.handshaker.isHandshakeComplete()){
FullHttpResponse response = (FullHttpResponse)msg;
this.handshaker.finishHandshake(ctx.channel(), response);
this.handshakeFuture.setSuccess();
Log.DebugLog("WebSocketClientHandler::channelRead0 HandshakeComplete...");
return;
}
if (msg instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame)msg;
Log.DebugLog("WebSocketClientHandler::channelRead0 textFrame: " + textFrame.text());
}
if (msg instanceof CloseWebSocketFrame){
Log.DebugLog("WebSocketClientHandler::channelRead0 CloseWebSocketFrame");
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// System.out.println("WebSocketClientHandler::channelInactive 服务端连接成功");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("WebSocketClientHandler::exceptionCaught");
cause.printStackTrace();
ctx.channel().close();
}
public void setHandshaker(WebSocketClientHandshaker handshaker) {
this.handshaker = handshaker;
}
public ChannelFuture handshakeFuture() {
return this.handshakeFuture;
}
public ChannelPromise getHandshakeFuture() {
return handshakeFuture;
}
public void setHandshakeFuture(ChannelPromise handshakeFuture) {
this.handshakeFuture = handshakeFuture;
}
}
标签:netty,WebSocket,Netty,WebSocketClientHandler,handler,io,import,channel,客户端 From: https://blog.51cto.com/softo/6022189