Netty测试客户端
package com.coremain;
import com.coremain.handler.ServerListenerHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.util.CharsetUtil;
import java.util.Scanner;
/**
* @description: 测试
* @author: GuoTong
* @createTime: 2023-05-14 16:46
* @since JDK 1.8 OR 11
**/
public class NettyClientTest {
public static void main(String[] args) throws InterruptedException {
// 客户端的线程池
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
// 创建Netty客户端端的启动对象
Bootstrap bootstrap = new Bootstrap();
// 使用链式编程来配置参数
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//对workerGroup的SocketChannel设置处理器
ChannelPipeline pipeline = ch.pipeline();
// 对于通道加入解码器
pipeline.addLast("decoder", new StringDecoder());
// 对于通道加入加码器
pipeline.addLast("encoder", new StringDecoder());
// 加入事件回调处理器
pipeline.addLast(new ServerListenerHandler());
}
});
System.out.println("基于Netty的客户端接入启动完成....");
ChannelFuture cf = bootstrap.connect("127.0.0.1", 18023).sync();
// 获取连接通道
Channel channel = cf.channel();
System.out.println("+++++++" + channel.localAddress() + "=======");
// 客户端输入扫描器
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String next = scanner.next();
// 发送到服务端
channel.writeAndFlush(Unpooled.buffer().writeBytes(next.getBytes(CharsetUtil.UTF_8)));
}
// 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成,内部调用的是Object的wait()方法
cf.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
Netty的Server启动类
package com.coremain.netty;
import com.coremain.handler.NettyServerHTTPHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
/**
* @description: Netty服务启动器
* netty服务端会跟随一起启动。 同时,在springboot关闭前,会先销毁netty服务。
* @author: GuoTong
* @createTime: 2023-05-14 15:13
* @since JDK 1.8 OR 11
**/
@Component
public class NettyServerHTTPRunning {
// log4j2的AsyncLogger本身的逻辑采用了缓冲区思想,使用的是disruptor框架来实现一个环形无锁队列。
private static final Logger log = LoggerFactory.getLogger(NettyServerHTTPRunning.class);
/**
* 主线程组
*/
private NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
/**
* 工作线程组
*/
private NioEventLoopGroup workerGroup = new NioEventLoopGroup(10);
/**
* (http)主要服务端口
*/
@Value("${iot.port1:18024}")
private int iot1;
/**
* (http)备用服务端口
*/
@Value("${iot.port2:18025}")
private int iot2;
/**
* 启动 netty 服务
*/
@PostConstruct
public void startServer() throws InterruptedException {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// 三次握手中的A、B队列总和最大值(第二次握手加入A, 第三次握手从A移动到B, accept 后从B取出)
.option(ChannelOption.SO_BACKLOG, 1024)
// 解决端口占用问题, 可以共用服务器端口(即使该端口已被其他端口占用)
.option(ChannelOption.SO_REUSEADDR, true)
// 接收消息缓冲区大小
.option(ChannelOption.SO_RCVBUF, 2048)
// 发送消息缓冲区大小
.option(ChannelOption.SO_SNDBUF, 2048)
// 用于启用或关于Nagle算法。如果要求高实时性,有数据发送时就马上发送,就将该选项设置为true关闭Nagle算法;
// 如果要减少发送次数减少网络交互,就设置为false等累积一定大小后再发送
.option(ChannelOption.TCP_NODELAY, true)
// 用于检测长时间没有数据传输的连接状态,当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
.option(ChannelOption.SO_KEEPALIVE, true)
// 当用户调用close()方法的时候,函数返回,在可能的情况下,尽量发送数据,不一定保证全部发送成功
// 使用SO_LINGER可以阻塞close()的调用时间,直到数据完全发送
.option(ChannelOption.SO_LINGER, 2000)
.childHandler(new NettyServerHTTPHandler());
ChannelFuture service1 = serverBootstrap.bind(iot1).sync();
ChannelFuture service2 = serverBootstrap.bind(iot2).sync();
if (service1.isSuccess()) {
log.info("服务1启动成功, port: {}", iot1);
}
if (service2.isSuccess()) {
log.info("服务2启动成功, port: {}", iot2);
}
}
/**
* 销毁
*/
@PreDestroy
public void destroy() {
bossGroup.shutdownGracefully().syncUninterruptibly();
workerGroup.shutdownGracefully().syncUninterruptibly();
log.info("关闭 Netty 成功");
}
}
Netty的服务端核心类:ServerBootstrap
package com.coremain.config;
import com.coremain.handler.NettyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @description: 配置NettyServer
* @author: GuoTong
* @createTime: 2023-05-14 14:54
* @since JDK 1.8 OR 11
**/
@Configuration
@EnableConfigurationProperties
public class NettyConfig {
private final NettyProperties nettyProperties;
public NettyConfig(NettyProperties nettyProperties) {
this.nettyProperties = nettyProperties;
}
/**
* boss线程池-进行客户端连接
*
* @return
*/
@Bean
public NioEventLoopGroup boosGroup() {
return new NioEventLoopGroup(nettyProperties.getBoss());
}
/**
* worker线程池-进行业务处理
*
* @return
*/
@Bean
public NioEventLoopGroup workerGroup() {
return new NioEventLoopGroup(nettyProperties.getWorker());
}
/**
* 服务端启动器,监听客户端连接
*
* @return
*/
@Bean
public ServerBootstrap serverBootstrap() {
ServerBootstrap serverBootstrap = new ServerBootstrap()
// 指定使用的线程组
.group(boosGroup(), workerGroup())
// 指定使用的通道
.channel(NioServerSocketChannel.class)
// 三次握手中的A、B队列总和最大值(第二次握手加入A, 第三次握手从A移动到B, accept 后从B取出)
.option(ChannelOption.SO_BACKLOG, 1024)
// 指定连接超时时间
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyProperties.getTimeout())
// 支持长连接
.option(ChannelOption.SO_KEEPALIVE, true)
// 接收消息缓冲区大小
.option(ChannelOption.SO_RCVBUF, 2048)
// 发送消息缓冲区大小
.option(ChannelOption.SO_SNDBUF, 2048)
// 指定worker处理器
.childHandler(new NettyServerHandler());
return serverBootstrap;
}
}
Netty的通道处理器 ChannelInitializer
package com.coremain.handler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import org.springframework.stereotype.Component;
/**
* @description: Netty服务端回调处理
* @ChannelHandler.Sharable。这个注解的作用是标注一个Handler实例可以被多个通道安全地共享。
* @author: GuoTong
* @createTime: 2023-05-14 14:57
* @since JDK 1.8 OR 11
**/
@ChannelHandler.Sharable
@Component
public class NettyServerHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 数据分割符
String delimiterStr = "##@##";
ByteBuf delimiter = Unpooled.copiedBuffer(delimiterStr.getBytes());
ChannelPipeline pipeline = socketChannel.pipeline();
// 使用自定义处理拆包/沾包,并且每次查找的最大长度为1024字节
pipeline.addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
// 将上一步解码后的数据转码为Message实例
pipeline.addLast(new MessageUTF8DecodeHandler());
// 对发送客户端的数据进行编码,并添加数据分隔符
pipeline.addLast(new MessageUTF8EncodeHandler(delimiterStr));
// 对数据进行最终处理
pipeline.addLast(new ServerListenerHandler());
}
}
Netty回调处理器SimpleChannelInboundHandler
package com.coremain.handler;
import com.coremain.handler.bean.MessageEnum;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* @description: 数据处理器,针对不同类型数据分类处理 在处理不同接收数据时使用了枚举类型
* @ChannelHandler.Sharable。这个注解的作用是标注一个Handler实例可以被多个通道安全地共享。
* @author: GuoTong
* @createTime: 2023-05-14 15:07
* @since JDK 1.8 OR 11
**/
@ChannelHandler.Sharable
@Component
public class ServerListenerHandler extends SimpleChannelInboundHandler<MessageStrUTF8> {
private static final Logger log = LoggerFactory.getLogger(ServerListenerHandler.class);
/**
* 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
SocketChannel channel = (SocketChannel) ctx.channel();
String currentData = LocalDateTime.now().format(CommonUtilHandler.dateTimeFormatter);
//通知客户端链接建立成功
String str = "通知客户端链接建立成功" + " " + currentData + " " + channel.localAddress().getHostString() + "\r\n";
ctx.writeAndFlush(str);
}
/**
* 设备接入连接时处理
*
* @param ctx
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
log.info("有新的连接:[{}]", ctx.channel().id().asLongText());
}
/**
* 数据处理
*
* @param ctx
* @param msg
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageStrUTF8 msg) {
// 获取消息实例中的消息体
String content = msg.getContent();
// 对不同消息类型进行处理
MessageEnum type = MessageEnum.getStructureEnum(msg);
String currentData = LocalDateTime.now().format(CommonUtilHandler.dateTimeFormatter);
switch (type) {
case CONNECT:
// TODO 心跳消息处理
case STATE:
// TODO 设备状态
default:
log.info(currentData + type.content + " 消息内容" + content);
}
}
/**
* 设备下线处理
*
* @param ctx
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
log.info("设备下线了:{}", ctx.channel().id().asLongText());
}
/**
* 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("客户端断开链接{}", ctx.channel().localAddress().toString());
}
/**
* 设备连接异常处理
*
* @param ctx
* @param cause
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
// 打印异常
log.info("异常:{}", cause.getMessage());
// 关闭连接
ctx.close();
}
}
maven依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--整合web模块-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-undertow</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<!--导入log4j2日志依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<!--og4j2的AsyncLogger本身的逻辑采用了缓冲区思想,使用的是disruptor框架来实现一个环形无锁队列。-->
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>${disruptor-version}</version>
</dependency>
<!--Netty-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.68.Final</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>${fastJson-version}</version>
</dependency>
</dependencies>
项目结构
日志配置文件 log4j2.xml
<?xml version="1.0" encoding="UTF-8"?>
<!--status:Log4j2内部日志的输出级别,设置为TRACE对学习Log4j2非常有用 -->
<!--monitorInterval:定时检测配置文件的修改,有变化则自动重新加载配置,时间单位为秒,最小间隔为5s -->
<Configuration status="debug" name="MyApp" packages="" monitorInterval="600">
<!--properties:设置全局变量 -->
<properties>
<!--LOG_HOME:指定当前日志存放的目录 -->
<property name="LOG_HOME">./NettyStudy/log4j2</property>
<!--FILE_NAME:指定日志文件的名称 -->
<property name="FILE_NAME">application</property>
<property name="log_pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level [%L] - %msg%n</property>
<property name="max_single_file_size">20MB</property>
</properties>
<!--Appenders:定义日志输出目的地,内容和格式等 -->
<Appenders>
<!--Console:日志输出到控制台标准输出 -->
<Console name="Console" target="SYSTEM_OUT">
<!--pattern:日期,线程名,日志级别,日志名称,日志信息,换行 -->
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
<!--RollingFile:日志输出到文件,下面的文件都使用相对路径 -->
<!--fileName:当前日志输出的文件名称 -->
<!--filePattern:备份日志文件名称,备份目录为logs下面以年月命名的目录,备份时使用gz格式压缩 -->
<RollingFile name="RollingFile" fileName="${LOG_HOME}/${FILE_NAME}.log"
filePattern="${LOG_HOME}/$${date:yyyy-MM}/${FILE_NAME}-%d{yyyy-MM-dd}-%i.log.gz">
<!--pattern:日期,线程名,日志级别,日志名称,日志信息,换行 -->
<PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5level [%L] - %msg%n"/>
<Policies>
<!--SizeBasedTriggeringPolicy:日志文件按照大小备份 -->
<!--size:指定日志文件最大为100MB,单位可以为KB、MB或GB -->
<SizeBasedTriggeringPolicy size="20MB"/>
</Policies>
</RollingFile>
<RollingFile name="InfoLogRollingFile" fileName="${LOG_HOME}/my_app_info.log"
filePattern="${LOG_HOME}/$${date:yyyy_MM_dd}/my_app_info_%d{yyyy_MM_dd_HH}_%i.log.gz">
<ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
<PatternLayout pattern="${log_pattern}"/>
<Policies>
<TimeBasedTriggeringPolicy/>
<SizeBasedTriggeringPolicy size="${max_single_file_size}"/>
</Policies>
<DefaultRolloverStrategy fileIndex="nomax">
<Delete basePath="${LOG_HOME}" maxDepth="2">
<IfFileName glob="*/my_app_info_*.log.gz">
<!-- 这里表示匹配“*/my_app_info_*.log.gz”模式的日志文件的删除策略如下:
- 只要日志文件总数量超过5个就删除按时间顺序最早的日志文件
- 只要日志文件总大小超过10MB就会删除按时间顺序最早的日志文件
- 只要日志文件最近修改时间为9分钟前或更早就会删除按时间顺序最早的日志文件 -->
<IfAny>
<IfAccumulatedFileSize exceeds="8MB"/>
<IfAccumulatedFileCount exceeds="5"/>
<IfLastModified age="9m"/>
</IfAny>
</IfFileName>
</Delete>
</DefaultRolloverStrategy>
</RollingFile>
<RollingFile name="WarnLogRollingFile" fileName="${LOG_HOME}/my_app_warn.log"
filePattern="${LOG_HOME}/$${date:yyyy_MM_dd}/my_app_warn_%d{yyyy_MM_dd_HH}_%i.log.gz">
<ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY"/>
<PatternLayout pattern="${log_pattern}"/>
<Policies>
<TimeBasedTriggeringPolicy/>
<SizeBasedTriggeringPolicy size="${max_single_file_size}"/>
</Policies>
<DefaultRolloverStrategy fileIndex="nomax">
<Delete basePath="${LOG_HOME}" maxDepth="2">
<IfFileName glob="*/my_app_warn_*.log.gz">
<IfAny>
<IfAccumulatedFileSize exceeds="3GB"/>
<IfAccumulatedFileCount exceeds="3000"/>
<IfLastModified age="30d"/>
</IfAny>
</IfFileName>
</Delete>
</DefaultRolloverStrategy>
</RollingFile>
<RollingFile name="ErrorLogRollingFile" fileName="${LOG_HOME}/my_app_error.log"
filePattern="${LOG_HOME}/$${date:yyyy_MM_dd}/my_app_error_%d{yyyy_MM_dd_HH}_%i.log.gz">
<ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY"/>
<PatternLayout pattern="${log_pattern}"/>
<Policies>
<TimeBasedTriggeringPolicy/>
<SizeBasedTriggeringPolicy size="${max_single_file_size}"/>
</Policies>
<DefaultRolloverStrategy fileIndex="nomax">
<Delete basePath="${LOG_HOME}" maxDepth="2">
<IfFileName glob="*/my_app_error_*.log.gz">
<IfAny>
<IfAccumulatedFileSize exceeds="3GB"/>
<IfAccumulatedFileCount exceeds="3000"/>
<IfLastModified age="30d"/>
</IfAny>
</IfFileName>
</Delete>
</DefaultRolloverStrategy>
</RollingFile>
</Appenders>
<Loggers>
<AsyncLogger name="com.meituan.Main" level="trace" additivity="false">
<appender-ref ref="RollingFile"/>
</AsyncLogger>
<AsyncLogger name="RollingFile2" level="trace" additivity="false">
<appender-ref ref="RollingFile2"/>
</AsyncLogger>
<Root level="info">
<AppenderRef ref="Console"/>
<AppenderRef ref="InfoLogRollingFile"/>
</Root>
</Loggers>
</Configuration>
配置文件 application.yml
# netty 配置
netty:
# boss线程数量
boss: 4
# worker线程数量
worker: 2
# 连接超时时间
timeout: 6000
# 服务器主端口
port: 18023
# 服务器备用端口
portSalve: 18026
# 服务器地址
host: 127.0.0.1
spring:
application:
name: netty-server
mvc:
servlet:
load-on-startup: 1 #项目启动时执行初始化即可解决。
server:
port: 15026
undertow:
accesslog:
enabled: false
direct-buffers: true # 是否分配的直接内存(NIO直接分配的堆外内存)
buffer-size: 1024 #每块buffer的空间大小,越小的空间被利用越充分
threads:
worker: 20 # 阻塞任务线程池, 它的值设置取决于系统线程执行任务的阻塞系数,默认值是IO线程数*8
io: 4 # CPU有几核,就填写几。
servlet:
context-path: /undertow