5.1、Redis协议
//redis协议测试
public class TestRedis {
public static void main(String[] args) {
final byte[] line = {13, 10};
NioEventLoopGroup group = new NioEventLoopGroup();
try {
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new LoggingHandler())
.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = ctx.alloc().buffer();
buf.writeBytes("*3".getBytes());
buf.writeBytes(line);
buf.writeBytes("$3".getBytes());
buf.writeBytes(line);
buf.writeBytes("set".getBytes());
buf.writeBytes(line);
buf.writeBytes("$4".getBytes());
buf.writeBytes(line);
buf.writeBytes("name".getBytes());
buf.writeBytes(line);
buf.writeBytes("$8".getBytes());
buf.writeBytes(line);
buf.writeBytes("zhangsan".getBytes());
buf.writeBytes(line);
ctx.writeAndFlush(buf);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("redis 回复:" + msg);
}
});
}
}).connect("124.220.70.93", 6379).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
group.shutdownGracefully();
}
}
}
5.2、Http协议
@Slf4j
public class TestHttp {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ChannelFuture channelFuture = new ServerBootstrap()
.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new LoggingHandler())
//http 编解码处理器
.addLast(new HttpServerCodec())
//只处理http请求头和请求行,要处理请求体,泛型为HttpContent
.addLast(new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest request) throws Exception {
//获取请求路径
log.info(request.uri());
//响应
DefaultFullHttpResponse response =
new DefaultFullHttpResponse(request.protocolVersion(), HttpResponseStatus.OK);
byte[] bytes = "<h1>hello world</h1>".getBytes();
response.headers().setInt(CONTENT_LENGTH, bytes.length);
response.content().writeBytes(bytes);
ctx.writeAndFlush(response);
}
});
}
}).bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
5.3、自定义协议要素
- 魔数,用来在第一时间判定是否是无效数据包
- 版本号,可以支持协议的升级
- 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk
- 指令类型,是登录、注册、单聊、群聊... 跟业务相关
- 请求序号,为了双工通信,提供异步能力
- 正文长度
- 消息正文
编解码器
根据上面的要素,设计一个登录请求消息和登录响应消息,并使用 Netty 完成收发
@Slf4j
public class MessageCodec extends ByteToMessageCodec<Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
// 1. 4 字节的魔数
out.writeBytes(new byte[]{1, 2, 3, 4});
// 2. 1 字节的版本号,
out.writeByte(1);
// 3. 1 字节的序列化方式 jdk 0 , json 1
out.writeByte(0);
// 4. 1 字节的指令类型
out.writeByte(msg.getMessageType());
// 5. 4 个字节的序列号
out.writeInt(msg.getSequenceId());
// 无意义,对齐填充
out.writeByte(0xff);
// 6. 获取内容的字节数组
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(msg);
byte[] bytes = bos.toByteArray();
// 7. 长度
out.writeInt(bytes.length);
// 8. 写入内容
out.writeBytes(bytes);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = in.readInt();
byte version = in.readByte();
byte serializerType = in.readByte();
byte messageType = in.readByte();
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) ois.readObject();
log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
log.debug("{}", message);
out.add(message);
}
}
测试
public class Test {
public static void main(String[] args) throws Exception {
//测试编码,对象--》ByteBuf
EmbeddedChannel channel = new EmbeddedChannel(
//LTC解码器,解决黏包半包,最后一个参数=0,是因为所有信息我们手动去解析了
new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),
new LoggingHandler(),
new MessageCodec()
);
LoginRequestMessage message = new LoginRequestMessage("zhangsang", "123456", "张三");
channel.writeOutbound(message);
//测试解码,ByteBuf-->对象
//1.先编码,用来测试
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
new MessageCodec().encode(null, message, buf);
channel.writeInbound(buf);
//codec.decode(null, buf, new ArrayList<>());
}
}
结果
15:54:44.259 [main] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] WRITE: 257B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 01 00 00 00 00 00 00 ff 00 00 00 f1 |................|
|00000010| ac ed 00 05 73 72 00 2c 63 6f 6d 2e 6a 70 79 2e |....sr.,com.jpy.|
|00000020| 63 68 61 74 64 65 6d 6f 2e 6d 65 73 73 61 67 65 |chatdemo.message|
|00000030| 2e 4c 6f 67 69 6e 52 65 71 75 65 73 74 4d 65 73 |.LoginRequestMes|
|00000040| 73 61 67 65 ec 28 6f bb c5 26 64 b8 02 00 03 4c |sage.(o..&d....L|
|00000050| 00 08 6e 69 63 6b 6e 61 6d 65 74 00 12 4c 6a 61 |..nicknamet..Lja|
|00000060| 76 61 2f 6c 61 6e 67 2f 53 74 72 69 6e 67 3b 4c |va/lang/String;L|
|00000070| 00 08 70 61 73 73 77 6f 72 64 71 00 7e 00 01 4c |..passwordq.~..L|
|00000080| 00 08 75 73 65 72 6e 61 6d 65 71 00 7e 00 01 78 |..usernameq.~..x|
|00000090| 72 00 20 63 6f 6d 2e 6a 70 79 2e 63 68 61 74 64 |r. com.jpy.chatd|
|000000a0| 65 6d 6f 2e 6d 65 73 73 61 67 65 2e 4d 65 73 73 |emo.message.Mess|
|000000b0| 61 67 65 9e 0b 0e cd d6 f3 a9 f5 02 00 02 49 00 |age...........I.|
|000000c0| 0b 6d 65 73 73 61 67 65 54 79 70 65 49 00 0a 73 |.messageTypeI..s|
|000000d0| 65 71 75 65 6e 63 65 49 64 78 70 00 00 00 00 00 |equenceIdxp.....|
|000000e0| 00 00 00 74 00 06 e5 bc a0 e4 b8 89 74 00 06 31 |...t........t..1|
|000000f0| 32 33 34 35 36 74 00 09 7a 68 61 6e 67 73 61 6e |23456t..zhangsan|
|00000100| 67 |g |
+--------+-------------------------------------------------+----------------+
15:54:44.259 [main] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] FLUSH
15:54:44.265 [main] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 257B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 01 00 00 00 00 00 00 ff 00 00 00 f1 |................|
|00000010| ac ed 00 05 73 72 00 2c 63 6f 6d 2e 6a 70 79 2e |....sr.,com.jpy.|
|00000020| 63 68 61 74 64 65 6d 6f 2e 6d 65 73 73 61 67 65 |chatdemo.message|
|00000030| 2e 4c 6f 67 69 6e 52 65 71 75 65 73 74 4d 65 73 |.LoginRequestMes|
|00000040| 73 61 67 65 ec 28 6f bb c5 26 64 b8 02 00 03 4c |sage.(o..&d....L|
|00000050| 00 08 6e 69 63 6b 6e 61 6d 65 74 00 12 4c 6a 61 |..nicknamet..Lja|
|00000060| 76 61 2f 6c 61 6e 67 2f 53 74 72 69 6e 67 3b 4c |va/lang/String;L|
|00000070| 00 08 70 61 73 73 77 6f 72 64 71 00 7e 00 01 4c |..passwordq.~..L|
|00000080| 00 08 75 73 65 72 6e 61 6d 65 71 00 7e 00 01 78 |..usernameq.~..x|
|00000090| 72 00 20 63 6f 6d 2e 6a 70 79 2e 63 68 61 74 64 |r. com.jpy.chatd|
|000000a0| 65 6d 6f 2e 6d 65 73 73 61 67 65 2e 4d 65 73 73 |emo.message.Mess|
|000000b0| 61 67 65 9e 0b 0e cd d6 f3 a9 f5 02 00 02 49 00 |age...........I.|
|000000c0| 0b 6d 65 73 73 61 67 65 54 79 70 65 49 00 0a 73 |.messageTypeI..s|
|000000d0| 65 71 75 65 6e 63 65 49 64 78 70 00 00 00 00 00 |equenceIdxp.....|
|000000e0| 00 00 00 74 00 06 e5 bc a0 e4 b8 89 74 00 06 31 |...t........t..1|
|000000f0| 32 33 34 35 36 74 00 09 7a 68 61 6e 67 73 61 6e |23456t..zhangsan|
|00000100| 67 |g |
+--------+-------------------------------------------------+----------------+
15:54:44.296 [main] INFO com.jpy.chatdemo.protocol.MessageCodec - 16909060,1,0,0,0,241
15:54:44.296 [main] INFO com.jpy.chatdemo.protocol.MessageCodec - ==========
15:54:44.296 [main] INFO com.jpy.chatdemo.protocol.MessageCodec - LoginRequestMessage(super=Message(sequenceId=0, messageType=0), username=zhangsang, password=123456, nickname=张三)
5.4、@Sharable
思考1:上述代码中,LoggingHandler和LengthFieldBasedFrameDecoder是否可以提取出来供多个channel使用。
LoggingHandler只是用来打印的,不保存信息,无状态的,可以多个线程channel共用。
LengthFieldBasedFrameDecoder用来编解码的,可能会有多个channel数据拼接到了一起,不可共用。
结论:@Sharable 注解的类可以共用,否则不可共用
思考2:自定义的MessageCodec可否使用,查看源码,继承了ByteToMessageCodec,ByteToMessageCodec的子类不能使用@Sharable,就不能共享
解决:继承MessageToMessageCodec<ByteBuf, Message>
@Slf4j
@ChannelHandler.Sharable
/*
* 必须和LengthFieldBasedFrameDecoder一起使用,确定ByteBuf消息是完整的
*/
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) throws Exception {
ByteBuf buf = ctx.alloc().buffer();
//1.魔数:4个字节
buf.writeBytes(new byte[]{1, 2, 3, 4});
//2.版本号:1个字节
buf.writeByte(1);
//3.序列化方式:1个字节
//0:jdk 1:json
buf.writeByte(0);
//4.指令类型:1个字节
buf.writeByte(msg.getMessageType());
//5.请求序号:4个字节
buf.writeInt(msg.getSequenceId());
//无意义,对齐
buf.writeByte(0xff);
//msg-->bytebuf
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(msg);
byte[] bytes = baos.toByteArray();
int length = bytes.length;
//6.内容长度:4个字节
buf.writeInt(length);
//7.内容
buf.writeBytes(bytes);
out.add(buf);
}
//解码:Bytebuf-->Message
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = in.readInt();
byte version = in.readByte();
byte serizal = in.readByte();
byte reqType = in.readByte();
int reqNum = in.readInt();
in.readByte();
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) ois.readObject();
log.info("{},{},{},{},{},{}", magicNum, version, serizal, reqType, reqNum, length);
log.info("==========");
log.info("{}", message);
out.add(message);
}
}
标签:Netty,00,进阶,协议,61,73,65,new,buf
From: https://www.cnblogs.com/jpymll/p/16821516.html