Dubbo源码阅读分享系列文章,欢迎大家关注点赞 Dubbo的RPC其实是对Protocol的封装,整体的结构与Remoting类似,dubbo-rpc-api是对具体协议、服务暴露、服务引用、代理等的抽象,是整个RPC中的核心,其他模块是对该层具体的实现,每个模块都是Dubbo具体支持的协议。
dubbo-rpc-api整体模块如图所示,整体接口包括了filter、listener、protocol、proxy、support以及核心API,接下来我们先来看下核心的接口介绍。 开始之前我们来先来回顾一下之前介绍RPC请求的过程,
这里为什么要回顾整个过程,这样后面介绍抽象的接口的时候大家会更更容易理解为什么这么抽象。 Invoker接口内部有三个方法,分别是getInterface、invoke、destroyAll,getInterface该方法主要是获取服务接口相关的信息,invoke主要是发起一次调用以及相应信息,destroyAll主要用于销毁调用请求。 Invocation是invoke的参数,内部抽象了RPC调用的目标服务、方法信息、相关参数信息、具体的参数值以及一些附加信息。 Result接口是Invoker.invoke方法的返回值,该返回值包含了被调用方返回值(或是异常)以及附加信息,我们也可以添加回调方法,在 RPC 调用方法结束时会触发这些回调。 Exporter暴露Invoker的实现,就是让Provider能够根据请求的各种信息,找到对应的Invoker的实现。 Protocol接口主要有三个核心方法export、refer以及destroy,export主要是将Invoker服务暴露出去,refer引用一个服务将Invoker对象返回,destroy主要是销毁Invoker,释放Protocol对底层的占用。Protocol接口的实现中,export方法并不是简单地将Invoker对象包装成Exporter对象返回,其中还涉及代理对象的创建、底层Server的启动等操作;refer方法除了根据传入的type类型以及URL参数查询Invoker之外,还涉及相关Client的创建等操作。
此外该接口被SPI修饰,export和refer被Adaptive修饰,因此对于Protocol可以动态选择实现,此外Dubbo也提供多种Protocol实现。 Filter接口用来拦截Dubbo请求,定义了一个invoke方法将请求传递给后续的Invoker进行处理。
ProxyFactory接口主要的功能是用来创建代理对象,此外ProxyFactory也是一个扩展接口,getProxy方法为Invoker创建代理对象,getInvoker方法将代理对象转为Invoker对象,默认采用javassist生成代理对象,Dubbo还提供很多种实现,可以通过SPI配置进行自定义。 欢迎大家点点关注,点点赞!
前言
SPI实现部分
注册中心
通信
RPC
整体介绍
dubbo-rpc-api
核心接口
Invoker
public interface Invoker<T> extends Node {
//获取服务接口
Class<T> getInterface();
//发起调用
Result invoke(Invocation invocation) throws RpcException;
//销毁调用连接
default void destroyAll() {
destroy();
}
}Invocation
public interface Invocation {
//调用Service的唯一标识
String getTargetServiceUniqueName();
String getProtocolServiceKey();
//调用的方法名称
String getMethodName();
//服务名称
String getServiceName();
//参数类型集合
Class<?>[] getParameterTypes();
//参数签名集合
default String[] getCompatibleParamSignatures() {
return Stream.of(getParameterTypes())
.map(Class::getName)
.toArray(String[]::new);
}
//调用具体的参数值
Object[] getArguments();
//调用关联的Invoker对象
Map<String, String> getAttachments();
@Experimental("Experiment api for supporting Object transmission")
Map<String, Object> getObjectAttachments();
void setAttachment(String key, String value);
@Experimental("Experiment api for supporting Object transmission")
void setAttachment(String key, Object value);
@Experimental("Experiment api for supporting Object transmission")
void setObjectAttachment(String key, Object value);
void setAttachmentIfAbsent(String key, String value);
@Experimental("Experiment api for supporting Object transmission")
void setAttachmentIfAbsent(String key, Object value);
@Experimental("Experiment api for supporting Object transmission")
void setObjectAttachmentIfAbsent(String key, Object value);
/**
* get attachment by key.
*
* @return attachment value.
* @serial
*/
String getAttachment(String key);
@Experimental("Experiment api for supporting Object transmission")
Object getObjectAttachment(String key);
/**
* get attachment by key with default value.
*
* @return attachment value.
* @serial
*/
String getAttachment(String key, String defaultValue);
@Experimental("Experiment api for supporting Object transmission")
Object getObjectAttachment(String key, Object defaultValue);
/**
* get the invoker in current context.
*
* @return invoker.
* @transient
*/
Invoker<?> getInvoker();
//Invoker对象可以设置一些KV属性,这些属性并不会传递给Provider
Object put(Object key, Object value);
Object get(Object key);
Map<Object, Object> getAttributes();
}Result
public interface Result extends Serializable {
//调用的返回值
Object getValue();
void setValue(Object value);
//异常处理方法
Throwable getException();
void setException(Throwable t);
boolean hasException();
//复合操作,如果调用发生异常,则直接抛出异常,如果没有异常,则返回结果
Object recreate() throws Throwable;
//携带附加信息
Map<String, String> getAttachments();
@Experimental("Experiment api for supporting Object transmission")
Map<String, Object> getObjectAttachments();
void addAttachments(Map<String, String> map);
@Experimental("Experiment api for supporting Object transmission")
void addObjectAttachments(Map<String, Object> map);
void setAttachments(Map<String, String> map);
@Experimental("Experiment api for supporting Object transmission")
void setObjectAttachments(Map<String, Object> map);
String getAttachment(String key);
@Experimental("Experiment api for supporting Object transmission")
Object getObjectAttachment(String key);
String getAttachment(String key, String defaultValue);
@Experimental("Experiment api for supporting Object transmission")
Object getObjectAttachment(String key, Object defaultValue);
void setAttachment(String key, String value);
@Experimental("Experiment api for supporting Object transmission")
void setAttachment(String key, Object value);
@Experimental("Experiment api for supporting Object transmission")
void setObjectAttachment(String key, Object value);
//添加回调 当RPC调用完成时,会触发回调
Result whenCompleteWithContext(BiConsumer<Result, Throwable> fn);
<U> CompletableFuture<U> thenApply(Function<Result, ? extends U> fn);
//阻塞线程,等待此次RPC调用完成
Result get() throws InterruptedException, ExecutionException;
Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}Exporter
public interface Exporter<T> {
//获取Invoker对象
Invoker<T> getInvoker();
//取消Invoker对象
void unexport();
}Protocol
@SPI("dubbo")
public interface Protocol {
//默认端口
int getDefaultPort();
//将一个Invoker暴露,该方法必须是幂等的
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
//引用一个Invoker,返回一个Invoker对象
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
//销毁export方法以及refer方法使用到的Invoker对象,释放当前Protocol对象底层占用的资源
void destroy();
//返回当前Protocol底层的全部ProtocolServer
default List<ProtocolServer> getServers() {
return Collections.emptyList();
}
}Filter
@SPI
public interface Filter {
//将请求传给后续的Invoker处理
Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;
//监听响应以及异常
interface Listener {
void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation);
void onError(Throwable t, Invoker<?> invoker, Invocation invocation);
}
}ProxyFactory
@SPI("javassist")
public interface ProxyFactory {
//将Invoker对象转为代理对象
@Adaptive({PROXY_KEY})
<T> T getProxy(Invoker<T> invoker) throws RpcException;
@Adaptive({PROXY_KEY})
<T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;
//将proxy对象转为Invoker
@Adaptive({PROXY_KEY})
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}结束