dbt docs generate 核心是获取dbt 项目的元数据信息(包含了project 的)以及相关table的(dbt 模型相关的),然后通过提供的解析页面进行显示
目前是基于静态处理的(先生成,然后基于纯web 的解析渲染)对于展示方法很多,可以基于dbt 的docs serve 命令也可以基于自己的静态web server (nginx 或者s3),以下简单说明下
内部处理
实际处理是基于GenerateTask 类的,此类继承了CompileTask
- cli 装饰器
参考如下,可以看到,依赖了profile,runtime_config 以及manifest,尽管write=False 但是实际manifest 文件也是写入的,因为docs 依赖
@requires.profile
@requires.project
@requires.runtime_config
@requires.manifest(write=False)
- GenerateTask
此类主要的方法是run,里边包含了编译,copy 静态资源(web的),获取catalog(table)以及write_manifest 写入的,代码比较清晰
class GenerateTask(CompileTask):
def run(self) -> CatalogArtifact:
compile_results = None
if self.args.compile:
compile_results = CompileTask.run(self)
if any(r.status == NodeStatus.Error for r in compile_results):
fire_event(CannotGenerateDocs())
return CatalogArtifact.from_results(
nodes={},
sources={},
generated_at=datetime.utcnow(),
errors=None,
compile_results=compile_results,
)
shutil.copyfile(
DOCS_INDEX_FILE_PATH, os.path.join(self.config.project_target_path, "index.html")
)
for asset_path in self.config.asset_paths:
to_asset_path = os.path.join(self.config.project_target_path, asset_path)
if os.path.exists(to_asset_path):
shutil.rmtree(to_asset_path)
if os.path.exists(asset_path):
shutil.copytree(asset_path, to_asset_path)
if self.manifest is None:
raise DbtInternalError("self.manifest was None in run!")
adapter = get_adapter(self.config)
with adapter.connection_named("generate_catalog"):
fire_event(BuildingCatalog())
catalog_table, exceptions = adapter.get_catalog(self.manifest)
catalog_data: List[PrimitiveDict] = [
dict(zip(catalog_table.column_names, map(dbt.utils._coerce_decimal, row)))
for row in catalog_table
]
catalog = Catalog(catalog_data)
errors: Optional[List[str]] = None
if exceptions:
errors = [str(e) for e in exceptions]
nodes, sources = catalog.make_unique_id_map(self.manifest)
results = self.get_catalog_results(
nodes=nodes,
sources=sources,
generated_at=datetime.utcnow(),
compile_results=compile_results,
errors=errors,
)
path = os.path.join(self.config.project_target_path, CATALOG_FILENAME)
results.write(path)
if self.args.compile:
write_manifest(self.manifest, self.config.project_target_path)
if exceptions:
fire_event(WriteCatalogFailure(num_exceptions=len(exceptions)))
fire_event(CatalogWritten(path=os.path.abspath(path)))
return results
- get_catalog 结合Manifest 获取catalog 处理
核心是编译Manifest 的schema 信息,之后通过数据库的查询获取实际的catalogs
def get_catalog(self, manifest: Manifest) -> Tuple[agate.Table, List[Exception]]:
schema_map = self._get_catalog_schemas(manifest)
with executor(self.config) as tpe:
futures: List[Future[agate.Table]] = []
for info, schemas in schema_map.items():
if len(schemas) == 0:
continue
name = ".".join([str(info.database), "information_schema"])
fut = tpe.submit_connected(
self, name, self._get_one_catalog, info, schemas, manifest
)
futures.append(fut)
catalogs, exceptions = catch_as_completed(futures)
return catalogs, exceptions
说明
对于web 解析部分大家可以参考下边的资料学习下,dagster 也有一个开源实现可以确保有自己快的加载解析速度
参考资料
core/dbt/task/generate.py
https://docs.getdbt.com/reference/commands/cmd-docs
https://github.com/dbt-labs/dbt-docs
https://github.com/dagster-io/supercharged-dbt-docs