1. 背景
之前写过Yarn状态机的两篇文章。
https://blog.51cto.com/u_15327484/4940200介绍了AsyncDispatcher线程,它提供以下机制:
- 通过调用它的register()方法注册不同类型事件对应的处理器,放入Map中。
- 通过调用它的handle()方法将具体的事件放入到事件队列BlockingQueue中。
- 内部eventHandlingThread线程负责消费BlockingQueue中的事件,它匹配事件对应的Handler,执行Handler的handle()方法。
https://blog.51cto.com/u_15327484/5015131介绍了状态机,它提供以下机制:
- ResourceManager维护了4个类型的状态机。RMApp(维护每个application状态),RMAppAttempt(维护每个application attempt状态),RMContainer(维护每个container状态)和RMNode(维护每个nodemanager状态)。
- EventHandler在执行时,内部往往会转换状态机的状态。它会调用状态机的doTransition方法,进行状态转换。
- 状态机会保存其转换方法,在初始化时进行构建。
2. YarnClient提交作业
通过ApplicationClientProtocol构建的反射对象,通过rpc提交submitApplication请求到服务端:
//通过ApplicatonClientProtocol发起提交应用请求
protected ApplicationClientProtocol rmClient;
@Override
public ApplicationId
submitApplication(ApplicationSubmissionContext appContext)
throws YarnException, IOException {
ApplicationId applicationId = appContext.getApplicationId();
if (applicationId == null) {
throw new ApplicationIdNotProvidedException(
"ApplicationId is not provided in ApplicationSubmissionContext");
}
SubmitApplicationRequest request =
Records.newRecord(SubmitApplicationRequest.class);
request.setApplicationSubmissionContext(appContext);
// Automatically add the timeline DT into the CLC
// Only when the security and the timeline service are both enabled
if (isSecurityEnabled() && timelineServiceEnabled) {
addTimelineDelegationToken(appContext.getAMContainerSpec());
}
//TODO: YARN-1763:Handle RM failovers during the submitApplication call.
rmClient.submitApplication(request);
int pollCount = 0;
long startTime = System.currentTimeMillis();
while (true) {
try {
YarnApplicationState state =
getApplicationReport(applicationId).getYarnApplicationState();
if (!state.equals(YarnApplicationState.NEW) &&
!state.equals(YarnApplicationState.NEW_SAVING)) {
LOG.info("Submitted application " + applicationId);
break;
}
//省略
}
return applicationId;
}
服务端经过RPC的反射,执行ApplicationClientProtocol的实现方法。最终通过RMAppImpl将RMAppEventType.START事件放入到AsyncDispatcher中:
protected void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime,
String user) throws YarnException {
ApplicationId applicationId = submissionContext.getApplicationId();
// Passing start time as -1. It will be eventually set in RMAppImpl
// constructor.
//创建RMAppImpl,并存入RMContext
RMAppImpl application = createAndPopulateNewRMApp(
submissionContext, submitTime, user, false, -1);
try {
if (UserGroupInformation.isSecurityEnabled()) {
this.rmContext.getDelegationTokenRenewer()
.addApplicationAsync(applicationId,
BuilderUtils.parseCredentials(submissionContext),
submissionContext.getCancelTokensWhenComplete(),
application.getUser(),
BuilderUtils.parseTokensConf(submissionContext));
} else {
// Dispatcher is not yet started at this time, so these START events
// enqueued should be guaranteed to be first processed when dispatcher
// gets started.
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.START));
}
} catch (Exception e) {
LOG.warn("Unable to parse credentials for " + applicationId, e);
// Sending APP_REJECTED is fine, since we assume that the
// RMApp is in NEW state and thus we haven't yet informed the
// scheduler about the existence of the application
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId,
RMAppEventType.APP_REJECTED, e.getMessage()));
throw RPCUtil.getRemoteException(e);
}
}
ResourceManager的内部类RMActiveService注册了ApplicationEventDispatcher处理RMAppEvent。内部使用RMAppImpl处理RMAppEvent,RMAppImpl添加start类型的RMAppEventType的处理Transition,为:
.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
RMAppEventType.START, new RMAppNewlySavingTransition())
它的意思是,对于处于RMAppState.NEW的应用,遇到RMAppEventType.START事件时,通过执行RMAppNewlySavingTransition.transition()方法,会将作业状态转变成为RMAppState.NEW_SAVING。
后续走了很多状态转换,这里先把它的转换过程列出来。后续只解析重要的状态转换:
- start类型的RMAppEvent。
- storeApp类型的RMStateStoreEvent,主要负责将app信息写入到zk中。
- newSaved类型的RMAppEvent。
- appAdded类型的SchedulerEvent,通过Scheduler进行调度。
- Accepted类型的RMAppEvent。
- app_accepted类型的RMAppEvent。
- start类型的RMAppAttemptEvent。
- appAttemptAdd类型的SchedulerEvent。
- AttemptAdded类型的RMAppAttemptEvent,通过Scheduler进行调度。
- storeAppAttempt类型的RMStateStoreEvent,主要负责将app attempt信息写入到zk中。
- AttemptNewSaved类型的RMAppAttemptEvent。
- launch类型的AMLaucherEvent,主要负责在Nodemanager中启动ApplicationMaster。
3. STORE_APP事件向ZK保存app状态
如下,StoreAppTransition负责处理STORE_APP事件,它调用RMStateStore.storeApplicationStateInternal,内部创建zk客户端,请求保存app状态。注意,app attempt也会请求zk保存状态:
//RMStateStore.java
stateMachineFactory = new StateMachineFactory<RMStateStore,
RMStateStoreState,
RMStateStoreEventType,
RMStateStoreEvent>(
RMStateStoreState.ACTIVE)
.addTransition(RMStateStoreState.ACTIVE,
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
RMStateStoreEventType.STORE_APP, new StoreAppTransition())
private static class StoreAppTransition
implements MultipleArcTransition<RMStateStore, RMStateStoreEvent,
RMStateStoreState> {
@Override
public RMStateStoreState transition(RMStateStore store,
RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreAppEvent)) {
// should never happen
LOG.error("Illegal event type: " + event.getClass());
return RMStateStoreState.ACTIVE;
}
boolean isFenced = false;
ApplicationStateData appState =
((RMStateStoreAppEvent) event).getAppState();
ApplicationId appId =
appState.getApplicationSubmissionContext().getApplicationId();
LOG.info("Storing info for app: " + appId);
try {
store.storeApplicationStateInternal(appId, appState);
store.notifyApplication(
new RMAppEvent(appId, RMAppEventType.APP_NEW_SAVED));
} catch (Exception e) {
LOG.error("Error storing app: " + appId, e);
if (e instanceof StoreLimitException) {
store.notifyApplication(
new RMAppEvent(appId, RMAppEventType.APP_SAVE_FAILED,
e.getMessage()));
} else {
isFenced = store.notifyStoreOperationFailedInternal(e);
}
}
return finalState(isFenced);
};
}
/**
* This method is called to notify the application that
* new application is stored or updated in state store
* @param event App event containing the app id and event type
*/
private void notifyApplication(RMAppEvent event) {
rmDispatcher.getEventHandler().handle(event);
}
4. AppAdded通过调度器进行调度
当收到RMAppEventType.APP_NEW_SAVED事件时,会将AppAddedSchedulerEvent事件放到AsyncDispatcher中:
.addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
private static final class AddApplicationToSchedulerTransition extends
RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
app.handler.handle(
new AppAddedSchedulerEvent(app.user, app.submissionContext, false,
app.applicationPriority, app.placementContext));
// send the ATS create Event
app.sendATSCreateEvent();
}
}
//事件包含appId,队列等信息
public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
String user, boolean isAppRecovering, ReservationId reservationID,
Priority appPriority, ApplicationPlacementContext placementContext) {
super(SchedulerEventType.APP_ADDED);
this.applicationId = applicationId;
this.queue = queue;
this.user = user;
this.reservationID = reservationID;
this.isAppRecovering = isAppRecovering;
this.appPriority = appPriority;
this.placementContext = placementContext;
}
ResourceManager的内部类RMActiveService在初始化时,规定了要使用EventDispatcher处理SchedulerEvent类型的事件:
protected void serviceInit(Configuration configuration) throws Exception {
scheduler = createScheduler();
scheduler.setRMContext(rmContext);
addIfService(scheduler);
rmContext.setScheduler(scheduler);
schedulerDispatcher = createSchedulerEventDispatcher();
addIfService(schedulerDispatcher);
rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
}
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
return new EventDispatcher(this.scheduler, "SchedulerEventDispatcher");
}
EventDispatcher和AsyncDispatcher类似,handle()方法将事件放入到eventQueue,通过eventProcessor消费事件,通过handler处理事件:
public class EventDispatcher<T extends Event> extends AbstractService implements EventHandler<T> {
private final EventHandler<T> handler;
private final BlockingQueue<T> eventQueue = new LinkedBlockingDeque<>();
private final Thread eventProcessor;
}
在此处,则是规定使用使用ResourceScheduler实现类处理SchedulerEvent类型事件。
ResourceScheduler有三种常用实现:FifoScheduler、FairScheduler、CapacityScheduler。
@Override
public void handle(SchedulerEvent event) {
switch(event.getType()) {
case APP_ADDED:
{
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
//addApplication()方法记录该app id和app到AbstractYarnSceduler的并发map集合中
addApplication(appAddedEvent.getApplicationId(),
appAddedEvent.getQueue(), appAddedEvent.getUser(),
appAddedEvent.getIsAppRecovering());
}
break;
case APP_ATTEMPT_ADDED:
{
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
(AppAttemptAddedSchedulerEvent) event;
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
appAttemptAddedEvent.getIsAttemptRecovering());
}
break;
..................
}
对于FifoScheduler,直接添加Application:
public synchronized void addApplication(ApplicationId applicationId,
String queue, String user, boolean isAppRecovering) {
SchedulerApplication<FifoAppAttempt> application =
new SchedulerApplication<>(DEFAULT_QUEUE, user);
//记录该app id和app到AbstractYarnSceduler的并发map集合中
applications.put(applicationId, application);
metrics.submitApp(user);
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", currently num of applications: " + applications.size());
if (isAppRecovering) {
if (LOG.isDebugEnabled()) {
LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
}
} else {
rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
}
}
对于FairScheduler,会分配队列:
protected void addApplication(ApplicationId applicationId,
String queueName, String user, boolean isAppRecovering) {
if (queueName == null || queueName.isEmpty()) {
String message = "Reject application " + applicationId +
" submitted by user " + user + " with an empty queue name.";
LOG.info(message);
rmContext.getDispatcher().getEventHandler()
.handle(new RMAppRejectedEvent(applicationId, message));
return;
}
if (queueName.startsWith(".") || queueName.endsWith(".")) {
String message = "Reject application " + applicationId
+ " submitted by user " + user + " with an illegal queue name "
+ queueName + ". "
+ "The queue name cannot start/end with period.";
LOG.info(message);
rmContext.getDispatcher().getEventHandler()
.handle(new RMAppRejectedEvent(applicationId, message));
return;
}
try {
writeLock.lock();
RMApp rmApp = rmContext.getRMApps().get(applicationId);
//配置队列
FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
if (queue == null) {
return;
}
// Enforce ACLs
UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user);
if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi)
&& !queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
String msg = "User " + userUgi.getUserName() +
" cannot submit applications to queue " + queue.getName();
LOG.info(msg);
rmContext.getDispatcher().getEventHandler()
.handle(new RMAppRejectedEvent(applicationId, msg));
return;
}
SchedulerApplication<FSAppAttempt> application =
new SchedulerApplication<FSAppAttempt>(queue, user);
applications.put(applicationId, application);
queue.getMetrics().submitApp(user);
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", in queue: " + queueName + ", currently num of applications: "
+ applications.size());
if (isAppRecovering) {
if (LOG.isDebugEnabled()) {
LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
}
} else {
rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
}
} finally {
writeLock.unlock();
}
}
5. AMLaucherEvent启动ApplicationMaster
ResourceManager的内部类RMActiveService为AsyncDispatcher注册AMLauncherEventType类型的处理器:ApplicationMasterLauncher。
protected void serviceInit(Configuration configuration) throws Exception {
applicationMasterLauncher = createAMLauncher();
rmDispatcher.register(AMLauncherEventType.class,
applicationMasterLauncher);
}
protected ApplicationMasterLauncher createAMLauncher() {
return new ApplicationMasterLauncher(this.rmContext);
}
ApplicationMasterLauncher的handle方法负责将事件放入到LinkedBlockingQueue中:
//Runnable队列,通过队列可以在短时间内有大量launch AM的事件提交时起到缓冲作用
private final BlockingQueue<Runnable> masterEvents
= new LinkedBlockingQueue<Runnable>();
@Override
public synchronized void handle(AMLauncherEvent appEvent) {
AMLauncherEventType event = appEvent.getType();
RMAppAttempt application = appEvent.getAppAttempt();
switch (event) {
case LAUNCH:
launch(application);
break;
case CLEANUP:
cleanup(application);
break;
default:
break;
}
}
private void launch(RMAppAttempt application) {
//生成AMLauncher
Runnable launcher = createRunnableLauncher(application,
AMLauncherEventType.LAUNCH);
//添加到runnable队列
masterEvents.add(launcher);
}
AMLauncher线程中,调用launch方法,请求Nodemanager启动AM。发起了startContainer请求:
private void launch() throws IOException, YarnException {
connect();
ContainerId masterContainerID = masterContainer.getId();
ApplicationSubmissionContext applicationContext =
application.getSubmissionContext();
LOG.info("Setting up container " + masterContainer
+ " for AM " + application.getAppAttemptId());
//创建ContainerLaunchContext,里面包含着启动AM要执行的命令
ContainerLaunchContext launchContext =
createAMContainerLaunchContext(applicationContext, masterContainerID);
//创建startContainerRequest,AM是NM端启动的第一个contaienr,由RM发起
//以后的startContainerRequest都由AM发起
StartContainerRequest scRequest =
StartContainerRequest.newInstance(launchContext,
masterContainer.getContainerToken());
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
list.add(scRequest);
//rpc请求的消息格式是StartContainersRequest
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
//代理发起rpc请求
StartContainersResponse response =
containerMgrProxy.startContainers(allRequests);
if (response.getFailedRequests() != null
&& response.getFailedRequests().containsKey(masterContainerID)) {
Throwable t =
response.getFailedRequests().get(masterContainerID).deSerialize();
parseAndThrowException(t);
} else {
LOG.info("Done launching container " + masterContainer + " for AM "
+ application.getAppAttemptId());
}
}
5. 总结
- ResourceManager由AsyncDispatcher在队列中存储事件,启动线程消费并处理事件。ResourManager会注册Handler处理不同类型的事件。
- ResourceManager通过状态机表示应用状态,状态机设置了转换方法,当AsyncDispatcher的Handler处理事件时,往往会对状态机的状态进行转换。
- 处理作业提交请求的过程中,会通过STORE_APP事件,将App和App Attempt信息存储到ZK中。
- 处理AppAdded事件时,会通过Scheduler对作业进行调度,例如作业应该放到哪个队列中。这些事件会放到EventDispatcher中的阻塞队列中,通过新线程消费这些事件。
- 处理LAUNCH事件时,向Nodemanager请求启动AM。这些事件会放到ApplicationMasterLauncher的阻塞队列汇总,通过新线程消费这些事件。