首页 > 其他分享 >Netty入门

Netty入门

时间:2022-12-21 22:45:24浏览次数:50  
标签:Netty 入门 netty ByteBuf io import new channel

1.简介

异步,基于事件驱动的网络应用框架

TCP/IP=>JDK原生=>NIO===>Netty

《netty实战》《netty权威指南》

2.应用

基于网络的高并发或者网络的通信

  1. RPC框架远程服务调用,作为基础框架组件被使用

    dubbo默认使用netty进行

  2. 游戏通信

3.IO模型

java的三种网络编程模型:

  1. BIO(同步阻塞)
  2. NIO(同步非阻塞)
  3. AIO

4.Netty组件

Eventloop(事件循环对象)

目的:来处理channel上的io事件

本质:单线程执行器

接口:

  • 继承了线程池中ScheduledExecutorService,包含线程池的所有方法
  • 继承netty的OrderedEventExecutor

EventLoopGroup是一组EventLoop,Channel通过EventLoopGroup的register方法绑定其中一个EventLoop,之后这个Channel的所有Io事件都由这个EventLoop来处理(保证线程的安全问题)

对于比较耗时的操作,可以再新建一个DefaultEventLoopGroup来进行特定的比较耗时的ChannelInboundHandlerAdapter方法

package com.yikolemon.EventLoop;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.nio.charset.Charset;

public class EventLoopServer {
    public static void main(String[] args) {
        //细分2,创建一个新的EventLoopGroup,来处理耗时较长的操作
        DefaultEventLoopGroup group = new DefaultEventLoopGroup();

        new ServerBootstrap()
                //boss 和 worker ,把accept和read事件划分的更清晰些
                //细分1:work只负责socketChannel的读写,NIO中分为了两个
                .group(new NioEventLoopGroup(),new NioEventLoopGroup(2))
                .channel(NioServerSocketChannel.class)
                .childHandler(
                    new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel channel)
                            throws Exception {
                        channel.pipeline().addLast(
                                new ChannelInboundHandlerAdapter(){
                                    @Override           //ByteBuf
                                    public void channelRead(ChannelHandlerContext ctx, Object msg) 
                                            throws Exception {
                                        ByteBuf buf = (ByteBuf) msg;
                                        System.out.println(buf.toString(Charset.defaultCharset()));
                                        ctx.fireChannelRead(msg);//*****让消息传递给下个Handler****
                                    }
                                }
                        ).addLast(group,"耗时的group处理",
                                //这个handler不由NioEventLoopGroup中的Worker处理,而是DefaultXXXX来处理了
                                new ChannelInboundHandlerAdapter(){
                                    @Override           //ByteBuf
                                    public void channelRead(ChannelHandlerContext ctx, Object msg)
                                            throws Exception {
                                        ByteBuf buf = (ByteBuf) msg;
                                        System.out.println(buf.toString(Charset.defaultCharset()));
                                    }
                                }
                        );
                    }
                })
                .bind(8080);
    }
}

1.EventLoopServer+Client

worker会轮流处理Channel,但是当连接建立后,Channel发送的消息只会有特定worker进行处理

EventLoopServer:

package com.yikolemon.EventLoop;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.nio.charset.Charset;

public class EventLoopServer {
    public static void main(String[] args) {
        //细分2,创建一个新的EventLoopGroup,来处理耗时较长的操作
        DefaultEventLoopGroup group = new DefaultEventLoopGroup();

        new ServerBootstrap()
                //boss 和 worker ,把accept和read事件划分的更清晰些
                //细分1:work只负责socketChannel的读写,NIO中分为了两个
                .group(new NioEventLoopGroup(),new NioEventLoopGroup(2))
                .channel(NioServerSocketChannel.class)
                .childHandler(
                    new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel channel) throws Exception {
                        channel.pipeline().addLast(
                                new ChannelInboundHandlerAdapter(){
                                    @Override           //ByteBuf
                                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                        ByteBuf buf = (ByteBuf) msg;
                                        System.out.println(buf.toString(Charset.defaultCharset()));
                                        ctx.fireChannelRead(msg);//*****让消息传递给下个Handler****
                                    }
                                }
                        ).addLast(group,"耗时的group处理",
                                //这个handler不由NioEventLoopGroup中的Worker处理,而是DefaultXXXX来处理了
                                new ChannelInboundHandlerAdapter(){
                                    @Override           //ByteBuf
                                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                        ByteBuf buf = (ByteBuf) msg;
                                        System.out.println(buf.toString(Charset.defaultCharset()));
                                    }
                                }
                        );
                    }
                })
                .bind(8080);
    }
}

EventLoopClient:

package com.yikolemon.EventLoop;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;

import java.net.InetSocketAddress;

public class EventLoopClient {
    public static void main(String[] args) throws InterruptedException {
        //1.启动器
        Channel channel = new Bootstrap()
                //2.添加EventLopp
                .group(new NioEventLoopGroup())
                //3.选择客户端Channel实现
                .channel(NioSocketChannel.class)
                //4.添加处理器
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    //连接建立后会调用
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        //1.添加编码器 String转ByteBuf
                        nioSocketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080))
                .sync()
                .channel();
        //6.向服务器发送数
        System.out.println(channel);
        System.out.println();
    }
}

2.Handler如何传递任务

方法invokeChannelRead():

  1. next.executor()返回下一个Handler的EventExecutor(即EventLoop)
  2. 检查EventLoop是不是当前的线程,如果为true,直接调下一个Handler
  3. 如果不是的话,让下一个的Handler去执行next.invokeChannelRead方法

Channel

  • close()关闭channel
  • closeFuture()处理channel的关闭
  • pipeline()添加处理器
  • write()方法,写入数据(不一定会发)flush会把write的数据进行发送
  • writeAndFlush()把数据写入并刷出

1.connect()

异步非阻塞的方法:

主线程发起了连接,真正执行connect连接操作的是NioEventLoop中的线程

主线程无阻塞的向下执行获取channel,这时的channel是还未建立好的channel

(sout打赢,显示channel是否是建立好的)

如何解决?

答:

channelFuture.sync();//阻塞线程知道Nio线程建立完毕
//使用addListener方法来异步处理结果(回调对象)
channelFuture.addListener(new ChannelFutureListener() {
    @Override
    //在Nio线程建立完成后,调用此方法
    public void operationComplete(ChannelFuture channelFuture)
            throws Exception {
		//方法在此写
    }
});

带有Future,Promise的方法都是和异步方法配套使用,用来处理结果

2.closeFuture()

能够在关闭之后,进行一些善后操作

1.使用阻塞的方式(sync())

package com.yikolemon.EventLoop;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.util.Scanner;

@Slf4j
public class FutureClient {
    public static void main(String[] args) throws InterruptedException {
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        //1.添加编码器 String转ByteBuf
                        nioSocketChannel.pipeline().addLast(new LoggingHandler());
                        nioSocketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080));
        Channel channel = channelFuture.sync().channel();
        new Thread(()->{
            Scanner scanner = new Scanner(System.in);
            while (true) {
                String s = scanner.nextLine();
                if ("q".equals(s)) {
                    channel.close();
                    //log.debug("处理关闭之后的操作");
                    break;
                }
                channel.writeAndFlush(s);
            }
        },"input-thread").start();
        //获取CloseFuture对象:1)同步模式关闭 2)异步模式关闭
        ChannelFuture closeFuture= channel.closeFuture();
        System.out.println("waiting close");
        //如果close没有执行,那么就会在这个方法下阻塞
        closeFuture.sync();
        log.debug("处理关闭之后的操作");

    }
}

2.使用Listener(closeFuture.addListener())

package com.yikolemon.EventLoop;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.util.Scanner;

@Slf4j
public class FutureClient {
    public static void main(String[] args) throws InterruptedException {
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        //1.添加编码器 String转ByteBuf
                        nioSocketChannel.pipeline().addLast(new LoggingHandler());
                        nioSocketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080));
        Channel channel = channelFuture.sync().channel();
        new Thread(()->{
            Scanner scanner = new Scanner(System.in);
            while (true) {
                String s = scanner.nextLine();
                if ("q".equals(s)) {
                    channel.close();
                    //log.debug("处理关闭之后的操作");
                    break;
                }
                channel.writeAndFlush(s);
            }
        },"input-thread").start();
        //获取CloseFuture对象:1)同步模式关闭 2)异步模式关闭
        ChannelFuture closeFuture= channel.closeFuture();
        closeFuture.addListener(new ChannelFutureListener() {
            @Override
            //由close执行的线程来执行这个方法
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                log.debug("关闭之后的操作");
            }
        });
    }
}

3.如何关闭Java的线程

在2中,关闭new Thread不会关闭调EventLoopGroup的线程,所以Java线程不会终止。我们需要在closeFuture之后进行EventLoopGroup的关闭

How:

NioEventLoopGroup group = new NioEventLoopGroup();
//末尾
group.shutdownGracefully();

完整代码:

package com.yikolemon.EventLoop;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.util.Scanner;

@Slf4j
public class FutureClient {
    public static void main(String[] args) throws InterruptedException {
        //关闭client,需要释放调这个group的资源
        NioEventLoopGroup group = new NioEventLoopGroup();
        ChannelFuture channelFuture = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        //1.添加编码器 String转ByteBuf
                        nioSocketChannel.pipeline().addLast(new LoggingHandler());
                        nioSocketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080));
        Channel channel = channelFuture.sync().channel();
        new Thread(()->{
            Scanner scanner = new Scanner(System.in);
            while (true) {
                String s = scanner.nextLine();
                if ("q".equals(s)) {
                    channel.close();
                    //log.debug("处理关闭之后的操作");
                    break;
                }
                channel.writeAndFlush(s);
            }
        },"input-thread").start();
        //获取CloseFuture对象:1)同步模式关闭 2)异步模式关闭
        ChannelFuture closeFuture= channel.closeFuture();
        closeFuture.addListener(new ChannelFutureListener() {
            @Override
            //由close执行的线程来执行这个方法
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                log.debug("关闭之后的操作");
                //优雅的关闭掉group,先拒绝新任务,等待现在的任务完成
                group.shutdownGracefully();
            }
        });
    }
}

Future&Promise

netty的Future继承Jdk的Future,而Promise继承了netty的Future

  • jdk Future只能同步等待任务结束才能得到结果
  • netty Future同步等待得到结果,也能异步获得结果,都需要等待任务结束
  • netty Promise不仅由netty的Future的功能,脱离任务独立存在,只作为线程之间传递的容器

jdk Future:

  • cancel
  • isCanceled
  • isDone
  • get
package com.yikolemon.FutureAndPromise;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j
public class JdkFutureTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1.线程池
        ExecutorService service = Executors.newFixedThreadPool(2);
        //2.提交任务
        Future<Integer> future = service.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                Thread.sleep(1000);
                return 50;
            }
        });
        //3.主线程获取future获取结果
        log.debug("等待结果");
        Integer res = future.get();
        log.debug("res= {}",res);
    }
}

netty Future:jdkFuture+

  • getNow:非阻塞,没有结果返回Null
  • sync:同步阻塞,等待任务结束,会抛出异常(区别get,sync不获取结果,只等待)
  • await:同步阻塞,等待任务结束,不会抛出异常(需要通过isSuccess来判断)
  • cause:非阻塞,返回失败信息(或者null)
  • addLinstner:添加回调,异步接受结果
package com.yikolemon.FutureAndPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;

@Slf4j
public class NettyFutureTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        EventLoop eventLoop = group.next();
        //这里的future是netty接口的Future
        Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                Thread.sleep(1000);
                return 40;
            }
        });
        log.debug("等待结果");
        future.addListener(new GenericFutureListener<Future<? super Integer>>() {
            @Override
            public void operationComplete(Future<? super Integer> future) throws Exception {
                log.debug("结果是:{}",future.get());
            }
        });
        log.debug("异步添加之后");
    }
}

Promise:netty Future+
它的用处?RPC网络框架开发

  • setSucces:???
  • setFailure:???
package com.yikolemon.FutureAndPromise;

import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutionException;

@Slf4j
public class NettyPromiseTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        EventLoop eventLoop = new NioEventLoopGroup().next();
        //1.可以主动创建promise
        DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);

        new Thread(()->{
                //3.任何一个线程执行计算
                log.debug("计算结果");
                try {
                    Thread.sleep(1000);
                    promise.setSuccess(30);

                } catch (InterruptedException e) {
                    e.printStackTrace();
                    promise.setFailure(e);
                }
            }
        );
        //4.接受结果
        log.debug("等待结果");
        log.debug("结果为:{}",promise.get());

    }
}

Handler&Pipeline

1.Pipeline

入站ChannelInboundHandlerAdapter(),是从head向tail执行的

而出站ChannelOutboundHandlerAdapter()从tail向head执行

package com.yikolemon.HandlerAndPipeline;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class PipelineTest {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class) //Oio,Nio
                .childHandler(
                    new ChannelInitializer<NioSocketChannel>() {
                        protected void initChannel(NioSocketChannel ch) 
                                throws Exception {
                            //1.通过channel拿到1pipeline
                            ChannelPipeline pipeline = ch.pipeline();
                            //2.添加处理器
                            //netty会自动添加head和tail Handler,addLast在尾巴前
                            //head -> h1 -> h2 -> h3 -> h4 -> h5 -> h6 ->tail
                            //Inbound入站,从head到tail,而出站从tail到head
                            pipeline.addLast("h1",new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) 
                                        throws Exception {
                                    log.debug("1");
                                    super.channelRead(ctx, msg);
                                }
                            });
                            pipeline.addLast("h2",new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) 
                                        throws Exception {
                                    log.debug("2");
                                    super.channelRead(ctx, msg);
                                }
                            });
                            pipeline.addLast("h3",new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) 
                                        throws Exception {
                                    log.debug("3");
                                    super.channelRead(ctx, msg);
                                    ch.writeAndFlush(ctx.alloc().buffer().writeBytes("fuck".getBytes()));
                                }
                            });
                            pipeline.addLast("h4",new ChannelOutboundHandlerAdapter(){
                                @Override
                                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) 
                                        throws Exception {
                                    log.debug("4");
                                    super.write(ctx, msg, promise);
                                }
                            });
                            pipeline.addLast("h5",new ChannelOutboundHandlerAdapter(){
                                @Override
                                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) 
                                        throws Exception {
                                    log.debug("5");
                                    super.write(ctx, msg, promise);
                                }
                            });
                            pipeline.addLast("h6",new ChannelOutboundHandlerAdapter(){
                                @Override
                                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) 
                                        throws Exception {
                                    log.debug("6");
                                    super.write(ctx, msg, promise);
                                }
                            });
                        }
                    })
                .bind(8080);
    }
}

2.ChannelInboundHandlerAdapter

ChannelInboundHandlerAdapter为入站处理器

channelRead会把数据传输给下一个Handler

package com.yikolemon.HandlerAndPipeline;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.Charset;

@Slf4j
public class PipelineTest {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class) //Oio,Nio
                .childHandler(
                    new ChannelInitializer<NioSocketChannel>() {
                        protected void initChannel(NioSocketChannel ch)
                                throws Exception {
                            //1.通过channel拿到1pipeline
                            ChannelPipeline pipeline = ch.pipeline();
                            //2.添加处理器
                            //netty会自动添加head和tail Handler,addLast在尾巴前
                            //head -> h1 -> h2 -> h3 -> h4 -> h5 -> h6 ->tail
                            //Inbound入站,从head到tail,而出站从tail到head
                            pipeline.addLast("h1",new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg)
                                        throws Exception {
                                    log.debug("1");
                                    ByteBuf buf=(ByteBuf)msg;
                                    String str = buf.toString(Charset.defaultCharset());
                                    //super.xxx这个交给下一个Handler(这里吧str传给下一个
                                    super.channelRead(ctx, str);
                                }
                            });
                            pipeline.addLast("h2",new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg)
                                        throws Exception {

                                    log.debug("2");
                                    String name = (String) msg;
                                    Student student = new Student(name);
                                    //把student传给下一个Handler,交给下一个入站处理器
                                    super.channelRead(ctx, student);
                                }
                            });
                            pipeline.addLast("h3",new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg)
                                        throws Exception {
                                    log.debug("3,结果为:{}",msg.getClass());
                                    //这里不需要channelRead,因为后面没有入站处理器了
                                    ch.writeAndFlush(ctx.alloc().buffer().writeBytes("fuck".getBytes()));
                                }
                            });
                            pipeline.addLast("h4",new ChannelOutboundHandlerAdapter(){
                                @Override
                                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
                                        throws Exception {
                                    log.debug("4");
                                    super.write(ctx, msg, promise);
                                }
                            });
                            pipeline.addLast("h5",new ChannelOutboundHandlerAdapter(){
                                @Override
                                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
                                        throws Exception {
                                    log.debug("5");
                                    super.write(ctx, msg, promise);
                                }
                            });
                            pipeline.addLast("h6",new ChannelOutboundHandlerAdapter(){
                                @Override
                                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
                                        throws Exception {
                                    log.debug("6");
                                    super.write(ctx, msg, promise);
                                }
                            });
                        }
                    })
                .bind(8080);



    }
}
@Data
@AllArgsConstructor
class Student{
    private String name;
}

3.ChannelOutboundHandlerAdapter

ChannelOutboundHandlerAdapter为出站处理器

  • channel.writeAndFlush会触发开始执行出站处理器

    channel.writeAndFlush是从tail向head找的,所以必定能找到出站处理器

  • 使用ChannelHandlerContext ctx,使用ctx.writeAndFlush也能触发开始执行出站处理器

    但是!!!ctx.writeAndFlush是从当前Handler向head找,如果在入站处理器中调用该处理器,会导致部分的出站处理器无法工作。

4.EmbeddedChannel(用于测试Channel入站出站)

我们之前测试都需要启动Server和Client,而使用EmbeddedChannel就不需要启动两个Netty就能实现测试

package com.yikolemon.Channel;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class EmbeddedChannelTest {
    //EmbeddedChannel用于测试,不需要去启动Server和Client了
    public static void main(String[] args) {
        ChannelInboundHandlerAdapter h1= new ChannelInboundHandlerAdapter(){
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                log.debug("1");
                super.channelRead(ctx, msg);
            }
        };
        ChannelInboundHandlerAdapter h2= new ChannelInboundHandlerAdapter(){
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                log.debug("2");
                super.channelRead(ctx, msg);
            }
        };
        ChannelOutboundHandlerAdapter h3= new ChannelOutboundHandlerAdapter(){
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                log.debug("3");
                super.write(ctx, msg, promise);
            }
        };
        ChannelOutboundHandlerAdapter h4= new ChannelOutboundHandlerAdapter(){
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                log.debug("4");
                super.write(ctx, msg, promise);
            }
        };
        EmbeddedChannel channel=new EmbeddedChannel(h1,h2,h3,h4);
        //模拟入站操作
        channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));
        System.out.println("出站");
        //模拟出站操作
        channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));

    }
}

ByteBuf

区别于Nio中的ByteBuffer,ByteBuf是Nio提供的对ByteBuffer的封装,更容易使用。

对ByteBuf自动扩容机制的测试:

package com.yikolemon.ByteBuf;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;

public class ByteBUfTest {
    //测试ByteBuf的自动扩容机制
    public static void main(String[] args) {
        //扩容前的ByteBUf
        ByteBuf buf= ByteBufAllocator.DEFAULT.buffer();
        System.out.println(buf);
        StringBuffer sb = new StringBuffer();
        for (int i = 0; i < 300; i++) {
            sb.append("a");
        }
        buf.writeBytes(sb.toString().getBytes());
        //扩容后的ByteBuf,cap代表容量
        System.out.println(buf);
    }
}

1.直接内存 和 堆内存

直接内存的读写效率高于堆内存

堆内存还会受到JVM垃圾回收的影响

Netty默认使用直接内存,下面是Netty对内存分配的方法:

ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
ByteBuf buf1 = ByteBufAllocator.DEFAULT.directBuffer();

2.池化 和 非池化

Netty支持池化,对于一些创建比较慢的资源,可以用池化来优化,可以重用ByteBuf对象。

如:数据库连接池

优点:

  • 直接内存代价昂贵,堆内存会增加GC压力
  • 池化可以重用ByteBuf实例,采用jemalloc类似内存分配算法提升分配效率
  • 高并发,池化可以节约内存,减少内存移除的可能

默认开启池化功能,通过环境变量设置:

-Dio.netty.allocator.type={unpooled|pooled}
  • 4.1之后非Android默认启动池化,Android非池化
  • 4.1之前默认非池化

测试:

在Idea运行配置中修改虚拟机参数,能修改ByteBuf的池化类型

package com.yikolemon.ByteBuf;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;

public class DirectHeapTest {
    public static void main(String[] args) {
        ByteBuf buf1 = ByteBufAllocator.DEFAULT.directBuffer();
        System.out.println(buf1.getClass());
        ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
        System.out.println(buf.getClass());
    }
}

3.组成

不同于ByteBuffer有一个指针,ByteBuf有读写两个指针

  • 容量
  • 最大容量
  • 写指针
  • 读指针

在容量与最大容量之间,即为可扩容部分

在容量和写指针之间,即为可写部分

写指针与读指针之间,即为可读部分

已经读过的部分,即为废弃部分

4.方法

写入

方法 含义 备注
writeBoolean(boolean value) 写入boolean 字节1/0代表true/false
...
writeBytes(ByteBuffer src) 写入nio的ByteBuffer
int writeCharSequence(CharSequence sequence,Charset charset) 写入字符串
buffer.writeBytes(new byte[]{1,2,3,4});
log(buffer);

扩容

扩容规则:

  • 如果写入后没有超过512,写入后大小为16倍数
  • 如果写入大小超过512,写入后大小为2^n(16,32,64,128.....)

读取

1.会改变读指针的位置

buffer.readByte();

如何重复读取:

buffer.markReaderIndex();//做reader标记
buffer.resetReaderIndex();//把reader充值到mark的位置

2.不会改变读指针的位置

buffer.getxxx()

5.内存回收

buf.release();

ByteBuf几种实现:

  • UnpooledHeapByteBuf:通过JVM来回收
  • UnpooledDirectByteBuf:需要特殊方法来回收
  • PooledByteBuffer:需要更复杂的规则回收

Netty对每个ByteBuffer实现ReferenceCounted接口

  • 每个ByteBuf对象初始计数为1
  • 使用release方法计数减1,如果count为0,就被回收了
  • 调用retain方法技术加1,不为0,调用release也不会回收

问题:

谁来调用release()方法?

  • 由于ByteBuf在Handler中传递,所以应该在最后一个Handler来释放ByteBuf来释放。
  • head和tail就像两个门神,在最后传入到他们的时候,会对没有释放的ByteBuf进行释放。
  • 虽然Head和Tail进行了处理,但是还是需要我们进行处理的(养成好习惯)

6.头尾Handler内存回收源码

TailContext(实现入站处理器接口)

ChannelRead调用onUnhandledInboundMessage方法,调用ReferenceCountUtil.realse()方法

实现:如果msgh实现RederenceCounted接口,进行强转然后release()

HeadContext(实现出站)

write方法,调用unsafe.write()方法(AbstractUnsafe)

检查buffer缓冲区,如果没有直接release

如果有缓冲区,较为负责xxxx

7.slice()

零拷贝(即不会拷贝一份相同内容)的体现之一

将原始的ByteBuf切片成为多个ByteBuf,切片后没有发生内存复制,仍然使用原始的ByteBuf内存。切片之后的新ByteBuf维护独立的read,write指针

注意点:

  • 使用slice切片后,最大容量会出现限制,切片后的ByteBuf无法写入超出容量的数据

  • 对原来的ByteBuf进行release()操作,切片后的ByteBuf也会被释放

    如何解决release()释放后,新ByteBuf无法使用?buf.retain()使原ByteBuf引用计数+1,这是release()一次不会释放内存

package com.yikolemon;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import lombok.extern.slf4j.Slf4j;

import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump;
import static io.netty.util.internal.StringUtil.NEWLINE;

@Slf4j
public class SliceTest {
    public static void main(String[] args) {
        ByteBuf buf= ByteBufAllocator.DEFAULT.buffer(10);
        buf.writeBytes(new byte[]{'a','b','c','d','e','f','g','h','i','j'});
        log(buf);
        ByteBuf buf1 = buf.slice(0, 5);
        ByteBuf buf2 = buf.slice(5, 5);
        log(buf1);
        log(buf2);
        //对buf做修改查看buf1是否也修改了
        buf.setByte(0,'s');
        log(buf1);
    }

    public static void log(ByteBuf buffer) {
        int length = buffer.readableBytes();
        int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
        StringBuilder buf = new StringBuilder(rows * 80 * 2)
                .append("read index:").append(buffer.readerIndex())
                .append(" write index:").append(buffer.writerIndex())
                .append(" capacity:").append(buffer.capacity())
                .append(NEWLINE);
        appendPrettyHexDump(buf, buffer);
        System.out.println(buf.toString());
    }
}

8.duplicate()

零拷贝(即不会拷贝一份相同内容)的体现之一

截取了原始ByteBuf所有内容,没有max capacity限制,共用同一块内存区域。唯一的区别是:读写指针独立

package com.yikolemon.ZeroCpoy;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;

import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump;
import static io.netty.util.internal.StringUtil.NEWLINE;

public class DuplicateTest {
    public static void main(String[] args) {
        ByteBuf buf= ByteBufAllocator.DEFAULT.buffer(10);
        buf.writeBytes(new byte[]{'a','b','c','d','e','f','g','h','i','j'});
        ByteBuf newBuf = buf.duplicate();
        log(newBuf);
    }
    public static void log(ByteBuf buffer) {
        int length = buffer.readableBytes();
        int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
        StringBuilder buf = new StringBuilder(rows * 80 * 2)
                .append("read index:").append(buffer.readerIndex())
                .append(" write index:").append(buffer.writerIndex())
                .append(" capacity:").append(buffer.capacity())
                .append(NEWLINE);
        appendPrettyHexDump(buf, buffer);
        System.out.println(buf.toString());
    }
}

9.compositeBuffer()

零拷贝(即不会拷贝一份相同内容)的体现之一

将多个ByteBuf合并成为一个整体

注意,需要在方法中带上true参数,表示读写指针能自动增长,才能让数据写入(零拷贝)

package com.yikolemon.ZeroCpoy;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;

import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump;
import static io.netty.util.internal.StringUtil.NEWLINE;

public class CompositeBufferTest {
    public static void main(String[] args) {
        ByteBuf buf1= ByteBufAllocator.DEFAULT.buffer(5);
        buf1.writeBytes(new byte[]{'a','b','c','d','e'});
        ByteBuf buf2= ByteBufAllocator.DEFAULT.buffer(5);
        buf2.writeBytes(new byte[]{'f','g','h','i','j'});
        CompositeByteBuf bufs = ByteBufAllocator.DEFAULT.compositeBuffer();
        bufs.addComponents(true,buf1,buf2);
        log(bufs);
    }
    public static void log(ByteBuf buffer) {
        int length = buffer.readableBytes();
        int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
        StringBuilder buf = new StringBuilder(rows * 80 * 2)
                .append("read index:").append(buffer.readerIndex())
                .append(" write index:").append(buffer.writerIndex())
                .append(" capacity:").append(buffer.capacity())
                .append(NEWLINE);
        appendPrettyHexDump(buf, buffer);
        System.out.println(buf.toString());
    }
}

10.Unpooled

Unpooled是一个工具类,提供非池化的ByteBuf创建组合复制等操作

其中wrappedBuffer(零拷贝),可以包装ByteBuf

可以组装两个ByteBuf,也能组装多个byte数组

总结

优势:

  • 可以重用池中的ByteBuf实例,减少内存溢出的问题。
  • 读写指针分离,不像ByteBuffer一样要切换读写模式
  • 自动扩容
  • 支持链式调用
  • 很多零拷贝

标签:Netty,入门,netty,ByteBuf,io,import,new,channel
From: https://www.cnblogs.com/zko0/p/16997398.html

相关文章

  • 普通人或者门外汉该怎样入门编程?
    继我关注很久一位编程的大牛发布了他的那篇文章——<探究:普通人都是怎么入门编程>https://www.cnblogs.com/liuyangfirst/p/16991386.html我先去再相关评论里看到了有人说......
  • 仿文:普通人或者门外汉该怎样入门编程?
    继我关注很久一位编程的大牛发布了他的那篇文章——<探究:普通人都是怎么入门编程>https://www.cnblogs.com/liuyangfirst/p/16991386.html我先去再相关评论里看到了有......
  • Day24.1.数组入门
    Day24.1.数组入门1.定义数组是相同数据的有序集合每个数据为一个数组元素,每个数组元素可以通过一个下标来访问他们2.数组的声明创建声明: //方法一 dataTyp......
  • QT 开发快速入门
    本人qt业余,但有的时候要用到qt,而又没有系统的学习,用到哪里看哪里。环境:vs2012+qt-vsaddins+qt5.5  qt的按钮点击事件出发的基本要素:1.按钮触发函数为public......
  • Netty-介绍-1
    Netty介绍和应用场景要求已经掌握了主要技术构成:JavaOOP编程、Java 多线程编程、JavaIO编程、Java 网络编程、常用的Java设计模式(比如 观察......
  • django模型层之models入门篇(Relationship fields)
    先看下models结构:#tournament/models.pyfromdjango.dbimportmodelsclassClub(models.Model):region_choices=[('E','East'),('W','W......
  • 难说| 看来我还没有入门 CPP
    难说|看来我还没有入门CPP原cpp新标准博览Incredibuildppt前言​​5​​月​​18​​日参加了​​Incredibuild​​与博览网联合举办的主题为《深入解析20个c++......
  • 使用whistle使用和入门
      这里注意、是手机下载抓取web接口时、需要在电脑浏览器设置代理 ......
  • Python全栈工程师之从网页搭建入门到Flask全栈项目实战(7) - 在线问答系统
    1.项目源码/业务逻辑百度网盘链接:链接:https://pan.baidu.com/s/13VNfrSJE6vcL3HP1J5T8ew 提取码:00s0,项目业务逻辑自行阅读 2.项目搭建......
  • SQL 十分钟入门教程
    SQL快速入门教程[文/张赐荣]SQL(StructuredQueryLanguage)是一种用于访问和处理数据库的标准化语言。它由ANSI(美国国家标准协会)制定,并被广泛使用于各种不同的数......