dbt 实际上提供了一个plugin 架构(属于扩展与adapter 的plugin 机制是不一样的)只是目前官方缺少文档的说明
以下是一些简单说明
内部处理
- 插件接口定义
目前相对简单,只提供了核心是3个方法initialize,get_nodes,get_manifest_artifacts
class dbtPlugin:
"""
EXPERIMENTAL: dbtPlugin is the base class for creating plugins.
Its interface is **not** stable and will likely change between dbt-core versions.
"""
def __init__(self, project_name: str) -> None:
self.project_name = project_name
try:
self.initialize()
except DbtRuntimeError as e:
# Remove the first line of DbtRuntimeError to avoid redundant "Runtime Error" line
raise DbtRuntimeError("\n".join(str(e).split("\n")[1:]))
except Exception as e:
raise DbtRuntimeError(str(e))
@property
def name(self) -> str:
return self.__class__.__name__
def initialize(self) -> None:
"""
Initialize the plugin. This function may be overridden by subclasses that have
additional initialization steps.
"""
pass
def get_nodes(self) -> PluginNodes:
"""
Provide PluginNodes to dbt for injection into dbt's DAG.
Currently the only node types that are accepted are model nodes.
"""
raise NotImplementedError(f"get_nodes hook not implemented for {self.name}")
def get_manifest_artifacts(self, manifest: Manifest) -> PluginArtifacts:
"""
Given a manifest, provide PluginArtifacts derived for writing by core.
PluginArtifacts share the same lifecycle as the manifest.json file -- they
will either be written or not depending on whether the manifest is written.
"""
raise NotImplementedError(f"get_manifest_artifacts hook not implemented for {self.name}")
同时为了方便开发,官方开发了一个装饰器,方便标记插件方法(dbt_hook) 添加了属性is_dbt_hook
同时处理的时候还会进一步判断
方法的名称
- 插件的查找
使用了比较标准的importlib
@classmethod
def get_prefixed_modules(cls):
return {
name: importlib.import_module(name)
for _, name, _ in pkgutil.iter_modules()
if name.startswith(cls.PLUGIN_MODULE_PREFIX)
}
- 插件钩子处理
class PluginManager:
PLUGIN_MODULE_PREFIX = "dbt_"
PLUGIN_ATTR_NAME = "plugins"
def __init__(self, plugins: List[dbtPlugin]) -> None:
# 此处基本就是我上边说明的,会基于装饰器以及属性判断
self._plugins = plugins
self._valid_hook_names = set()
# default hook implementations from dbtPlugin
for hook_name in dir(dbtPlugin):
if not hook_name.startswith("_"):
self._valid_hook_names.add(hook_name)
self.hooks: Dict[str, List[Callable]] = {}
for plugin in self._plugins:
for hook_name in dir(plugin):
hook = getattr(plugin, hook_name)
if (
callable(hook)
and hasattr(hook, "is_dbt_hook")
and hook_name in self._valid_hook_names
):
if hook_name in self.hooks:
self.hooks[hook_name].append(hook)
else:
self.hooks[hook_name] = [hook]
- 具体插件hook 调用
目前是包含了对于get_manifest_artifacts
以及get_nodes
的处理
get_manifest_artifacts 是在manifest 解析部分处理的
def parse_manifest(runtime_config, write_perf_info, write, write_json):
register_adapter(runtime_config, get_mp_context())
adapter = get_adapter(runtime_config)
adapter.set_macro_context_generator(generate_runtime_macro_context)
manifest = ManifestLoader.get_full_manifest(
runtime_config,
write_perf_info=write_perf_info,
)
if write and write_json:
write_manifest(manifest, runtime_config.project_target_path)
pm = plugins.get_plugin_manager(runtime_config.project_name)
plugin_artifacts = pm.get_manifest_artifacts(manifest)
for path, plugin_artifact in plugin_artifacts.items():
plugin_artifact.write(path)
return manifest
get_nodes 是在ManifestLoader.load 方法处理的
def inject_external_nodes(self) -> bool:
# Remove previously existing external nodes since we are regenerating them
manifest_nodes_modified = False
# Remove all dependent nodes before removing referencing nodes
for unique_id in self.manifest.external_node_unique_ids:
remove_dependent_project_references(self.manifest, unique_id)
manifest_nodes_modified = True
for unique_id in self.manifest.external_node_unique_ids:
# remove external nodes from manifest only after dependent project references safely removed
self.manifest.nodes.pop(unique_id)
# Inject any newly-available external nodes
pm = plugins.get_plugin_manager(self.root_project.project_name)
plugin_model_nodes = pm.get_nodes().models
for node_arg in plugin_model_nodes.values():
node = ModelNode.from_args(node_arg)
# node may already exist from package or running project - in which case we should avoid clobbering it with an external node
if node.unique_id not in self.manifest.nodes:
self.manifest.add_node_nofile(node)
manifest_nodes_modified = True
return manifest_nodes_modified
插件开发
开发插件,就是实现dbtPlugin 同时结合装饰器dbt_hook,主题模块的格式应该是dbt_ 开头的,同时模块应该包含一个plugins 的属性
- 参考格式
from dbt.plugins.manager import dbt_hook, dbtPlugin
from dbt.plugins.manifest import PluginNodes
from dbt.plugins.contracts import PluginArtifacts
class ExamplePlugin(DbtPlugin):
"""A demonstration plugin for dbt-core 1.6.x."""
def initialize(self) -> None:
"""
Initialize the plugin. This is where you'd setup connections,
load files, or perform other I/O.
"""
print('Initializing ExamplePlugin')
pass
@dbt_hook
def get_manifest_artifacts(self, manifest) -> PluginArtifacts:
"""
Return PluginArtifacts to dbt for writing to the file system.
"""
return PluginArtifacts()
@dbt_hook
def get_nodes(self) -> PluginNodes:
"""
Return PluginNodes to dbt for injection into dbt's DAG.
"""
return PluginNodes()
plugins = [ExamplePlugin]
说明
dbt-loom 是一个基于plugin 开发的扩展,目前似乎并不是很多,但是plugin 模式为dbt 的扩展提供了一个更加方便的口子,还是值得学习下的
参考资料
dbt/plugins/manager.py
https://nicholasyager.com/2023/08/dbt_plugin_api.html
https://github.com/nicholasyager/dbt-loom