1. 背景
在https://blog.51cto.com/u_15327484/7894282文章中,介绍了Yarn的两种调度器。在https://blog.51cto.com/u_15327484/7920197文章中,介绍了FairScheduler迁移Capacity Scheduler的迁移实践。
在实际迁移之前,必须要确保Capacity Scheduler能够达到足够的收益,即吞吐率或调度时间等指标都有所优化,才能开启迁移操作。
而为了评估迁移收益,很难直接通过搭建1k+个NodeManager的Yarn集群来测试调度器性能,这样太浪费成本。为了解决这个问题,Hadoop官方提供了Scheduler load Simulator工具,可以用它在单机上模拟超大Yarn集群,测试调度器性能。
2. Yarn SLS概要
为了压测调度器性能,需要实现两方面的需求:
- 启动压测的集群,该集群能够读取作业,并根据作业信息进行调度。
- 提供作业信息,最好根据线上已经运行过的作业,再跑到SLS集群中测试。
对于作业信息的输入,Hadoop提供了Rumen工具,可以读取Mapreduce中historyServer保存的作业历史信息,生成作业信息文件sls-jobs.json
和节点信息文件sls-nodes.json
。
对于集群的模拟,启动SLS脚本时,会启动三个重要组件:
- 启动ResourceManager进程,用于接受资源请求。
- 启动NMSimulator,读取
sls-nodes.json
文件,获取文件中的节点信息,通过线程池的方法将每个节点的心跳请求发送给ResourceManager。 - 启动AMSimulator,读取
sls-jobs.json
文件,获取文件中的作业信息,通过线程池的方法将每个节点的资源请求发送给ResourceManager。
其架构图如下所示:
注意,SLS的职责并不是执行任务,而是执行资源的分配。因此ResourceManager接受AM的请求后,不会向NM发送启动作业的请求;分配完资源后,作业直接结束。
3. SLS重要执行流程解析
在hadoop包的share/hadoop/tools/sls/bin目录下,有脚本slsrun.sh,它就是SLS的启动脚本。脚本中,最重要的启动命令如下:
hadoop_java_exec sls org.apache.hadoop.yarn.sls.SLSRunner ${args}
它会通过java命令执行SLSRunner类,SLSRunner启动过程如下:
- 启动ResourceManager服务。
- 启动NodeManager心跳线程。
- 启动AM资源请求线程。
public void start() throws IOException, ClassNotFoundException, YarnException,
InterruptedException {
enableDNSCaching(getConf());
// start resource manager
//启动ResourceManager服务。
startRM();
// start node managers
//启动NodeManager心跳线程。
startNM();
// start application masters
//启动AM资源请求线程。
startAM();
// set queue & tracked apps information
((SchedulerWrapper) rm.getResourceScheduler()).getTracker()
.setQueueSet(this.queueAppNumMap.keySet());
((SchedulerWrapper) rm.getResourceScheduler()).getTracker()
.setTrackedAppSet(this.trackedApps);
// print out simulation info
printSimulationInfo();
// blocked until all nodes RUNNING
waitForNodesRunning();
// starting the runner once everything is ready to go,
runner.start();
}
3.1 ResourceMananger启动
它调用startRM方法启动ResourceManager服务。注意,这个ResourceManager服务重写了createAMLauncher方法:
final SLSRunner se = this;
rm = new ResourceManager() {
@Override
protected ApplicationMasterLauncher createAMLauncher() {
return new MockAMLauncher(se, this.rmContext, appIdAMSim);
}
};
// Across runs of parametrized tests, the JvmMetrics objects is retained,
// but is not registered correctly
JvmMetrics jvmMetrics = JvmMetrics.initSingleton("ResourceManager", null);
jvmMetrics.registerIfNeeded();
// Init and start the actual ResourceManager
rm.init(rmConf);
rm.start();
返回的ApplicationMasterLauncher对象是MockAMLauncher,该类handle方法用户指定am的启动行为,这里AM不会启动容器了,如果调度到资源,就结束执行。(对于正常的ApplicationMasterLauncher,是会通知NM启动容器的)。如下所示:
Container amContainer = event.getAppAttempt().getMasterContainer();
setupAMRMToken(event.getAppAttempt());
// Notify RMAppAttempt to change state
super.context.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(event.getAppAttempt().getAppAttemptId(),
RMAppAttemptEventType.LAUNCHED));
ams.notifyAMContainerLaunched(
event.getAppAttempt().getMasterContainer());
LOG.info("Notify AM launcher launched:" + amContainer.getId());
se.getNmMap().get(amContainer.getNodeId())
.addNewContainer(amContainer, 100000000L);
return;
3.2 NodeManager和ApplicationMaster线程模型
在介绍NM和AM实现之前,先了解下它的线程池模型。SLSRunner的成员变量TaskRunner维护了一个线程池。NM的心跳任务和AM的资源申请任务都会提交到该线程池中,向ResourceManager发送请求。
TaskRunner包含了一个队列queue对象,通过schedule方法向queue中添加任务。queue作为线程池的参数,会直接将queue中的任务放到线程池中执行:
private static TaskRunner runner = new TaskRunner();
private DelayQueue queue;
executor = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, 0,
TimeUnit.MILLISECONDS, queue);
private void schedule(Task task, long timeNow) {
task.timeRebase(timeNow);
task.setQueue(queue);
queue.add(task);
}
NM和AM都继承了TaskRunner的内部类Task,实现的run方法。它们的任务依次执行firstStep、middleStep、lastStep。如下:
public final void run() {
try {
if (nextRun == startTime) {
firstStep();
nextRun += repeatInterval;
if (nextRun <= endTime) {
queue.add(this);
}
} else if (nextRun < endTime) {
middleStep();
nextRun += repeatInterval;
queue.add(this);
} else {
lastStep();
}
} catch (Exception e) {
e.printStackTrace();
Thread.getDefaultUncaughtExceptionHandler()
.uncaughtException(Thread.currentThread(), e);
}
}
3.3 NodeManager心跳上报
SLS对NodeManager逻辑进行简化,阉割启动容器的能力。直接从sls-nodes.json获取所有NodeManager节点信息,解析到的每个节点,都进行心跳上报。
如下,NMSimulator通过多线程的方式讲NM心跳任务放到TaskRunner中的线程池中:
public class NMSimulator extends TaskRunner.Task {
//读取sls-nodes.json文件,获取节点信息
nodeSet = SLSUtils.parseNodesFromSLSTrace(inputTrace);
//创建线程池
ExecutorService executorService = Executors.
newFixedThreadPool(threadPoolSize);
for (Map.Entry<String, Resource> entry : nodeResourceMap.entrySet()) {
executorService.submit(new Runnable() {
@Override public void run() {
try {
// we randomize the heartbeat start time from zero to 1 interval
NMSimulator nm = new NMSimulator();
Resource nmResource = nodeManagerResource;
String hostName = entry.getKey();
if (entry.getValue() != null) {
nmResource = entry.getValue();
}
nm.init(hostName, nmResource,
random.nextInt(heartbeatInterval),
heartbeatInterval, rm, resourceUtilizationRatio);
nmMap.put(nm.getNode().getNodeID(), nm);
//将NM包含的心跳任务放到runner中的线程池中
runner.schedule(nm);
rackSet.add(nm.getNode().getRackName());
} catch (IOException | YarnException e) {
LOG.error("Got an error while adding node", e);
}
}
});
}
线程执行时,依次执行firstStep、middleStep、lastStep方法,NMSimulator只实现了middleStep方法。它负责进行心跳上报:
if (resourceUtilizationRatio > 0 && resourceUtilizationRatio <=1) {
int pMemUsed = Math.round(node.getTotalCapability().getMemorySize()
* resourceUtilizationRatio);
float cpuUsed = node.getTotalCapability().getVirtualCores()
* resourceUtilizationRatio;
ResourceUtilization resourceUtilization = ResourceUtilization.newInstance(
pMemUsed, pMemUsed, cpuUsed);
ns.setContainersUtilization(resourceUtilization);
ns.setNodeUtilization(resourceUtilization);
}
beatRequest.setNodeStatus(ns);
NodeHeartbeatResponse beatResponse =
rm.getResourceTrackerService().nodeHeartbeat(beatRequest);
3.4 AM资源请求
AMSimulator读取sls-jobs.json
文件,获取其中的任务信息:
try (Reader input = new InputStreamReader(
new FileInputStream(inputTrace), "UTF-8")) {
Iterator<Map> jobIter = mapper.readValues(
jsonF.createParser(input), Map.class);
while (jobIter.hasNext()) {
try {
createAMForJob(jobIter.next());
} catch (Exception e) {
LOG.error("Failed to create an AM: {}", e.getMessage());
}
}
}
封装请求后,放倒入TaskRunner.queue中。依次执行firstStep、middleStep、lastStep方法。
firstStep中,提交任务:
rm.getClientRMService().submitApplication(subAppRequest);
middleStep中,处理响应,发送容器请求:
public void middleStep() throws Exception {
if (isAMContainerRunning) {
// process responses in the queue
processResponseQueue();
// send out request
sendContainerRequest();
// check whether finish
checkStop();
}
}
lastStep中,发送作业完成信息:
rm.getApplicationMasterService().finishApplicationMaster(finishAMRequest);
4. Mapreduce HistoryServer日志介绍
SLS需要HistoryServer保存的作业统计信息来模拟集群和作业请求。需要线了解下HistoryServer作业历史信息相关内容。
首先,在mapred-site.xml,可以看到HistoryServer相关配置:
<property>
#作业历史文件的保存位置,设置none表示不记录日志,如果开启intermediate-done-dir和done-dir就可以记录了
<name>hadoop.job.history.user.location</name>
<value>none</value>
</property>
<property>
#JobHistory服务器地址,与ResourceManager、NodeManager、NameNode等组件通信
<name>mapreduce.jobhistory.address</name>
<value>启动historyserver服务的机器ip:10020</value>
</property>
<property>
#供用户访问的地址
<name>mapreduce.jobhistory.webapp.address</name>
<value>启动historyserver服务的机器ip:19888</value>
</property>
<property>
#存储作业执行状态信息的临时目录
<name>mapreduce.jobhistory.intermediate-done-dir</name>
<value>/user/history/temp</value>
</property>
<property>
#存储作业执行完成后,将临时目录的文件转移到该目录下
<name>mapreduce.jobhistory.done-dir</name>
<value>/user/history/done</value>
</property>
<!-- MapReduce Job History Server security configs -->
<property>
<name>mapreduce.jobhistory.keytab</name>
<value>{% $KEYTAB_DIR %}/{% $DFS_NAMESERVICE %}.keytab</value>
</property>
<property>
<name>mapreduce.jobhistory.principal</name>
<value>hadoop/{%- $jobhisoryserver -%}@NIE.NETEASE.COM</value>
</property>
<property>
#开启历史作业信息定时清理功能
<name>mapreduce.jobhistory.cleaner.enable</name>
<value>true</value>
</property>
<property>
#每2.5小时清理一次历史作业信息
<name>mapreduce.jobhistory.cleaner.interval-ms</name>
<value>8640000</value>
</property>
<property>
#历史作业信息最多存活存活2天
<name>mapreduce.jobhistory.max-age-ms</name>
<value>172800000</value>
</property>
<property>
#能同时处理历史记录请求的线程数量
<name>mapreduce.jobhistory.client.thread-count</name>
<value>256</value>
</property>
<property>
#将历史状态信息从intermediate-done-dir临时目录移动到done-dir的线程数
<name>mapreduce.jobhistory.move.thread-count</name>
<value>32</value>
</property>
可以从mapreduce.jobhistory.done-dir指定路径中找到历史作业数据文件:
job_xxx_conf.xml保存作业配置信息:
job_xxx.jhist保存作业历史执行相关状态信息:
5. SLS实践
首先,需要使用hadoop的rumen工具,将historyserver中的日志转为rumen格式的历史作业数据文件:job-trace.json和job-topology.json:
hadoop jar ~/hadoop/share/hadoop/tools/lib/hadoop-rumen-2.6.0-cdh5.6.0.jar \
org.apache.hadoop.tools.rumen.TraceBuilder \
file:///home/hadoop/var/rumen/job-trace.json \
file:///home/hadoop/var/rumen/job-topology.json \
hdfs:///user/history/done/2022/11/25/000358
再将job-trace.json文件转为sls-nodes.json和sls-jobs.json文件:
~/hadoop/share/hadoop/tools/sls/bin/rumen2sls.sh --rumen-file=/home/brightness/rumen/job-trace.json --output-dir=/home/hadoop/var/sls
sls-jobs.json文件如下:
sls-nodes.json文件如下: SLS将sls-nodes.json和sls-jobs.json文件作为输入,执行:
cd /home/hadoop/hadoop/share/hadoop/tools/sls
/home/hadoop/hadoop/share/hadoop/tools/sls/bin/slsrun.sh \
--input-sls=/home/hadoop/var/sls/sls-jobs.json \
--nodes=/home/hadoop/var/sls/sls-nodes.json \
--output-dir=/home/hadoop/var/sls/out1 \
--print-simulation > /home/hadoop/var/sls/out1/run.log 2>&1
可以查看网页查看resourcemanager及模拟过程:
http://启动ip:8088/cluster/apps/RUNNING
执行结束后,生成调度耗时、心跳耗时、Container平均耗时等信息。当然还有各种指标的曲线图:
它和日志聚合不同,日志聚合是yarn自身提供的服务。
通过上述流程,通过在yarn-site.xml中设置不同的调度器,就能够模拟不同调度器的性能结果。同时,它还可以将同一种调度在不同配置下的性能表现进行对比。
标签:启动,代码,作业,hadoop,Yarn,sls,json,线程,SLS From: https://blog.51cto.com/u_15327484/7939194