首页 > 其他分享 >Netty 学习笔记

Netty 学习笔记

时间:2024-08-28 17:15:14浏览次数:14  
标签:Netty Reactor 笔记 学习 线程 IO new 客户端

Java 网络编程

早期的 Java API 只支持由本地系统套接字库提供的所谓的阻塞函数,下面的代码展示了一个使用传统 Java API 的服务器代码的普通示例

// 创建一个 ServerSocket 用以监听指定端口上的连接请求
ServerSocket serverSocket = new ServerSocket(5000);
// 对 accept 方法的调用将被阻塞,直到一个连接建立
Socket clientSocket = serverSocket.accept();
// 这些流对象都派生于该套接字的流对象
BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
String request, response;
// 客户端发送了 "Done" 则退出循环
while ((request = in.readLine()) != null) {
    if ("Done".equals(request)) {
        break;
    }
    // 请求被传递给服务器的处理方法
    response = processRequest(request);
    // 服务器的响应被发送给客户端
    out.println(response);
}

这段代码只能同时处理一个连接,要管理多个客户端,就要为每个新的客户端 Socket 创建一个新的 Thread,让我们来考虑一下这种方案的影响:

  • 在任何时候都会有大量线程处于休眠状态,造成资源浪费
  • 需要为每个线程的调用栈都分配内存
  • 线程的上下文切换会带来开销

这种并发方案对于中小数量的客户端还算理想,但不能很好地支持更大的并发


Java NIO

NIO(Non-blocking I/O,也称 New I/O),是一种同步非阻塞的 I/O 模型,也是 I/O 多路复用的基础。传统的 IO 流是阻塞的,这意味着,当一个线程调用读或写操作时,线程将被阻塞,直至数据被完全读取或写入。NIO 的非阻塞模式,使一个线程进行读或写操作时,如果目前无数据可用时,就不做操作,而不是保持线程阻塞,所以直至数据就绪以前,线程可以继续做其他事情

class java.nio.channels.Selector 是 Java 非阻塞 IO 实现的关键。它使用事件通知 API 以确定在一组非阻塞套接字中有哪些已经就绪并能进行 IO 相关操作。因为可以在任何时间点任意检查读操作或写操作的完成情况,所以单一线程可以处理多个并发的连接

与阻塞 IO 模型相比,这种模型提供了更好的资源管理:

  • 使用较少的线程便可以处理许多连接,减少内存管理和上下文切换所带来的开销
  • 当没有 IO 操作需要处理时,线程也可以用户其他任务

Reactor 线程模型

Reactor 是一种并发处理客户端请求与响应的事件驱动模型。服务端在接收到客户请求后采用多路复用策略,通过一个非阻塞的线程来异步接收所有的客户端请求,并将这些请求转发到相关的工作线程组进行处理。

Reactor 模型常常基于异步线程实现,常用的 Reactor 线程模型有三种:Reactor 单程模型、Reactor 多线程模型和 Reactor 主备多线程模型

1. Reactor 单线程模型

Reactor 单线程模型指所有的客户端 IO 请求都在同一个线程(Thread)上完成。Reactor 单线程模型的各模块组成及职责如图所示

  • Client:NIO 客户端,向服务端发起 TCP 连接,并发送数据
  • Acceptor:NIO 服务端,通过 Acceptor 接收客户端的 TCP 连接
  • Dispatcher:接收客户端的数据并将数据以 ByteBuffer 的形式发送到对应的编解码器
  • DecoderHandler:解码器,读取客户端的数据并进行数据解码及处理和消息应答
  • EncoderHandler:编码器,将向客户端发送的数据(消息请求或消息应答)进行统一的编码处理,并写入通道

由于 Reactor 模式使用的是异步非阻塞 IO,因此一个线程可以独立处理多个 IO 相关的操作。Reactor 单线程模型将所有 IO 操作都集中在一个线程中处理,其处理流程如下:

  1. Acceptor 接收客户端的 TCP 连接请求消息
  2. 在链路建立成功后通过 Dispatcher 将接收到的消息写入 ByteBuffer,并派发到对应的 DecoderHandler 进行消息解码和处理
  3. 在消息处理完成后调用对应的 EncoderHandler 将该请求对应的响应消息进行编码和下发

2. Reactor 多线程模型

Reactor 多线程模型与单线程模型最大的区别在于,它使用线程池(ThreadPoll)处理客户端的 IO 请求。Reactor 多线程模型如图所示

3. Reactor 主备多线程模型

在 Reactor 主备多线程模型中,服务端用于接收客户端连接的不再是一个 NIO 线程而是一个独立的 NIO 线程池。主线程 Acceptor 在接收到客户端的 TCP 连接请求并建立完成连接后(可能要经过鉴权、登录等过程),将新创建的 SocketChannel 注册到子 I/O 线程池(Sub Reactor Pool)的某个 I/O 线程上,由它负责具体的 SocketChannel 读写、编解码、业务处理工作。这样就将客户端连接的建立和息的响应都以异步线程的方式来实现,大大提高了系统的吞吐量。Reactor 主备多线程模型如图所示


Netty 概述

Netty 是一个高性能、异步事件驱动的 NIO 框架,基于 Java NIO 提供的 API 实现提供了对 TCP、UDP 和文件传输的支持。Netty的所有 IO 操作都是异步非阻塞的,通过 Future-Listener 机制,用户可以主动或者通过通知机制获取 IO 操作结果

Netty 架构设计的主要特性如下:

  1. IO 多路复用模型:Netty 通过在 NioEventLoop(事件轮询机制)内封装 Selector 来实现 IO 的多路复用
  2. 数据零拷贝:Netty 的数据接收和发送均采用直接内存进行 Socket 读写,大大提高了系统的性能
  3. 内存重用机制:直接内存的分配和回收是一种耗时的操作,为了尽量重用缓冲区,Netty 提供了基于内存池的缓冲区重用机制
  4. 无锁化机制:Netty 内部采用串行无锁化设计思想对 IO 进行操作。在具体使用过程中可以调整 NIO 线程池的线程参数,同时启动多个串行化的线程并行运行,这种局部无锁化的串行多线程设计比一个队列结合多个工作线程模型的性能更佳
  5. 高性能序列化框架:Netty 默认基于 ProtoBuf 实现数据的序列化,通过扩展 Netty 的编解码接口,用户可以实现自定义的序列化框架

Netty 核心组件

  1. Bootstrap/ServerBootstrap:Bootstrap 用于客户端服务的启动引导,ServerBootstrap 用于服务端服务的启动引导
  2. NioEventLoop:基于线程队列的方式执行事件操作,具体要执行的事件操作包括连接注册、端口绑定和 IO 数据读写等。每个 NioEventLoop 线程都负责多个 Channel 的事件处理
  3. NioEventLoopGroup:NioEventLoop 生命周期的管理
  4. Future/ChannelFuture:Future 和 ChannelFuture 用于异步通信的实现,基于异步通信方式可以在 IO 操作触发后注册一个监听事件,在 IO 操作完成后自动触发监听事件并完成后续操作
  5. Channel:Channel 是 Netty 中的网络通信组件,用于执行具体的 IO 操作。Nettty 中所有的数据通信都基于 Channel 读取或者将数据写入对应的 Channel。Channel 的主要功能包括网络连接的建立、连接状态的管理(网络连接的打开和关闭)、网络连接参数的配置(每次接收数据的大小)、基于异步 NIO 的网络数据操作(数据读取、数据写出)等
  6. Selector:Selector 用于多路复用中 Channel 的管理。在 Netty中,一个 Selector 可以管理多个 Channel,在 Channel 连接建立后将连接注册到 Selector,Selector 在内部监听每个 Channel 上 IO 事件的变化,当 Channel 有网络 IO 事件发生时通知 ChannelHandler 执行具体的 IO 操作
  7. ChannelHandlerContext:Channel 上下文信息的管理。每个 ChannelHandler 都对应一个 ChannelHandlerContext
  8. ChannelHandler:IO 事件的拦截和处理。其中,ChannelInboundHandler 用于处理数据接收的 IO 操作,ChannelOutboundHandler 用于处理数据发送的 IO 操作
  9. ChannelPipeline:基于拦截器设计模式实现的事件拦截处理和转发。Netty 中的每 Channel 都对应一个 ChannelPipeline,在 ChannelPipeline 中维护了一个由 ChannelHandlerContext 组成的双向链表,每个 ChannelHandlerContext 都对应一个 ChannelHandler,以完成对具体 Channel 事件的拦截和处理。其中,数据入站由 Head 向 Tail 依次传递和处理,数据出站由 Tail 向 Head 依次传递和处理

Netty 原理

1. Netty Server 的初始化过程

  1. 初始化 BossGroup 和 WorkerGroup
  2. 基于 ServerBootstrap 配置 EventLoopGroup,包括连接参数设置、Channel 类型设置、编解码 Handler 设置等
  3. 绑定端口和服务启动
public static void main(String[] args) {
  // 1:创建 BossGroup 和 WorkerGroup
  NioEventLoopGroup bossGroup = new NioEventLoopGroup();
  NioEventLoopGroup workerGroup = new NioEventLoopGroup();

  final ServerBootstrap serverBootstrap = new ServerBootstrap();
  // 2:配置NioEventLoopGroup
  serverBootstrap
    .group(bossGroup, workerGroup)
    .channel(NioServerSocketchannel.class) // 设置 channel 的类型为 NIO
    .option(ChannelOption.SO_BACKLOG, 1024)  // 设置 BACKLOG 的大小为 1024
    .childOption(ChannelOption.SO_KEEPALIVE,true)  // 启用心跳检测机制
    .childOption(ChannelOption.TCP_NODELAY,true)  // 设置数据包无延迟
    // 设置 Channel 的类型为 NioSocketChannel
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
      @Override
      protected void initChannel(NioSocketChannel ch) {
        // 配置解码器为 MessageDecoder 类
        ch.pipeline().addLast("decoder", new MessageDecoder());
        // 配置编码器为 MessageEncoder 类
        ch.pipeline().addlast("encoder", new MessageEncoder());
      }
    });
  // 3:绑定端口和服务启动
  int port = 9000;
  serverBootstrap.bind(port).addlistener(future -> {
    if(future.isSuccess()) {
      System.out.printin("server start up on port:" + port);
    } else {
      System.err.printin("server start up failed");
    }
  });
}

2. Netty 工作流程

  1. Netty 抽象出两组线程池 BossGroup 和 WorkerGroup。BossGroup 专门负责接收客户端的连接,WorkerGroup 专门负责网络的读写
  2. BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup
  3. NioEventLoopGroup 相当于一个事件循环线程组,这个组中含有多个事件循环线程,每一个事件循环线程是 NioEventLoop
  4. 每个 NioEventLoop 都有一个 selector,用于监听注册在其上的 socketChannel 的网络通讯
  5. 每个 BossNioEventLoop 线程内部循环执行的步骤:
  • 处理 accept事件,与 client 建立连接 , 生成 NioSocketChannel
  • 将 NioSocketChannel 注册到某个 worker NIOEventLoop 上的 selector
  • 处理任务队列的任务,即 runAllTasks
  1. 每个 worker NIOEventLoop 线程循环执行的步骤
  • 轮询注册到自己 selector 上的所有 NioSocketChannel 的 read/write 事件
  • 处理 I/O 事件,即 read/write 事件,在对应 NioSocketChannel 处理业务
  • runAllTasks 处理任务队列 TaskQueue 的任务 ,一些耗时的业务处理可以放入 TaskQueue 慢慢处理,这样不影响数据在 pipeline 的流动处理
  1. 每个 worker NIOEventLoop 处理 NioSocketChannel 业务时,会使用 pipeline,管道中维护了很多 handler 处理器用来处理 channel 中的数据

Netty 实战

Netty 的使用分为客户端和服务端两部分。客户端用于连接服务端上报数据,并接服务端下发的请求指令等。服务端主要用于接收客户端的数据,并根据协议的规定对客户端的消息进行响应

定义通用消息格式 BaseMessage

public class BaseMessage {
  //消息创建的时间
  private Date createTime;
  //消息接收的时间
  private Date receiveTime;
  //消息内容
  private String messageContent;
  //消息id
  private int messageId;
  //省略get、set、构造方法
}

定义消息处理工具类 MessageUtils

public class MessageUtils {
  //将 BaseMessage 消息写入 ByteBuf
  public static ByteBuf getByteBuf(BaseMessage baseMessage) throws UnsupportedEncodingException {
    byte[] req = JSON.toJSONString(baseMessage).getBytes("UTF-8");
    ByteBuf byteBuf = Unpooled.buffer();
    byteBuf.writeBytes(reg);
    return byteBuf;
  }
  //从ByteBuf中获取信息,使用UTF-8编码后解析为BaseMessage的系统消息格式
  public static BaseMessage getBaseMessage(ByteBuf buf) {
    byte[] con = new byte[buf.readableBytes()];
    buf.readBytes(con);
    try {
      String message = new String(con, "UTF8");
      BaseMessage baseMessage = JSON.parseObject(message, BaseMessage.class);
      baseMessage.setReceiveTime(new Date());
      return baseMessage;
    } catch(UnsupportedEncodingException e) {
      e.printStackTrace();
      return null;
    }
  }
}

定义 NettyServer

public class NettyServer {
  private final static Log logger = LogFactory.getLog(NettyServer.class);
  private int port;
  public NettyServer(int port) {
    this.port = port;
    bind ();
  }

  private void bind() {
    //1:创建BossGroup和WorkerGroup
    EventLoopGroup boss = new NioEventLoopGroup();
    EventLoopGroup worker = new NioEventLoopGroup();
    try {
      //2:创建ServerBootstrap
      ServerBootstrap bootstrap = new ServerBootstrap();
      bootstrap.group(boss, worker);
      //3:设置Channel和 Option
      bootstrap.channel(NioServerSocketChannel.class);
      bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
      bootstrap.option(ChannelOption.TCP_NODELAY, true);
      bootstrap.childoption(ChannelOption.SO_KEEPALIVE, true);
      bootstrap.childHandier(new channelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
          ChannelPipeline p = socketChannel.pipeline();
          //定义MessageDecoder,用于解码Server接收的消息并处理
          p.addLast("decoder", new MessageDecoder());
        }
      });
      //4:设置绑定端口号并启动
      ChannelFuture channelFuture = bootstrap.bind(port).sync();
      if (channelFuture.isSuccess()) {
        logger.info("NettyServer start success, port: " + this.port);
      }
      //5:设置异步关闭连接
      channelFuture.channel().closeFuture().sync();
    } catch(Exception e) {
      logger.error("NettyServer start fail, exception:" + e.getMessage());
      e.printStackTrace():
    } finally {
      //6:优雅退出函数设置
      boss.shutdownGracefully();
      worker.shutdownGracefully();
    }
  }

  public static void main(String[] args) throws InterruptedException {
    new NettyServer(9000);
  }
}

定义 MessageDecoder 解码器

public class MessageDecoder extends ChannelHandlerAdapter {
  private final static Log logger = LogFactory.getLog(MessageDecoder.class);

  // 覆写channelRead方法并接收客户端发送的消息
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) {
    //1:接收到客户端发送的消息并解码
    ByteBuf buf = (ByteBuf) msg;
    BaseMessage message = MessageUtils.getBaseMessage(buf);
    try {
      //2:定义回复消息体
      BaseMessage responseMessage = new BaseMessage(message.getMessageID() + 1, "response from server", new Date());
      logger.info("send response message for client:" + JSON.toJSONString(responseMessage));
      //3:消息编码
      ByteBuf byteBuf = MessageUtils.getByteBuf(responseMessage);
      //4:消息发送,将消息通过ChannelHandlerContext写入Channel
      ctx.writeAndFlush(byteBuf);
    } catch(UnsupportedEncodingException e) {
      e.printStackTrace();
    }
  }

  @Override//连接断开触发事件
  public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    logger.error("channel removed");
    super.handlerRemoved(ctx);
  }

  @Override//连接异常触发事件
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    logger.error("channel exception");
    super.exceptionCaught(ctx, cause);
  }

  @Override//连接注册触发事件
  public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    logger.error("channel registered");
    super.channelRegistered(ctx);
  }
}

定义 NettyClient

public class NettyClient {
  private final static Log logger = LogFactory.getlog(NettyClient.class);
  //服务端的端口号
  private int port = 9000;//服务端的 IP地址
  private String host = "localhost";
  public NettyClient(String host, int port) throws InterruptedException {
    this.port = port;
    this.host = host;
    start();
  }
  private void start() throws InterruptedException {
    EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
    try {
      Bootstrap bootstrap = new Bootstrap();
      bootstrap.channel(NioSocketChannel.class);
      bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
      bootstrap.group(eventLoopGroup);
      bootstrap.removeAddress(host, port);
      bootstrap.handler(new ChannelInitializer<SocketChannel>(){
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
          socketChannel.pipeline().addLast(new NettyClientHandler());
        }
      }); 
    }
  }
}

定义 NettyClientHandler 消息处理器

public class NettyClientHandler extends ChannelHandlerAdapter {

  private final static Log logger = LogFactory.getLog(NettyClientHandler.class);

  @Override//连接创建后,Netty会自动调用channelActive方法
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    //创建一条消息,发送给服务端
    BaseMessage message = new BaseMessage(0, "message from client", new Date());
    ByteBuf byteBuf = MessageUtils.getByteBuf(message);
    ctx.writeAndFlush(byteBuf);
    logger.info("send a message for server:" + JSON.toJSONString(message));
  }

  @Override//读取服务端的消息
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf buf = (ByteBuf) msg;
    BaseMessage message = MessageUtils.getBaseMessage(buf);
    logger.info("received message form server:" + JSON,toJSONString(message));
  }
}

标签:Netty,Reactor,笔记,学习,线程,IO,new,客户端
From: https://www.cnblogs.com/Yee-Q/p/18370307

相关文章

  • Java基础-学习笔记15
    15泛型1.泛型泛型的好处编译时,检查添加元素的类型,提高了安全性减少了类型转换的次数,提高效率比如:ArrayListarr=newArrayList();在放入时,如果添加Dog类到arr里,编译器发现添加的类型不满足要求,就会报错;在取出时,直接取出Person类,就不用再转型使用。泛型的......
  • MybatisPlus学习笔记
    MyBatisPlus从入门到精通1.概述MybatisPlus是一款Mybatis增强工具,用于简化开发,提高效率。它在MyBatis的基础上只做增强不做改变,为简化开发、提高效率而生。官网:https://baomidou.com/2.快速入门2.0准备工作①准备数据CREATETABLE`user`(`id`bigint(20)NOTNULL......
  • 深度学习-pytorch-basic-003
    1.环境配置1.1anconda配置环境condacreate-nDL_pytorchpython=3.11condaacticvateDL_pytorchcondadeactivatecondaenvlistcondaremove-nDL_pytorch--all1.2torchCPU环境配置pipinstalltorch==1.10.0-ihttps://pypi.tuna.tsinghua.edu.cn/simplecond......
  • 系统架构师考试学习笔记第二篇——架构设计专业知识(6)系统工程基础知识
    本章节考点分析:        第6课时主要学习系统工程和系统性能等内容。根据考试大纲,本课时知识点会涉及单项选择题,约占2~5分。本课时内容侧重于概念知识也会有计算题。根据以往全国计算机技术与软件专业技术资格(水平)考试的出题规律,考查的知识点多来源于教材,扩展内容较......
  • Spring超硬核笔记———全是干货
    为什么用spring?Spring的核心功能IOC(控制反转,依赖注入),AOP(面向切面的编程)IOC:我们在使用过程中不用关注于对象是怎么创建的,只用应用过去,sping自动帮我们完成注入,对象的创建,spring默认创建对象是单例,这样减少了频繁创建对象,让对象重复利用,所有的对象都是放在BeanFactory工厂......
  • Effective Java理解笔记系列-第1条-何时考虑用静态工厂方法替代构造器?
    为什么写这系列博客?在阅读《EffectiveJava》这本书时,我发现有许多地方需要仔细认真地慢慢阅读并且在必要时查阅相关资料才能彻底搞懂,相信有些读者在阅读此书时也有类似感受;同时,在解决疑惑的过程中,还存在着有些内容不容易查找、查找到的解答质量不高等问题,于是我决定把我阅读此书......
  • 《机器学习》—— K-means 聚类算法
    文章目录一、什么是K-means聚类算法?二、聚类效果评价方式——轮廓系数三、示例:代码实现四、聚类算法的优缺点一、什么是K-means聚类算法?K-Means是Python中非常流行的一个聚类算法,它属于无监督学习算法的一种。在scikit-learn(一个广泛使用的机器学习库)中,KMeans......
  • Unity 3D学习资料集合
    本文包含了unity3D游戏开发相关的学习资料,包含了入门、进阶、性能优化、面试和书籍等学习资料,含金量非常高,在这里分享给大家,欢迎收藏。学习社区1.Unity3D开发者Unity3D开发者论坛是一个专注于Unity引擎的开发者社区。在这个论坛上,开发者们可以分享自己的项目经验、技术问......
  • 亦菲喊你来学机器学习(14) --贝叶斯算法
    文章目录贝叶斯一、贝叶斯定理二、贝叶斯算法的核心概念三、贝叶斯算法的优点与局限优点:局限:四、构建模型训练模型测试模型总结贝叶斯贝叶斯算法(Bayesianalgorithm)是一种基于贝叶斯定理的机器学习方法,主要用于估计模型参数和进行概率推断。以下是对贝叶斯算法的......
  • 基于深度学习的游客满意度分析与评论分析【情感分析、主题分析】
    需要本项目的可以私信博主目录1绪论1.1选题背景及研究意义1.1.1选题背景1.1.2研究意义1.2研究内容与方法1.2.1研究内容1.2.2研究方法1.3创新与不足1.3.1创新点1.3.2研究局限性2文献综述2.1相关概念界定2.1.1大数据分析2.1.2游客满意度2.2国内外......