1.RMI简单介绍
Spring除了使用基于HTTP协议的远程调用方案,还为开发者提供了基于RMI机制的远程调用方法,RMI远程调用网络通信实现是基于TCP/IP协议完成的,而不是通过HTTP协议。
在Spring RMI实现中,集成了标准的RMI-JRIM解决方案,该方案是java虚拟机实现的一部分,它使用java序列化来完成对象的传输,是一个java到java环境的分布式处理技术,不涉及异构平台的处理。
2.RMI客户端配置:
和基于HTTP协议的远程调用类似,RMI远程调用客户端也需要进行类似如下的配置:
<bean id=”rmiProxy” class=”org.springframework.remoting.rmi.RmiProxyFactoryBean”>
<property name=”serviceUrl”>
<value>rmi://hostAddress:1099/serviceUrl</value>
</property>
<property name=”serviceInterface”>
<value>远程调用接口</value>
</property>
</bean>
<bean id=”rmiClient” class=”RMI远程调用客户端类全路径”>
<property name=”serviceInterface”>
<ref bean=”rmiProxy”/>
</property>
</bean>
注意:上面的配置中serviceUrl必须和服务端的远程调用提供者的id一致,另外,serviceUrl中使用的是rmi协议,默认端口是1099.
3.RmiProxyFactoryBean
RmiProxyFactoryBean的主要功能是对RMI客户端封装,生成代理对象,查询得到RMI的stub对象,并通过这个stub对象发起相应的RMI远程调用请求。其源码如下:
public class RmiProxyFactoryBean extends RmiClientInterceptor implements FactoryBean<Object>, BeanClassLoaderAware {
//远程调用代理对象
private Object serviceProxy;
//Spring IoC容器完成依赖注入后的回调方法
public void afterPropertiesSet() {
//调用父类RmiClientInterceptor的回调方法
super.afterPropertiesSet();
//获取客户端配置的远程调用接口
if (getServiceInterface() == null) {
throw new IllegalArgumentException("Property 'serviceInterface' is required");
}
//创建远程调用代理对象并为代理对象设置拦截器。注意第二个参数this,因为
//RmiProxyFactoryBean继承RmiClientInterceptor,因此其也是拦截器
this.serviceProxy = new ProxyFactory(getServiceInterface(), this).getProxy(getBeanClassLoader());
}
//Spring IoC容器获取被管理对象的接口方法,获取远程调用代理
public Object getObject() {
return this.serviceProxy;
}
public Class<?> getObjectType() {
return getServiceInterface();
}
public boolean isSingleton() {
return true;
}
}
通过对上面RmiProxyFactoryBean源码分析中,我们看到在创建远程调用代理对象的时候需要设置拦截器,因为我们继续分析远程调用客户端拦截器RmiClientInterceptor。
4.RmiClientInterceptor封装RMI客户端:
RmiClientInterceptor对客户端的远程调用进行拦截,具体的生成远程调用代理对象、查找远程调用stub、以及通过RMI stub向服务端发起远程调用请求的具体实现都由RMI客户端拦截器实现,其源码如下:
public class RmiClientInterceptor extends RemoteInvocationBasedAccessor
implements MethodInterceptor {
//在Spring启动时查找远程调用stub
private boolean lookupStubOnStartup = true;
//对查找到或使用过的远程调用stub进行缓存
private boolean cacheStub = true;
//当连接失败是是否刷新远程调用stub
private boolean refreshStubOnConnectFailure = false;
//RMI客户端socket工厂
private RMIClientSocketFactory registryClientSocketFactory;
//缓存的远程调用stub
private Remote cachedStub;
//创建远程调用stub监控器
private final Object stubMonitor = new Object();
//设置是否启动时查找RMI stub
public void setLookupStubOnStartup(boolean lookupStubOnStartup) {
this.lookupStubOnStartup = lookupStubOnStartup;
}
//设置是否缓存以查找的RMI stub
public void setCacheStub(boolean cacheStub) {
this.cacheStub = cacheStub;
}
//设置当连接失败时,是否刷新RMI stub
public void setRefreshStubOnConnectFailure(boolean refreshStubOnConnectFailure) {
this.refreshStubOnConnectFailure = refreshStubOnConnectFailure;
}
//设置客户端socket工厂
public void setRegistryClientSocketFactory(RMIClientSocketFactory registryClientSocketFactory) {
this.registryClientSocketFactory = registryClientSocketFactory;
}
//Spring IoC容器回调方法,由子类RmiProxyFactoryBean回调方法调用
public void afterPropertiesSet() {
//调用父类RemoteInvocationBasedAccessor的回调方法
super.afterPropertiesSet();
prepare();
}
//初始化RMI客户端
public void prepare() throws RemoteLookupFailureException {
//如果设置了启动时查找RMI stub
if (this.lookupStubOnStartup) {
//查找RMI stub
Remote remoteObj = lookupStub();
if (logger.isDebugEnabled()) {
//如果查找到的RMI stub是RmiInvocationHandler类型
if (remoteObj instanceof RmiInvocationHandler) {
logger.debug("RMI stub [" + getServiceUrl() + "] is an RMI invoker");
}
//如果获取到客户端配置的serviceInterface不为null
else if (getServiceInterface() != null) {
//判断客户端配置的serviceInterface是否是RMI stub实例
boolean isImpl = getServiceInterface().isInstance(remoteObj);
logger.debug("Using service interface [" + getServiceInterface().getName() +
"] for RMI stub [" + getServiceUrl() + "] - " +
(!isImpl ? "not " : "") + "directly implemented");
}
}
//如果设置了缓存RMI stub,将缓存的stub设置为查找到的RMI stub
if (this.cacheStub) {
this.cachedStub = remoteObj;
}
}
}
//查找RMI stub
protected Remote lookupStub() throws RemoteLookupFailureException {
try {
Remote stub = null;
//如果设置了客户端socket工厂
if (this.registryClientSocketFactory != null) {
//获取并解析客户端配置的serviceUrl
URL url = new URL(null, getServiceUrl(), new DummyURLStreamHandler());
//获取客户端配置的serviceUrl协议
String protocol = url.getProtocol();
//如果客户端配置的serviceUrl中协议不为null且不是rmi
if (protocol != null && !"rmi".equals(protocol)) {
throw new MalformedURLException("Invalid URL scheme '" + protocol + "'");
}
//获取客户端配置的serviceUrl中的主机地址
String host = url.getHost();
//获取客户端配置的serviceUrl中的端口
int port = url.getPort();
//获取客户端配置的serviceUrl中请求路径
String name = url.getPath();
//如果请求路径不为null,且请求路径以”/”开头,则去掉”/”
if (name != null && name.startsWith("/")) {
name = name.substring(1);
}
//根据客户端配置的serviceUrl信息和客户端socket工厂创建远程对
//象引用
Registry registry = LocateRegistry.getRegistry(host, port, this.registryClientSocketFactory);
//通过远程对象引用查找指定RMI请求的RMI stub
stub = registry.lookup(name);
}
//如果客户端配置的serviceUrl中协议为null或者是rmi
else {
//直接通过RMI标准API查找客户端配置的serviceUrl的RMI stub
stub = Naming.lookup(getServiceUrl());
}
if (logger.isDebugEnabled()) {
logger.debug("Located RMI stub with URL [" + getServiceUrl() + "]");
}
return stub;
}
//对查找RMI stub过程中异常处理
catch (MalformedURLException ex) {
throw new RemoteLookupFailureException("Service URL [" + getServiceUrl() + "] is invalid", ex);
}
catch (NotBoundException ex) {
throw new RemoteLookupFailureException(
"Could not find RMI service [" + getServiceUrl() + "] in RMI registry", ex);
}
catch (RemoteException ex) {
throw new RemoteLookupFailureException("Lookup of RMI stub failed", ex);
}
}
//获取RMI stub
protected Remote getStub() throws RemoteLookupFailureException {
//如果没有配置缓存RMI stub,或者设置了启动时查找RMI stub或当连接失败时
//不刷新RMI stub
if (!this.cacheStub || (this.lookupStubOnStartup && !this.refreshStubOnConnectFailure)) {
//如果缓存的RMI stub不为null,则直接返回,否则,查找RMI stub
return (this.cachedStub != null ? this.cachedStub : lookupStub());
}
//如果设置了缓存RMI stub,且设置了启动时查找RMI stub或者当连接失败时刷新
//RMI stub
else {
//线程同步
synchronized (this.stubMonitor) {
//如果缓存的RMI stub为null
if (this.cachedStub == null) {
//则将查找的RMI stub缓存
this.cachedStub = lookupStub();
}
//返回缓存的RMI stub
return this.cachedStub;
}
}
}
//拦截器对客户端远程调用方法的拦截入口
public Object invoke(MethodInvocation invocation) throws Throwable {
//获取RMI stub
Remote stub = getStub();
try {
//拦截客户端远程调用方法
return doInvoke(invocation, stub);
}
catch (RemoteConnectFailureException ex) {
return handleRemoteConnectFailure(invocation, ex);
}
catch (RemoteException ex) {
if (isConnectFailure(ex)) {
return handleRemoteConnectFailure(invocation, ex);
}
else {
throw ex;
}
}
}
//判断是否连接失败
protected boolean isConnectFailure(RemoteException ex) {
return RmiClientInterceptorUtils.isConnectFailure(ex);
}
//处理远程连接失败
private Object handleRemoteConnectFailure(MethodInvocation invocation, Exception ex) throws Throwable {
//如果设置了当连接失败时,刷新RMI stub
if (this.refreshStubOnConnectFailure) {
String msg = "Could not connect to RMI service [" + getServiceUrl() + "] - retrying";
if (logger.isDebugEnabled()) {
logger.warn(msg, ex);
}
else if (logger.isWarnEnabled()) {
logger.warn(msg);
}
//刷新查找远程调用stub
return refreshAndRetry(invocation);
}
else {
throw ex;
}
}
//刷新RMI stub
protected Object refreshAndRetry(MethodInvocation invocation) throws Throwable {
Remote freshStub = null;
//线程同步
synchronized (this.stubMonitor) {
this.cachedStub = null;
//查找RMI stub
freshStub = lookupStub();
//如果设置了缓存RMI stub
if (this.cacheStub) {
//将刷新查找的RMI stub缓存
this.cachedStub = freshStub;
}
}
return doInvoke(invocation, freshStub);
}
//具体RMI调用的地方
protected Object doInvoke(MethodInvocation invocation, Remote stub) throws Throwable {
//如果RMI stub是RmiInvocationHandler类型
if (stub instanceof RmiInvocationHandler) {
//调用RmiInvocationHandler的RMI
try {
return doInvoke(invocation, (RmiInvocationHandler) stub);
}
catch (RemoteException ex) {
throw RmiClientInterceptorUtils.convertRmiAccessException(
invocation.getMethod(), ex, isConnectFailure(ex), getServiceUrl());
}
catch (InvocationTargetException ex) {
Throwable exToThrow = ex.getTargetException();
RemoteInvocationUtils.fillInClientStackTraceIfPossible(exToThrow);
throw exToThrow;
}
catch (Throwable ex) {
throw new RemoteInvocationFailureException("Invocation of method [" + invocation.getMethod() +
"] failed in RMI service [" + getServiceUrl() + "]", ex);
}
}
//如果RMI stub不是RmiInvocationHandler类型
else {
//使用传统的RMI调用方式
try {
return RmiClientInterceptorUtils.invokeRemoteMethod(invocation, stub);
}
catch (InvocationTargetException ex) {
Throwable targetEx = ex.getTargetException();
if (targetEx instanceof RemoteException) {
RemoteException rex = (RemoteException) targetEx;
throw RmiClientInterceptorUtils.convertRmiAccessException(
invocation.getMethod(), rex, isConnectFailure(rex), getServiceUrl());
}
else {
throw targetEx;
}
}
}
}
//调用RmiInvocationHandler的RMI
protected Object doInvoke(MethodInvocation methodInvocation, RmiInvocationHandler invocationHandler)
throws RemoteException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
//如果客户端远程调用请求是toString()方法
if (AopUtils.isToStringMethod(methodInvocation.getMethod())) {
return "RMI invoker proxy for service URL [" + getServiceUrl() + "]";
}
//使用RmiInvocationHandler处理RMI调用
return invocationHandler.invoke(createRemoteInvocation(methodInvocation));
}
}
通过上面对RmiClientInterceptor源码分析,我们看到Spring对RMI远程调用使用以下两种方式:
(1).RMI调用器方式:
这种方式和Spring HTTP调用器非常类似,即使用RemoteInvocation来封装调用目标对象、目标方法、参数类型等信息,RMI服务器端接收到RMI请求之后直接调用目标对象的匹配方法。
(2).传统RMI调用方式:
使用JDK的反射机制,直接调用远程调用stub的方法。
5.RMI的服务端配置:
在Spring RMI服务端需要进行类似如下的配置:
<bean id=”rmiService” class=”org.springframework.remoting.rmi.RmiServiceExporter”>
<property name=”service”>
<ref bean=”RMI服务端对象”/>
</property>
<property name=”serviceInterface”>
<value>RMI服务接口</value>
</property>
<property name=”serviceName”>
<value>RMI服务导出名称</value>
</property>
<property name=”registerPort”>
<value>1099</value>
</property>
</bean>
RMI中,基于TCP/IP协议,而不是HTTP协议来实现底层网络通信,由于RMI的网络通信已由Java RMI实现,所以这里不再使用Spring MVC的DispatcherServlet来转发客户端配置的远程调用请求URL,只需要指定RMI的TCP/.IP监听端口和服务导出的名称即可。
6.RmiServiceExporter导出RMI远程调用对象:
RmiServiceExporter主要功能是将服务端远程对象提供的服务导出供客户端请求调用,同时将导出的远程对象和注册器绑定起来供客户端查询,其主要源码如下:
public class RmiServiceExporter extends RmiBasedExporter implements InitializingBean, DisposableBean {
//导出的RMI服务名称
private String serviceName;
//RMI服务端口
private int servicePort = 0;
//RMI客户端socket工厂
private RMIClientSocketFactory clientSocketFactory;
//RMI服务端socket工厂
private RMIServerSocketFactory serverSocketFactory;
//注册器
private Registry registry;
//注册主机
private String registryHost;
//注册端口
private int registryPort = Registry.REGISTRY_PORT;
//客户端注册socket工厂
private RMIClientSocketFactory registryClientSocketFactory;
//服务端注册socket工厂
private RMIServerSocketFactory registryServerSocketFactory;
//总是创建时注册
private boolean alwaysCreateRegistry = false;
//替换已有的绑定
private boolean replaceExistingBinding = true;
//导出的远程对象
private Remote exportedObject;
//创建注册
private boolean createdRegistry = false;
//注入服务端配置的RMI导出服务名称,格式为:“rmi://host:post/NAME”
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
//设置服务端口
public void setServicePort(int servicePort) {
this.servicePort = servicePort;
}
//设置RMI客户端socket工厂
public void setClientSocketFactory(RMIClientSocketFactory clientSocketFactory) {
this.clientSocketFactory = clientSocketFactory;
}
//设置RMI服务端socket工厂
public void setServerSocketFactory(RMIServerSocketFactory serverSocketFactory) {
this.serverSocketFactory = serverSocketFactory;
}
//设置RMI注册器
public void setRegistry(Registry registry) {
this.registry = registry;
}
//设置RMI注册主机
public void setRegistryHost(String registryHost) {
this.registryHost = registryHost;
}
//设置RMI注册端口
public void setRegistryPort(int registryPort) {
this.registryPort = registryPort;
}
//设置用于注册的RMI客户端socket工厂
public void setRegistryClientSocketFactory(RMIClientSocketFactory registryClientSocketFactory) {
this.registryClientSocketFactory = registryClientSocketFactory;
}
//设置用于注册的RMI服务端socket工厂
public void setRegistryServerSocketFactory(RMIServerSocketFactory registryServerSocketFactory) {
this.registryServerSocketFactory = registryServerSocketFactory;
}
//设置是否总是创建注册,而不是试图查找指定端口上已有的注册
public void setAlwaysCreateRegistry(boolean alwaysCreateRegistry) {
this.alwaysCreateRegistry = alwaysCreateRegistry;
}
//设置是否提供已绑定的RMI注册
public void setReplaceExistingBinding(boolean replaceExistingBinding) {
this.replaceExistingBinding = replaceExistingBinding;
}
//IoC容器依赖注入完成之后的回调方法
public void afterPropertiesSet() throws RemoteException {
prepare();
}
//初始化RMI服务导出器
public void prepare() throws RemoteException {
//调用其父类RmiBasedExporter的方法,检查服务引用是否被设置
checkService();
//如果服务导出名称为null
if (this.serviceName == null) {
throw new IllegalArgumentException("Property 'serviceName' is required");
}
//检查socket工厂
if (this.clientSocketFactory instanceof RMIServerSocketFactory) {
this.serverSocketFactory = (RMIServerSocketFactory) this.clientSocketFactory;
}
if ((this.clientSocketFactory != null && this.serverSocketFactory == null) ||
(this.clientSocketFactory == null && this.serverSocketFactory != null)) {
throw new IllegalArgumentException(
"Both RMIClientSocketFactory and RMIServerSocketFactory or none required");
}
//检查RMI注册的socket工厂
if (this.registryClientSocketFactory instanceof RMIServerSocketFactory) {
this.registryServerSocketFactory = (RMIServerSocketFactory) this.registryClientSocketFactory;
}
if (this.registryClientSocketFactory == null && this.registryServerSocketFactory != null) {
throw new IllegalArgumentException(
"RMIServerSocketFactory without RMIClientSocketFactory for registry not supported");
}
this.createdRegistry = false;
//获取RMI注册器
if (this.registry == null) {
this.registry = getRegistry(this.registryHost, this.registryPort,
this.registryClientSocketFactory, this.registryServerSocketFactory);
this.createdRegistry = true;
}
//获取要被导出的服务端远程对象
this.exportedObject = getObjectToExport();
if (logger.isInfoEnabled()) {
logger.info("Binding service '" + this.serviceName + "' to RMI registry: " + this.registry);
}
//导出远程服务对象
if (this.clientSocketFactory != null) {
UnicastRemoteObject.exportObject(
this.exportedObject, this.servicePort, this.clientSocketFactory, this.serverSocketFactory);
}
else {
UnicastRemoteObject.exportObject(this.exportedObject, this.servicePort);
}
//将RMI对象绑定到注册器
try {
if (this.replaceExistingBinding) {
this.registry.rebind(this.serviceName, this.exportedObject);
}
else {
this.registry.bind(this.serviceName, this.exportedObject);
}
}
//异常处理
catch (AlreadyBoundException ex) {
unexportObjectSilently();
throw new IllegalStateException(
"Already an RMI object bound for name '" + this.serviceName + "': " + ex.toString());
}
catch (RemoteException ex) {
unexportObjectSilently();
throw ex;
}
}
……
}
7.RemoteInvocationBasedExporter处理RMI远程调用请求:
RmiServiceExporter的父类RmiBasedExporter的父类RemoteInvocationBasedExporter负责对RMI远程调用请求进行处理,并将处理的结果封装返回,其源码如下:
public abstract class RemoteInvocationBasedExporter extends RemoteExporter {
//RMI远程调用处理器,RMI远程调用请求是由DefaultRemoteInvocationExecutor处理
private RemoteInvocationExecutor remoteInvocationExecutor = new DefaultRemoteInvocationExecutor();
//设置RMI远程调用处理器
public void setRemoteInvocationExecutor(RemoteInvocationExecutor remoteInvocationExecutor) {
this.remoteInvocationExecutor = remoteInvocationExecutor;
}
//获取RMI远程调用处理器
public RemoteInvocationExecutor getRemoteInvocationExecutor() {
return this.remoteInvocationExecutor;
}
//对RMI远程调用请求进行处理的地方
protected Object invoke(RemoteInvocation invocation, Object targetObject)
throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
if (logger.isTraceEnabled()) {
logger.trace("Executing " + invocation);
}
try {
//调用DefaultRemoteInvocationExecutor对RMI远程调用请求进行处理
return getRemoteInvocationExecutor().invoke(invocation, targetObject);
}
catch (NoSuchMethodException ex) {
if (logger.isDebugEnabled()) {
logger.warn("Could not find target method for " + invocation, ex);
}
throw ex;
}
catch (IllegalAccessException ex) {
if (logger.isDebugEnabled()) {
logger.warn("Could not access target method for " + invocation, ex);
}
throw ex;
}
catch (InvocationTargetException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Target method failed for " + invocation, ex.getTargetException());
}
throw ex;
}
}
//获取RMI远程调用请求的处理结果
protected RemoteInvocationResult invokeAndCreateResult(RemoteInvocation invocation, Object targetObject) {
try {
Object value = invoke(invocation, targetObject);
return new RemoteInvocationResult(value);
}
catch (Throwable ex) {
return new RemoteInvocationResult(ex);
}
}
}
RMI远程调用请求最终由DefaultRemoteInvocationExecutor处理。
8.DefaultRemoteInvocationExecutor处理RMI远程调用请求:
DefaultRemoteInvocationExecutor用于处理RMI远程调用请求,并返回处理后的结果,其源码如下:
public class DefaultRemoteInvocationExecutor implements RemoteInvocationExecutor {
//处理RMI远程调用请求
public Object invoke(RemoteInvocation invocation, Object targetObject)
throws NoSuchMethodException, IllegalAccessException, InvocationTargetException{
Assert.notNull(invocation, "RemoteInvocation must not be null");
Assert.notNull(targetObject, "Target object must not be null");
//调用RemoteInvocation处理RMI远程调用请求
return invocation.invoke(targetObject);
}
}
9.RemoteInvocation处理RMI远程调用请求:
RemoteInvocation的invoke方法处理RMI远程调用请求,并返回远程调用处理后的结果,其源码如下:
public Object invoke(Object targetObject)
throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
//使用JDK反射机制获取远程调用服务端目标对象的方法
Method method = targetObject.getClass().getMethod(this.methodName, this.parameterTypes);
//使用JDK反射机制调用目标对象的方法
return method.invoke(targetObject, this.arguments);
}