一、概述
TCP
是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务器端)都要有一一成对的socket
,因此,发送端为了将多个发给接收端的包,更有效的发给对方,使用了优化方法(Nagle
算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样做虽然提高了效率,但是接收端就难于分辨出完整的数据包了,因为面向流的通信是无消息保护边界的。
由于TCP
无消息保护边界,需要在接收端处理消息边界问题,也就是我们所说的粘包、拆包问题
通常的解决方案是:发送端每发送一次消息,就需要在消息的内容之前携带消息的长度,这样,接收方每次先接受消息的长度,再根据长度去读取该消息剩余的内容。如果socket
中还有没有读取的内容,也只能放在下一次读取事件中进行。
假设客户端分别发送了两个数据包D1
和D2
给服务端,由于服务端一次读取到字节数是不确定的,故可能存在以下四种情况:
- 服务端分两次读取到了两个独立的数据包,分别是
D1
和D2
,没有粘包和拆包 - 服务端一次接受到了两个数据包,
D1
和D2
粘合在一起,称之为TCP
粘包 - 服务端分两次读取到了数据包,第一次读取到了完整的
D1
包和D2
包的部分内容,第二次读取到了D2
包的剩余内容,这称之为TCP
拆包 - 服务端分两次读取到了数据包,第一次读取到了
D1
包的部分内容D1_1
,第二次读取到了D1
包的剩余部分内容D1_2
和完整的D2
包。
二、案例
在编写Netty
程序时,如果没有做处理,就会发生粘包和拆包的问题
客户端:
package com.test.netty.tcp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.nio.charset.Charset;
public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private int count;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//使用客户端发送10条数据 hello,server编号
for (int i = 0; i < 10; ++i) {
ByteBuf buffer = Unpooled.copiedBuffer("hello,server " + i, Charset.forName("utf-8"));
ctx.writeAndFlush(buffer);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
byte[] buffer = new byte[msg.readableBytes()];
msg.readBytes(buffer);
String message = new String(buffer, Charset.forName("utf-8"));
System.out.println("客户端接收到消息=" + message);
System.out.println("客户端接收消息数量=" + (++this.count));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
服务端:
package com.test.netty.tcp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.nio.charset.Charset;
import java.util.UUID;
public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
private int count;
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//cause.printStackTrace();
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
byte[] buffer = new byte[msg.readableBytes()];
msg.readBytes(buffer);
//将buffer转成字符串
String message = new String(buffer, Charset.forName("utf-8"));
System.out.println("服务器接收到数据 " + message);
System.out.println("服务器接收到消息量=" + (++this.count));
//服务器回送数据给客户端,回送一个随机id
ByteBuf responseByteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString()
+ " ", Charset.forName("utf-8"));
ctx.writeAndFlush(responseByteBuf);
}
}
- 服务端直接一次就把我们客户端
10
次发送的内容读取完成了。 - 当数据量小且发送间隔短,如果我们客户端每次发送的都是不同的结果,这种情况下我们就不知道客户端返回了多少次结果以及每次结果究竟是什么。
三、编解码器
编写网络应用程序时,因为数据在网络中传输的都是二进制字节码数据,在发送数据时就需要编码,接收数据时就需要解码。codec
(编解码器)的组成部分有两个:decoder
(解码器)和encoder
(编码器)。encoder
负责把业务数据转换成字节码数据,decoder
负责把字节码数据转换成业务数据。
Netty本身的编码解码的机制和问题分析
Netty
自身提供了一些codec
(编解码器)Netty
提供的编码器StringEncoder
:对字符串数据进行编码ObjectEncoder
,对Java
对象进行编码
Netty
提供的解码器StringDecoder
:对字符串数据进行解码ObjectDecoder
:对Java
对象进行解码
Netty
本身自带的ObjectDecoder
和ObjectEncoder
可以用来实现POJO
对象或各种业务对象的编码和解码,底层使用的仍是Java
序列化技术,而Java
序列化技术本身效率就不高,存在如下问题- 无法跨语言
- 序列化后的体积太大,是二进制编码的5倍多。
- 序列化性能太低
- 引出新的解决方案[
Google
的Protobuf
]
3.1 基本说明
Netty
的组件设计:Netty
的主要组件有Channel
、EventLoop
、ChannelFuture
、ChannelHandler
、ChannelPipe
等ChannelHandler
充当了处理入站和出站数据的应用程序逻辑的容器。例如,实现ChannelInboundHandler
接口(或ChannelInboundHandlerAdapter
),你就可以接收入站事件和数据,这些数据会被业务逻辑处理。当要给客户端发送响应时,也可以从ChannelInboundHandler
冲刷数据。业务逻辑通常写在一个或者多个ChannelInboundHandler
中。ChannelOutboundHandler
原理一样,只不过它是用来处理出站数据的ChannelPipeline
提供了ChannelHandler
链的容器。以客户端应用程序为例,如果事件的运动方向是从客户端到服务端的,那么我们称这些事件为出站的,即客户端发送给服务端的数据会通过pipeline
中的一系列ChannelOutboundHandler
,并被这些Handler
处理,反之则称为入站的。以服务器端为例:接受数据的过程就是入站,发送数据的过程就是出站。客户端也一样。
来看看我们常用的Handler
的关系图:Inbound
处理入站,Outbound
处理出站
一般来说,在我们接收数据时将数据解码后,就进行业务的相关处理,所以上图的入站的常用类更多。在数据出站时,一般我们只需要将数据编码后直接发出。
3.2 handler链式调用机制
Pipeline
中的Handler
可以当作一个双向链表。但是,Handler
却又存在着入站和出站之分。那么Netty
是如何将两种类型的Handler
保存在一个链表中,却又能够入站的时候调用InboundHandler
,出栈的时候调用OutBoundHandler
呢?看下图,黄色的表示入站,以及入站的Handler
,绿色的表示出站,以及出站的Handler
。
当我们调用如下代码时,我们就会获得一个上图所示的Handler
链表。下面代码时在ChannelInitializer
类中添加Handler
的部分代码。
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LongToByteEncoder()); //out
pipeline.addLast(new ByteToLongDecoder()); //in
pipeline.addLast(new OutBoundHandler()); //out
pipeline.addLast(new InBoundHandler()); //in
}
当一个请求来了的时候,首先会将请求发给pipeline
中位于链表首部的Handler
。如图所示,首先由LongToByteEncoder
(这个东西不管,就是个出站的)接受到入站请求,但是这个东西是个OutBound
。所以它收到入站请求时就不做处理,直接转发给它的下一个ByteToLongDecoder
(这个东西也不管,它是入站的)。这个东西接受到了入站请求了,一看它自己也是一个Inbound
,所以它就将请求的数据进行处理,然后转发给下一个。之后又是一个Outbound
,然后再进行转发,到了最后的InBoundHandler
,在这里我们可以进行业务的处理等等操作。
然后如果需要返回数据,我们就调用writeAndFlush
方法,这个方法可不简单,当他一调用,就会触发出站的请求,然后就由当前所在的Handler
节点往回调用。往回调用的途中,如果遇到InBound
就直接转发给下一个Handler
,直到最后将消息返回。
通过上面的描述,我们可以总结添加Handler
的以下节点总结:
- 调用
InboundHandler
的顺序和添加的顺序是一致的。 - 调用
OutboundHandler
的顺序和添加它的顺序是相反的。 - 链表的末尾不能有
OutHandler
,因为如果最后是OutHandler
的话,当他前面的InHandler
处理完数据返回消息调用writeflush
时,它直接在前面进行反向调用了,就调用不到最后的这个Out
了。所以我们平常可以将OutHandler
写在前面,InHandler
写在后面。 InHandler
一旦进行writeAndFlush
,只有这个InHandler
之前添加的OutHandler
能够处理他
3.3 编码解码器
- 当
Netty
发送或者接受一个消息的时候,就将会发生一次数据转换。入站消息会被解码:从字节转换为另一种格式(比如java
对象);如果是出站消息,它会被编码成字节。 Netty
提供一系列实用的编解码器,他们都实现了ChannelInboundHadnler
或者ChannelOutboundHandler
接口。在这些类中,channelRead
方法已经被重写了。以入站为例,对于每个从入站Channel
读取的消息,这个方法会被调用。随后,它将调用由解码器所提供的decode()
方法进行解码,并将已经解码的字节转发给ChannelPipeline
中的下一个ChannelInboundHandler
。
3.3.1 解码器-ByteToMessageDecoder
- 关系继承图
- 由于不可能知道远程节点是否会一次性发送一个完整的信息,
tcp
有可能出现粘包拆包的问题,这个类会对入站数据进行缓冲,直到它准备好被处理. - 一个关于
ByteToMessageDecoder
实例分析
3.3.2 解码器-ReplayingDecoder
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
ReplayingDecoder
扩展了ByteToMessageDecoder
类,使用这个类,我们不必调用readableBytes()
方法。参数S
指定了用户状态管理的类型,其中Void
代表不需要状态管理- 应用实例:使用
ReplayingDecoder
编写解码器,对前面的案例进行简化[案例演示]
package com.test.netty.inboundhandlerandoutboundhandler;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;
public class MyByteToLongDecoder2 extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyByteToLongDecoder2 被调用");
//在 ReplayingDecoder 不需要判断数据是否足够读取,内部会进行处理判断
out.add(in.readLong());
}
}
ReplayingDecoder
使用方便,但它也有一些局限性:- 并不是所有的
ByteBuf
操作都被支持,如果调用了一个不被支持的方法,将会抛出一个UnsupportedOperationException
。 ReplayingDecoder
在某些情况下可能稍慢于ByteToMessageDecoder
,例如网络缓慢并且消息格式复杂时,消息会被拆成了多个碎片,速度变慢
- 并不是所有的
3.3.3 其它编解码器
其它解码器
- 行解码器(
LineBasedFrameDecoder
):这个类在Netty
内部也有使用,它使用行尾控制字符(\n或者\r\n)作为分隔符来解析数据。 - 分隔符解码器(
DelimiterBasedFrameDecoder
):使用自定义的特殊字符作为消息的分隔符。 - 固定长度解码器(
FixedLengthFrameDecoder
):消息长度固定,累积读取到长度总和为定长LEN
的报文后,就认为读取到了一个完整的消息,再将计数器置位,重新读取下一个数据报。例如可以让每个报文的大小为固定长度1024
字节,如果消息长度不够,则使用空位填补空缺,这样读取到了之后,只需要trim
去掉空格即可。 - 自定义长度解码器(
LengthFieldBasedFrameDecoder
):通过指定长度来标识整包消息,这样就可以自动的处理黏包和半包消息。 HttpObjectDecoder
:一个HTTP
数据的解码器。
3.4 示例
使用自定义的编码器和解码器来说明Netty
的handler
调用机制,
- 客户端发送
long
-> 服务器 - 服务端发送
long
-> 客户端
Decoder:
public class ByteToLongDecoder extends ByteToMessageDecoder {
/**
* decode方法会根据接收到的数据,被调用多次,直到确定没有新元素被添加到list,或者ByteBuf没有更多的可读字节为止
* 如果list out不为空,就会将list的内容传递给下一个Handler进行处理,该处理器的方法也会被调用多次。
*
* @param ctx 上下文对象
* @param in 入栈的 ByteBuf
* @param out list集合,将解码后的数据传给下一个Handler
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
//因为long为8个字节,所以需要8个字节才能读取成一个long类型的数据
System.out.println("ByteToLongDecoder:入栈数据被解码");
if (in.readableBytes() >= 8){
out.add(in.readLong());
}
}
}
Encoder:
public class LongToByteEncoder extends MessageToByteEncoder<Long> {
@Override
protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
System.out.println("LongToByteEncoder: 出栈数据,msg = " + msg);
out.writeLong(msg);
}
}
服务端添加Handler:
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LongToByteEncoder()); //编码器,出站
pipeline.addLast(new ByteToLongDecoder()); //解码器,入站
pipeline.addLast(new ServerInBoundHandler()); //业务处理,入站
}
客户端添加Handler:
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LongToByteEncoder()); //编码器,出站
pipeline.addLast(new ByteToLongDecoder()); //解码器,入站
pipeline.addLast(new ClientInBoundHandler()); //业务处理,入站。
}
这里当客户端或者服务端接受消息的时候,首先会调用入站的解码器,然后业务处理,然后调用出站的编码器返回消息。后面可以在业务处理类中,增加发送消息的代码,此处省略。
结论
- 不论解码器
handler
还是编码器handler
即接收的消息类型必须与待处理的消息类型一致,否则该handler
不会被执行 - 在解码器进行数据解码时,需要判断缓存区(
ByteBuf
)的数据是否足够,否则接收到的结果会期望结果可能不一致
四、Google Protobuf
Protobuf
是Google
发布的开源项目,全称Google Protocol Buffers
,是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化,或者说序列化。它很适合做数据存储或RPC
[远程过程调用remote procedure call
]数据交换格式。目前很多公司http + json =》 tcp + protobuf
。
Protobuf
是以message
的方式来管理数据的- 支持跨平台、跨语言,即客户端和服务器端可以是不同的语言编写的
- 高性能,高可靠性
- 使用
protobuf
编译器能自动生成代码,Protobuf
是将类的定义使用.proto
文件进行描述。说明,在idea
中编写.proto
文件时,会自动提示是否下载.ptoto
编写插件.可以让语法高亮。 - 然后通过
protoc.exe
编译器根据.proto
自动生成.java
文件
4.1 proto文件格式
首先我们需要在.proto
文件中定义好实体及他们的属性,再进行编译成java
对象为我们所用。下面将介绍proto
文件的写法。
文件头
就想我们写java
需要写package
包名一样,.proto
文件也要写一些文件的全局属性,主要用于将.proto
文件编译成Java
文件。
实例 | 介绍 |
---|---|
syntax="proto3"; |
声明使用到的protobuf 的版本 |
optimize_for=SPEED; |
表示 |
java_package="com.mical.netty.pojo"; |
表示生成Java 对象所在包名 |
java_outer_classname="MyWorker"; |
表示生成的Java对象的外部类名 |
一般将这些代码写在proto
文件的开头,以表明生成Java
对象的相关文件属性。
定义类和属性
syntax = "proto3"; //版本
option optimize_for = SPEED; //加快解析
option java_outer_classname = "MyDataInfo"; //生成的外部类名,同时也是文件名
message Student { //会在StudentPojo外部类生成一个内部类Student,他是真正发送的pojo对象
int32 id = 1; //Student类中有一个属性名字为ID,类型为int32(protobuf类型),1表示序号,不是值
string name = 2;
}
enum DateType {
StudentType = 0; //在proto3中,要求enum的编号从0开始
WorkerType = 1;
}
文件中不但声明了protobuf
的版本,还声明了生成java
对象的类名。当生成java
对象后,MyDataInfo
将是对象的类名,同时,它使用message
声明了Student
这个内部类,使用enum
声明了DataType
这个内部枚举类。就像下面这个样子
public final class MyDataInfo {
public static final class Student { }
public enum DataType { }
}
然后需要注意的是,protobuf
中的变量类型和其他语言的声明有所不同。下面是类型的对照表。
.proto 类型 |
java 类型 |
C++ 类型 |
备注 |
---|---|---|---|
double |
double |
double |
|
float |
float |
float |
|
int32 |
int |
int32 |
使用可变长编码方式。编码负数时不够高效——如果你的字段可能含有负数,那么请使用sint32 。 |
int64 |
long |
int64 |
使用可变长编码方式。编码负数时不够高效——如果你的字段可能含有负数,那么请使用sint64 。 |
unit32 |
int[1] |
unit32 |
总是4 个字节。如果数值总是比总是比228 大的话,这个类型会比uint32 高效。 |
unit64 |
long[1] |
unit64 |
总是8 个字节。如果数值总是比总是比256 大的话,这个类型会比uint64 高效。 |
sint32 |
int |
int32 |
使用可变长编码方式。有符号的整型值。编码时比通常的int32 高效。 |
sint64 |
long |
int64 |
使用可变长编码方式。有符号的整型值。编码时比通常的int64 高效。 |
fixed32 |
int[1] |
unit32 |
|
fixed64 |
long[1] |
unit64 |
总是8 个字节。如果数值总是比总是比256 大的话,这个类型会比uint64 高效。 |
sfixed32 |
int |
int32 |
总是4 个字节。 |
sfixed64 |
long |
int64 |
总是8 个字节。 |
bool |
boolean |
bool |
|
string |
String |
string |
一个字符串必须是UTF-8 编码或者7-bit ASCII 编码的文本。 |
bytes |
ByteString |
string |
可能包含任意顺序的字节数据 |
类型关注之后,我们看到代码中string name = 2
,它并不是给name
这个变量赋值,而是给它标号。每个类都需要给其中的变量标号,且需要注意的是类的标号是从1
开始的,枚举的标号是从0
开始的。
复杂对象
当我们需要统一发送对象和接受对象时,就需要使用一个对象将其他所有对象进行包装,再获取里面的某一类对象。
syntax = "proto3"; //版本
option optimize_for = SPEED; //加快解析
option java_outer_classname = "MyDataInfo"; //生成的外部类名,同时也是文件名
message MyMessage {
//定义一个枚举类型
enum DateType {
StudentType = 0; //在proto3中,要求enum的编号从0开始
WorkerType = 1;
}
//用data_type来标识传的是哪一个枚举类型
DateType data_type = 1;
//标识每次枚举类型最多只能出现其中的一个类型,节省空间
oneof dataBody {
Student stuent = 2;
Worker worker = 4;
}
}
message Student { //会在StudentPojo外部类生成一个内部类Student,他是真正发送的pojo对象
int32 id = 1; //Student类中有一个属性名字为ID,类型为int32(protobuf类型),1表示序号,不是值
string name = 2;
}
message Worker {
string name = 1;
int32 age = 2;
}
这里面我们定义了MyMessage
、Student
、Worker
三个对象,MyMessage
里面持有了一个枚举类DataType
和,Student
、Worker
这两个类对象中的其中一个。这样设计的目的是什么呢?当我们在发送对象时,设置MyMessage
里面的对象的同时就可以给枚举赋值,这样当我们接收对象时,就可以根据枚举判断我们接受到哪个实例类了。
4.2 Netty中使用Protobuf
需要给发送端的pipeline
添加编码器:ProtobufEncoder
。
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("encoder", new ProtobufEncoder());
pipeline.addLast(new ProtoClientHandler());
}
});
需要在接收端添加解码器:ProtobufDecoder
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler())
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//需要指定对哪种对象进行解码
pipeline.addLast("decoder", new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
pipeline.addLast(new ProtoServerHandler());
}
})
在发送时,如何构造一个具体对象呢?以上面复杂对象为例,我们主要构造的是MyMessage
对象,设置里面的枚举属性,和对应的对象。
MyDataInfo.MyMessage build = MyDataInfo.MyMessage.newBuilder()
.setDataType(MyDataInfo.MyMessage.DateType.StudentType)
.setStuent(MyDataInfo.Student.newBuilder()
.setId(5)
.setName("王五")
.build())
.build();
在接收对象时,我们就可以根据枚举变量去获取实例对象了。
MyDataInfo.MyMessage message = (MyDataInfo.MyMessage) msg;
MyDataInfo.MyMessage.DateType dataType = message.getDataType();
switch (dataType) {
case StudentType:
MyDataInfo.Student student = message.getStuent();
System.out.println("学生Id = " + student.getId() + student.getName());
case WorkerType:
MyDataInfo.Worker worker = message.getWorker();
System.out.println("工人:name = " + worker.getName() + worker.getAge());
case UNRECOGNIZED:
System.out.println("输入的类型不正确");
}
五、 解决方案示例
- 使用自定义协议+编解码器来解决
- 关键就是要解决服务器端每次读取数据长度的问题,这个问题解决,就不会出现服务器多读或少读数据的问题,从而避免的
TCP
粘包、拆包。
package com.test.netty.protocoltcp;
//协议包
@Data
public class MessageProtocol {
private int len; //关键
private byte[] content;
}
package com.test.netty.protocoltcp;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.nio.charset.Charset;
//客户端连续发送Message对象
public class MyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {
private int count;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//使用客户端发送10条数据 "今天天气冷,吃火锅" 编号
for (int i = 0; i < 5; i++) {
String mes = "今天天气冷,吃火锅";
byte[] content = mes.getBytes(Charset.forName("utf-8"));
int length = mes.getBytes(Charset.forName("utf-8")).length;
//创建协议包对象
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(length);
messageProtocol.setContent(content);
ctx.writeAndFlush(messageProtocol);
}
}
//@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
int len = msg.getLen();
byte[] content = msg.getContent();
System.out.println("客户端接收到消息如下");
System.out.println("长度=" + len);
System.out.println("内容=" + new String(content, Charset.forName("utf-8")));
System.out.println("客户端接收消息数量=" + (++this.count));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("异常消息=" + cause.getMessage());
ctx.close();
}
}
package com.test.netty.protocoltcp;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
//将Message转换为ByteBuf的编码器
public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
@Override
protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out)
throws Exception {
System.out.println("MyMessageEncoder encode 方法被调用");
out.writeInt(msg.getLen());
out.writeBytes(msg.getContent());
}
}
package com.test.netty.protocoltcp;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;
//将ByteBuf转换成Message的解码器
public class MyMessageDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyMessageDecoder decode 被调用");
//需要将得到二进制字节码-> MessageProtocol数据包(对象)
int length = in.readInt();
byte[] content = new byte[length];
in.readBytes(content);
//封装成MessageProtocol对象,放入out, 传递下一个handler业务处理
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(length);
messageProtocol.setContent(content);
out.add(messageProtocol);
}
}
package com.test.netty.protocoltcp;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.nio.charset.Charset;
import java.util.UUID;
//处理业务的handler
public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {
private int count;
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//cause.printStackTrace();
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
//接收到数据,并处理
int len = msg.getLen();
byte[] content = msg.getContent();
System.out.println();
System.out.println("服务器接收到信息如下");
System.out.println("长度=" + len);
System.out.println("内容=" + new String(content, Charset.forName("utf-8")));
System.out.println("服务器接收到消息包数量=" + (++this.count));
//回复消息
String responseContent = UUID.randomUUID().toString();
int responseLen = responseContent.getBytes("utf-8").length;
byte[] responseContent2 = responseContent.getBytes("utf-8");
//构建一个协议包
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(responseLen);
messageProtocol.setContent(responseContent2);
ctx.writeAndFlush(messageProtocol);
}
}
- 客户端发送
5
个Message
对象,客户端每次发送一个Message
对象。 - 服务器端每次接收一个
Message
,调用MessageDecoder
分5
次进行解码,将ByteBuf
类型的数据转换成MessageProtocol
,然后再进入进行读取的Handler
中读取消息。最后返回给客户端消息,调用MessageEncoder
将MessageProtocol
转换成Byte
然后发送出去。