1. 背景
一年前写过的一篇文章:https://blog.51cto.com/u_15327484/5046768,介绍了ResourceManager的启动流程。文章中介绍了ResourceManager的选举流程,但是行文逻辑较混乱。本文在此基础上,更清晰地介绍了resourcemanager的高可用原理,希望能够达到小白也能看懂的程度。
2. Zookeeper相关背景知识
2.1 Watcher通知
使用场景:客户端通过注册watcher,当zookeeper的某一路径状态发生变化时,会向客户端发送watcher事件,客户端中,通过Watcher#process处理该通知。
Watcher使用:
方法一:创建zookeeper客户端时指定,它会作为默认的watcher请求监听指定路径:
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException {
this(connectString, sessionTimeout, watcher, false);
}
方法二:通过zookeeper客户端api调用时指定watcher:
public void getData(String path, boolean watch, DataCallback cb, Object ctx)
public Stat exists(String path, boolean watch)
public List<String> getChildren(String path, boolean watch)
Zookeeper客户端线程模型:
在创建客户端Zookeeper对象的过程中,会创建连接相关线程并启动:
cnxn = createConnection(
connectStringParser.getChrootPath(),
hostProvider,
sessionTimeout,
this,
watchManager,
getClientCnxnSocket(),
canBeReadOnly);
cnxn.start();
具体会创建发送请求的线程和处理响应的线程:
sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
zookeeper客户端主线程会将请求封装成Packet,交给发送线程sendThread处理:
public Packet queuePacket(...){
Packet packet = null;
packet = new Packet(h, r, request, response, watchRegistration);
...
packet.watchDeregistration = watchDeregistration;
...
outgoingQueue.add(packet);
...
sendThread.getClientCnxnSocket().packetAdded();
}
当zk服务端触发事件并返回给客户端时,发送线程sendThread获得了响应,将响应交付给eventThread处理:
public void run() {
try {
isRunning = true;
// 无限循环
while (true) {
// 取出事件
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
// 处理事件
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
...
}
eventThread则调用对应的watcher.process处理通知事件:
private void processEvent(Object event) {
...
if (event instanceof WatcherSetEventPair) {
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
// 回调事件监听的处理
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
}
}
注意,一旦客户端接收了watcher事件,该watcher就失效。原因就是对于指定事件,已经处理过一次,无法重复处理同一事件,如果还需要处理,可以在watcher.process处理过程中,通过调用zk的api的方式,再次注册watcher。watcher在客户端和服务端的处理流程如下所示:
2.2 Callback回调
使用场景:客户端通过异步的方式,通过实现的processResult回调函数执行api的返回结果。回调是异步的,它对应的是同步。即,客户端不希望阻塞等待服务端响应,则通过回调的方式在异步线程中执行。
回调的使用:在zk客户端访问服务端时,可以指定回调函数,它会异步执行响应处理。如下,同步的方法,返回的是byte[]类型:
byte[] getData(String path, boolean watch,Stat stat)
异步的方法,返回的是void。因为真正处理响应的是AsyncCallback.DataCallback参数中的逻辑:
void getData(String path,Watcher watcher,AsyncCallback.DataCallback cb,Object ctx)
2.3 Watcher和Callback区别
watcher和callback虽然在功能看起来相似。但是在使用场景中,却是互补的关系。以ResourceManager为例:
- 在启动多个resourcemanager服务时,各个服务都会调用zkClient.create方法,向zookeeper同一路径创建EPHEMERAL节点,只有一个resourcemanager服务能够创建成功。
- 各个服务都会调用zkClient.create后,通过zk服务端的返回值,就知道自己是否成功创建了EPHEMERAL节点,成功了就称为了active节点,失败了就转换成standby节点。这个处理只需要使用Callback就行,watcher没必要监听该事件。
- 由于standby状态的resourcemanager可能会成为active,而active状态的resourcemanager可能因为OOM转换成为standby。我们无法知道什么时候这些事件会发生,这时Callback就无能为力了,必须使用watcher请求监听,发生事件后进行处理。
总结:Callback只能处理zk客户端api的响应;watcher则是客户端处理zk服务端的突发事件。
3. ResourceManager Callback处理流程
ResourceManager启动时,创建并启动了EmbeddedElector选举服务。默认情况下,选举服务的实现类是ActiveStandbyElectorBasedElectorService。
如果在yarn-site.xml中,设置yarn.resourcemanager.ha.curator-leader-elector.enabled参数为true,则实现类就是CuratorBasedElectorService,它基于curator框架,对zk的api进行了一些封装,使用起来更简单。
但是目前线上使用基于原生zk api的ActiveStandbyElectorBasedElectorService,因此本文会基于ActiveStandbyElectorBasedElectorService进行resourcemanager高可用流程的介绍:
protected EmbeddedElector createEmbeddedElector() throws IOException {
EmbeddedElector elector;
curatorEnabled =
conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
if (curatorEnabled) {
this.zkManager = createAndStartZKManager(conf);
elector = new CuratorBasedElectorService(this);
} else {
elector = new ActiveStandbyElectorBasedElectorService(this);
}
return elector;
}
ActiveStandbyElectorBasedElectorService服务启动时,经历如下调用流程:
ActiveStandbyElectorBasedElectorService#serviceStart -> ActiveStandbyElector#joinElection -> ActiveStandbyElector#joinElectionInternal -> ActiveStandbyElector#createLockNodeAsync,它会使用zkClient.create创建EPHEMERAL路径:
public class ActiveStandbyElector implements StatCallback, StringCallback {
private void createLockNodeAsync() {
zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL, this, zkClient);
}
}
回调方式就是ActiveStandbyElector#processResult实现:
- 如果是当前resourcemanager创建的EPHEMERAL节点,直接进入active状态。
- 如果EPHEMERAL节点被其他人创建,resourcemanager进入standby状态。
- 如果EPHEMERAL节点节点还没有被创建,则resourcemanager重新尝试创建EPHEMERAL节点。
public class ActiveStandbyElector implements StatCallback, StringCallback {
public synchronized void processResult(int rc, String path, Object ctx,
String name) {
//省略
if (isSuccess(code)) {
// we successfully created the znode. we are the leader. start monitoring
//尝试进入Active状态
if (becomeActive()) {
//验证
monitorActiveStatus();
} else {
//否则重新尝试创建zookeeper节点,以获得Active状态
reJoinElectionAfterFailureToBecomeActive();
}
return;
}
//如果创建节点失败,但是节点已经存在,就进入standby状态
if (isNodeExists(code)) {
if (createRetryCount == 0) {
becomeStandby();
}
monitorActiveStatus();
return;
}
//如果创建节点失败,节点尚未存在,就重试创建节点
if (shouldRetry(code)) {
if (createRetryCount < maxRetryNum) {
++createRetryCount;
createLockNodeAsync();
return;
}
//省略
}
}
4. ResourceManager watcher处理流程
经过回调方法的处理,每个resourcemanager都成功的切换了自己的状态。但是,后续standby状态的resourcemanager可能会成为active,而active状态的resourcemanager可能因为OOM转换成为standby。这种状态的切换,就需要听过zk事件的监听及处理。
在创建zookeeper客户端时,指定了默认的watcher:
protected synchronized ZooKeeper connectToZooKeeper() throws IOException, KeeperException {
watcher = new WatcherWithClientRef();
//把watcher注册到zookeeper中
ZooKeeper zk = createZooKeeper();
watcher.setZooKeeperRef(zk);
//省略
watcher.waitForZKConnectionEvent(zkSessionTimeout);
//省略
return zk;
}
watcher的process实现如下,它将事件交给ActiveStandbyElector进行处理:
private final class WatcherWithClientRef implements Watcher {
private ZooKeeper zk;
//只有收到zk服务端的返回的连接事件后,才允许处理其它事件
private CountDownLatch hasReceivedEvent = new CountDownLatch(1);
//只有等待watcher设置了zookeeper引用,才能处理事件
private CountDownLatch hasSetZooKeeper = new CountDownLatch(1);
//省略普通方法
//process是watcher处理zk事件的方法
@Override
public void process(WatchedEvent event) {
//省略
ActiveStandbyElector.this.processWatchEvent(zk, event);
//省略
}
}
首先,要了解不同状态下,Zookeeper服务端会触发的事件。如下表所示,例如:
- 当客户端和服务端处于连接状态下(SyncConnected),当客户端成功与服务端建立对话,服务端会向客户端发送None类型的事件。
- 当客户端和服务端处于连接状态下(SyncConnected),当数据节点的数据内容发生变更,服务端会向客户端发送NodeDataChanged类型的事件。
可以看到,当客户端与服务端建立会话,会话超时,断开连接。都会向客户端发送None事件:
ActiveStandbyElector#processWatchEvent方法中,则定义了resourcemanager处理不同事件的逻辑。大致分为以下几类处理思路:
- 当客户端与服务端session建立成功,连接断开,session过期。会让resourcemanager进入中立状态,一般进入standby状态,注册watcher等待重新选举。
- EPHEMERAL节点被删除,重新进行选举。
synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
Event.EventType eventType = event.getType();
if (isStaleClient(zk)) return;
if (LOG.isDebugEnabled()) {
LOG.debug("Watcher event type: " + eventType + " with state:"
+ event.getState() + " for path:" + event.getPath()
+ " connectionState: " + zkConnectionState
+ " for " + this);
}
//None事件
if (eventType == Event.EventType.None) {
// the connection state has changed
switch (event.getState()) {
//当客户端与服务端建立时,更新zk连接状态。如果之前连接是断开的,就进入重新选择合适的状态进行切换
case SyncConnected:
LOG.info("Session connected.");
// if the listener was asked to move to safe state then it needs to
// be undone
ConnectionState prevConnectionState = zkConnectionState;
zkConnectionState = ConnectionState.CONNECTED;
if (prevConnectionState == ConnectionState.DISCONNECTED &&
wantToBeInElection) {
monitorActiveStatus();
}
break;
//如果客户端与服务端断开连接,就进入中立状态,就是变成standby状态
case Disconnected:
LOG.info("Session disconnected. Entering neutral mode...");
// ask the app to move to safe state because zookeeper connection
// is not active and we dont know our state
zkConnectionState = ConnectionState.DISCONNECTED;
enterNeutralMode();
break;
//如果会话过期,重新连接并进入中立状态,变成standby状态
case Expired:
// the connection got terminated because of session timeout
// call listener to reconnect
LOG.info("Session expired. Entering neutral mode and rejoining...");
enterNeutralMode();
reJoinElection(0);
break;
case SaslAuthenticated:
LOG.info("Successfully authenticated to ZooKeeper using SASL.");
break;
default:
fatalError("Unexpected Zookeeper watch event state: "
+ event.getState());
break;
}
return;
}
// a watch on lock path in zookeeper has fired. so something has changed on
// the lock. ideally we should check that the path is the same as the lock
// path but trusting zookeeper for now
String path = event.getPath();
if (path != null) {
switch (eventType) {
//如果节点删除,就重新选举
case NodeDeleted:
if (state == State.ACTIVE) {
enterNeutralMode();
}
joinElectionInternal();
break;
//如果节点内容变化,就检测状态
case NodeDataChanged:
monitorActiveStatus();
break;
default:
if (LOG.isDebugEnabled()) {
LOG.debug("Unexpected node event: " + eventType + " for path: " + path);
}
monitorActiveStatus();
}
return;
}
// some unexpected error has occurred
fatalError("Unexpected watch error from Zookeeper");
}
上述流程中,最重要的应该是NodeDeleted事件,会执行joinElectionInternal方法,该方法会调用createLockNodeAsync,即zkClient.create方法尝试创建EPHEMERAL节点:
private void joinElectionInternal() {
Preconditions.checkState(appData != null,
"trying to join election without any app data");
if (zkClient == null) {
if (!reEstablishSession()) {
fatalError("Failed to reEstablish connection with ZooKeeper");
return;
}
}
createRetryCount = 0;
wantToBeInElection = true;
createLockNodeAsync();
}
5. 相关配置
<property>
<name>yarn.resourcemanager.recovery.enabled</name>
<value>true</value>
<description>Enable RM to recover state after starting. If true, then yarn.resourcemanager.store.class must be specified</description>
</property>
<property>
<name>yarn.resourcemanager.store.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
<description>The class to use as the persistent store.</description>
</property>
<property>
<name>yarn.resourcemanager.zk-address</name>
<value>hadoop34:2181,hadoop39:2181,hadoop40:2181</value>
<description>Comma separated list of Host:Port pairs. Each corresponds to a ZooKeeper server(e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002") to be used by the RM for storing RM state.This must be supplied when using org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore as the value for yarn.resourcemanager.store.class</description>
</property>
6. resourcemanager脑裂问题
对于zookeeper服务端,EPHEMERAL节点的生命周期和session一致,一旦session终止,EPHEMERAL节点就会被清理。因此,由于ZK Client发生full gc,或者机器负载过高,可能就会导致
Zookeeper Session Timeout,导致Session过期,EPHEMERAL节点被清除。向其他standby机器发送事件让其重新选举出新的leader。但是原先发生full gc的机器依然认为自己是leader。此时两个resourcemanager都是active,这就是脑裂现象。
对于脑裂的解决方法,官方暂时还没有实现。官方解释到:由于只有一个resourcemanager可以对zk进行修改,因此即使发生了脑裂,也不会污染zk中现存的数据。但是隐患是新提交的请求可能无法处理。
The ZooKeeper-based state store (
ZKRMStateStore
) allows only a single ResourceManager to make changes to the stored state, implicitly fencing the other ResourceManager. This is accomplished by the ResourceManager claiming exclusive create-delete permissions on the root znode. The ACLs on the rootznode
are automatically created based on the ACLs configured for the store; in case of secure clusters, Cloudera recommends that you set ACLs for the root host such that both ResourceManagers share read-write-admin access, but have exclusive create-delete access. The fencing is implicit and does not require explicit configuration (as fencing in HDFS does). You can plug in a custom "Fencer" if you choose to – for example, to use a different implementation of the state store.
7. 总结
- 每个resourcemanager服务启动时,会执行zkClient.create方法尝试在zk服务端创建EPHEMERAL节点。
- zkClient.create方法会通过异步的方式处理返回值,最终由ActiveStandbyElector#processResult处理。当resourcemanager成功创建了EPHEMERAL节点,就转换成为Active;其他节点就是standby。
- 调用zkClient同样会注册watcher,当resourcemanager服务关闭时,EPHEMERAL节点删除,zk生成NodeDeleted事件,让standby状态的resourcemanager进行处理。最终执行ActiveStandbyElector#processWatchEvent处理不同的事件,处理NodeDeleted事件时,依然会调用zkClient.create方法尝试创建EPHEMERAL节点。