1.Netty问题
TCP协议都存在着黏包和半包问题,但是UDP没有
1.粘包现象
发送方分10次发送,接收方一次接受了10次发送的消息
2.半包现象
调整服务器的接受缓冲区大小(调小)
半包会导致服务器在1次接受时,无法接收到客户端1次的数据
3.TCP滑动窗口
-
粘包:
- 滑动窗口:假设一个报文为256字节,如果接收方处理不及时(没有及时把报文处理掉),并且滑动窗口的大小足够大,那么这256字节就会缓冲在窗口中,多个
- Nagle算法:攒够一批足够再发,也会造成粘包
- 应用层:ByteBuf设置很大,Netty默认时1024
-
半包:
- 滑动窗口:接收方窗口只有128字节,发送方发送256字节,放不下只能先发送前128,等待ack后发送剩余的部分,造成了半包
- MSS(链路层):(网卡/网络设备)发送数据超过MSS限制,会将数据切分发送,造成半包
- 应用层:接收方ByteBuf小于实际的发送数据量
4.代码案例
Server:
package com.yikolemon.P91Question;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
/**
* 黏包现象 : 不调 服务端 接收缓冲区 SO_RCVBUF
* 半包现象 : serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);
* 影响的底层接收缓冲区(即滑动窗口)大小,仅决定了 netty 读取的最小单位,netty 实际每次读取的一般是它的整数倍
*/
@Slf4j
public class HelloWorldServer {
void start(){
final NioEventLoopGroup boss = new NioEventLoopGroup();
final NioEventLoopGroup worker = new NioEventLoopGroup();
try{
final ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
// head<<<<<<<<<
// 接收缓冲区 设置字节 【使 发生半包】
serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);
// =============
// end>>>>>>>>>>
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
}
});
final ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
}catch (InterruptedException e)
{
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
public static void main(String[] args) {
new HelloWorldServer().start();
}
}
Client:
package com.yikolemon.P91Question;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HelloWorldClient {
static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);
public static void main(String[] args) {
final NioEventLoopGroup worker = new NioEventLoopGroup();
try{
final Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
// channel连接建立好之后 出发 channelActive() 时间
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// super.channelActive(ctx);
for (int i = 0; i < 10; i++) {
final ByteBuf buf = ctx.alloc().buffer(16);
buf.writeBytes(new byte[]{0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15});
ctx.writeAndFlush(buf);
}
}
});
}
});
final ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e)
{
log.error("Client error", e);
} finally {
worker.shutdownGracefully();
}
}
}
2.问题解决方案
1.短连接
能够解决粘包问题,但不能解决半包问题,当Server的接受缓冲区很小,就会产生半包问题
一次发送就断开连接,不会产生粘包的问题
Clinet代码:
package com.yikolemon.P91Question;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HelloWorldClient {
static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class);
public static void main(String[] args) {
//发送10次短连接
for (int i = 0; i < 10; i++) {
send();
}
}
public static void send() {
final NioEventLoopGroup worker = new NioEventLoopGroup();
try{
final Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
// channel连接建立好之后 出发 channelActive() 时间
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// super.channelActive(ctx);
final ByteBuf buf = ctx.alloc().buffer(16);
buf.writeBytes(new byte[]{0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15});
ctx.writeAndFlush(buf);
ctx.channel().close();
}
});
}
});
final ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e)
{
log.error("Client error", e);
} finally {
worker.shutdownGracefully();
}
}
}
2.FixedLengthFrameDecoder定长帧解码器
FixedLengthFrameDecoder是Netty提供的一个解码器。发送方发送指定长度的消息,接收方如果接受不到定长,那么就会等到后面消息补齐最后拼接成为指定的长度。
Client:
package com.yikolemon.P91Question;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.Random;
@Slf4j
public class SloveClient {
public static byte[] fill10Bytes(char c, int len) {
byte[] bytes = new byte[10];
Arrays.fill(bytes, (byte) '_');
for (int i = 0; i < len; i++) {
bytes[i] = (byte) c;
}
System.out.println(new String(bytes));
return bytes;
}
public static void main(String[] args) {
send();
}
public static void send() {
final NioEventLoopGroup worker = new NioEventLoopGroup();
try{
final Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
// channel连接建立好之后 出发 channelActive() 时间
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//把所有的数据全都写到一个Buf中,让服务器去解码
ByteBuf buf = ctx.alloc().buffer();
char c='0';
for (int i = 0; i < 10; i++) {
byte[] bytes = fill10Bytes(c, new Random().nextInt(10) + 1);
buf.writeBytes(bytes);
c++;
}
ctx.writeAndFlush(buf);
}
});
}
});
final ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e)
{
log.error("Client error", e);
} finally {
worker.shutdownGracefully();
}
}
}
Server代码:
package com.yikolemon.P91Question;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
/**
* 黏包现象 : 不调 服务端 接收缓冲区 SO_RCVBUF
* 半包现象 : serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);
* 影响的底层接收缓冲区(即滑动窗口)大小,仅决定了 netty 读取的最小单位,netty 实际每次读取的一般是它的整数倍
*/
@Slf4j
public class HelloWorldServer {
void start(){
final NioEventLoopGroup boss = new NioEventLoopGroup();
final NioEventLoopGroup worker = new NioEventLoopGroup();
try{
final ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//约定解码器,消息的长度为10
ch.pipeline().addLast(new FixedLengthFrameDecoder(10));
ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
}
});
final ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
}catch (InterruptedException e)
{
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
public static void main(String[] args) {
new HelloWorldServer().start();
}
}
3.使用分割符作为解码器
1)LineBasedFrameDecoder
支持\n和\r\n,需要指定最大长度,如果在最大长度内没有找到分割符,就抛出异常
Client代码:
package com.yikolemon.P91Question;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import java.util.Random;
@Slf4j
public class SolveClient2 {
public static StringBuilder makeString(char c, int len) {
StringBuilder sb = new StringBuilder(len + 2);
for (int i = 0; i < len; i++) {
sb.append(c);
}
sb.append("\n");
return sb;
}
public static void main(String[] args) {
send();
}
public static void send() {
final NioEventLoopGroup worker = new NioEventLoopGroup();
try{
final Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
// channel连接建立好之后 出发 channelActive() 时间
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//把所有的数据全都写到一个Buf中,让服务器去解码
ByteBuf buf = ctx.alloc().buffer();
char c='0';
for (int i = 0; i < 10; i++) {
int nextInt = new Random().nextInt(256+1);
StringBuilder str = makeString(c, nextInt);
byte[] bytes = str.toString().getBytes();
buf.writeBytes(bytes);
c++;
}
ctx.writeAndFlush(buf);
}
});
}
});
final ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e)
{
log.error("Client error", e);
} finally {
worker.shutdownGracefully();
}
}
}
Server代码:
package com.yikolemon.P91Question;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class HelloWorldServer {
void start(){
final NioEventLoopGroup boss = new NioEventLoopGroup();
final NioEventLoopGroup worker = new NioEventLoopGroup();
try{
final ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//1024表示最大的检验长度,如果超过了长度仍然没有检测到换行符就会抛出异常
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
}
});
final ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
}catch (InterruptedException e)
{
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
public static void main(String[] args) {
new HelloWorldServer().start();
}
}
2)其他
其他的分割解码器,可以自行进行测试
4.LengthFieldBasedFrameDecoder帧解码器
四个构造参数的含义:
- lengthFieldOffset :长度字段偏移量
- lengthFieldLength :长度字段长度
- lengthAdjustment :长度字段为基准,还有几个字节是内容
- initialBytesToStrip :从头剥离几个字节
例子1:
lengthFieldOffset = 0
lengthFieldLength = 2
lengthAdjustment = 0
initialBytesToStrip = 0 (= do not strip header)
BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
+--------+----------------+ +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
| 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" |
+--------+----------------+ +--------+----------------+
例子2:最后剥离出长度字段
lengthFieldOffset = 0
lengthFieldLength = 2
lengthAdjustment = 0
initialBytesToStrip = 2 (= the length of the Length field)
BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes)
+--------+----------------+ +----------------+
| Length | Actual Content |----->| Actual Content |
| 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" |
+--------+----------------+ +----------------+
例子3:
cafe占据两个字节,offset代表长度的偏移量,length代表长度的长度
lengthFieldOffset = 2 (= the length of Header 1)
lengthFieldLength = 3
lengthAdjustment = 0
initialBytesToStrip = 0
BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
+----------+----------+----------------+ +----------+----------+----------------+
| Header 1 | Length | Actual Content |----->| Header 1 | Length | Actual Content |
| 0xCAFE | 0x00000C | "HELLO, WORLD" | | 0xCAFE | 0x00000C | "HELLO, WORLD" |
+----------+----------+----------------+ +----------+----------+----------------+
例子4:
lengthAdjustment代表,从length之后,需要跳过2字节(类似Http的Header),才是真正的内容
lengthFieldOffset = 0
lengthFieldLength = 3
lengthAdjustment = 2 (= the length of Header 1)
initialBytesToStrip = 0
BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
+----------+----------+----------------+ +----------+----------+----------------+
| Length | Header 1 | Actual Content |----->| Length | Header 1 | Actual Content |
| 0x00000C | 0xCAFE | "HELLO, WORLD" | | 0x00000C | 0xCAFE | "HELLO, WORLD" |
+----------+----------+----------------+ +----------+----------+----------------+
例子5:
lengthFieldOffset = 1 (= the length of HDR1)
lengthFieldLength = 2
lengthAdjustment = 1 (= the length of HDR2)
initialBytesToStrip = 3 (= the length of HDR1 + LEN)
BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
+------+--------+------+----------------+ +------+----------------+
| HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
| 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
+------+--------+------+----------------+ +------+----------------+
代码测试:
package com.yikolemon.Decoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class LengthFieldDecoderTest {
public static void main(String[] args) {
EmbeddedChannel channel = new EmbeddedChannel(
//int length是4个字节
new LengthFieldBasedFrameDecoder(1024,0,4,4,0),
new LoggingHandler(LogLevel.INFO)
);
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
send(buf,"Fuck u");
send(buf,"Shit");
channel.writeInbound(buf);
}
public static void send(ByteBuf buf,String str) {
//4个字节的内容长度, 实际内容
byte[] bytes =str.getBytes();
int length = bytes.length;//实际内容长度
buf.writeInt(length);
//假装写入了一个版本号,所有需要adjustment对此(header)造成的影响进行修正
buf.writeBytes("why:".getBytes());
buf.writeBytes(bytes);
}
}
3.协议的设计和解析
1.Redis协议
根据Redis的协议,使用Netty根据协议发送命令
Redis网络传输协议:
set ke vvvv 对应的网络数据发送:
*3
$3
set
$2
ke
$4
vvvv
首先在本机上搭建Redis,然后Netty发送命令
FakeRedisClient代码:
package com.yikolemon.FakeRedis;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.Charset;
public class FakeRedisClient {
public static void main(String[] args) throws InterruptedException {
final byte[] LINE={13,10};
NioEventLoopGroup worker = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
//第一次连接调用
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = ctx.alloc().buffer();
buf.writeBytes("*3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("set".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$2".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("ke".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$4".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("vvvv".getBytes());
buf.writeBytes(LINE);
ctx.writeAndFlush(buf);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(buf.toString(Charset.defaultCharset()));
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 6379).sync();
channelFuture.channel().closeFuture().sync();
}
}
2.Http协议
使用Netty提供的HttpServerCodec(),此Handler能够解码Http请求。(它既是入站处理器也是出站处理器)
解码后,会生成为HttpRequest和HttpContent两个类,有两种方式对消息内容进行处理
-
方式一
使用ChannelInboundHandlerAdapter()类处理,此方式处理需要对msg类型进行instance判断,需要if else,比较麻烦
-
方式二
使用SimpleChannelInboundHandler
,这里的T泛型类即此Handler会处理的消息,如果发送来的msg不为此类型,那么不会处理
DefaultFullHttpResponse
作为HttpResponse,发送给Server,写入content和在header中的length
代码:
package com.yikolemon.FakeProtocol;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
@Slf4j
public class FakeHttp {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup boss=new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss,worker);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
ch.pipeline().addLast(new HttpServerCodec());
/*ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("{}",msg.getClass());
if (msg instanceof HttpRequest){
//请求行,请求头
}
else if (msg instanceof HttpContent){
//请求体
}
}
});*/
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest) throws Exception {
String uri = httpRequest.getUri();
log.info(uri);
DefaultFullHttpResponse response =
new DefaultFullHttpResponse(httpRequest.getProtocolVersion(), HttpResponseStatus.OK);
byte[] bytes = "<h1>Fuck this</h1>".getBytes();
response.content().writeBytes(bytes);
//告诉浏览器,content长度,防止浏览器一直读取
response.headers().setInt(CONTENT_LENGTH,bytes.length);
channelHandlerContext.writeAndFlush(response);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
}
}
3.自定义协议的要素
- 魔数:判断包是否无效:例如CAFEBABE
- 版本号:可以支持协议的升级
- 序列化算法:消息正文采用哪种序列化/反序列化方式,由此拓展,如json,protobuf,(hessian,jdk这俩为二进制)
- 指令类型:登录,注册,单聊,群聊...
- 请求序号:为了双工通信,提供异步的能力
- 正文长度
- 消息正文:使用json或者xml,或者对象流
4.聊天室消息协议
MessageCodec:
package com.yikolemon.Codec;
import com.yikolemon.message.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageCodec;
import lombok.extern.slf4j.Slf4j;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.List;
/**
* 自定义编解码器
*/
@Slf4j
public class MessageCodec extends ByteToMessageCodec<Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
//魔数写入
out.writeBytes(new byte[]{1,2,3,4});
//版本
out.writeByte(1);
//序列化算法 0:jdk 1:json
out.writeByte(0);
//指令类型
out.writeByte(msg.getMessageType());
//4个字节的请求序号
out.writeInt(msg.getSequenceId());
//对齐填充用
out.writeByte(0xff);
//长度
//内容字节数组
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos=new ObjectOutputStream(bos);
oos.writeObject(msg);
byte[] bytes = bos.toByteArray();
//长度
out.writeInt(bytes.length);
out.writeBytes(bytes);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = in.readInt();
byte version = in.readByte();
byte serializerType = in.readByte();
byte messageType = in.readByte();
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
byte[] bytes=new byte[length];
in.readBytes(bytes,0,length);
if (serializerType==0){
//使用jdk反序列化
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);
Message message = (Message)objectInputStream.readObject();
log.info("{},{},{},{},{},{}",magicNum,version,serializerType,messageType,sequenceId,length);
log.info("{}",message);
out.add(message);
}
}
}
测试类:
package com.yikolemon.Codec;
import com.yikolemon.message.LoginRequestMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class MessageCodecTest {
public static void main(String[] args) throws Exception {
EmbeddedChannel embeddedChannel = new EmbeddedChannel(
//使用帧解码器能够解决半包和粘包问题
new LengthFieldBasedFrameDecoder(1024,12,4,0,0),
new LoggingHandler(LogLevel.INFO),
new MessageCodec()
);
//encode
LoginRequestMessage message = new LoginRequestMessage("yiko", "111");
embeddedChannel.writeOutbound(message);
//decode
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
new MessageCodec().encode(null,message,buf);
//入站
embeddedChannel.writeInbound(buf);
}
}
4.Handler线程安全
如果你使用的Handler没有被标注为Shareable,在addLast时不能共用Handler,否则会出错抛出异常
-
加上了@Shareable注解的Handler是线程安全的
这些编码和解码器能够公共使用
-
没有加的时候需要注意线程安全问题(编码解码器)
ByteToMessage的子类不能被标注为@Sharable
将3中的MessageCodec改为可以标注为Sharable的类:
package com.zko0.protocol;
import com.zko0.message.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import lombok.extern.slf4j.Slf4j;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.List;
/**
* 在前面必须添加LengthFieldBasedFrameDecoder处理器
* 确保接到的ByteBuf消息完整
*/
@Slf4j
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> list) throws Exception {
ByteBuf out = ctx.alloc().buffer();
//魔数写入
out.writeBytes(new byte[]{1,2,3,4});
//版本
out.writeByte(1);
//序列化算法 0:jdk 1:json
out.writeByte(0);
//指令类型
out.writeByte(msg.getMessageType());
//4个字节的请求序号
out.writeInt(msg.getSequenceId());
//对齐填充用
out.writeByte(0xff);
//长度
//内容字节数组
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos=new ObjectOutputStream(bos);
oos.writeObject(msg);
byte[] bytes = bos.toByteArray();
//长度
out.writeInt(bytes.length);
out.writeBytes(bytes);
list.add(out);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = in.readInt();
byte version = in.readByte();
byte serializerType = in.readByte();
byte messageType = in.readByte();
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
byte[] bytes=new byte[length];
in.readBytes(bytes,0,length);
if (serializerType==0){
//使用jdk反序列化
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);
Message message = (Message)objectInputStream.readObject();
log.info("{},{},{},{},{},{}",magicNum,version,serializerType,messageType,sequenceId,length);
log.info("{}",message);
out.add(message);
}
}
}
5.连接异常断开
channelInactive和exceptionCaught,分别处理channel连接断开和异常捕捉,重新实现该两个方法,可以实现客户端退出,服务端的对应处理
package com.zko0.server.handler;
import com.zko0.server.session.SessionFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@ChannelHandler.Sharable
public class QuitHandler extends ChannelInboundHandlerAdapter {
// 当连接断开时触发 inactive 事件
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
SessionFactory.getSession().unbind(ctx.channel());
log.info("{} 已经断开", ctx.channel());
}
// 当出现异常时触发
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
SessionFactory.getSession().unbind(ctx.channel());
log.info("{} 已经异常断开 异常是{}", ctx.channel(), cause.getMessage());
}
}
6.心跳机制
连接假死问题:
- 当设备出现故障,底层TCP连接已经断开,但是应用程序没有感知到连接已经端口,会保持资源的占用
- 公网不稳定,丢包。连续丢包体现为:客户端数据无法发送,服务端无法接收到消息
- 应用线程阻塞
解决方式:使用心跳机制,一段事件内没有发送/接收到消息,触发事件,进行心跳检测
new IdleStateHandler()
为Netty提供的Handler,能够在Channel长时间没有数据读/写的情况下,触发IdleState XXX
事件
服务端检测:
在init内加入Handler
//判断是否读空闲时间过长,或者写空闲时间过长
//5s内没有收到channel数据,会出发IdleState#Read_IDLE事件
ch.pipeline().addLast(new IdleStateHandler(5,0,0));
// ChannelDuplexHandler可以作为入站或者出站处理器
ch.pipeline().addLast(new ChannelDuplexHandler(){
//用来触发特殊事件
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
//触发读共享事件
if (event.state()== IdleState.READER_IDLE){
log.info("5s没有读到数据了");
ctx.channel().close();
}
}
});
客户端检测:
ch.pipeline().addLast(new IdleStateHandler(0,3,0));
ch.pipeline().addLast(new ChannelDuplexHandler(){
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
//触发读共享事件
if (event.state()== IdleState.WRITER_IDLE){
log.info("3s没有写数据了");
ctx.writeAndFlush(new PingMessage());
}
}
});
标签:Netty,进阶,netty,void,io,new,import,channel
From: https://www.cnblogs.com/zko0/p/17026291.html