YarnDaemon 与DremioDaemon 基本类似,都是启动dremio 服务,只是YarnDaemon 更多是关于执行器节点的运行模式与
DremioDaemon 差异比较大,DremioDaemon 就是一个long running 任务,但是YarnDaemon 实际上是一个由线程调度出发的
任务,运行时间不如DremioDaemon长,同时会有watchdog 进行服务状态的监控
参考代码
public void run() {
try (TimedBlock b = Timer.time("main")) {
// create a temporary local write path
final Path localWritePath = Files.createTempDirectory("dremio-executor");
final DACConfig config = DACConfig.newConfig()
.writePath(localWritePath.toString());
logger.info("Local write path set to '{}'", localWritePath);
final SabotConfig sabotConfig = config.getConfig().getSabotConfig();
final DACModule module = sabotConfig.getInstance(DremioDaemon.DAEMON_MODULE_CLASS, DACModule.class, DACDaemonModule.class);
try (final DACDaemon daemon = DACDaemon.newDremioDaemon(config, ClassPathScanner.fromPrescan(sabotConfig),
module)) {
dacDaemon = daemon;
// 此部分属于标准DACDaemon 玩法
daemon.init();
// Start yarn watchdog
// 特定于yarn 模式的watchdog
startYarnWatchdog(daemon);
daemon.closeOnJVMShutDown();
daemon.awaitClose();
}
} catch (Exception e) {
throw Throwables.propagate(e);
}
}
YarnDaemon 的启动
YarnDaemon 是直接由YarnController使用的,属于标准的apache twill
// 启动集群
public TwillController startCluster(YarnConfiguration yarnConfiguration, List<Property> propertyList) {
TwillController tmpController = createPreparer(yarnConfiguration, propertyList).start();
return tmpController;
}
// TwillPreparer 包装
protected TwillPreparer createPreparer(YarnConfiguration yarnConfiguration, List<Property> propertyList) {
// 入口参数定义,就使用了YarnDaemon
AppBundleRunnable.Arguments discoveryArgs = new AppBundleRunnable.Arguments(
YARN_BUNDLED_JAR_NAME,
"com.dremio.dac.daemon.YarnDaemon",
new String[] {});
DacDaemonYarnApplication dacDaemonApp = new DacDaemonYarnApplication(dremioConfig, yarnConfiguration,
new DacDaemonYarnApplication.Environment());
TwillRunnerService twillRunner = startTwillRunner(yarnConfiguration);
Map<String, String> envVars = Maps.newHashMap();
envVars.put("MALLOC_ARENA_MAX", "4");
envVars.put("MALLOC_MMAP_THRESHOLD_", "131072");
envVars.put("MALLOC_TRIM_THRESHOLD_", "131072");
envVars.put("MALLOC_TOP_PAD_", "131072");
envVars.put("MALLOC_MMAP_MAX_", "65536");
// Set ${DREMIO_HOME} for YarnDaemon to avoid config substitution failure
envVars.put(DREMIO_HOME, ".");
try {
String userName = UserGroupInformation.getCurrentUser().getUserName();
envVars.put("HADOOP_USER_NAME", userName);
} catch (IOException e) {
logger.error("Exception while trying to fill out HADOOP_USER_NAME with current user", e);
}
for (Property prop : propertyList) {
// add if it is env var
if (PropertyType.ENV_VAR.equals(prop.getType())) {
envVars.put(prop.getKey(), prop.getValue());
}
}
String[] yarnClasspath = yarnConfiguration.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH);
final TwillPreparer preparer = twillRunner.prepare(dacDaemonApp)
.addLogHandler(new YarnTwillLogHandler())
.withApplicationClassPaths(yarnClasspath)
.withBundlerClassAcceptor(new HadoopClassExcluder())
.setLogLevels(ImmutableMap.of(Logger.ROOT_LOGGER_NAME, yarnContainerLogLevel()))
.withEnv(YARN_RUNNABLE_NAME, envVars)
.withMaxRetries(YARN_RUNNABLE_NAME, MAX_APP_RESTART_RETRIES)
.withArguments(YARN_RUNNABLE_NAME, discoveryArgs.toArray())
.setJVMOptions(YARN_RUNNABLE_NAME, prepareCommandOptions(yarnConfiguration, propertyList))
.withClassPaths(dacDaemonApp.getJarNames());
String queue = yarnConfiguration.get(DacDaemonYarnApplication.YARN_QUEUE_NAME);
if (queue != null) {
preparer.setSchedulerQueue(queue);
}
if (dremioConfig.getBoolean(DremioConfig.DEBUG_YARN_ENABLED)) {
preparer.enableDebugging(true, YARN_RUNNABLE_NAME);
}
return preparer;
}
说明
以上只是一个简单的说明,后变会介绍下关于yarn 包装调度部分,通过以上我们可以大致了解到是如果关联系统的
参考资料
dac/daemon/src/main/java/com/dremio/dac/daemon/YarnDaemon.java
provision/yarn/yarntwill/src/main/java/com/dremio/provision/yarn/YarnController.java