首页 > 其他分享 >dbt plugin 系统简单说明

dbt plugin 系统简单说明

时间:2024-05-09 12:00:11浏览次数:13  
标签:name plugin self manifest hook 简单 nodes dbt

dbt 实际上提供了一个plugin 架构(属于扩展与adapter 的plugin 机制是不一样的)只是目前官方缺少文档的说明
以下是一些简单说明

内部处理

  • 插件接口定义
    目前相对简单,只提供了核心是3个方法initialize,get_nodes,get_manifest_artifacts
class dbtPlugin:
    """
    EXPERIMENTAL: dbtPlugin is the base class for creating plugins.
    Its interface is **not** stable and will likely change between dbt-core versions.
    """
 
    def __init__(self, project_name: str) -> None:
        self.project_name = project_name
        try:
            self.initialize()
        except DbtRuntimeError as e:
            # Remove the first line of DbtRuntimeError to avoid redundant "Runtime Error" line
            raise DbtRuntimeError("\n".join(str(e).split("\n")[1:]))
        except Exception as e:
            raise DbtRuntimeError(str(e))
 
    @property
    def name(self) -> str:
        return self.__class__.__name__
 
    def initialize(self) -> None:
        """
        Initialize the plugin. This function may be overridden by subclasses that have
        additional initialization steps.
        """
        pass
 
    def get_nodes(self) -> PluginNodes:
        """
        Provide PluginNodes to dbt for injection into dbt's DAG.
        Currently the only node types that are accepted are model nodes.
        """
        raise NotImplementedError(f"get_nodes hook not implemented for {self.name}")
 
    def get_manifest_artifacts(self, manifest: Manifest) -> PluginArtifacts:
        """
        Given a manifest, provide PluginArtifacts derived for writing by core.
        PluginArtifacts share the same lifecycle as the manifest.json file -- they
        will either be written or not depending on whether the manifest is written.
        """
        raise NotImplementedError(f"get_manifest_artifacts hook not implemented for {self.name}")

同时为了方便开发,官方开发了一个装饰器,方便标记插件方法(dbt_hook) 添加了属性is_dbt_hook 同时处理的时候还会进一步判断
方法的名称

  • 插件的查找
    使用了比较标准的importlib
@classmethod
def get_prefixed_modules(cls):
    return {
        name: importlib.import_module(name)
        for _, name, _ in pkgutil.iter_modules()
        if name.startswith(cls.PLUGIN_MODULE_PREFIX)
    }
  • 插件钩子处理
class PluginManager:
    PLUGIN_MODULE_PREFIX = "dbt_"
    PLUGIN_ATTR_NAME = "plugins"
 
    def __init__(self, plugins: List[dbtPlugin]) -> None:
        # 此处基本就是我上边说明的,会基于装饰器以及属性判断
        self._plugins = plugins
        self._valid_hook_names = set()
        # default hook implementations from dbtPlugin
        for hook_name in dir(dbtPlugin):
            if not hook_name.startswith("_"):
                self._valid_hook_names.add(hook_name)
 
        self.hooks: Dict[str, List[Callable]] = {}
        for plugin in self._plugins:
            for hook_name in dir(plugin):
                hook = getattr(plugin, hook_name)
                if (
                    callable(hook)
                    and hasattr(hook, "is_dbt_hook")
                    and hook_name in self._valid_hook_names
                ):
                    if hook_name in self.hooks:
                        self.hooks[hook_name].append(hook)
                    else:
                        self.hooks[hook_name] = [hook]
  • 具体插件hook 调用
    目前是包含了对于get_manifest_artifacts 以及get_nodes 的处理
    get_manifest_artifacts 是在manifest 解析部分处理的
def parse_manifest(runtime_config, write_perf_info, write, write_json):
    register_adapter(runtime_config, get_mp_context())
    adapter = get_adapter(runtime_config)
    adapter.set_macro_context_generator(generate_runtime_macro_context)
    manifest = ManifestLoader.get_full_manifest(
        runtime_config,
        write_perf_info=write_perf_info,
    )
 
    if write and write_json:
        write_manifest(manifest, runtime_config.project_target_path)
        pm = plugins.get_plugin_manager(runtime_config.project_name)
        plugin_artifacts = pm.get_manifest_artifacts(manifest)
        for path, plugin_artifact in plugin_artifacts.items():
            plugin_artifact.write(path)
    return manifest

get_nodes 是在ManifestLoader.load 方法处理的

def inject_external_nodes(self) -> bool:
    # Remove previously existing external nodes since we are regenerating them
    manifest_nodes_modified = False
    # Remove all dependent nodes before removing referencing nodes
    for unique_id in self.manifest.external_node_unique_ids:
        remove_dependent_project_references(self.manifest, unique_id)
        manifest_nodes_modified = True
    for unique_id in self.manifest.external_node_unique_ids:
        # remove external nodes from manifest only after dependent project references safely removed
        self.manifest.nodes.pop(unique_id)
 
    # Inject any newly-available external nodes
    pm = plugins.get_plugin_manager(self.root_project.project_name)
    plugin_model_nodes = pm.get_nodes().models
    for node_arg in plugin_model_nodes.values():
        node = ModelNode.from_args(node_arg)
        # node may already exist from package or running project - in which case we should avoid clobbering it with an external node
        if node.unique_id not in self.manifest.nodes:
            self.manifest.add_node_nofile(node)
            manifest_nodes_modified = True
 
    return manifest_nodes_modified

插件开发

开发插件,就是实现dbtPlugin 同时结合装饰器dbt_hook,主题模块的格式应该是dbt_ 开头的,同时模块应该包含一个plugins 的属性

  • 参考格式
from dbt.plugins.manager import dbt_hook, dbtPlugin
from dbt.plugins.manifest import PluginNodes
from dbt.plugins.contracts import PluginArtifacts
 
class ExamplePlugin(DbtPlugin):
    """A demonstration plugin for dbt-core 1.6.x."""
 
    def initialize(self) -> None:
        """
        Initialize the plugin. This is where you'd setup connections,
        load files, or perform other I/O.
        """
        print('Initializing ExamplePlugin')
        pass
 
    @dbt_hook
    def get_manifest_artifacts(self, manifest) -> PluginArtifacts:
        """
        Return PluginArtifacts to dbt for writing to the file system.
        """
        return PluginArtifacts()
 
    @dbt_hook
    def get_nodes(self) -> PluginNodes:
        """
        Return PluginNodes to dbt for injection into dbt's DAG.
        """
        return PluginNodes()
   
plugins = [ExamplePlugin]

说明

dbt-loom 是一个基于plugin 开发的扩展,目前似乎并不是很多,但是plugin 模式为dbt 的扩展提供了一个更加方便的口子,还是值得学习下的

参考资料

dbt/plugins/manager.py
https://nicholasyager.com/2023/08/dbt_plugin_api.html
https://github.com/nicholasyager/dbt-loom

标签:name,plugin,self,manifest,hook,简单,nodes,dbt
From: https://www.cnblogs.com/rongfengliang/p/18169927

相关文章

  • nicegui:Python 图形界面库,简单好用
    前言在现代计算机应用程序开发中,图形用户界面(GUI)是用户与程序交互的重要组成部分。然而,GUI开发往往需要大量的代码和复杂的布局,给开发者带来了一定的挑战。在本篇博文中,将介绍nicegui,它是一个简单易用的图形用户界面库,提供了一种简化GUI开发的方式,使开发者能够更快速地构建吸......
  • Altium PCB添加平衡铜/盗铜的方法(依旧是简单粗暴)
    最近画的板子遇到了PCB残铜率不足的问题,一般想法也是用整板覆铜的方法来填满空旷的区域,但是这个会带来很多碎铜,特别是表层有元器件,覆铜会产生更多碎铜,但是不覆铜又会导致残铜率低,板厂的说法是残铜率过低会导致PCB外层电镀时电流不均衡,后果就是铜箔厚度不均匀,内层残铜率过低会影响......
  • 矩阵之间的关系简单整理
    等价:可以通过初等变换互相转化的矩阵。当A和B为同型矩阵,且r(A)=r(B)时,A,B一定等价。相似:\(P^{-1}AP=B\)。本质是基坐标转换,表示在不同坐标系下效果相同的线性变换过程。P为基坐标转换矩阵,是新基向量按列排列形成的矩阵。重要性质(原理):A与B相似,则A与B有相同的特征值(亦有相同的迹与......
  • 【HarmonyOS Next】多线程网络请求简单封装
    importhttpfrom'@ohos.net.http';import{taskpool}from'@kit.ArkTS';exportclassRequest{staticasyncget(url:string,header?:Object,expectDataType:http.HttpDataType=http.HttpDataType.OBJECT):Promise<Object>{......
  • dbt on_configuration_change 简单说明
    dbton_configuration_change目前主要是在处理物化视图中,on_configuration_change包含了三类设置参考设置apply是默认参数,尝试进行更新存在的数据库对象continue允许继续运行,但是提供一个警告fail如果捕捉到变动就失败参考使用比如模型上的{{config(......
  • C#中Redis使用简单教程
    C#中Redis使用简单教程Curry30_chen已于2024-01-1013:54:25修改阅读量2.9k收藏16点赞数5文章标签:c#redis开发语言版权客户端redis-desktop-manager对Redis可视化管理工具客户端redis-desktop-manager对Redis可视化管理工具立即下载C#开发者的Redis入门指南Redis是......
  • 41.前端知识node.js中fs的简单学习整理
    在跟敲项目的实现时候会有很多困难后端还好前端差了很多那既然想要将整个项目从头到尾捋清楚那就需要对前端的学习那就从5.7号开始了前端知识的学习不能叫学习吧更算是重写认识从基础抓抓类似与这些知识点那废话不多说反正我有大把时间可以学习哈啊哈课程学习来......
  • jmeter插件管理器安装-Plugins Manager
    有些函数是jmeter自带函数,有些函数是自定义的需要通过插件安装的,例如jmeter没有自带base64加密函数,若要使用该函数,可以通过插件安装自定义函数1.下载jmeter插件管理器:https://jmeter-plugins.org/wiki/PluginsManager/ 2.重启在jmeter,在“选项”下显示插件管理器"Plugins......
  • unplugin-auto-import 工程项目 import 模块自动导入
    渡一(袁老师)视频详解[点击前往]unplugin-auto-import是一个用于**Vue3**(和Vue2的CompositionAPI)的插件,它可以自动导入你在代码中使用的VueCompositionAPI函数(如ref,reactive,computed等)以及来自其他库的函数(如VueRouter的useRoute,useRouter,或者Pinia的de......
  • pytorch训练简单加减验证码(一):数据加载器实现
    1、torch.utils.data.Datasettorch.utils.data.Dataset是代表自定义数据集方法的类,用户可以通过继承该类来自定义自己的数据集类,在继承时要求用户重载__len__()和__getitem__()这两个魔法方法。len():返回的是数据集的大小。我们构建的数据集是一个对象,而数据集不像序列类型(列表......