首页 > 编程语言 >【Flink系列二十四】Flink HistoryServer 实现原理分析-源码解读

【Flink系列二十四】Flink HistoryServer 实现原理分析-源码解读

时间:2024-11-13 17:45:30浏览次数:1  
标签:... Flink HistoryServer gen JSON 源码 path archive

Flink系列二十四 Flink HistoryServer 实现原理

首先,作业停止或者故障时,调用 HistoryServerArchivist 进行归档

public interface HistoryServerArchivist {

    /**
     * Archives the given {@link ExecutionGraphInfo} on the history server.
     *
     * @param executionGraphInfo to store on the history server
     * @return Future which is completed once the archiving has been completed.
     */
    CompletableFuture<Acknowledge> archiveExecutionGraph(ExecutionGraphInfo executionGraphInfo);
}

调用的入口

protected CompletableFuture<CleanupJobState> jobReachedTerminalState(
            ExecutionGraphInfo executionGraphInfo) {
}

...

private CompletableFuture<Acknowledge> archiveExecutionGraphToHistoryServer(
            ExecutionGraphInfo executionGraphInfo) {
}

FsJobArchivist将数据序列化成JSON文本

org.apache.flink.runtime.history.FsJobArchivist#archiveJob

数据会写入: jobmanager.archive.fs.dir 对应的目录

public static Path archiveJob(
            Path rootPath, JobID jobId, Collection<ArchivedJson> jsonToArchive) throws IOException {
        try {
            FileSystem fs = rootPath.getFileSystem();
            Path path = new Path(rootPath, jobId.toString());
            OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);

            try (JsonGenerator gen = jacksonFactory.createGenerator(out, JsonEncoding.UTF8)) {
                gen.writeStartObject();
                gen.writeArrayFieldStart(ARCHIVE);
                for (ArchivedJson archive : jsonToArchive) {
                    gen.writeStartObject();
                    gen.writeStringField(PATH, archive.getPath());
                    gen.writeStringField(JSON, archive.getJson());
                    gen.writeEndObject();
                }
                gen.writeEndArray();
                gen.writeEndObject();
            } catch (Exception e) {
                fs.delete(path, false);
                throw e;
            }
            LOG.info("Job {} has been archived at {}.", jobId, path);
            return path;
        } catch (IOException e) {
            LOG.error("Failed to archive job.", e);
            throw e;
        }
    }

查看归档的JSON数据

hdfs dfs -get hdfs://corp.slankka-hdfs.com/application/app-logs/flink/da12f990aba5bcdff710e96c5a409123

查看该数据,其实是一个JSON结构

{
    "archive": [
        {
            "path": "/jobs/overview",
			"json": "..." },
        {
            "path": "/jobs/da12f990aba5bcdff710e96c5a409123/config",
			"json": "..."
		},
		{
            "path": "/jobs/da12f990aba5bcdff710e96c5a409123/checkpoints",
			"json": "..."
		}
	]
}

HistoryServer 启动后开始扫描归档的数据

public class HistoryServer {
  void start() throws IOException, InterruptedException {
     //...
	 executor.scheduleWithFixedDelay(
                    getArchiveFetchingRunnable(), 0, refreshIntervalMillis, TimeUnit.MILLISECONDS);
  }
  
   private Runnable getArchiveFetchingRunnable() {
        return Runnables.withUncaughtExceptionHandler(
                () -> archiveFetcher.fetchArchives(), FatalExitExceptionHandler.INSTANCE);
   }
}

处理读取到的归档:下载到HistoryServer本地磁盘

可以发现,写入归档和处理归档的代码逻辑在同一个类文件内:FsJobArchivist

org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher#processArchive

private void processArchive(String jobID, Path jobArchive) throws IOException {
        for (ArchivedJson archive : FsJobArchivist.getArchivedJsons(jobArchive)) {
            String path = archive.getPath();
            String json = archive.getJson();

            File target;
            if (path.equals(JobsOverviewHeaders.URL)) {
                target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
            } else if (path.equals("/joboverview")) { // legacy path
                LOG.debug("Migrating legacy archive {}", jobArchive);
                json = convertLegacyJobOverview(json);
                target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
            } else {
                // this implicitly writes into webJobDir
                target = new File(webDir, path + JSON_FILE_ENDING);
            }
			...
    }	
}

上述逻辑会写入 historyserver.web.tmpdir 对应位置,如果未定义,则下载到 java.io.tmpdir

相对路径

HistoryServerArchiveFetcher(...) {
        this.webDir = checkNotNull(webDir);
        this.webJobDir = new File(webDir, "jobs");
        Files.createDirectories(webJobDir.toPath());
        this.webOverviewDir = new File(webDir, "overviews");
}

API 返回相应的文件

History Server Available Requests

本质上都是拉取以及解析对应的文件

router.addGet("/:*", new HistoryServerStaticFileServerHandler(webDir));

Router 相应的请求分为两类:

第一类

  1. Job 归档的JSON文件

第二类:History Server 的 JobManager 以及 TaskManager 的日志链接

  1. /jobs/<jobid>/jobmanager/log-url
  2. /jobs/<jobid>/taskmanagers/<taskmanagerid>/log-url

有关Container日志的信息

  1. JobManager信息:暂无,可以通过 <jobId> 反查 applicationId 直接调用Yarn REST API
  2. TaskManager信息:可以通过下方链接文章的顺序获得,或者直接通过上述JSON获得 Vertex 中的TaskManager信息
  • A. 寻找 path=/jobs/<jobid>/vertices/ 找到所有的 vertexid

  • B. 寻找 path=/jobs/<jobid>/vertices/<vertexid>/taskmanagers 的值并建立映射关系

例如

{
    "id": "cbc357ccb763df2852fee8c4fc7d55f2",
    "name": "Source: Custom Source -> Process -> Map",
    "now": 1731465012643,
    "taskmanagers": [
        {
            "host": "kka136119:40079",
            "status": "CANCELED",
            "taskmanager-id": "container_e87_1724243239726_2160_01_000003"
        },
        {
            "host": "kka129134:40853",
            "status": "CANCELED",
            "taskmanager-id": "container_e87_1724243239726_2160_01_000002"
         }
        ...
    ]
}

通过API调用和这里JSON数据是一样的。

TaskManager hostName截断

因为HostNameSupplier 即便在开启了TaskManager反解析,即 jobmanager.retrieve-taskmanager-hostname,也只会返回第一段。(默认开启,不建议关闭)

org.apache.flink.runtime.taskmanager.TaskManagerLocation.DefaultHostNameSupplier#getHostName

If the FQDN is the textual IP address, then the hostname is also the IP address
If the FQDN has only one segment (such as "localhost", or "host17"), then this is used as the hostname.
If the FQDN has multiple segments (such as "worker3.subgroup.company.net"), then the first segment (here "worker3") will be used as the hostname

如果对Yarn进行日志集成,则需要自行补全 HostName

日志链接的实现参考

【Flink系列十八】History Server 重新登场,如何实现Yarn日志集成

标签:...,Flink,HistoryServer,gen,JSON,源码,path,archive
From: https://www.cnblogs.com/slankka/p/18544469

相关文章

  • SpringBoot医院管理系统5nr0z--程序+源码+数据库+调试部署+开发环境
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容一、项目背景随着医疗技术的不断进步和医疗需求的日益增长,医院管理面临着前所未有的挑战。传统的医院管理方式已难以满足现代医疗服务的需要,存在信......
  • SpringBoot医疗信息共享平台k09w0(程序+源码+数据库+调试部署+开发环境)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容一、研究背景与意义随着信息技术的飞速发展,医疗领域正逐步迈向数字化、智能化。医疗信息共享平台作为连接医疗机构、患者及医疗数据的关键纽带,对于......
  • Flink 开发工程应加载哪些依赖
    在我们要开发Flink程序时,就会涉及到应该加载哪些Flinkjar的问题。本章内容就是向你展示如何配置你的项目,添加必要的依赖。每个应用程序都会依赖一些Flink libraries,比如至少依赖FlinkAPIs库,如果使用了connector,则还需要依赖connector相关的库,比如kafka、jdbc等,和你自己程......
  • 高效稳定的校园管理系统源码,APP小程序H5三端源码交付支持二开
    校园系统源码的核心优势在于其高度的定制化和可扩展性。可以根据自己的实际需求,对源码进行二次开发,实现个性化的功能定制。同时,源码的模块化设计使得系统能够轻松应对未来需求的增长和变化,为你的长期发展提供了有力保障 。校园管理系统源码系统特点:1.基于TP6+Uni-app框架开......
  • Temple OS源码研究
    博客:源码解读TOS源码:https://github.com/cia-foundation/TempleOS启动过程:https://minexew.github.io/2020/02/27/templeos-loader-part1.html建设性看法:http://www.codersnotes.com/notes/a-constructive-look-at-templeos/移植文字游戏:https://www.jwhitham.org/2015/07/p......
  • [开题报告]基于javaweb的宠物医院平台dz56j9计算机毕业设计源码、研究背景、意义、目
    本系统(程序+源码)带文档lw万字以上文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着人们生活水平的提高和城市化进程的加速,宠物已成为许多家庭的重要成员。宠物数量的增加带动了宠物相关产业的发展,其中宠物医院作为宠物健康保障的......
  • springboot毕设高校毕业生就业岗位推荐系统源码+论文+部署
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容一、研究背景随着高校的不断扩招,毕业生数量逐年递增,就业市场的竞争日益激烈。在这样的大环境下,高校毕业生面临着巨大的就业压力。一方面,毕业生需要花费大量的......
  • springboot毕设基于JavaWeb的校园点餐平台源码+论文+部署
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容一、研究背景随着校园数字化建设的不断推进以及学生生活节奏的加快,传统的校园餐饮模式面临着诸多挑战。目前,校园内餐饮需求多样且分散,学生在点餐过程中往往需......
  • DApp开发:定制化解决方案与源码部署的一站式指南
    去中心化应用(DApp)随着区块链技术的发展,成为众多行业探索与创新的重要方向。无论是金融、供应链、游戏,还是社交和艺术市场,DApp都为传统业务模式带来了全新可能。然而,开发一款DApp并非易事,从合约设计到前后端的搭建,再到部署与安全性考虑,整个过程涉及多项技术和策略。在这里,我们将......
  • 免费送源码:Java+python+django+MySQL 小区疫情订菜系统 计算机毕业设计原创定制
    摘 要随着科学技术的飞速发展,各行各业都在努力与现代先进技术接轨,通过科技手段提高自身的优势;对于小区疫情订菜系统当然也不能排除在外,随着网络技术的不断成熟,带动了小区疫情订菜系统,它彻底改变了过去传统的管理方式,不仅使服务管理难度变低了,还提升了管理的灵活性。这种......