Dubbo注册中心-订阅/发布
订阅与发布是整个注册中心的核心功能之一。在传统应用系统中,配置文件中配置需要变更时会修改配置文件,当服务节点数量不断上升时,就会有很多弊端。
我们使用注册中心,解决该问题。当一个已有服务提供者节点下线,或者一个新的服务提供者节点加入微服务环境时,订阅对应接口的消费者和服务治理中心都能及时收到注册中心的通知,并更新本地的配置信息。
ZooKeeper的实现
发布的实现
zkClient.create(toUrlPath(url)),
url.getParameter(Constants.DYNAMIC_KEY,true));
取消发布:
zkClient.delete(toUrlPath(url));
订阅的实现
订阅通常用pull和push两种方式,一种是客户端定时轮询注册中心拉取配置,另一种是注册中心主动推送数据给客户端。Dubbo采用的是第一次启动拉取方式,后续接收事件重新拉取数据。
在服务暴露时,服务端会订阅configurators用于监听动态配置,在消费端启动时,消费端会订阅providers、routers和configurators这三个目录,分别对应服务提供者、路由和动态配置变量通知。
ZooKeeper注册中心采用的是“事件通知”+“客户端拉取”的方式,客户端在第一次连接上注册中心时,会获取对应目录下全量的数据。并在订阅的节点上注册一个watcher,客户端与注册中心之间保持TCP长连接,后续每个节点有任何数据变化的时候,注册中心会根据watcher的回调主动通知客户端,客户端接到通知后,会把对应节点下的全量数据都拉取过来。
ZoooKeeper全量订阅服务代码:(来自ZooKeeperRegistry)
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
}
});
zkListener = listeners.get(listener);
}
zkClient.create(root, false);
List<String> services = zkClient.addChildListener(root, zkListener);
if (services != null && !services.isEmpty()) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
}