1.简介
异步,基于事件驱动的网络应用框架
TCP/IP=>JDK原生=>NIO===>Netty
《netty实战》《netty权威指南》
2.应用
基于网络的高并发或者网络的通信
-
RPC框架远程服务调用,作为基础框架组件被使用
dubbo默认使用netty进行
-
游戏通信
3.IO模型
java的三种网络编程模型:
- BIO(同步阻塞)
- NIO(同步非阻塞)
- AIO
4.Netty组件
Eventloop(事件循环对象)
目的:来处理channel上的io事件
本质:单线程执行器
接口:
- 继承了线程池中ScheduledExecutorService,包含线程池的所有方法
- 继承netty的OrderedEventExecutor
EventLoopGroup是一组EventLoop,Channel通过EventLoopGroup的register方法绑定其中一个EventLoop,之后这个Channel的所有Io事件都由这个EventLoop来处理(保证线程的安全问题)
对于比较耗时的操作,可以再新建一个DefaultEventLoopGroup来进行特定的比较耗时的ChannelInboundHandlerAdapter方法
package com.yikolemon.EventLoop;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.nio.charset.Charset;
public class EventLoopServer {
public static void main(String[] args) {
//细分2,创建一个新的EventLoopGroup,来处理耗时较长的操作
DefaultEventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
//boss 和 worker ,把accept和read事件划分的更清晰些
//细分1:work只负责socketChannel的读写,NIO中分为了两个
.group(new NioEventLoopGroup(),new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel channel)
throws Exception {
channel.pipeline().addLast(
new ChannelInboundHandlerAdapter(){
@Override //ByteBuf
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(buf.toString(Charset.defaultCharset()));
ctx.fireChannelRead(msg);//*****让消息传递给下个Handler****
}
}
).addLast(group,"耗时的group处理",
//这个handler不由NioEventLoopGroup中的Worker处理,而是DefaultXXXX来处理了
new ChannelInboundHandlerAdapter(){
@Override //ByteBuf
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(buf.toString(Charset.defaultCharset()));
}
}
);
}
})
.bind(8080);
}
}
1.EventLoopServer+Client
worker会轮流处理Channel,但是当连接建立后,Channel发送的消息只会有特定worker进行处理
EventLoopServer:
package com.yikolemon.EventLoop;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.nio.charset.Charset;
public class EventLoopServer {
public static void main(String[] args) {
//细分2,创建一个新的EventLoopGroup,来处理耗时较长的操作
DefaultEventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
//boss 和 worker ,把accept和read事件划分的更清晰些
//细分1:work只负责socketChannel的读写,NIO中分为了两个
.group(new NioEventLoopGroup(),new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel channel) throws Exception {
channel.pipeline().addLast(
new ChannelInboundHandlerAdapter(){
@Override //ByteBuf
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(buf.toString(Charset.defaultCharset()));
ctx.fireChannelRead(msg);//*****让消息传递给下个Handler****
}
}
).addLast(group,"耗时的group处理",
//这个handler不由NioEventLoopGroup中的Worker处理,而是DefaultXXXX来处理了
new ChannelInboundHandlerAdapter(){
@Override //ByteBuf
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(buf.toString(Charset.defaultCharset()));
}
}
);
}
})
.bind(8080);
}
}
EventLoopClient:
package com.yikolemon.EventLoop;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
//1.启动器
Channel channel = new Bootstrap()
//2.添加EventLopp
.group(new NioEventLoopGroup())
//3.选择客户端Channel实现
.channel(NioSocketChannel.class)
//4.添加处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
//连接建立后会调用
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
//1.添加编码器 String转ByteBuf
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080))
.sync()
.channel();
//6.向服务器发送数
System.out.println(channel);
System.out.println();
}
}
2.Handler如何传递任务
方法invokeChannelRead():
- next.executor()返回下一个Handler的EventExecutor(即EventLoop)
- 检查EventLoop是不是当前的线程,如果为true,直接调下一个Handler
- 如果不是的话,让下一个的Handler去执行next.invokeChannelRead方法
Channel
- close()关闭channel
- closeFuture()处理channel的关闭
- pipeline()添加处理器
- write()方法,写入数据(不一定会发)flush会把write的数据进行发送
- writeAndFlush()把数据写入并刷出
1.connect()
异步非阻塞的方法:
主线程发起了连接,真正执行connect连接操作的是NioEventLoop中的线程
主线程无阻塞的向下执行获取channel,这时的channel是还未建立好的channel
(sout打赢,显示channel是否是建立好的)
如何解决?
答:
channelFuture.sync();//阻塞线程知道Nio线程建立完毕
//使用addListener方法来异步处理结果(回调对象)
channelFuture.addListener(new ChannelFutureListener() {
@Override
//在Nio线程建立完成后,调用此方法
public void operationComplete(ChannelFuture channelFuture)
throws Exception {
//方法在此写
}
});
带有Future,Promise的方法都是和异步方法配套使用,用来处理结果
2.closeFuture()
能够在关闭之后,进行一些善后操作
1.使用阻塞的方式(sync())
package com.yikolemon.EventLoop;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.Scanner;
@Slf4j
public class FutureClient {
public static void main(String[] args) throws InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
//1.添加编码器 String转ByteBuf
nioSocketChannel.pipeline().addLast(new LoggingHandler());
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
Channel channel = channelFuture.sync().channel();
new Thread(()->{
Scanner scanner = new Scanner(System.in);
while (true) {
String s = scanner.nextLine();
if ("q".equals(s)) {
channel.close();
//log.debug("处理关闭之后的操作");
break;
}
channel.writeAndFlush(s);
}
},"input-thread").start();
//获取CloseFuture对象:1)同步模式关闭 2)异步模式关闭
ChannelFuture closeFuture= channel.closeFuture();
System.out.println("waiting close");
//如果close没有执行,那么就会在这个方法下阻塞
closeFuture.sync();
log.debug("处理关闭之后的操作");
}
}
2.使用Listener(closeFuture.addListener())
package com.yikolemon.EventLoop;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.Scanner;
@Slf4j
public class FutureClient {
public static void main(String[] args) throws InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
//1.添加编码器 String转ByteBuf
nioSocketChannel.pipeline().addLast(new LoggingHandler());
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
Channel channel = channelFuture.sync().channel();
new Thread(()->{
Scanner scanner = new Scanner(System.in);
while (true) {
String s = scanner.nextLine();
if ("q".equals(s)) {
channel.close();
//log.debug("处理关闭之后的操作");
break;
}
channel.writeAndFlush(s);
}
},"input-thread").start();
//获取CloseFuture对象:1)同步模式关闭 2)异步模式关闭
ChannelFuture closeFuture= channel.closeFuture();
closeFuture.addListener(new ChannelFutureListener() {
@Override
//由close执行的线程来执行这个方法
public void operationComplete(ChannelFuture channelFuture) throws Exception {
log.debug("关闭之后的操作");
}
});
}
}
3.如何关闭Java的线程
在2中,关闭new Thread不会关闭调EventLoopGroup的线程,所以Java线程不会终止。我们需要在closeFuture之后进行EventLoopGroup的关闭
How:
NioEventLoopGroup group = new NioEventLoopGroup();
//末尾
group.shutdownGracefully();
完整代码:
package com.yikolemon.EventLoop;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.Scanner;
@Slf4j
public class FutureClient {
public static void main(String[] args) throws InterruptedException {
//关闭client,需要释放调这个group的资源
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
//1.添加编码器 String转ByteBuf
nioSocketChannel.pipeline().addLast(new LoggingHandler());
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
Channel channel = channelFuture.sync().channel();
new Thread(()->{
Scanner scanner = new Scanner(System.in);
while (true) {
String s = scanner.nextLine();
if ("q".equals(s)) {
channel.close();
//log.debug("处理关闭之后的操作");
break;
}
channel.writeAndFlush(s);
}
},"input-thread").start();
//获取CloseFuture对象:1)同步模式关闭 2)异步模式关闭
ChannelFuture closeFuture= channel.closeFuture();
closeFuture.addListener(new ChannelFutureListener() {
@Override
//由close执行的线程来执行这个方法
public void operationComplete(ChannelFuture channelFuture) throws Exception {
log.debug("关闭之后的操作");
//优雅的关闭掉group,先拒绝新任务,等待现在的任务完成
group.shutdownGracefully();
}
});
}
}
Future&Promise
netty的Future继承Jdk的Future,而Promise继承了netty的Future
- jdk Future只能同步等待任务结束才能得到结果
- netty Future同步等待得到结果,也能异步获得结果,都需要等待任务结束
- netty Promise不仅由netty的Future的功能,脱离任务独立存在,只作为线程之间传递的容器
jdk Future:
- cancel
- isCanceled
- isDone
- get
package com.yikolemon.FutureAndPromise;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j
public class JdkFutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1.线程池
ExecutorService service = Executors.newFixedThreadPool(2);
//2.提交任务
Future<Integer> future = service.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(1000);
return 50;
}
});
//3.主线程获取future获取结果
log.debug("等待结果");
Integer res = future.get();
log.debug("res= {}",res);
}
}
netty Future:jdkFuture+
- getNow:非阻塞,没有结果返回Null
- sync:同步阻塞,等待任务结束,会抛出异常(区别get,sync不获取结果,只等待)
- await:同步阻塞,等待任务结束,不会抛出异常(需要通过isSuccess来判断)
- cause:非阻塞,返回失败信息(或者null)
- addLinstner:添加回调,异步接受结果
package com.yikolemon.FutureAndPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@Slf4j
public class NettyFutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
//这里的future是netty接口的Future
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(1000);
return 40;
}
});
log.debug("等待结果");
future.addListener(new GenericFutureListener<Future<? super Integer>>() {
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
log.debug("结果是:{}",future.get());
}
});
log.debug("异步添加之后");
}
}
Promise:netty Future+
它的用处?RPC网络框架开发
- setSucces:???
- setFailure:???
package com.yikolemon.FutureAndPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutionException;
@Slf4j
public class NettyPromiseTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
EventLoop eventLoop = new NioEventLoopGroup().next();
//1.可以主动创建promise
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
new Thread(()->{
//3.任何一个线程执行计算
log.debug("计算结果");
try {
Thread.sleep(1000);
promise.setSuccess(30);
} catch (InterruptedException e) {
e.printStackTrace();
promise.setFailure(e);
}
}
);
//4.接受结果
log.debug("等待结果");
log.debug("结果为:{}",promise.get());
}
}
Handler&Pipeline
1.Pipeline
入站ChannelInboundHandlerAdapter(),是从head向tail执行的
而出站ChannelOutboundHandlerAdapter()从tail向head执行
package com.yikolemon.HandlerAndPipeline;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class PipelineTest {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class) //Oio,Nio
.childHandler(
new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch)
throws Exception {
//1.通过channel拿到1pipeline
ChannelPipeline pipeline = ch.pipeline();
//2.添加处理器
//netty会自动添加head和tail Handler,addLast在尾巴前
//head -> h1 -> h2 -> h3 -> h4 -> h5 -> h6 ->tail
//Inbound入站,从head到tail,而出站从tail到head
pipeline.addLast("h1",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
log.debug("1");
super.channelRead(ctx, msg);
}
});
pipeline.addLast("h2",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
log.debug("2");
super.channelRead(ctx, msg);
}
});
pipeline.addLast("h3",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
log.debug("3");
super.channelRead(ctx, msg);
ch.writeAndFlush(ctx.alloc().buffer().writeBytes("fuck".getBytes()));
}
});
pipeline.addLast("h4",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
log.debug("4");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("h5",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
log.debug("5");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("h6",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
log.debug("6");
super.write(ctx, msg, promise);
}
});
}
})
.bind(8080);
}
}
2.ChannelInboundHandlerAdapter
ChannelInboundHandlerAdapter为入站处理器
channelRead会把数据传输给下一个Handler
package com.yikolemon.HandlerAndPipeline;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset;
@Slf4j
public class PipelineTest {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class) //Oio,Nio
.childHandler(
new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch)
throws Exception {
//1.通过channel拿到1pipeline
ChannelPipeline pipeline = ch.pipeline();
//2.添加处理器
//netty会自动添加head和tail Handler,addLast在尾巴前
//head -> h1 -> h2 -> h3 -> h4 -> h5 -> h6 ->tail
//Inbound入站,从head到tail,而出站从tail到head
pipeline.addLast("h1",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
log.debug("1");
ByteBuf buf=(ByteBuf)msg;
String str = buf.toString(Charset.defaultCharset());
//super.xxx这个交给下一个Handler(这里吧str传给下一个
super.channelRead(ctx, str);
}
});
pipeline.addLast("h2",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
log.debug("2");
String name = (String) msg;
Student student = new Student(name);
//把student传给下一个Handler,交给下一个入站处理器
super.channelRead(ctx, student);
}
});
pipeline.addLast("h3",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
log.debug("3,结果为:{}",msg.getClass());
//这里不需要channelRead,因为后面没有入站处理器了
ch.writeAndFlush(ctx.alloc().buffer().writeBytes("fuck".getBytes()));
}
});
pipeline.addLast("h4",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
log.debug("4");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("h5",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
log.debug("5");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("h6",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
log.debug("6");
super.write(ctx, msg, promise);
}
});
}
})
.bind(8080);
}
}
@Data
@AllArgsConstructor
class Student{
private String name;
}
3.ChannelOutboundHandlerAdapter
ChannelOutboundHandlerAdapter为出站处理器
-
channel.writeAndFlush会触发开始执行出站处理器
channel.writeAndFlush是从tail向head找的,所以必定能找到出站处理器
-
使用ChannelHandlerContext ctx,使用ctx.writeAndFlush也能触发开始执行出站处理器
但是!!!ctx.writeAndFlush是从当前Handler向head找,如果在入站处理器中调用该处理器,会导致部分的出站处理器无法工作。
4.EmbeddedChannel(用于测试Channel入站出站)
我们之前测试都需要启动Server和Client,而使用EmbeddedChannel就不需要启动两个Netty就能实现测试
package com.yikolemon.Channel;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class EmbeddedChannelTest {
//EmbeddedChannel用于测试,不需要去启动Server和Client了
public static void main(String[] args) {
ChannelInboundHandlerAdapter h1= new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
super.channelRead(ctx, msg);
}
};
ChannelInboundHandlerAdapter h2= new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("2");
super.channelRead(ctx, msg);
}
};
ChannelOutboundHandlerAdapter h3= new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("3");
super.write(ctx, msg, promise);
}
};
ChannelOutboundHandlerAdapter h4= new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("4");
super.write(ctx, msg, promise);
}
};
EmbeddedChannel channel=new EmbeddedChannel(h1,h2,h3,h4);
//模拟入站操作
channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));
System.out.println("出站");
//模拟出站操作
channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));
}
}
ByteBuf
区别于Nio中的ByteBuffer,ByteBuf是Nio提供的对ByteBuffer的封装,更容易使用。
对ByteBuf自动扩容机制的测试:
package com.yikolemon.ByteBuf;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
public class ByteBUfTest {
//测试ByteBuf的自动扩容机制
public static void main(String[] args) {
//扩容前的ByteBUf
ByteBuf buf= ByteBufAllocator.DEFAULT.buffer();
System.out.println(buf);
StringBuffer sb = new StringBuffer();
for (int i = 0; i < 300; i++) {
sb.append("a");
}
buf.writeBytes(sb.toString().getBytes());
//扩容后的ByteBuf,cap代表容量
System.out.println(buf);
}
}
1.直接内存 和 堆内存
直接内存的读写效率高于堆内存
堆内存还会受到JVM垃圾回收的影响
Netty默认使用直接内存,下面是Netty对内存分配的方法:
ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
ByteBuf buf1 = ByteBufAllocator.DEFAULT.directBuffer();
2.池化 和 非池化
Netty支持池化,对于一些创建比较慢的资源,可以用池化来优化,可以重用ByteBuf对象。
如:数据库连接池
优点:
- 直接内存代价昂贵,堆内存会增加GC压力
- 池化可以重用ByteBuf实例,采用jemalloc类似内存分配算法提升分配效率
- 高并发,池化可以节约内存,减少内存移除的可能
默认开启池化功能,通过环境变量设置:
-Dio.netty.allocator.type={unpooled|pooled}
- 4.1之后非Android默认启动池化,Android非池化
- 4.1之前默认非池化
测试:
在Idea运行配置中修改虚拟机参数,能修改ByteBuf的池化类型
package com.yikolemon.ByteBuf;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
public class DirectHeapTest {
public static void main(String[] args) {
ByteBuf buf1 = ByteBufAllocator.DEFAULT.directBuffer();
System.out.println(buf1.getClass());
ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
System.out.println(buf.getClass());
}
}
3.组成
不同于ByteBuffer有一个指针,ByteBuf有读写两个指针
- 容量
- 最大容量
- 写指针
- 读指针
在容量与最大容量之间,即为可扩容部分
在容量和写指针之间,即为可写部分
写指针与读指针之间,即为可读部分
已经读过的部分,即为废弃部分
4.方法
写入
方法 | 含义 | 备注 |
---|---|---|
writeBoolean(boolean value) | 写入boolean | 字节1/0代表true/false |
... | ||
writeBytes(ByteBuffer src) | 写入nio的ByteBuffer | |
int writeCharSequence(CharSequence sequence,Charset charset) | 写入字符串 |
buffer.writeBytes(new byte[]{1,2,3,4});
log(buffer);
扩容
扩容规则:
- 如果写入后没有超过512,写入后大小为16倍数
- 如果写入大小超过512,写入后大小为2^n(16,32,64,128.....)
读取
1.会改变读指针的位置
buffer.readByte();
如何重复读取:
buffer.markReaderIndex();//做reader标记
buffer.resetReaderIndex();//把reader充值到mark的位置
2.不会改变读指针的位置
buffer.getxxx()
5.内存回收
buf.release();
ByteBuf几种实现:
- UnpooledHeapByteBuf:通过JVM来回收
- UnpooledDirectByteBuf:需要特殊方法来回收
- PooledByteBuffer:需要更复杂的规则回收
Netty对每个ByteBuffer实现ReferenceCounted接口
- 每个ByteBuf对象初始计数为1
- 使用release方法计数减1,如果count为0,就被回收了
- 调用retain方法技术加1,不为0,调用release也不会回收
问题:
谁来调用release()方法?
- 由于ByteBuf在Handler中传递,所以应该在最后一个Handler来释放ByteBuf来释放。
- head和tail就像两个门神,在最后传入到他们的时候,会对没有释放的ByteBuf进行释放。
- 虽然Head和Tail进行了处理,但是还是需要我们进行处理的(养成好习惯)
6.头尾Handler内存回收源码
TailContext(实现入站处理器接口)
ChannelRead调用onUnhandledInboundMessage方法,调用ReferenceCountUtil.realse()方法
实现:如果msgh实现RederenceCounted接口,进行强转然后release()
HeadContext(实现出站)
write方法,调用unsafe.write()方法(AbstractUnsafe)
检查buffer缓冲区,如果没有直接release
如果有缓冲区,较为负责xxxx
7.slice()
零拷贝(即不会拷贝一份相同内容)的体现之一
将原始的ByteBuf切片成为多个ByteBuf,切片后没有发生内存复制,仍然使用原始的ByteBuf内存。切片之后的新ByteBuf维护独立的read,write指针
注意点:
-
使用slice切片后,最大容量会出现限制,切片后的ByteBuf无法写入超出容量的数据
-
对原来的ByteBuf进行release()操作,切片后的ByteBuf也会被释放
如何解决release()释放后,新ByteBuf无法使用?buf.retain()使原ByteBuf引用计数+1,这是release()一次不会释放内存
package com.yikolemon;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import lombok.extern.slf4j.Slf4j;
import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump;
import static io.netty.util.internal.StringUtil.NEWLINE;
@Slf4j
public class SliceTest {
public static void main(String[] args) {
ByteBuf buf= ByteBufAllocator.DEFAULT.buffer(10);
buf.writeBytes(new byte[]{'a','b','c','d','e','f','g','h','i','j'});
log(buf);
ByteBuf buf1 = buf.slice(0, 5);
ByteBuf buf2 = buf.slice(5, 5);
log(buf1);
log(buf2);
//对buf做修改查看buf1是否也修改了
buf.setByte(0,'s');
log(buf1);
}
public static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(buf, buffer);
System.out.println(buf.toString());
}
}
8.duplicate()
零拷贝(即不会拷贝一份相同内容)的体现之一
截取了原始ByteBuf所有内容,没有max capacity限制,共用同一块内存区域。唯一的区别是:读写指针独立
package com.yikolemon.ZeroCpoy;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump;
import static io.netty.util.internal.StringUtil.NEWLINE;
public class DuplicateTest {
public static void main(String[] args) {
ByteBuf buf= ByteBufAllocator.DEFAULT.buffer(10);
buf.writeBytes(new byte[]{'a','b','c','d','e','f','g','h','i','j'});
ByteBuf newBuf = buf.duplicate();
log(newBuf);
}
public static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(buf, buffer);
System.out.println(buf.toString());
}
}
9.compositeBuffer()
零拷贝(即不会拷贝一份相同内容)的体现之一
将多个ByteBuf合并成为一个整体
注意,需要在方法中带上true参数,表示读写指针能自动增长,才能让数据写入(零拷贝)
package com.yikolemon.ZeroCpoy;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump;
import static io.netty.util.internal.StringUtil.NEWLINE;
public class CompositeBufferTest {
public static void main(String[] args) {
ByteBuf buf1= ByteBufAllocator.DEFAULT.buffer(5);
buf1.writeBytes(new byte[]{'a','b','c','d','e'});
ByteBuf buf2= ByteBufAllocator.DEFAULT.buffer(5);
buf2.writeBytes(new byte[]{'f','g','h','i','j'});
CompositeByteBuf bufs = ByteBufAllocator.DEFAULT.compositeBuffer();
bufs.addComponents(true,buf1,buf2);
log(bufs);
}
public static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(buf, buffer);
System.out.println(buf.toString());
}
}
10.Unpooled
Unpooled是一个工具类,提供非池化的ByteBuf创建组合复制等操作
其中wrappedBuffer(零拷贝),可以包装ByteBuf
可以组装两个ByteBuf,也能组装多个byte数组
总结
优势:
- 可以重用池中的ByteBuf实例,减少内存溢出的问题。
- 读写指针分离,不像ByteBuffer一样要切换读写模式
- 自动扩容
- 支持链式调用
- 很多零拷贝