首页 > 其他分享 >dbt snapshot 处理简单说明

dbt snapshot 处理简单说明

时间:2024-05-11 09:01:58浏览次数:40  
标签:node set target relation 简单 snapshot config dbt

dbt 的snapshot 实际上也是一种物化处理,支持与test,docs,稍有不同就是dbt 没定义独立的block 扩展,以下是一个简单说明
dbt 目前默认的snapshot是基于了scd2 模型

使用

包含了配置以及snapshot 定义,配置支持dbt_project 项目级以及独立snapshot 定义,对于snapshot 是需要指定策略的

  • 参考配置
{% snapshot orders_snapshot %}
    {{
        config(
          unique_key='id',
          strategy='timestamp',
          updated_at='updated_at'
        )
    }}
    -- Pro-Tip: Use sources in snapshots!
    select * from {{ source('jaffle_shop', 'orders') }}
{% endsnapshot %}

处理机制

dbt snapshot 实际上也是物化,所以dbt 处理的时候只是进行了snapshot block 的parse,实际编译的元数据也是materialization

  • 参考解析

ManifestLoader 的load方法中调用的解析

parser_types: List[Type[Parser]] = [
                ModelParser,
                SnapshotParser,
                AnalysisParser,
                SingularTestParser,
                SeedParser,
                DocumentationParser,
                HookParser,
            ]
            for project in self.all_projects.values():
                if project.project_name not in project_parser_files:
                    continue
                self.parse_project(
                    project, project_parser_files[project.project_name], parser_types
                )

SnapshotParser 解析处理

class SnapshotParser(SQLParser[IntermediateSnapshotNode, SnapshotNode]):
    def parse_from_dict(self, dct, validate=True) -> IntermediateSnapshotNode:
        if validate:
            IntermediateSnapshotNode.validate(dct)
        return IntermediateSnapshotNode.from_dict(dct)
 
    @property
    def resource_type(self) -> NodeType:
        return NodeType.Snapshot
 
    @classmethod
    def get_compiled_path(cls, block: FileBlock):
        return block.path.relative_path
 
    def set_snapshot_attributes(self, node):
        # use the target_database setting if we got it, otherwise the
        # `database` value of the node (ultimately sourced from the `database`
        # config value), and if that is not set, use the database defined in
        # the adapter's credentials.
        if node.config.target_database:
            node.database = node.config.target_database
        elif not node.database:
            node.database = self.root_project.credentials.database
 
        # the target schema must be set if we got here, so overwrite the node's
        # schema
        node.schema = node.config.target_schema
        # We need to set relation_name again, since database/schema might have changed
        self._update_node_relation_name(node)
 
        return node
 
    def get_fqn(self, path: str, name: str) -> List[str]:
        """Get the FQN for the node. This impacts node selection and config
        application.
 
        On snapshots, the fqn includes the filename.
        """
        no_ext = os.path.splitext(path)[0]
        fqn = [self.project.project_name]
        fqn.extend(split_path(no_ext))
        fqn.append(name)
        return fqn
 
    def transform(self, node: IntermediateSnapshotNode) -> SnapshotNode:
        try:
            # The config_call_dict is not serialized, because normally
            # it is not needed after parsing. But since the snapshot node
            # does this extra to_dict, save and restore it, to keep
            # the model config when there is also schema config.
            config_call_dict = node.config_call_dict
            dct = node.to_dict(omit_none=True)
            parsed_node = SnapshotNode.from_dict(dct)
            parsed_node.config_call_dict = config_call_dict
            self.set_snapshot_attributes(parsed_node)
            return parsed_node
        except ValidationError as exc:
            raise SnapshopConfigError(exc, node)
 
    def parse_file(self, file_block: FileBlock) -> None:
       # 此处文件处理只支持snapshot 的,BlockSearcher 处理部分会使用自己的一套blocktag 解析处理
        blocks = BlockSearcher(
            source=[file_block],
            allowed_blocks={"snapshot"},
            source_tag_factory=BlockContents,
        )
        for block in blocks:
            self.parse_node(block)
  • 生成格式参考

如下图

 

  • 执行

通过了解我们知道也是物化,那么就会使用物化相关定义处理
参考处理snapshot.sql ,里边东西还是比较多的, 同时也调用了不少其他macro 定义,详细的后边解决物化完整介绍下,默认的实现是基于了merge into

{% materialization snapshot, default %}
  {%- set config = model['config'] -%}
 
  {%- set target_table = model.get('alias', model.get('name')) -%}
 
  {%- set strategy_name = config.get('strategy') -%}
  {%- set unique_key = config.get('unique_key') %}
  -- grab current tables grants config for comparision later on
  {%- set grant_config = config.get('grants') -%}
 
  {% set target_relation_exists, target_relation = get_or_create_relation(
          database=model.database,
          schema=model.schema,
          identifier=target_table,
          type='table') -%}
 
  {%- if not target_relation.is_table -%}
    {% do exceptions.relation_wrong_type(target_relation, 'table') %}
  {%- endif -%}
   
  {{ run_hooks(pre_hooks, inside_transaction=False) }}
 
  {{ run_hooks(pre_hooks, inside_transaction=True) }}
 
  {% set strategy_macro = strategy_dispatch(strategy_name) %}
  {% set strategy = strategy_macro(model, "snapshotted_data", "source_data", config, target_relation_exists) %}
 
  {% if not target_relation_exists %}
 
      {% set build_sql = build_snapshot_table(strategy, model['compiled_code']) %}
      {% set final_sql = create_table_as(False, target_relation, build_sql) %}
 
  {% else %}
 
      {{ adapter.valid_snapshot_target(target_relation) }}
 
      {% set staging_table = build_snapshot_staging_table(strategy, sql, target_relation) %}
 
      -- this may no-op if the database does not require column expansion
      {% do adapter.expand_target_column_types(from_relation=staging_table,
                                               to_relation=target_relation) %}
 
      {% set missing_columns = adapter.get_missing_columns(staging_table, target_relation)
                                   | rejectattr('name', 'equalto', 'dbt_change_type')
                                   | rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
                                   | rejectattr('name', 'equalto', 'dbt_unique_key')
                                   | rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
                                   | list %}
 
      {% do create_columns(target_relation, missing_columns) %}
 
      {% set source_columns = adapter.get_columns_in_relation(staging_table)
                                   | rejectattr('name', 'equalto', 'dbt_change_type')
                                   | rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
                                   | rejectattr('name', 'equalto', 'dbt_unique_key')
                                   | rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
                                   | list %}
 
      {% set quoted_source_columns = [] %}
      {% for column in source_columns %}
        {% do quoted_source_columns.append(adapter.quote(column.name)) %}
      {% endfor %}
 
      {% set final_sql = snapshot_merge_sql(
            target = target_relation,
            source = staging_table,
            insert_cols = quoted_source_columns
         )
      %}
 
  {% endif %}
 
  {% call statement('main') %}
      {{ final_sql }}
  {% endcall %}
 
  {% set should_revoke = should_revoke(target_relation_exists, full_refresh_mode=False) %}
  {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}
 
  {% do persist_docs(target_relation, model) %}
 
  {% if not target_relation_exists %}
    {% do create_indexes(target_relation) %}
  {% endif %}
 
  {{ run_hooks(post_hooks, inside_transaction=True) }}
 
  {{ adapter.commit() }}
 
  {% if staging_table is defined %}
      {% do post_snapshot(staging_table) %}
  {% endif %}
 
  {{ run_hooks(post_hooks, inside_transaction=False) }}
 
  {{ return({'relations': [target_relation]}) }}
 
{% endmaterialization %}

说明

以上只是一个简单说明,dbt 不少功能都是基于物化block 处理的,后边说明下物化的处理

参考资料

core/dbt/parser/snapshots.py
core/dbt/parser/manifest.py
core/dbt/parser/search.py
core/dbt/clients/_jinja_blocks.py
core/dbt/contracts/graph/nodes.py
core/dbt/include/global_project/macros/materializations/snapshots/snapshot.sql
core/dbt/include/global_project/macros/materializations/snapshots/strategies.sql
https://docs.getdbt.com/reference/snapshot-properties

标签:node,set,target,relation,简单,snapshot,config,dbt
From: https://www.cnblogs.com/rongfengliang/p/18134063

相关文章

  • Dawn Launcher Windows快捷启动工具 Maye是一款体积小巧、简单易用的快速启动工具 Luc
    DawnLauncherWindows快捷启动工具,帮助您整理杂乱无章的桌面,分门别类管理您的桌面快捷方式,让您的桌面保持干净整洁。支持关联文件夹(实时同步文件夹内容)、快速搜索、相对路径(便携路径)、扫描本机开始菜单、本地扫描本机Appx应用列表、添加网址并一键获取网址信息。Maye是一......
  • 闭包函数最简单的理解
    闭包函数是指在编程中,一个函数可以访问其词法范围内的变量,即使在其定义之外执行。这意味着函数可以“捕获”其周围的环境,并在稍后的时间访问这些值。闭包函数通常用于创建函数工厂,或者用于在程序中创建私有变量和方法。在许多编程语言中,包括Python、JavaScript和Swift等,都支......
  • dbt fromyaml 上下文方法简单说明
    fromyaml上下文方法可以用来加载yaml内容,属于一个工具类,比如automate-dv就使用了不少方法参考使用{%-setinfo-%}source_model:raw_staging:"raw_customer"derived_columns:SOURCE:"!1"LOAD_DATETIME:"CRM_DATA_INGESTION_TIME"E......
  • Android Studio简单入门教程
    1.建立项目首先点击new——newproject新建项目选择想要创建的Android的模板,建议选择emptyactivity(空模板),然后nextName:给你的项目起一个名字APIlevel:选择Android虚拟机的版本,版本越低运行起来越快剩下的就按默认的就行,点击finish(由于版本不一样,步骤2和步骤3的顺序......
  • dbt macro 中获取relation 的几种方法
    很多时候我们是希望在自己开发的macro中引用relation这样可以获取实际模型在数据库中的信息,方便数据的写入,或者进行查询实现动态能力,尤其在进行数据质量方便的处理时候,以下简单说明下一些可选的方法参考方法直接使用api.Relation.create创建新的如果知道一些信息(database......
  • 平衡树的简单替代品
    1、STL/gnu_pbds1、vector<int>常用,动态空间注意比较慢,远古题数据小才建议使用。支持操作复杂度序列类别随机访问\(O(1)\)尾部插入删除\(O(1)\)随机插入删除\(O(玄学),O(\sqrt{n})\)集合类别none2、set<int>维护数集的,它的常数真的很奇妙......
  • 一个简单的MD5加盐
    虽然都说MD5加密一下密码比较好,但是如果密码过于简单,比如123456,经过MD5加密之后还是不安全,因为别有用心的人可以使用彩虹表来撞库得到密码。因此为了加大破解难度,需要给MD5算法加盐。下面是一个简单的加盐算法。当然,我不是说加了盐就一劳永逸了,下面的代码也不安全,这样做只是为了......
  • dremio CatalogMaintenanceService 服务简单说明
    说明此服务是从25.0开始包含的,同时在releasenote中也有说明,以下主要说明下内部实现release信息如下,具体就不翻译了,主要是添加了一个每个任务进行每个view最大保留50个历史信息Addeddailycatalogmaintenancetaskstotrimhistoryofviewstoamaximumof50......
  • react + antd + js 简单Cron组件,支持国际化
    Cron.jsimportReact,{Fragment,useState,useCallback,useRef,useEffect}from'react';import{Select,TimePicker,Input}from'antd';constOption=Select.Option;constmwidth80={minWidth:80,marginRight:10};constwidt......
  • 前端导出简单的Excel
    //报表导出exportProjectCount:asyncfunction(){letthat=this;awaitthat.getProjectCount().then(()=>{console.log("日志输出",that.dataCount.length)letdataLi......