摘要
本地缓存机制:作为一个 RPC 框架,在网络抖动等原因而导致订阅失败时,Consumer能够调用本地缓存提供的可用的URL消费者,保证了服务的可靠性。
重试机制”就是在请求失败时,客户端重新发起一个一模一样的请求,尝试调用相同或不同的服务端,完成相应的业务操作。
一、本地缓存原理
Dubbo 架构图
- Provider 从容器启动后的初始化阶段便会向注册中心完成注册操作;
- Consumer 启动初始化阶段会完成对所需 Prov·ider 的订阅操作;
- 另外,在 Provider 发生变化时,需要通知监听的 Consumer。
Registry 只是 Consumer 和 Provider 感知彼此状态变化的一种便捷途径而已,它们彼此的实际通讯交互过程是直接进行的,对于 Registry 来说是透明无感的。Provider 状态发生变化了,会由 Registry 主动推送订阅了该 Provider 的所有 Consumer,这保证了 Consumer 感知 Provider 状态变化的及时性,也将和具体业务需求逻辑交互解耦,提升了系统的稳定性。
1. 1 本地缓存
作为一个 RPC 框架,Dubbo 在微服务架构中解决了各个服务间协作的难题;作为 Provider 和 Consumer 的底层依赖,它会与服务一起打包部署。dubbo-registry 也仅仅是其中一个依赖包,负责完成与 ZooKeeper、etcd、Consul 等服务发现组件的交互。
当 Provider 端暴露的 URL 发生变化时,ZooKeeper 等服务发现组件会通知 Consumer 端的 Registry 组件,Registry 组件会调用 notify() 方法,被通知的 Consumer 能匹配到所有 Provider 的 URL 列表并写入 properties 集合中。
// 注意入参,第一个URL参数表示的是Consumer,第二个NotifyListener是第一个参数对应的监听器,第三个参数是Provider端暴露的URL的全量数据
protected void notify(URL url, NotifyListener listener,
List<URL> urls) {
... // 省略一系列边界条件的检查
Map<String, List<URL>> result = new HashMap<>();
for (URL u : urls) {
// 需要Consumer URL与Provider URL匹配,具体匹配规则后面详述
if (UrlUtils.isMatch(url, u)) {
// 根据Provider URL中的category参数进行分类
String category = u.getParameter("category", "providers");
List<URL> categoryList = result.computeIfAbsent(category,
k -> new ArrayList<>());
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
Map<String, List<URL>> categoryNotified =
notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList); // 更新notified
listener.notify(categoryList); // 调用NotifyListener
// 更新properties集合以及底层的文件缓存
saveProperties(url);
}
}
在 saveProperties() 方法中会取出 Consumer 订阅的各个分类的 URL 连接起来(中间以空格分隔),然后以 Consumer 的 ServiceKey 为键值写到 properties 中,同时 lastCacheChanged 版本号会自增。完成 properties 字段的更新之后,会根据 syncSaveFile 字段值来决定是在当前线程同步更新 file 文件,还是向 registryCacheExecutor 线程池提交任务,异步完成 file 文件的同步。本地缓存文件的具体路径是:
/.dubbo/dubbo-registry-[当前应用名]-[当前Registry所在的IP地址].cache
AbstractRegistry 的核心是本地文件缓存的功能。 在 AbstractRegistry 的构造方法中,会调用 loadProperties() 方法将上面写入的本地缓存文件,加载到 properties 对象中。
在网络抖动等原因而导致订阅失败时,Consumer 端的 Registry 就可以调用 getCacheUrls() 方法获取本地缓存,从而得到最近注册的 Provider URL。可见,AbstractRegistry 通过本地缓存提供了一种容错机制,保证了服务的可靠性。
1.2 服务的注册/订阅
AbstractRegistry 实现了 Registry 接口,它实现的 registry() 方法会将当前节点要注册的 URL 缓存到 registered 集合,而 unregistry() 方法会从 registered 集合删除指定的 URL,例如当前节点下线的时候。
subscribe() 方法会将当前节点作为 Consumer 的 URL 以及相关的 NotifyListener 记录到 subscribed 集合,unsubscribe() 方法会将当前节点的 URL 以及关联的 NotifyListener 从 subscribed 集合删除。
单看 AbstractRegistry 的实现,上述四个基础的注册、订阅方法都是内存操作,但是 Java 有继承和多态的特性,AbstractRegistry 的子类会覆盖上述四个基础的注册、订阅方法进行增强。
1.3 服务的恢复/销毁
AbstractRegistry 中还有另外两个需要关注的方法:recover() 方法和destroy() 方法。
在 Provider 因为网络问题与注册中心断开连接之后,会进行重连,重新连接成功之后,会调用 recover() 方法将 registered 集合中的全部 URL 重新走一遍 register() 方法,恢复注册数据。同样,recover() 方法也会将 subscribed 集合中的 URL 重新走一遍 subscribe() 方法,恢复订阅监听器。
在当前节点下线的时候,会调用 Node.destroy() 方法释放底层资源。AbstractRegistry 实现的 destroy() 方法会调用 unregister() 方法和 unsubscribe() 方法将当前节点注册的 URL 以及订阅的监听全部清理掉,其中不会清理非动态注册的 URL(即 dynamic 参数明确指定为 false)。
二、重试机制原理
在真实的微服务系统中, ZooKeeper、etcd 等服务发现组件一般会独立部署成一个集群,业务服务通过网络连接这些服务发现节点,完成注册和订阅操作。但即使是机房内部的稳定网络,也无法保证两个节点之间的请求一定成功,因此 Dubbo 这类 RPC 框架在稳定性和容错性方面,就受到了比较大的挑战。为了保证服务的可靠性,重试机制就变得必不可少了。
所谓的 “重试机制”就是在请求失败时,客户端重新发起一个一模一样的请求,尝试调用相同或不同的服务端,完成相应的业务操作。能够使用重试机制的业务接口得是“幂等”的,也就是无论请求发送多少次,得到的结果都是一样的,例如查询操作。
2.1 FailbackRegistry原理解析
dubbo-registry 将重试机制的相关实现放到了 AbstractRegistry 的子类—— FailbackRegistry 中。如下图所示,接入 ZooKeeper、etcd 等开源服务发现组件的 Registry 实现,都继承了 FailbackRegistry,也就都拥有了失败重试的能力。
FailbackRegistry 设计核心是:覆盖了 AbstractRegistry 中 register()/unregister()、subscribe()/unsubscribe() 以及 notify() 这五个核心方法,结合前面介绍的时间轮,实现失败重试的能力;真正与服务发现组件的交互能力则是放到了 doRegister()/doUnregister()、doSubscribe()/doUnsubscribe() 以及 doNotify() 这五个抽象方法中,由具体子类实现。这是典型的模板方法模式的应用。
retryTimer(HashedWheelTimer 类型):用于定时执行失败重试操作的时间轮。
retryPeriod(int 类型):重试操作的时间间隔。
failedRegistered(ConcurrentMap<URL, FailedRegisteredTask>类型):注册失败的 URL 集合,其中 Key 是注册失败的 URL,Value 是对应的重试任务。
failedUnregistered(ConcurrentMap<URL, FailedUnregisteredTask>类型):取消注册失败的 URL 集合,其中 Key 是取消注册失败的 URL,Value 是对应的重试任务。
failedSubscribed(ConcurrentMap<Holder, FailedSubscribedTask>类型):订阅失败 URL 集合,其中 Key 是订阅失败的 URL + Listener 集合,Value 是相应的重试任务。
failedUnsubscribed(ConcurrentMap<URL, Set>类型):取消订阅失败的 URL 集合,其中 Key 是取消订阅失败的 URL + Listener 集合,Value 是相应的重试任务。
failedNotified(ConcurrentMap<Holder, FailedNotifiedTask>类型):通知失败的 URL 集合,其中 Key 是通知失败的 URL + Listener 集合,Value 是相应的重试任务。
在 FailbackRegistry 的构造方法中,首先会调用父类 AbstractRegistry 的构造方法完成本地缓存相关的初始化操作,然后从传入的 URL 参数中获取重试操作的时间间隔(即retry.period 参数)来初始化 retryPeriod 字段,最后初始化 retryTimer****时间轮。
2.2 核心方法源码分析
根据 registryUrl 中 accepts 参数指定的匹配模式,决定是否接受当前要注册的 Provider URL。
调用父类 AbstractRegistry 的 register() 方法,将 Provider URL 写入 registered 集合中。
调用 removeFailedRegistered() 方法和 removeFailedUnregistered() 方法,将该 Provider URL 从 failedRegistered 集合和 failedUnregistered 集合中删除,并停止相关的重试任务。
调用 doRegister() 方法,与服务发现组件进行交互。该方法由子类实现,每个子类只负责接入一个特定的服务发现组件。
在 doRegister() 方法出现异常的时候,会根据 URL 参数以及异常的类型,进行分类处理:待注册 URL 的 check 参数为 true(默认值为 true);待注册的 URL 不是 consumer 协议;registryUrl 的 check 参数也为 true(默认值为 true)。若满足这三个条件或者抛出的异常为 SkipFailbackWrapperException,则直接抛出异常。否则,就会创建重试任务并添加到 failedRegistered 集合中。
2.2.1 Registry()方法
public void register(URL url) {
if (!acceptable(url)) {
logger.info("..."); // 打印相关的提示日志
return;
}
super.register(url); // 完成本地文件缓存的初始化
// 清理failedRegistered集合和failedUnregistered集合,并取消相关任务
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
doRegister(url); // 与服务发现组件进行交互,具体由子类实现
} catch (Exception e) {
Throwable t = e;
// 检测check参数,决定是否直接抛出异常
boolean check = getUrl().getParameter(Constants.CHECK_KEY,
true) && url.getParameter(Constants.CHECK_KEY, true)
&& !CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof
SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register");
}
// 如果不抛出异常,则创建失败重试的任务,并添加到failedRegistered集合中
addFailedRegistered(url);
}
}
当 Provider 向 Registry 注册 URL 的时候,如果注册失败,且未设置 check 属性,则创建一个定时任务,添加到时间轮中。
2.2.2 addFailedRegistered() 方法
private void addFailedRegistered(URL url) {
FailedRegisteredTask oldOne = failedRegistered.get(url);
if (oldOne != null) { // 已经存在重试任务,则无须创建,直接返回
return;
}
FailedRegisteredTask newTask = new FailedRegisteredTask(url,
this);
oldOne = failedRegistered.putIfAbsent(url, newTask);
if (oldOne == null) {
// 如果是新建的重试任务,则提交到时间轮中,等待retryPeriod毫秒后执行
retryTimer.newTimeout(newTask, retryPeriod,
TimeUnit.MILLISECONDS);
}
}
2.3 重试任务
FailbackRegistry.addFailedRegistered() 方法中创建的 FailedRegisteredTask 任务以及其他的重试任务,都继承了 AbstractRetryTask 抽象类,
在 AbstractRetryTask 中维护了当前任务关联的 URL、当前重试的次数等信息,在其 run() 方法中,会根据重试 URL 中指定的重试次数(retry.times 参数,默认值为 3)、任务是否被取消以及时间轮的状态,决定此次任务的 doRetry() 方法是否正常执行。
public void run(Timeout timeout) throws Exception {
if (timeout.isCancelled() || timeout.timer().isStop() || isCancel()) {
// 检测定时任务状态和时间轮状态
return;
}
if (times > retryTimes) { // 检查重试次数
logger.warn("...");
return;
}
try {
doRetry(url, registry, timeout); // 执行重试
} catch (Throwable t) {
reput(timeout, retryPeriod); // 重新添加定时任务,等待重试
}
}
如果任务的 doRetry() 方法执行出现异常,AbstractRetryTask 会通过 reput() 方法将当前任务重新放入时间轮中,并递增当前任务的执行次数。
protected void reput(Timeout timeout, long tick) {
if (timeout == null) { // 边界检查
throw new IllegalArgumentException();
}
Timer timer = timeout.timer(); // 检查定时任务
if (timer.isStop() || timeout.isCancelled() || isCancel()) {
return;
}
times++; // 递增times
// 添加定时任务
timer.newTimeout(timeout.task(), tick, TimeUnit.MILLISECONDS);
}
AbstractRetryTask 将 doRetry() 方法作为抽象方法,留给子类实现具体的重试逻辑,这也是模板方法的使用。
在子类 FailedRegisteredTask 的 doRetry() 方法实现中,会再次执行关联 Registry 的 doRegister() 方法,完成与服务发现组件交互。如果注册成功,则会调用 removeFailedRegisteredTask() 方法将当前关联的 URL 以及当前重试任务从 failedRegistered 集合中删除。如果注册失败,则会抛出异常,执行上文介绍的 reput ()方法重试。
protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) {
registry.doRegister(url); // 重新注册
registry.removeFailedRegisteredTask(url); // 删除重试任务
}
public void removeFailedRegisteredTask(URL url) {
failedRegistered.remove(url);
}
另外,在 register() 方法入口处,会主动调用 removeFailedRegistered() 方法和RemoveFailedUnregistered() 方法来清理指定 URL 关联的定时任务:
public void register(URL url) {
super.register(url);
removeFailedRegistered(url); // 清理FailedRegisteredTask定时任务
removeFailedUnregistered(url); // 清理FailedUnregisteredTask定时任务
try {
doRegister(url);
} catch (Exception e) {
addFailedRegistered(url);
}
}
2.3.1 AbstractRegistry.notify()
protected void notify(URL url, NotifyListener listener,
List<URL> urls) {
... // 检查url和listener不为空(略)
try {
// FailbackRegistry.doNotify()方法实际上就是调用父类
// AbstractRegistry.notify()方法,没有其他逻辑
doNotify(url, listener, urls);
} catch (Exception t) {
// doNotify()方法出现异常,则会添加一个定时任务
addFailedNotified(url, listener, urls);
}
}
addFailedNotified() 方法会创建相应的 FailedNotifiedTask 任务,添加到 failedNotified 集合中,同时也会添加到时间轮中等待执行。如果已存在相应的 FailedNotifiedTask 重试任务,则会更新任务需要处理的 URL 集合。
在 FailedNotifiedTask 中维护了一个 URL 集合,用来记录当前任务一次运行需要通知的 URL,每执行完一次任务,就会清空该集合,具体实现如下:
protected void doRetry(URL url, FailbackRegistry registry,
Timeout timeout) {
// 如果urls集合为空,则会通知所有Listener,该任务也就啥都不做了
if (CollectionUtils.isNotEmpty(urls)) {
listener.notify(urls);
urls.clear();
}
reput(timeout, retryPeriod); // 将任务重新添加到时间轮中等待执行
}
从上面的代码可以看出,FailedNotifiedTask 重试任务一旦被添加,就会一直运行下去,但真的是这样吗?在 FailbackRegistry 的 subscribe()、unsubscribe() 方法中,可以看到 removeFailedNotified() 方法的调用,这里就是清理 FailedNotifiedTask 任务的地方。我们以 FailbackRegistry.subscribe() 方法为例进行介绍:
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
removeFailedSubscribed(url, listener); // 关注这个方法
try {
doSubscribe(url, listener);
} catch (Exception e) {
addFailedSubscribed(url, listener);
}
}
// removeFailedSubscribed()方法中会清理FailedSubscribedTask、FailedUnsubscribedTask、FailedNotifiedTask三类定时任务
private void removeFailedSubscribed(URL url, NotifyListener listener) {
Holder h = new Holder(url, listener); // 清理FailedSubscribedTask
FailedSubscribedTask f = failedSubscribed.remove(h);
if (f != null) {
f.cancel();
}
removeFailedUnsubscribed(url, listener);// 清理FailedUnsubscribedTask
removeFailedNotified(url, listener); // 清理FailedNotifiedTask
}
介绍完 FailbackRegistry 中最核心的注册/订阅实现之后,我们再来关注其实现的恢复功能,也就是 recover() 方法。该方法会直接通过 FailedRegisteredTask 任务处理 registered 集合中的全部 URL,通过 FailedSubscribedTask 任务处理 subscribed 集合中的 URL 以及关联的 NotifyListener。
FailbackRegistry 在生命周期结束时,会调用自身的 destroy() 方法,其中除了调用父类的 destroy() 方法之外,还会调用时间轮(即 retryTimer 字段)的 stop() 方法,释放时间轮相关的资源。