1. 背景
在Hadoop2.0前,NameNode存在单点问题,造成服务稳定性差。Hadoop2.0后,引入HA机制,通过zk选举的方式选举active节点提供服务。
在https://blog.51cto.com/u_15327484/7850359一文中,介绍过resourmanager高可用过程。NameNode HA在选举流程上和resourmanager一致,但是,为了降低复杂度,同时也为了规避选举过程对namenode的服务稳定性影响。HDFS讲NameNode选举流程独立到zkfc进程中。如下是NameNode HA架构图:
2. NameNode启动时状态分析
在NameNode启动时,会创建初始的NameNode状态,如下所示,默认为STANDBY_STATE。然后进入STANDBY_STATE状态:
HAState state = createHAState(getStartupOption(conf));
protected HAState createHAState(StartupOption startOpt) {
if (!haEnabled || startOpt == StartupOption.UPGRADE
|| startOpt == StartupOption.UPGRADEONLY) {
return ACTIVE_STATE;
} else if (startOpt == StartupOption.OBSERVER) {
return OBSERVER_STATE;
} else {
return STANDBY_STATE;
}
}
state.enterState(haContext);
最后执行NameNode.startStandbyServices开始启动Standby状态的服务线程,例如checkpoint相关。在上一篇文章中已经详细解释了相关逻辑:https://blog.51cto.com/u_15327484/8122340
public void startStandbyServices() throws IOException {
try {
namesystem.startStandbyServices(getConf(),
state == NameNode.OBSERVER_STATE);
} catch (Throwable t) {
doImmediateShutdown(t);
}
}
3. ZKFC初始化
在两个NameNode启动后,都处于Standby状态。后续会在两台namenode上启动zkfc进程:
hadoop/sbin/hadoop-daemon.sh start zkfc
进入~/hadoop/bin/hdfs脚本中,发现它启动了DFSZKFailoverController类:
zkfc)
HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true"
HADOOP_CLASSNAME='org.apache.hadoop.hdfs.tools.DFSZKFailoverController'
;;
DFSZKFailoverController.main方法创建DFSZKFailoverController对象,执行它的run方法:
DFSZKFailoverController zkfc = DFSZKFailoverController.create(
parser.getConfiguration());
System.exit(zkfc.run(parser.getRemainingArgs()));
最终循环执行ZKFailoverController.doRun方法。正式控制NameNode主备状态:
private int doRun(String[] args)
throws Exception {
try {
initZK();
} catch (KeeperException ke) {
LOG.error("Unable to start failover controller. Unable to connect "
+ "to ZooKeeper quorum at " + zkQuorum + ". Please check the "
+ "configured value for " + ZK_QUORUM_KEY + " and ensure that "
+ "ZooKeeper is running.", ke);
return ERR_CODE_NO_ZK;
}//省略
try {
//RPC服务初始化
initRPC();
//启动HealthMonitor线程
initHM();
startRPC();
mainLoop();
} catch (Exception e) {
LOG.error("The failover controller encounters runtime error: ", e);
throw e;
} finally {
rpcServer.stopAndJoin();
elector.quitElection(true);
healthMonitor.shutdown();
healthMonitor.join();
}
return 0;
}
initZK主要负责读取core-site.xml中zk的地址配置ha.zookeeper.quorum,根据地址创建ActiveStandbyElector对象:
//读取ha.zookeeper.quorum
zkQuorum = conf.get(ZK_QUORUM_KEY);
int zkTimeout = conf.getInt(ZK_SESSION_TIMEOUT_KEY,
ZK_SESSION_TIMEOUT_DEFAULT);
// Parse ACLs from configuration.
String zkAclConf = conf.get(ZK_ACL_KEY, ZK_ACL_DEFAULT);
zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
List<ACL> zkAcls = ZKUtil.parseACLs(zkAclConf);
if (zkAcls.isEmpty()) {
zkAcls = Ids.CREATOR_ALL_ACL;
}
//创建ActiveStandbyElector对象
elector = new ActiveStandbyElector(zkQuorum,
zkTimeout, getParentZnode(), zkAcls, zkAuths,
new ElectorCallbacks(), maxRetryNum);
ActiveStandbyElector在ResourceManager中的功能一致,可以参考文章:https://blog.51cto.com/u_15327484/7850359。
创建ActiveStandbyElector时,就会创建zk的连接:
// establish the ZK Connection for future API calls
if (failFast) {
createConnection();
} else {
reEstablishSession();
}
最终执行connectToZooKeeper方法,将WatcherWithClientRef注册到zk客户端中:
protected synchronized ZooKeeper connectToZooKeeper() throws IOException,
KeeperException {
watcher = new WatcherWithClientRef();
ZooKeeper zk = createZooKeeper();
watcher.setZooKeeperRef(zk);
// Wait for the asynchronous success/failure. This may throw an exception
// if we don't connect within the session timeout.
watcher.waitForZKConnectionEvent(zkSessionTimeout);
for (ZKAuthInfo auth : zkAuthInfo) {
zk.addAuthInfo(auth.getScheme(), auth.getAuth());
}
return zk;
}
WatcherWithClientRef.processWatchEvent方法根据zk返回出来的状态,从而决定切换至Active还是Standby状态:
if (eventType == Event.EventType.None) {
// the connection state has changed
switch (event.getState()) {
case SyncConnected:
LOG.info("Session connected.");
// if the listener was asked to move to safe state then it needs to
// be undone
ConnectionState prevConnectionState = zkConnectionState;
zkConnectionState = ConnectionState.CONNECTED;
if (prevConnectionState == ConnectionState.DISCONNECTED &&
wantToBeInElection) {
monitorActiveStatus();
}
break;
case Disconnected:
LOG.info("Session disconnected. Entering neutral mode...");
// ask the app to move to safe state because zookeeper connection
// is not active and we dont know our state
zkConnectionState = ConnectionState.DISCONNECTED;
enterNeutralMode();
break;
case Expired:
// the connection got terminated because of session timeout
// call listener to reconnect
LOG.info("Session expired. Entering neutral mode and rejoining...");
enterNeutralMode();
reJoinElection(0);
break;
case SaslAuthenticated:
LOG.info("Successfully authenticated to ZooKeeper using SASL.");
break;
default:
fatalError("Unexpected Zookeeper watch event state: "
+ event.getState());
break;
}
ZKFailoverController
初始化ZK客户端结束后,调用initHM启动HealthMonitor线程:
private void initHM() {
healthMonitor = new HealthMonitor(conf, localTarget);
healthMonitor.addCallback(new HealthCallbacks());
healthMonitor.addServiceStateCallback(new ServiceStateCallBacks());
healthMonitor.start();
}
4. ZKFC监控NameNode状态并切换NameNode状态
它会启动MonitorDaemon守护线程。它会先连接NameNode,再通过RPC请求获取NameNode状态:
public void run() {
while (shouldRun) {
try {
//连接NameNode
loopUntilConnected();
//再检测NameNode状态
doHealthChecks();
} catch (InterruptedException ie) {
Preconditions.checkState(!shouldRun,
"Interrupted but still supposed to run");
}
}
}
}
不管是健康状态还是不健康状态,都会调用enterState方法,在enterState方法中,会对新旧进行判断,如果不相等,则对调用相应的回调方法进行处理。 比如原来的状态是健康状态,后来namenode挂掉了,则是非健康状态了,这时候就要调用ZKFailoverController的相应方法进行主备选举了。具体处理逻辑不深入研究:
private void doHealthChecks() throws InterruptedException {
while (shouldRun) {
HAServiceStatus status = null;
boolean healthy = false;
try {
status = proxy.getServiceStatus();
//调用和namenode交互的协议HAServiceProtocol的monitorHealth方法发送远程的rpc请求检查namenode的状态,具体的实现方法是NameNodeRpcServer中的同名方法
proxy.monitorHealth();
healthy = true;
} catch (Throwable t) {
if (isHealthCheckFailedException(t)) {
//服务运行但是出于不健康状态
LOG.warn("Service health check failed for " + targetToMonitor
+ ": " + t.getMessage());
enterState(State.SERVICE_UNHEALTHY);
} else {
//无响应
LOG.warn("Transport-level exception trying to monitor health of " +
targetToMonitor + ": " + t.getCause() + " " + t.getLocalizedMessage());
RPC.stopProxy(proxy);
proxy = null;
enterState(State.SERVICE_NOT_RESPONDING);
Thread.sleep(sleepAfterDisconnectMillis);
return;
}
}
if (status != null) {
setLastServiceStatus(status);
}
//正常状态
if (healthy) {
enterState(State.SERVICE_HEALTHY);
}
Thread.sleep(checkIntervalMillis);
}
}
进行主备选举时,实际上就是调用zkClient.create创建临时节点。它会调用ActiveStandbyElector.processResult处理结果。
public synchronized void processResult(int rc, String path, Object ctx,
String name) {
if (isStaleClient(ctx)) return;
if (LOG.isDebugEnabled()) {
LOG.debug("CreateNode result: " + rc + " for path: " + path
+ " connectionState: " + zkConnectionState +
" for " + this);
}
//如果成功创建临时节点,就进入active状态
Code code = Code.get(rc);
if (isSuccess(code)) {
// we successfully created the znode. we are the leader. start monitoring
if (becomeActive()) {
monitorActiveStatus();
} else {
reJoinElectionAfterFailureToBecomeActive();
}
return;
}
//如果已存在临时节点,继续变成standby状态
if (isNodeExists(code)) {
if (createRetryCount == 0) {
// znode exists and we did not retry the operation. so a different
// instance has created it. become standby and monitor lock.
becomeStandby();
}
// if we had retried then the znode could have been created by our first
// attempt to the server (that we lost) and this node exists response is
// for the second attempt. verify this case via ephemeral node owner. this
// will happen on the callback for monitoring the lock.
monitorActiveStatus();
return;
}
//省略
}
这里的becomeActive()和becomeStandby()状态是重点:
private boolean becomeActive() {
assert wantToBeInElection;
if (state == State.ACTIVE) {
// already active
return true;
}
try {
//zk上创建BreadCrumbNode节点
Stat oldBreadcrumbStat = fenceOldActive();
writeBreadCrumbNode(oldBreadcrumbStat);
LOG.debug("Becoming active for {}", this);
//转化为active状态
appClient.becomeActive();
state = State.ACTIVE;
return true;
} catch (Exception e) {
LOG.warn("Exception handling the winning of election", e);
// Caller will handle quitting and rejoining the election.
return false;
}
}
对于ActiveStandbyElectorCallback类型的appClient有两个实现。在ResourceManager的高可用中是ActiveStandbyElectorBasedElectorService,而在ZKFC中,它的实现是ElectorCallbacks:
ElectorCallbacks最终调用HAServiceProtocol.transitionToActive方法发送rpc请求,命令NameNode转化为active状态,对于StandBy状态的处理过程相同:
public static void transitionToActive(HAServiceProtocol svc,
StateChangeRequestInfo reqInfo)
throws IOException {
try {
svc.transitionToActive(reqInfo);
} catch (RemoteException e) {
throw e.unwrapRemoteException(ServiceFailedException.class);
}
}
5. NameNode脑裂问题预防机制
5.1 脑裂处理逻辑
当网络抖动时,zkfc检测不到Active NameNode,此时认为NameNode挂掉了,因此standby Namenode切换成为Active Namenode。而旧的Active NameNode由于网络抖动,接收不到zkfc的切换命令。此时两个NameNode都是active状态。这就是脑裂。
NameNode HA中,如果zkClient.create成功创建了临时节点hadoop-ha/${dfs.nameservices}/ActiveStandbyElectorLock
,在转换为Active状态时:
- 新晋NameNode对应的zkfc进程会执行隔离操作,例如通过SSH将旧的Active NameNode进程杀死。
- zkfc会额外创建持久节点
/hadoop-ha/${dfs.nameservices}/ActiveBreadCrumb
。
private boolean becomeActive() {
assert wantToBeInElection;
if (state == State.ACTIVE) {
// already active
return true;
}
try {
Stat oldBreadcrumbStat = fenceOldActive();
writeBreadCrumbNode(oldBreadcrumbStat);
//省略
}
默认情况下,如果active namenode退出后,会删除/hadoop-ha/${dfs.nameservices}/ActiveBreadCrumb
节点。如果没有删除,就表示/hadoop-ha/${dfs.nameservices}/ActiveBreadCrumb
和当前active namenode节点不一致。执行 fenceOldActive方法隔离旧的active namenode:
if (Arrays.equals(data, appData)) {
LOG.info("But old node has our own data, so don't need to fence it.");
} else {
appClient.fenceOldActive(data);
}
resourmanager中,fenceOldActive方法为空,即不处理脑裂问题:
public void fenceOldActive(byte[] oldActiveData) {
if (LOG.isDebugEnabled()) {
LOG.debug("Request to fence old active being ignored, " +
"as embedded leader election doesn't support fencing");
}
}
NameNode会进行处理:
public void fenceOldActive(byte[] data) {
ZKFailoverController.this.fenceOldActive(data);
}
ZKFailoverController.doFence方法:
- 尝试通过rpc请求切换到standby状态,并检查状态。
- 如果方法一失败,就执行ssh隔离脑裂节点。
private void doFence(HAServiceTarget target) {
LOG.info("Should fence: " + target);
//尝试通过rpc请求切换到standby状态
boolean gracefulWorked = new FailoverController(conf,
RequestSource.REQUEST_BY_ZKFC).tryGracefulFence(target);
if (gracefulWorked) {
// It's possible that it's in standby but just about to go into active,
// no? Is there some race here?
LOG.info("Successfully transitioned " + target + " to standby " +
"state without fencing");
return;
}
try {
//检查状态
target.checkFencingConfigured();
} catch (BadFencingConfigurationException e) {
LOG.error("Couldn't fence old active " + target, e);
recordActiveAttempt(new ActiveAttemptRecord(false, "Unable to fence old active"));
throw new RuntimeException(e);
}
//最后执行fence方法,通过ssh方式隔离进程
if (!target.getFencer().fence(target)) {
throw new RuntimeException("Unable to fence " + target);
}
}
可以通过脚本或者端口设置的方式进行隔离:
- 通过脚本的方式,可以自行设置隔离逻辑,例如关闭NameNode后,启动NameNode。
- 通过端口的方式,通过ssh会直接kill 旧的active NameNode进程,最简单。
<property>
<name>dfs.ha.fencing.methods</name>
<value>sshfence
shell(/data/hadoop/scripts/fence.sh)</value>
</property>
<property>
<name>dfs.ha.fencing.methods</name>
<value>sshfence(hadoop:32200)</value>
</property>
隔离完旧的active NameNode进程后,会重新向zk写入最新的持久节点/hadoop-ha/${dfs.nameservices}/ActiveBreadCrumb
。
5.2 脑裂的拖底策略
每个NameNode 与 JournalNodes通信时,需要带一个 epoch numbers(epoch numbers 是唯一的且只增不减)。而每个JournalNode 都有一个本地的promised epoch。拥有值大的epoch numbers 的NameNode会使得JournalNode提升自己的 promised epoch,从而占大多数,而epoch numbers较小的那个NameNode就成了少数派(Paxos协议思想)。
从而epoch number值大的NameNode才是真正的Active NameNode,拥有写JournalNode的权限。这样旧的active namenode的epoch number较小,journalnode禁止写入这样的edits,从而避免污染已有的fsimage。
标签:状态,return,LOG,state,NameNode,active,Namenode,HA,原理 From: https://blog.51cto.com/u_15327484/8129844