SpringBoot Netty socket使用
Netty是由JBOSS提供的一个java开源框架,现为Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
这里springBoot整合起来使用测试,性能怎么的不怎么了解,至少能用
maven引用依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.42.Final</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
配置
这里测试看看启动两个socket server,所以两个端口
nettysocket:
port: 32768
port2: 32769
Netty 服务
MySoketListener
package com.ld.test.socket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
@Component
//@WebListener
public class MySoketListener implements ServletContextListener {
private static final Logger log = LoggerFactory.getLogger(MySoketListener.class);
@Value("${nettysocket.port}")
private Integer port;
@Value("${nettysocket.port2}")
private Integer port2;
private MyTestNettyServer myTestNettyServer;
@Override
public void contextInitialized(ServletContextEvent sce) {
log.info("1.启动时,开启监听========================");
if (myTestNettyServer == null) {
log.info("2.启动时,MyTestNettyServer为null,启动Netty socket服务========================");
log.info("=====MyTestNettyServer 端口为:" + port);
Thread thread = new Thread(new MyTestNettyServer(port));
thread.start();
//启动了别一个服务后 也是能共用ServerHandler.ChannelGroup进行统一对多个端进行转发消息
//这里注意:不能使用在MyTestNettyServer 使用static 静态端口号,会被覆盖的
//可以再启动一个服务
log.info("=====MyTestNettyServer 端口为:" + port2);
Thread thread2 = new Thread(new MyTestNettyServer(port2));
thread2.start();
}
}
// 应用关闭时,此方法被调用
@Override
public void contextDestroyed(ServletContextEvent sce) {
log.info("23========================");
}
}
MyTestNettyServer
package com.ld.test.socket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyTestNettyServer implements Runnable {
/**
* 异常输出
*/
private static final Logger log = LoggerFactory.getLogger(MyTestNettyServer.class);
private static Integer DEFAULT_PORT = 58765;
private Integer port = DEFAULT_PORT;
private String serverName = "";
//默认
public MyTestNettyServer() {
port = DEFAULT_PORT;
serverName = "srv(" + port + ")";
}
public MyTestNettyServer(Integer port) {
this.port = port;
serverName = "srv(" + this.port + ")";
}
/**
* soket监听
*/
// public static void soketListener() {
public void soketListener() {
log.info(serverName + "当前SOCKET NettyServer 端口为: port=" + port);
Integer portNow = port;
EventLoopGroup bossExecutors = new NioEventLoopGroup();
EventLoopGroup workExecutors = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossExecutors, workExecutors)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 65535, 65535))
.childHandler(new ServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(portNow).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
log.error("NettyServer 端口为:" + portNow + " 启动出现异常, 异常内容为:" + e.getMessage() + "========================");
} finally {
log.error("NettyServer 服务关闭========================");
bossExecutors.shutdownGracefully();
workExecutors.shutdownGracefully();
}
}
/**
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
log.info("多线程启动Netty sserver========================");
// CallNettyServer.soketListener();
this.soketListener();
}
}
ServerHandler
package com.ld.test.socket;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class ServerHandler extends SimpleChannelInboundHandler<String> {
/**
* 日志输出
*/
private static final Logger log = LoggerFactory.getLogger(MyTestNettyServer.class);
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
// private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
// private ConcurrentMap<String,T> socketConnectionMap = new ConcurrentHashMap<>();
//存放 key=channel 及value=DF
private static ConcurrentMap<String, String> channelMap = new ConcurrentHashMap<>();
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
Channel channel = ctx.channel();
System.out.println("====");
System.out.println("channelMap客户端:" + JSON.toJSONString(channelMap));
System.out.println("====");
System.out.println(msg);
System.out.println("====");
}
//给http 控制器调用的 转发消息的方法,通过 channelGroup缓存的Channel进行转发
public static void sendMessage(String msg) throws Exception {
JSONObject retJson = new JSONObject();
retJson.put("code", "9999");
retJson.put("message", "-");
try {
System.out.println("===================");
System.out.println("socet sendMessage收到报文");
System.out.println(msg);
System.out.println("===================");
} catch (Exception e) {
System.out.println("转换信息出错,收到内容:" + msg);
e.printStackTrace();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + "channelActive");
//统一给所有客户端发送一个上线通知
// channelGroup.writeAndFlush("[服务器]-" + channel.remoteAddress() + "已经上线channelActive"+"\n");
channelGroup.add(channel);
System.out.println("当前socket连接数 channelGroup.size():" + channelGroup.size());
//
// channelMap.put(channel.id().asLongText(),"13001");
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelRegistered");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + "-----channelInactive");
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelUnregistered");
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + "handlerAdded");
// channelGroup.writeAndFlush("[服务器]-" + channel.remoteAddress() + "已经上线"+"\n");
channelGroup.add(channel);
//channelMap.put(channel.id().asLongText(),"1001");
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println(channel.remoteAddress() + "handlerRemoved");
channelMap.remove(channel.id().asLongText());
// channelGroup.writeAndFlush("[服务器]-" + channel.remoteAddress() + "已经下线"+"\n");
System.out.println("当前socket连接数channelGroup.size():" + channelGroup.size());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
ServerInitializer
package com.ld.test.socket;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new ServerHandler());
}
}
SpringBoot服务
package com.ld.test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
@SpringBootApplication
@ServletComponentScan
public class NettySocketApplication {
public static void main(String[] args) {
SpringApplication.run(NettySocketApplication.class, args);
}
}
启动应用
测试
package com.ld.test.test;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.net.Socket;
import java.util.UUID;
@Slf4j
public class SocketClient1 {
private static Logger log = LoggerFactory.getLogger(SocketClient1.class);
public static void main(String[] args) throws IOException {
// String host = "192.168.2.156";
// int port = 8068;
String host = "127.0.0.1";
int port = 32768;
//与服务端建立连接
Socket socket = new Socket(host, port);
socket.setOOBInline(true);
//建立连接后获取输出流
DataOutputStream outputStream = new DataOutputStream(socket.getOutputStream());
OutputStreamWriter outSW = new OutputStreamWriter(outputStream, "UTF-8");
BufferedWriter bw = new BufferedWriter(outSW);
DataInputStream inputStream = new DataInputStream(socket.getInputStream());
InputStreamReader inSR = new InputStreamReader(inputStream, "UTF-8");
BufferedReader br = new BufferedReader(inSR);
String uuid = UUID.randomUUID().toString();
uuid = "用户1" + "\r\n";
log.info("uuid: {}", uuid);
outputStream.write(uuid.getBytes());
while (true) {
//接收消息循环
}
}
}
标签:Netty,SpringBoot,netty,io,new,import,public,channel,socket
From: https://blog.51cto.com/u_12668715/6992717