来源:https://www.jb51.net/article/168212.htm
最近有朋友向我询问一些Netty与SpringBoot整合的相关问题,这里,我就总结了一下基本整合流程,也就是说,这篇文章 ,默认大家是对netty与Spring,SpringMVC的整合是没有什么问题的。现在,就进入正题吧。
Server端:
总的来说,服务端还是比较简单的,自己一共写了三个核心类。分别是
- NettyServerListener:服务启动监听器
- ServerChannelHandlerAdapter:通道适配器,主要用于多线程共享
- RequestDispatcher:请求分排器
下面开始集成过程:
在pom.xml中添加以下依赖
1 2 3 4 5 6 7 8 9 10 |
< dependency >
< groupId >io.netty</ groupId >
< artifactId >netty-all</ artifactId >
< version >5.0.0.Alpha2</ version >
</ dependency >
< dependency >
< groupId >org.springframework.boot</ groupId >
< artifactId >spring-boot-configuration-processor</ artifactId >
< optional >true</ optional >
</ dependency >
|
让SpringBoot的启动类实现CommandLineRunner接口并重写run方法,比如我的启动类是CloudApplication.java
1 2 3 4 5 6 7 8 9 10 11 |
@SpringBootApplication
public class CloudApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(CloudApplication. class , args);
}
@Override
public void run(String... strings) {
}
}
|
创建类NettyServerListener.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
// 读取yml的一个配置类
import com.edu.hart.modules.constant.NettyConfig;
// Netty连接信息配置类
import com.edu.hart.modules.constant.NettyConstant;
//
import com.edu.hart.rpc.util.ObjectCodec;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
/**
* 服务启动监听器
*
* @author 叶云轩
*/
@Component
public class NettyServerListener {
/**
* NettyServerListener 日志输出器
*
* @author 叶云轩 create by 2017/10/31 18:05
*/
private static final Logger LOGGER = LoggerFactory.getLogger(NettyServerListener. class );
/**
* 创建bootstrap
*/
ServerBootstrap serverBootstrap = new ServerBootstrap();
/**
* BOSS
*/
EventLoopGroup boss = new NioEventLoopGroup();
/**
* Worker
*/
EventLoopGroup work = new NioEventLoopGroup();
/**
* 通道适配器
*/
@Resource
private ServerChannelHandlerAdapter channelHandlerAdapter;
/**
* NETT服务器配置类
*/
@Resource
private NettyConfig nettyConfig;
/**
* 关闭服务器方法
*/
@PreDestroy
public void close() {
LOGGER.info( "关闭服务器...." );
//优雅退出
boss.shutdownGracefully();
work.shutdownGracefully();
}
/**
* 开启及服务线程
*/
public void start() {
// 从配置文件中(application.yml)获取服务端监听端口号
int port = nettyConfig.getPort();
serverBootstrap.group(boss, work)
.channel(NioServerSocketChannel. class )
.option(ChannelOption.SO_BACKLOG, 100 )
.handler( new LoggingHandler(LogLevel.INFO));
try {
//设置事件处理
serverBootstrap.childHandler( new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast( new LengthFieldBasedFrameDecoder(nettyConfig.getMaxFrameLength()
, 0 , 2 , 0 , 2 ));
pipeline.addLast( new LengthFieldPrepender( 2 ));
pipeline.addLast( new ObjectCodec());
pipeline.addLast(channelHandlerAdapter);
}
});
LOGGER.info( "netty服务器在[{}]端口启动监听" , port);
ChannelFuture f = serverBootstrap.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
LOGGER.info( "[出现异常] 释放资源" );
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
}
|
创建类ServerChannelHandlerAdapter.java - 通道适配器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
// 记录调用方法的元信息的类
import com.edu.hart.rpc.entity.MethodInvokeMeta;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 多线程共享
*/
@Component
@Sharable
public class ServerChannelHandlerAdapter extends ChannelHandlerAdapter {
/**
* 日志处理
*/
private Logger logger = LoggerFactory.getLogger(ServerChannelHandlerAdapter. class );
/**
* 注入请求分排器
*/
@Resource
private RequestDispatcher dispatcher;
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
MethodInvokeMeta invokeMeta = (MethodInvokeMeta) msg;
// 屏蔽toString()方法
if (invokeMeta.getMethodName().endsWith( "toString()" )
&& ! "class java.lang.String" .equals(invokeMeta.getReturnType().toString()))
logger.info( "客户端传入参数 :{},返回值:{}" ,
invokeMeta.getArgs(), invokeMeta.getReturnType());
dispatcher.dispatcher(ctx, invokeMeta);
}
}
|
RequestDispatcher.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
// 封装的返回信息枚举类
import com.edu.hart.modules.communicate.ResponseCodeEnum;
// 封装的返回信息实体类
import com.edu.hart.modules.communicate.ResponseResult;
// 封装的连接常量类
import com.edu.hart.modules.constant.NettyConstant;
// 记录元方法信息的实体类
import com.edu.hart.rpc.entity.MethodInvokeMeta;
// 对于返回值为空的一个处理
import com.edu.hart.rpc.entity.NullWritable;
// 封装的返回信息实体工具类
import com.edu.hart.rpc.util.ResponseResultUtil;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 请求分排器
*/
@Component
public class RequestDispatcher implements ApplicationContextAware {
private ExecutorService executorService = Executors.newFixedThreadPool(NettyConstant.getMaxThreads());
private ApplicationContext app;
/**
* 发送
*
* @param ctx
* @param invokeMeta
*/
public void dispatcher( final ChannelHandlerContext ctx, final MethodInvokeMeta invokeMeta) {
executorService.submit(() -> {
ChannelFuture f = null ;
try {
Class<?> interfaceClass = invokeMeta.getInterfaceClass();
String name = invokeMeta.getMethodName();
Object[] args = invokeMeta.getArgs();
Class<?>[] parameterTypes = invokeMeta.getParameterTypes();
Object targetObject = app.getBean(interfaceClass);
Method method = targetObject.getClass().getMethod(name, parameterTypes);
Object obj = method.invoke(targetObject, args);
if (obj == null ) {
f = ctx.writeAndFlush(NullWritable.nullWritable());
} else {
f = ctx.writeAndFlush(obj);
}
f.addListener(ChannelFutureListener.CLOSE);
} catch (Exception e) {
ResponseResult error = ResponseResultUtil.error(ResponseCodeEnum.SERVER_ERROR);
f = ctx.writeAndFlush(error);
} finally {
f.addListener(ChannelFutureListener.CLOSE);
}
});
}
/**
* 加载当前application.xml
*
* @param ctx
* @throws BeansException
*/
public void setApplicationContext(ApplicationContext ctx) throws BeansException {
this .app = ctx;
}
}
|
application.yml文件中对于netty的一个配置
1 2 |
netty:
port: 11111
|
NettyConfig.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* 读取yml配置文件中的信息
* Created by 叶云轩 on 2017/10/31 - 18:38
* Concat [email protected]
*/
@Component
@ConfigurationProperties (prefix = "netty" )
public class NettyConfig {
private int port;
public int getPort() {
return port;
}
public void setPort( int port) {
this .port = port;
}
}
|
NettyConstanct.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
import org.springframework.stereotype.Component;
/**
* Netty服务器常量
* Created by 叶云轩 on 2017/10/31 - 17:47
* Concat [email protected]
*/
@Component
public class NettyConstant {
/**
* 最大线程量
*/
private static final int MAX_THREADS = 1024 ;
/**
* 数据包最大长度
*/
private static final int MAX_FRAME_LENGTH = 65535 ;
public static int getMaxFrameLength() {
return MAX_FRAME_LENGTH;
}
public static int getMaxThreads() {
return MAX_THREADS;
}
}
|
至此,netty服务端算是与SpringBoot整合成功。那么看一下启动情况吧。
Client端:
Client我感觉要比Server端要麻烦一点。这里还是先给出核心类吧。
- NettyClient : netty客户端
- ClientChannelHandlerAdapter : 客户端通道适配器
- CustomChannelInitalizer:自定义通道初始化工具
- RPCProxyFactoryBean:RPC通信代理工厂
在Client端里。SpringBoot的启动类要继承SpringBootServletInitializer这个类,并覆盖SpringApplicationBuilder方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.support.SpringBootServletInitializer;
@SpringBootApplication
public class OaApplication extends SpringBootServletInitializer {
public static void main(String[] args) {
SpringApplication.run(OaApplication. class , args);
}
@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) {
return builder.sources(OaApplication. class );
}
}
|
NettyClient.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
// 记录元方法信息的实体类
import com.edu.hart.rpc.entity.MethodInvokeMeta;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.management.MBeanServer;
/**
* 客户端发送类
* Created by 叶云轩 on 2017/6/16-16:58
* Concat [email protected]
*/
public class NettyClient {
private Logger logger = LoggerFactory.getLogger(MBeanServer. class );
private Bootstrap bootstrap;
private EventLoopGroup worker;
private int port;
private String url;
private int MAX_RETRY_TIMES = 10 ;
public NettyClient(String url, int port) {
this .url = url;
this .port = port;
bootstrap = new Bootstrap();
worker = new NioEventLoopGroup();
bootstrap.group(worker);
bootstrap.channel(NioSocketChannel. class );
}
public void close() {
logger.info( "关闭资源" );
worker.shutdownGracefully();
}
public Object remoteCall( final MethodInvokeMeta cmd, int retry) {
try {
CustomChannelInitializerClient customChannelInitializer = new CustomChannelInitializerClient(cmd);
bootstrap.handler(customChannelInitializer);
ChannelFuture sync = bootstrap.connect(url, port).sync();
sync.channel().closeFuture().sync();
Object response = customChannelInitializer.getResponse();
return response;
} catch (InterruptedException e) {
retry++;
if (retry > MAX_RETRY_TIMES) {
throw new RuntimeException( "调用Wrong" );
} else {
try {
Thread.sleep( 100 );
} catch (InterruptedException e1) {
e1.printStackTrace();
}
logger.info( "第{}次尝试....失败" , retry);
return remoteCall(cmd, retry);
}
}
}
}
|
ClientChannelHandlerAdapter.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
import com.edu.hart.rpc.entity.MethodInvokeMeta;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Created by 叶云轩 on 2017/6/16-17:03
* Concat [email protected]
*/
public class ClientChannelHandlerAdapter extends ChannelHandlerAdapter {
private Logger logger = LoggerFactory.getLogger(ClientChannelHandlerAdapter. class );
private MethodInvokeMeta methodInvokeMeta;
private CustomChannelInitializerClient channelInitializerClient;
public ClientChannelHandlerAdapter(MethodInvokeMeta methodInvokeMeta, CustomChannelInitializerClient channelInitializerClient) {
this .methodInvokeMeta = methodInvokeMeta;
this .channelInitializerClient = channelInitializerClient;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.info( "客户端出异常了,异常信息:{}" , cause.getMessage());
cause.printStackTrace();
ctx.close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (methodInvokeMeta.getMethodName().endsWith( "toString" ) && ! "class java.lang.String" .equals(methodInvokeMeta.getReturnType().toString()))
logger.info( "客户端发送信息参数:{},信息返回值类型:{}" , methodInvokeMeta.getArgs(), methodInvokeMeta.getReturnType());
ctx.writeAndFlush(methodInvokeMeta);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
channelInitializerClient.setResponse(msg);
}
}
|
CustomChannelInitializerClient.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
import com.edu.hart.rpc.entity.MethodInvokeMeta;
import com.edu.hart.rpc.entity.NullWritable;
import com.edu.hart.rpc.util.ObjectCodec;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; /**
Created by 叶云轩 on 2017/6/16-15:01
Concat [email protected]
*/
public class CustomChannelInitializerClient extends ChannelInitializer {
private Logger logger = LoggerFactory.getLogger(CustomChannelInitializerClient. class );
private MethodInvokeMeta methodInvokeMeta;
private Object response;
public CustomChannelInitializerClient(MethodInvokeMeta methodInvokeMeta) {
if (! "toString" .equals(methodInvokeMeta.getMethodName())) {
logger.info( "[CustomChannelInitializerClient] 调用方法名:{},入参:{},参数类型:{},返回值类型{}"
, methodInvokeMeta.getMethodName()
, methodInvokeMeta.getArgs()
, methodInvokeMeta.getParameterTypes()
, methodInvokeMeta.getReturnType());
}
this .methodInvokeMeta = methodInvokeMeta;
}
public Object getResponse() {
if (response instanceof NullWritable) {
return null ;
}
return response;
}
public void setResponse(Object response) {
this .response = response;
}
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast( new LengthFieldPrepender( 2 ));
pipeline.addLast( new LengthFieldBasedFrameDecoder( 1024 * 1024 , 0 , 2 , 0 , 2 ));
pipeline.addLast( new ObjectCodec());
pipeline.addLast( new ClientChannelHandlerAdapter(methodInvokeMeta, this ));
}}
|
4. RPCProxyFactoryBean.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
import com.edu.hart.rpc.entity.MethodInvokeMeta;
import com.edu.hart.rpc.util.WrapMethodUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.config.AbstractFactoryBean;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
/**
* Created by 叶云轩 on 2017/6/16-17:16
* Concat [email protected]
*/
public class RPCProxyFactoryBean extends AbstractFactoryBean implements InvocationHandler {
private Logger logger = LoggerFactory.getLogger(RPCProxyFactoryBean. class );
private Class interfaceClass;
private NettyClient nettyClient;
@Override
public Class<?> getObjectType() {
return interfaceClass;
}
@Override
protected Object createInstance() throws Exception {
logger.info( "[代理工厂] 初始化代理Bean : {}" , interfaceClass);
return Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, this );
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) {
final MethodInvokeMeta methodInvokeMeta = WrapMethodUtils.readMethod(interfaceClass, method, args);
if (!methodInvokeMeta.getMethodName().equals( "toString" )) {
logger.info( "[invoke] 调用接口{},调用方法名:{},入参:{},参数类型:{},返回值类型{}" ,
methodInvokeMeta.getInterfaceClass(), methodInvokeMeta.getMethodName()
, methodInvokeMeta.getArgs(), methodInvokeMeta.getParameterTypes(), methodInvokeMeta.getReturnType());
}
return nettyClient.remoteCall(methodInvokeMeta, 0 );
}
public void setInterfaceClass(Class interfaceClass) {
this .interfaceClass = interfaceClass;
}
public void setNettyClient(NettyClient nettyClient) {
this .nettyClient = nettyClient;
}
}
|
至此,netty-client与SpringBoot的集成了算完毕了。同样 ,在netty-client中也要加入相应的依赖
不过上面server与client使用了一些公共的类和工具。下面也给列举中出来。
MethodInvokeMeta.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
import org.springframework.stereotype.Component;
import java.io.Serializable;
/**
* 记录调用方法的元信息
* Created by 叶云轩 on 2017/6/7-15:41
* Concat [email protected]
*/
@Component
public class MethodInvokeMeta implements Serializable {
private static final long serialVersionUID = 8379109667714148890L;
//接口
private Class<?> interfaceClass;
//方法名
private String methodName;
//参数
private Object[] args;
//返回值类型
private Class<?> returnType;
//参数类型
private Class<?>[] parameterTypes;
public Object[] getArgs() {
return args;
}
public void setArgs(Object[] args) {
this .args = args;
}
public Class<?> getInterfaceClass() {
return interfaceClass;
}
public void setInterfaceClass(Class<?> interfaceClass) {
this .interfaceClass = interfaceClass;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this .methodName = methodName;
}
public Class[] getParameterTypes() {
return parameterTypes;
}
public void setParameterTypes(Class<?>[] parameterTypes) {
this .parameterTypes = parameterTypes;
}
public Class getReturnType() {
return returnType;
}
public void setReturnType(Class returnType) {
this .returnType = returnType;
}
}
|
NullWritable.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
import java.io.Serializable;
/**
* 服务器可能返回空的处理
* Created by 叶云轩 on 2017/6/16-16:46
* Concat [email protected]
*/
public class NullWritable implements Serializable {
private static final long serialVersionUID = -8191640400484155111L;
private static NullWritable instance = new NullWritable();
private NullWritable() {
}
public static NullWritable nullWritable() {
return instance;
}
}
|
ObjectCodec.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import java.util.List;
public class ObjectCodec extends MessageToMessageCodec<ByteBuf, Object> {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) {
byte [] data = ObjectSerializerUtils.serilizer(msg);
ByteBuf buf = Unpooled.buffer();
buf.writeBytes(data);
out.add(buf);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
byte [] bytes = new byte [msg.readableBytes()];
msg.readBytes(bytes);
Object deSerilizer = ObjectSerializerUtils.deSerilizer(bytes);
out.add(deSerilizer);
}
}
|
ObjectSerializerUtils.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
package com.edu.hart.rpc.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
/**
* 对象序列化工具
*/
public class ObjectSerializerUtils {
private static final Logger logger = LoggerFactory.getLogger(ObjectSerializerUtils. class );
/**
* 反序列化
*
* @param data
* @return
*/
public static Object deSerilizer( byte [] data) {
if (data != null && data.length > 0 ) {
try {
ByteArrayInputStream bis = new ByteArrayInputStream(data);
ObjectInputStream ois = new ObjectInputStream(bis);
return ois.readObject();
} catch (Exception e) {
logger.info( "[异常信息] {}" , e.getMessage());
e.printStackTrace();
}
return null ;
} else {
logger.info( "[反序列化] 入参为空" );
return null ;
}
}
/**
* 序列化对象
*
* @param obj
* @return
*/
public static byte [] serilizer(Object obj) {
if (obj != null ) {
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(obj);
oos.flush();
oos.close();
return bos.toByteArray();
} catch (IOException e) {
e.printStackTrace();
}
return null ;
} else {
return null ;
}
}
}
|
下面主要是用于Client端的:
NettyBeanSacnner.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
import com.edu.hart.rpc.client.RPCProxyFactoryBean;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import java.util.List;
/**
* 动态加载代理bean到Spring bean工厂
*/
public class NettyBeanScanner implements BeanFactoryPostProcessor {
private DefaultListableBeanFactory beanFactory;
private String basePackage;
private String clientName;
public NettyBeanScanner(String basePackage, String clientName) {
this .basePackage = basePackage;
this .clientName = clientName;
}
/**
* 注册Bean到Spring的bean工厂
*/
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
this .beanFactory = (DefaultListableBeanFactory) beanFactory;
// 加载远程服务的接口
List<String> resolverClass = PackageClassUtils.resolver(basePackage);
for (String clazz : resolverClass) {
String simpleName;
if (clazz.lastIndexOf( '.' ) != - 1 ) {
simpleName = clazz.substring(clazz.lastIndexOf( '.' ) + 1 );
} else {
simpleName = clazz;
}
BeanDefinitionBuilder gd = BeanDefinitionBuilder.genericBeanDefinition(RPCProxyFactoryBean. class );
gd.addPropertyValue( "interfaceClass" , clazz);
gd.addPropertyReference( "nettyClient" , clientName);
this .beanFactory.registerBeanDefinition(simpleName, gd.getRawBeanDefinition());
}
}
}
|
PackageClassUtils.java
这个类要说一下,主要是用来加载Server对应的接口的。因为在Client中RPC接口没有实现类,所以要自己将这些接口加载到Spring工厂里面。但是现在有个问题就是需要使用**
SpringBoot中application.yml
1 |
basePackage: com.edu.hart.rpc.service.login;com.edu.hart.rpc.service.employee;com.edu.hart.rpc.service.authorization;
|
这样的方式来加载,使用通配符的时候会加载不到,这个问题我还没有解决。**
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
/**
* 字节文件加载
*/
public class PackageClassUtils {
private final static Logger LOGGER = LoggerFactory.getLogger(PackageClassUtils. class );
/**
* 解析包参数
*
* @param basePackage 包名
* @return 包名字符串集合
*/
public static List<String> resolver(String basePackage) {
//以";"分割开多个包名
String[] splitFHs = basePackage.split( ";" );
List<String> classStrs = new ArrayList<>();
//s: com.yyx.util.*
for (String s : splitFHs) {
LOGGER.info( "[加载类目录] {}" , s);
//路径中是否存在".*" com.yyx.util.*
boolean contains = s.contains( ".*" );
if (contains) {
//截断星号 com.yyx.util
String filePathStr = s.substring( 0 , s.lastIndexOf( ".*" ));
//组装路径 com/yyx/util
String filePath = filePathStr.replaceAll( "\\." , "/" );
//获取路径 xxx/classes/com/yyx/util
File file = new File(PackageClassUtils. class .getResource( "/" ).getPath() + "/" + filePath);
//获取目录下获取文件
getAllFile(filePathStr, file, classStrs);
} else {
String filePath = s.replaceAll( "\\." , "/" );
File file = new File(PackageClassUtils. class .getResource( "/" ).getPath() + "/" + filePath);
classStrs = getClassReferenceList(classStrs, file, s);
}
}
return classStrs;
}
/**
* 添加全限定类名到集合
*
* @param classStrs 集合
* @return 类名集合
*/
private static List<String> getClassReferenceList(List<String> classStrs, File file, String s) {
File[] listFiles = file.listFiles();
if (listFiles != null && listFiles.length != 0 ) {
for (File file2 : listFiles) {
if (file2.isFile()) {
String name = file2.getName();
String fileName = s + "." + name.substring( 0 , name.lastIndexOf( '.' ));
LOGGER.info( "[加载完成] 类文件:{}" , fileName);
classStrs.add(fileName);
}
}
}
return classStrs;
}
/**
* 获取一个目录下的所有文件
*
* @param s
* @param file
* @param classStrs
*/
private static void getAllFile(String s, File file, List<String> classStrs) {
if (file.isDirectory()) {
File[] files = file.listFiles();
if (files != null )
for (File file1 : files) {
getAllFile(s, file1, classStrs);
}
} else {
String path = file.getPath();
String cleanPath = path.replaceAll( "/" , "." );
String fileName = cleanPath.substring(cleanPath.indexOf(s), cleanPath.length());
LOGGER.info( "[加载完成] 类文件:{}" , fileName);
classStrs.add(fileName);
}
}
}
|
RemoteMethodInvokeUtil.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
import com.edu.hart.rpc.entity.MethodInvokeMeta;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
/**
* 消息处理类
* Created by 叶云轩 on 2017/6/7-15:49
* Concat [email protected]
*/
public class RemoteMethodInvokeUtil implements ApplicationContextAware {
private ApplicationContext applicationContext;
public Object processMethod(MethodInvokeMeta methodInvokeMeta) throws InvocationTargetException, IllegalAccessException {
Class interfaceClass = methodInvokeMeta.getInterfaceClass();
Object bean = applicationContext.getBean(interfaceClass);
Method[] declaredMethods = interfaceClass.getDeclaredMethods();
Method method = null ;
for (Method declaredMethod : declaredMethods) {
if (methodInvokeMeta.getMethodName().equals(declaredMethod.getName())) {
method = declaredMethod;
}
}
Object invoke = method.invoke(bean, methodInvokeMeta.getArgs());
return invoke;
}
@Override
public void setApplicationContext(ApplicationContext app) throws BeansException {
applicationContext = app;
}
}
|
WrapMethodUtils.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
import com.edu.hart.rpc.entity.MethodInvokeMeta;
import java.lang.reflect.Method;
public class WrapMethodUtils {
/**
* 获取 method的元数据信息
@param interfaceClass
* @param method
* @param args
* @return
*/
public static MethodInvokeMeta readMethod(Class interfaceClass, Method method, Object[] args) {
MethodInvokeMeta mim = new MethodInvokeMeta();
mim.setInterfaceClass(interfaceClass);
mim.setArgs(args);
mim.setMethodName(method.getName());
mim.setReturnType(method.getReturnType());
Class<?>[] parameterTypes = method.getParameterTypes();
mim.setParameterTypes(parameterTypes);
return mim;
}
}
|
下面的这些类我也会用在与前台通信时使用:
ResponseEnum.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
import java.io.Serializable;
/**
响应码枚举类
Created by 叶云轩 on 2017/6/13-11:53
Concat [email protected]
*/
public enum ResponseCodeEnum implements Serializable {
// region authentication code
REQUEST_SUCCESS( 10000 , "请求成功" ),
SERVER_ERROR( 99999 , "服务器内部错误" ),;
//region 提供对外访问的方法,无需更改
/**
响应码
*/
private Integer code;
/**
响应信息
*/
private String msg;
ResponseCodeEnum(Integer code, String msg) {
this .code = code;
this .msg = msg;
}
public Integer getCode() {
return code;
}
public String getMsg() {
return msg;
}
//endregion
}
|
ResponseResult.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
import java.io.Serializable;
/**
* 数据返回实体封装
* <p>
* Created by 叶云轩 on 2017/6/13-11:38
* Concat [email protected]
*
* @param <T> 通用变量
*/
public class ResponseResult<T> implements Serializable {
private static final long serialVersionUID = -3411174924856108156L;
/**
* 服务器响应码
*/
private Integer code;
/**
* 服务器响应说明
*/
private String msg;
/**
* 服务器响应数据
*/
private T data;
public ResponseResult() {
}
@Override
public boolean equals(Object o) {
if ( this == o) return true ;
if (o == null || getClass() != o.getClass()) return false ;
ResponseResult<?> that = (ResponseResult<?>) o;
return (code != null ? code.equals(that.code) : that.code == null ) && (msg != null ? msg.equals(that.msg) : that.msg == null ) && (data != null ? data.equals(that.data) : that.data == null );
}
public Integer getCode() {
return code;
}
public void setCode(Integer code) {
this .code = code;
}
public T getData() {
return data;
}
public void setData(T data) {
this .data = data;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this .msg = msg;
}
@Override
public int hashCode() {
int result = code != null ? code.hashCode() : 0 ;
result = 31 * result
+ (msg != null ? msg.hashCode() : 0 );
result = 31 * result + (data != null ? data.hashCode() : 0 );
return result;
}
@Override
public String toString() {
return "ResponseResult{"
+ "code="
+ code
+ ", msg='"
+ msg
+ '\''
+ ", data="
+ data
+ '}' ;
}
}
|
ResponseResultUtil.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
import com.edu.hart.modules.communicate.ResponseCodeEnum;
import com.edu.hart.modules.communicate.ResponseResult;
/**
* 返回结果工具类
* Created by 叶云轩 on 2017/5/29-10:37
* Concat [email protected]
*/
public class ResponseResultUtil {
/**
* 请求失败返回的数据结构
*
* @param responseCodeEnum 返回信息枚举类
* @return 结果集
*/
public static ResponseResult error(ResponseCodeEnum responseCodeEnum) {
ResponseResult ResponseResult = new ResponseResult();
ResponseResult.setMsg(responseCodeEnum.getMsg());
ResponseResult.setCode(responseCodeEnum.getCode());
ResponseResult.setData( null );
return ResponseResult;
}
/**
* 没有结果集的返回数据结构
*
* @return 结果集
*/
public static ResponseResult success() {
return success( null );
}
/**
* 成功返回数据结构
*
* @param o 返回数据对象
* @return 返回结果集
*/
public static ResponseResult success(Object o) {
ResponseResult responseResult = new ResponseResult();
responseResult.setMsg(ResponseCodeEnum.REQUEST_SUCCESS.getMsg());
responseResult.setCode(ResponseCodeEnum.REQUEST_SUCCESS.getCode());
responseResult.setData(o);
return responseResult;
}
/**
* 判断是否成功
*
* @param responseResult 请求结果
* @return 判断结果
*/
public static boolean judgementSuccess(ResponseResult responseResult) {
return responseResult.getCode().equals(ResponseCodeEnum.REQUEST_SUCCESS.getCode());
}
}
|
来,我们测试一下远程通信:
Client调用Server的一个接口。可以看到在hart-oa项目中,RPCEmployeeService没有任何实现类,控制台中打印了方法的调用 以及入参信息
Server断点监听到远程调用,CloudApplication项目为Server端,我们可以看到接收到来自hart-oa的一个请求,参数一致。在CloudApplication中进行相应的处理后,返回到Client(hart-oa)
返回信息到Client,可以看到我们(hart-oa)收到了来自CloudApplication的响应,结果是我们封装好的ResponseResult.
嗯 ~至此整合测试完成。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。
标签:Netty,return,netty,Spring,Boot,private,import,com,public From: https://www.cnblogs.com/wangbin2188/p/17308700.html