前言
说实话,java netty方面的资料不算多,尤其是自定义报文格式的,少之又少
自己写了个简单的收发:报文长度+报文内容
发送的话,没有写自动组装格式,自己看需求吧,需要的话,自己完善
服务端启动
可以直接用类文件启动,也可以通过springboot。我这里写的是用springboot启动的,可以自己按照需求自己修改
代码
入口
package cn.daenx.demo.example.socket;
import cn.daenx.demo.example.service.BankServerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
/**
* 入口
*
* @author DaenMax
*/
@Slf4j
@Component
public class SocketServer {
/**
* 这里仅仅是为了注入一个处理消息的service接口
*/
@Resource
private BankServerService bankServerService;
@PostConstruct
public void start() {
SocketServerThread socketServerThread = new SocketServerThread(bankServerService);
socketServerThread.start();
}
}
服务端
package cn.daenx.demo.example.socket;
import cn.daenx.demo.example.service.BankServerService;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
/**
* 服务端
*
* @author DaenMax
*/
@Slf4j
@Component
public class SocketServerThread extends Thread {
private static BankServerService bankServerService;
public SocketServerThread(BankServerService bankServerService) {
this.bankServerService = bankServerService;
}
//监听IP
private static final String ip = "127.0.0.1";
//监听端口
private static final int port = 6666;
//发送的数据帧最大长度
private static final int maxFrameLength = Integer.MAX_VALUE;
//定义长度域位于发送的字节数组中的下标
private static final int lengthFieldOffset = 0;
//用于描述定义的长度域的长度默认只支持 1, 2, 3, 4, or 8(原版)
//我这里重写了解码器,所以支持传入任意长度
private static final int lengthFieldLength = 6;
//偏移位,即:长度字节和内容中间隔了几个字节
private static final int lengthAdjustment = 0;
//表示获取完一个完整的数据包之后,忽略前面的几个字节
private static final int initialBytesToStrip = 6;
private static final EventLoopGroup bossGroup = new NioEventLoopGroup();
private static final EventLoopGroup workerGroup = new NioEventLoopGroup();
@Override
public void run() {
log.info("SocketServer启动中...");
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<Channel>() {
protected void initChannel(Channel ch) {
ChannelPipeline pipeline = ch.pipeline();
ch.pipeline().addLast(new CustomLengthFieldDecoder(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip));
pipeline.addLast(new SocketHandler(bankServerService));
}
});
ChannelFuture channelFuture = serverBootstrap.bind(ip, port).sync();
log.info("SocketServer启动完成,端口:" + port);
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}
@PreDestroy
public void preDestroy() {
log.info("springboot项目即将停止运行");
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
log.info("SocketServer已停止,端口:" + port);
}
}
事件处理器
package cn.daenx.demo.example.socket;
import cn.daenx.demo.example.service.BankServerService;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
/**
* 事件处理器
*
* @author DaenMax
*/
@Slf4j
public class SocketHandler extends ChannelInboundHandlerAdapter {
private final BankServerService bankServerService;
public SocketHandler(BankServerService bankServerService) {
this.bankServerService = bankServerService;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
}
/**
* 接收到的报文
* 注意,因为用了LengthFieldBasedFrameDecoder,所以这里不会分包,而是会一次性接收完成后才会调用此方法
*
* @param ctx
* @param msg
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf message = (ByteBuf) msg;
byte[] response = new byte[message.readableBytes()];
message.readBytes(response);
String reqStr = new String(response);
log.info("接收socket->>>" + reqStr);
String resStr = bankServerService.handleReqXml(reqStr);
ByteBuf sendMsg = Unpooled.buffer(resStr.length());
sendMsg.writeBytes(resStr.getBytes());
ctx.writeAndFlush(sendMsg);
log.info("响应socket->>>" + resStr);
}
/**
* 发生异常时
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
String resStr = "出现错误,请联系运维人员";
ByteBuf sendMsg = Unpooled.buffer(resStr.length());
sendMsg.writeBytes(resStr.getBytes());
ctx.writeAndFlush(sendMsg);
log.info("响应socket->>>" + resStr);
ctx.close();
}
}
自定义定长解码器
package cn.daenx.demo.example.socket;
import cn.hutool.core.util.CharsetUtil;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.TooLongFrameException;
import java.nio.ByteOrder;
/**
* LengthFieldBasedFrameDecoder长度域的长度默认只支持 1, 2, 3, 4, or 8
* 但是我的需求是6位长度域,所以继承重写getUnadjustedFrameLength方法
*
* @author DaenMax
*/
public class CustomLengthFieldDecoder extends LengthFieldBasedFrameDecoder {
/**
* Creates a new instance.
*
* @param maxFrameLength the maximum length of the frame. If the length of the frame is
* greater than this value, {@link TooLongFrameException} will be
* thrown.
* @param lengthFieldOffset the offset of the length field
* @param lengthFieldLength the length of the length field
* @param lengthAdjustment the compensation value to add to the value of the length field
* @param initialBytesToStrip
*/
public CustomLengthFieldDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
}
/**
* 重写getUnadjustedFrameLength方法,可以自定义长度区域字节数
*
* @param buf
* @param offset
* @param length
* @param order
* @return
*/
@Override
protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {
buf = buf.order(order);
CharSequence frameLength = buf.getCharSequence(0, length, CharsetUtil.CHARSET_UTF_8);
Long lengthLength = Long.valueOf((String) frameLength);
return lengthLength.longValue();
}
}
service接口
package cn.daenx.demo.example.service.impl;
import cn.daenx.demo.example.service.BankServerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* 处理Socket请求
*
* @author DaenMax
*/
@Slf4j
@Service
public class BankServerServiceImpl implements BankServerService {
/**
* 处理Socket请求
*
* @param str
* @return
*/
@Override
public String handleReqXml(String str) {
return "我已成功收到你的请求";
}
}
发送接收方法testSocketSend
package cn.daenx.demo.example.socket;
import java.io.*;
import java.net.Socket;
/**
* 测试发送和接收
*
* @author DaenMax
*/
public class testSocketSend {
//测试发送和接收
public static void main(String[] args) throws Exception {
System.out.println(send("127.0.0.1", "6666", "0051203f478ff4846dbdc1cbd32b44cee63658c9fc7535104bac402b8b3b997def5a24ab21e99ce1852de2a6d3877bc83c13076350433509d8b7d2b4447af019a415e81ff82b94d97f54a1b3b0698569cde81faa0ca5845aef1df3433637d6d4f374faac1953a1bc9ee147d1ac2e72e8b03758a76deea7464360e906924c28cbed37a6c41bc3c641c29542f5cc99846bc34963e51011b3c76b49d5e5a3b714215f55cc9bfbb6e0cf080a5cca57931a5d211b5723e99e0c74be56de2b5950d2a90909f1346f90281088df55d025cf5f3ec8280c7fc4feb36eef29029627d2a2bfa7cdb80ce85e34281629204a69876c43de88cd05baba70bc7ba50e9fa89838ca30855d87647847f203aa0daec68b7b434ffe4d8275355b535205424fde56bf185d6acae2e140fbc00ec5b26084a8b2d9def1459416170143a7a597466fdd92c8ee735c5d1db837b5ed96b462130de4cea2d9e95c84171acaedaef1aa2d25ce7f4078d80b115e9b0cb026672b751e3843b7f5fba23d60ee1d576f2354468655bbe1c4eb787b0aaf754a16dfbc6763ea1625b07e2b9c7a26a379857936db479494de17e063a2378ca1d4ae1dba1779cc792adb423068a0df1e7eb522186f17ae489860aeb6ec569d73c9303c7a5e2f8716a32140ccea074999c43e86eef31dd9754142dfa03bb3b7c2381fa229f8034f3ea542b321a502b340df33fd4af7ac7c81eba1173f478ff4846dbdc1cbd32b44cee63658c9fc7535104bac402b8b3b997def5a24ab21e99ce1852de2a6d3877bc83c13076350433509d8b7d2b4447af019a415e81ff82b94d97f54a1b3b0698569cde81faa0ca5845aef1df3433637d6d4f374faac1953a1bc9ee147d1ac2e72e8b03758a76deea7464360e906924c28cbed37a6c41bc3c641c29542f5cc99846bc34963e51011b3c76b49d5e5a3b714215f55cc9bfbb6e0cf080a5cca57931a5d211b5723e99e0c74be56de2b5950d2a90909f1346f90281088df55d025cf5f3ec8280c7fc4feb36eef29029627d2a2bfa7cdb80ce85e34281629204a69876c43de88cd05baba70bc7ba50e9fa89838ca30855d87647847f203aa0daec68b7b434ffe4d8275355b535205424fde56bf185d6acae2e140fbc00ec5b26084a8b2d9def1459416170143a7a597466fdd92c8ee735c5d1db837b5ed96b462130de4cea2d9e95c84171acaedaef1aa2d25ce7f4078d80b115e9b0cb026672b751e3843b7f5fba23d60ee1d576f2354468655bbe1c4eb787b0aaf754a16dfbc6763ea1625b07e2b9c7a26a379857936db479494de17e063a2378ca1d4ae1dba1779cc792adb423068a0df1e7eb522186f17ae489860aeb6ec569d73c9303c7a5e2f8716a32140ccea074999c43e86eef31dd9754142dfa03bb3b7c2381fa229f8034f3ea542b321a502b340df33fd4af7ac7c81eba1173f478ff4846dbdc1cbd32b44cee63658c9fc7535104bac402b8b3b997def5a24ab21e99ce1852de2a6d3877bc83c13076350433509d8b7d2b4447af019a415e81ff82b94d97f54a1b3b0698569cde81faa0ca5845aef1df3433637d6d4f374faac1953a1bc9ee147d1ac2e72e8b03758a76deea7464360e906924c28cbed37a6c41bc3c641c29542f5cc99846bc34963e51011b3c76b49d5e5a3b714215f55cc9bfbb6e0cf080a5cca57931a5d211b5723e99e0c74be56de2b5950d2a90909f1346f90281088df55d025cf5f3ec8280c7fc4feb36eef29029627d2a2bfa7cdb80ce85e34281629204a69876c43de88cd05baba70bc7ba50e9fa89838ca30855d87647847f203aa0daec68b7b434ffe4d8275355b535205424fde56bf185d6acae2e140fbc00ec5b26084a8b2d9def1459416170143a7a597466fdd92c8ee735c5d1db837b5ed96b462130de4cea2d9e95c84171acaedaef1aa2d25ce7f4078d80b115e9b0cb026672b751e3843b7f5fba23d60ee1d576f2354468655bbe1c4eb787b0aaf754a16dfbc6763ea1625b07e2b9c7a26a379857936db479494de17e063a2378ca1d4ae1dba1779cc792adb423068a0df1e7eb522186f17ae489860aeb6ec569d73c9303c7a5e2f8716a32140ccea074999c43e86eef31dd9754142dfa03bb3b7c2381fa229f8034f3ea542b321a502b340df33fd4af7ac7c81eba1173f478ff4846dbdc1cbd32b44cee63658c9fc7535104bac402b8b3b997def5a24ab21e99ce1852de2a6d3877bc83c13076350433509d8b7d2b4447af019a415e81ff82b94d97f54a1b3b0698569cde81faa0ca5845aef1df3433637d6d4f374faac1953a1bc9ee147d1ac2e72e8b03758a76deea7464360e906924c28cbed37a6c41bc3c641c29542f5cc99846bc34963e51011b3c76b49d5e5a3b714215f55cc9bfbb6e0cf080a5cca57931a5d211b5723e99e0c74be56de2b5950d2a90909f1346f90281088df55d025cf5f3ec8280c7fc4feb36eef29029627d2a2bfa7cdb80ce85e34281629204a69876c43de88cd05baba70bc7ba50e9fa89838ca30855d87647847f203aa0daec68b7b434ffe4d8275355b535205424fde56bf185d6acae2e140fbc00ec5b26084a8b2d9def1459416170143a7a597466fdd92c8ee735c5d1db837b5ed96b462130de4cea2d9e95c84171acaedaef1aa2d25ce7f4078d80b115e9b0cb026672b751e3843b7f5fba23d60ee1d576f2354468655bbe1c4eb787b0aaf754a16dfbc6763ea1625b07e2b9c7a26a379857936db479494de17e063a2378ca1d4ae1dba1779cc792adb423068a0df1e7eb522186f17ae489860aeb6ec569d73c9303c7a5e2f8716a32140ccea074999c43e86eef31dd9754142dfa03bb3b7c2381fa229f8034f3ea542b321a502b340df33fd4af7ac7c81eba1173f478ff4846dbdc1cbd32b44cee63658c9fc7535104bac402b8b3b997def5a24ab21e99ce1852de2a6d3877bc83c13076350433509d8b7d2b4447af019a415e81ff82b94d97f54a1b3b0698569cde81faa0ca5845aef1df3433637d6d4f374faac1953a1bc9ee147d1ac2e72e8b03758a76deea7464360e906924c28cbed37a6c41bc3c641c29542f5cc99846bc34963e51011b3c76b49d5e5a3b714215f55cc9bfbb6e0cf080a5cca57931a5d211b5723e99e0c74be56de2b5950d2a90909f1346f90281088df55d025cf5f3ec8280c7fc4feb36eef29029627d2a2bfa7cdb80ce85e34281629204a69876c43de88cd05baba70bc7ba50e9fa89838ca30855d87647847f203aa0daec68b7b434ffe4d8275355b535205424fde56bf185d6acae2e140fbc00ec5b26084a8b2d9def1459416170143a7a597466fdd92c8ee735c5d1db837b5ed96b462130de4cea2d9e95c84171acaedaef1aa2d25ce7f4078d80b115e9b0cb026672b751e3843b7f5fba23d60ee1d576f2354468655bbe1c4eb787b0aaf754a16dfbc6763ea1625b07e2b9c7a26a379857936db479494de17e063a2378ca1d4ae1dba1779cc792adb423068a0df1e7eb522186f17ae489860aeb6ec569d73c9303c7a5e2f8716a32140ccea074999c43e86eef31dd9754142dfa03bb3b7c2381fa229f8034f3ea542b321a502b340df33fd4af7ac7c81eba666"));
}
/**
* 发送消息到指定服务上
*
* @param host
* @param port
* @param msg
* @return
*/
public static String send(String host, String port, String msg) throws Exception {
try (Socket socket = new Socket(host, Integer.parseInt(port));) {
OutputStream outputStream = socket.getOutputStream();
PrintWriter printWriter = new PrintWriter(outputStream);
printWriter.print(msg);
printWriter.flush();
socket.shutdownOutput();
System.out.println("socket发送,host=" + host + ",port=" + port + ",msg=" + msg);
//获取一个输入流,接收服务端的信息
InputStream inputStream = socket.getInputStream();
//包装成字符流,提高效率
InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
//缓冲区
BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
StringBuffer sb = new StringBuffer();
//临时变量
String temp = null;
while ((temp = bufferedReader.readLine()) != null) {
sb.append(temp).append("\n");
}
System.out.println("socket接收,host=" + host + ",port=" + port + ",msg=" + sb.toString());
//关闭相对应的资源
bufferedReader.close();
inputStream.close();
printWriter.close();
outputStream.close();
socket.close();
String ret = sb.toString();
return ret;
} catch (IOException e) {
throw new Exception("连接服务失败");
}
}
}
测试
先启动springboot
然后直接运行testSocketSend即可看到效果