首页 > 其他分享 >dbt adapter dispatch 处理简单说明

dbt adapter dispatch 处理简单说明

时间:2024-06-02 09:03:32浏览次数:38  
标签:search name macro self namespace dispatch dbt adapter

以前是结合使用对于adapter 的dispatch 有过简单说明,以下结合源码分析下

内部参考处理

  • 参考处理
def dispatch(
    self,
    macro_name: str,
    macro_namespace: Optional[str] = None,
    packages: Optional[List[str]] = None,  # eventually remove since it's fully deprecated
) -> MacroGenerator:
    search_packages: List[Optional[str]]
    #  对于macro 包含了 . 的调用进行处理,所以自己开发的macro 注意命名
    if "." in macro_name:
        suggest_macro_namespace, suggest_macro_name = macro_name.split(".", 1)
        msg = (
            f'In adapter.dispatch, got a macro name of "{macro_name}", '
            f'but "." is not a valid macro name component. Did you mean '
            f'`adapter.dispatch("{suggest_macro_name}", '
            f'macro_namespace="{suggest_macro_namespace}")`?'
        )
        raise CompilationError(msg)
 
    if packages is not None:
        raise MacroDispatchArgError(macro_name)
   # 对于package 的搜索,此
    search_packages = self._get_search_packages(macro_namespace)
 
    attempts = []
   # 进行遍历操作
    for package_name in search_packages:
        for prefix in self._get_adapter_macro_prefixes():
            search_name = f"{prefix}__{macro_name}"
            try:
                # this uses the namespace from the context
                macro = self._namespace.get_from_package(package_name, search_name)
            except CompilationError:
                # Only raise CompilationError if macro is not found in
                # any package
                macro = None
 
            if package_name is None:
                attempts.append(search_name)
            else:
                attempts.append(f"{package_name}.{search_name}")
 
            if macro is not None:
                return macro
 
    searched = ", ".join(repr(a) for a in attempts)
    msg = f"In dispatch: No macro named '{macro_name}' found within namespace: '{macro_namespace}'\n    Searched for: {searched}"
    raise CompilationError(msg)
  • 包搜索处理

会包含搜索顺序以及搜索包的处理,与官方文档介绍是一致的

def _get_search_packages(self, namespace: Optional[str] = None) -> List[Optional[str]]:
    search_packages: List[Optional[str]] = [None]
 
    if namespace is None:
        search_packages = [None]
    elif isinstance(namespace, str):
        macro_search_order = self._adapter.config.get_macro_search_order(namespace)
        if macro_search_order:
            search_packages = macro_search_order
        elif not macro_search_order and namespace in self._adapter.config.dependencies:
            search_packages = [self.config.project_name, namespace]
    else:
        raise CompilationError(
            f"In adapter.dispatch, got a {type(namespace)} macro_namespace argument "
            f'("{namespace}"), but macro_namespace should be None or a string.'
        )
 
    return search_packages
  • macronamespace 处理
class MacroNamespace(Mapping):
    def __init__(
        self,
        global_namespace: FlatNamespace,  # root package macros
        local_namespace: FlatNamespace,  # packages for *this* node
        global_project_namespace: FlatNamespace,  # internal packages
        packages: Dict[str, FlatNamespace],  # non-internal packages
    ):
        self.global_namespace: FlatNamespace = global_namespace
        self.local_namespace: FlatNamespace = local_namespace
        self.packages: Dict[str, FlatNamespace] = packages
        self.global_project_namespace: FlatNamespace = global_project_namespace
 
    def _search_order(self) -> Iterable[Union[FullNamespace, FlatNamespace]]:
        yield self.local_namespace  # local package
        yield self.global_namespace  # root package
        # TODO CT-211
        yield self.packages  # type: ignore[misc] # non-internal packages
        yield {
            # TODO CT-211
            GLOBAL_PROJECT_NAME: self.global_project_namespace,  # type: ignore[misc] # dbt
        }
        yield self.global_project_namespace  # other internal project besides dbt
 
    # provides special keys method for MacroNamespace iterator
    # returns keys from local_namespace, global_namespace, packages,
    # global_project_namespace
    def _keys(self) -> Set[str]:
        keys: Set[str] = set()
        for search in self._search_order():
            keys.update(search)
        return keys
 
    # special iterator using special keys
    def __iter__(self) -> Iterator[str]:
        for key in self._keys():
            yield key
 
    def __len__(self):
        return len(self._keys())
 
    def __getitem__(self, key: str) -> NamespaceMember:
        for dct in self._search_order():
            if key in dct:
                return dct[key]
        raise KeyError(key)
 
    def get_from_package(self, package_name: Optional[str], name: str) -> Optional[MacroGenerator]:
        if package_name is None:
            return self.get(name)
        elif package_name == GLOBAL_PROJECT_NAME:
            return self.global_project_namespace.get(name)
        elif package_name in self.packages:
            return self.packages[package_name].get(name)
        else:
            raise PackageNotFoundForMacroError(package_name)
  • macro prefix 的处理
    可以看到先是当前adapter 的,然后是其他adapter (通过依赖指定的),之后是default
def _get_adapter_macro_prefixes(self) -> List[str]:
    # order matters for dispatch:
    #  1. current adapter
    #  2. any parent adapters (dependencies)
    #  3. 'default'
    search_prefixes = get_adapter_type_names(self._adapter.type()) + ["default"]
    return search_prefixes
  • get_adapter_type_names 的处理
def get_adapter_type_names(self, name: Optional[str]) -> List[str]:
    return [p.adapter.type() for p in self.get_adapter_plugins(name)]
  • get_adapter_plugins 部分会进行依赖plugin 的处理

注意dependencies 这个也是我们开发plugin 定义的一个,也会进行查找

def get_adapter_plugins(self, name: Optional[str]) -> List[AdapterPlugin]:
    """Iterate over the known adapter plugins. If a name is provided,
    iterate in dependency order over the named plugin and its dependencies.
    """
    if name is None:
        return list(self.plugins.values())
 
    plugins: List[AdapterPlugin] = []
    seen: Set[str] = set()
    plugin_names: List[str] = [name]
    while plugin_names:
        plugin_name = plugin_names[0]
        plugin_names = plugin_names[1:]
        try:
            plugin = self.plugins[plugin_name]
        except KeyError:
            raise DbtInternalError(f"No plugin found for {plugin_name}") from None
        plugins.append(plugin)
        seen.add(plugin_name)
        for dep in plugin.dependencies:
            if dep not in seen:
                plugin_names.append(dep)
    return plugins

说明

结合官方文档以及源码可以更好的了解dbt adapter dispatch 的内部处理

参考资料

core/dbt/context/macros.py
core/dbt/context/providers.py
https://docs.getdbt.com/reference/dbt-jinja-functions/dispatch
https://docs.getdbt.com/reference/dbt-jinja-functions/adapter

标签:search,name,macro,self,namespace,dispatch,dbt,adapter
From: https://www.cnblogs.com/rongfengliang/p/18169081

相关文章

  • dbt dbt-audit-helper 包compare_relation_columns 处理简单说明
    dbtdbt-audit-helper包在进行compare_relation_columns处理的时候进行数据表列字段创建顺序的判断参考使用我按照test处理的,同时进行的测试异常进行存储使用{{audit_helper.compare_relation_columns(a_relation=source("dalongdemo","mytest_appv2")......
  • lora_adapter 模型和原模型合并成一个模型
    lora部分合并到原模型参数上importtorchfrompeftimportPeftModelfromtransformersimportAutoTokenizer,AutoModelForCausalLM,LlamaTokenizerfromtransformers.generation.utilsimportGenerationConfigdefapply_lora(model_name_or_path,output_path,lor......
  • Delphi CxGrid/CxDBTreeList等将排序筛选条件改为中文方法
    Delphi CxGrid/CxDBTreeList等将排序筛选条件改为中文方法一、加入cxLocalizer控件二、在FormCreate里加入以下代码procedureTForm1.FormCreate(Sender:TObject);begin cxLocalizer1.LoadFromResource(HInstance); cxLocalizer1.Language:='中文(简体,中国)';......
  • 设计模式:适配器模式(Adapter)
    设计模式:适配器模式(Adapter)设计模式:适配器模式(Adapter)模式动机模式定义模式结构时序图模式实现在单线程环境下的测试在多线程环境下的测试模式分析优缺点适用场景应用场景应用实例适配器模式和代理模式的区别模式扩展默认适配器模式(DefaultAdapterPattern)接口适配器模......
  • Scan Your Truck Using Nexiq Adapter: Simplifying Your Diagnostic Process
    Intoday'sfast-pacedworld,ensuringthesmoothfunctioningofyourtruckisessentialforavoidingdowntimeandmaintainingefficiency.Withtheadventofadvancedtechnology,diagnosingandtroubleshootingissueshasbecomemoreconvenientthanev......
  • dbt adapter 的get_relation 简单说明
    dbt的adapter.get_relation可以方便的获取存在的relcation信息,以下是一个简单说明参考实现内部处理@available.parse_nonedefget_relation(self,database:str,schema:str,identifier:str)->Optional[BaseRelation]:relations_list=self.lis......
  • dbt Relation check_schema_exists 一个有意思的功能
    dbt内部总有一些隐藏的小细节,官方文档没有说明,但是在一些adapter实现中包含,一些是关于check_schema_exists的一些说明内部处理dbt/adapters/sql/impl.pydefcheck_schema_exists(self,database:str,schema:str)->bool:information_schema=self.Re......
  • ComfyUI使用基础工作流+面部adapter修复
    json{"last_node_id":13,"last_link_id":19,"nodes":[{"id":6,"type":"CLIPTextEncode","pos":[415,186],"size&qu......
  • dbt snapshot 处理简单说明
    dbt的snapshot实际上也是一种物化处理,支持与test,docs,稍有不同就是dbt没定义独立的block扩展,以下是一个简单说明dbt目前默认的snapshot是基于了scd2模型使用包含了配置以及snapshot定义,配置支持dbt_project项目级以及独立snapshot定义,对于snapshot是需要指定策略的......
  • dbt fromyaml 上下文方法简单说明
    fromyaml上下文方法可以用来加载yaml内容,属于一个工具类,比如automate-dv就使用了不少方法参考使用{%-setinfo-%}source_model:raw_staging:"raw_customer"derived_columns:SOURCE:"!1"LOAD_DATETIME:"CRM_DATA_INGESTION_TIME"E......