dbt 支持基于seed 的快速建模处理(比较适合测试环境使用),我们只需要提供csv 格式的文件,之后执行dbt seed 就会创建对应的模型,之后我们就可以在
dbt 模型中引用了,以下简单说明下内部实现以及处理
参考使用
- seed 文件位置
一般我们会在dbt 项目的seed 目录中放对应的seed 文件,就是一个csv 格式文件(目前是强要求)
- 模型引用
models/demoapp.sql
select * from {{ref('app')}}
内部处理
内部实现上实际上seed 是一种物化,对于seed 中文件的加载dbt 使用了agate 这个强大的python 数据包,没有使用pandas,对于seed 创建模型的数据schema 信息(列类型信息)dbt 是利用了agate 的自动推导能力
- 内部参考处理
core/dbt/include/global_project/macros/materializations/seeds/seed.sql
{% materialization seed, default %}
{%- set identifier = model['alias'] -%}
{%- set full_refresh_mode = (should_full_refresh()) -%}
{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%}
{%- set exists_as_view = (old_relation is not none and old_relation.is_view) -%}
{%- set grant_config = config.get('grants') -%}
# 此处使用了dbt 暴露的load_agate_table 上下文方法
{%- set agate_table = load_agate_table() -%}
-- grab current tables grants config for comparision later on
# 此处使用了dbt 暴露的store_result上下文方法进行结果存储,方便复用
{%- do store_result('agate_table', response='OK', agate_table=agate_table) -%}
{{ run_hooks(pre_hooks, inside_transaction=False) }}
-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}
-- build model
{% set create_table_sql = "" %}
{% if exists_as_view %}
{{ exceptions.raise_compiler_error("Cannot seed to '{}', it is a view".format(old_relation)) }}
{% elif exists_as_table %}
{% set create_table_sql = reset_csv_table(model, full_refresh_mode, old_relation, agate_table) %}
{% else %}
# 通过create_csv_table macro集合上边的agate_table 数据生成创建表的sql
{% set create_table_sql = create_csv_table(model, agate_table) %}
{% endif %}
{% set code = 'CREATE' if full_refresh_mode else 'INSERT' %}
{% set rows_affected = (agate_table.rows | length) %}
{% set sql = load_csv_rows(model, agate_table) %}
# 执行sql
{% call noop_statement('main', code ~ ' ' ~ rows_affected, code, rows_affected) %}
{{ get_csv_sql(create_table_sql, sql) }};
{% endcall %}
{% set target_relation = this.incorporate(type='table') %}
{% set should_revoke = should_revoke(old_relation, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}
{% do persist_docs(target_relation, model) %}
{% if full_refresh_mode or not exists_as_table %}
{% do create_indexes(target_relation) %}
{% endif %}
{{ run_hooks(post_hooks, inside_transaction=True) }}
-- `COMMIT` happens here
{{ adapter.commit() }}
{{ run_hooks(post_hooks, inside_transaction=False) }}
{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}
- agate 加载处理
@contextmember
def load_agate_table(self) -> agate.Table:
if not isinstance(self.model, SeedNode):
raise LoadAgateTableNotSeedError(self.model.resource_type, node=self.model)
# include package_path for seeds defined in packages
package_path = (
os.path.join(self.config.packages_install_path, self.model.package_name)
if self.model.package_name != self.config.project_name
else "."
)
path = os.path.join(self.config.project_root, package_path, self.model.original_file_path)
if not os.path.exists(path):
assert self.model.root_path
path = os.path.join(self.model.root_path, self.model.original_file_path)
column_types = self.model.config.column_types
try:
# 通过包装的agate_helper 加载csv 文件
table = agate_helper.from_csv(path, text_columns=column_types)
except ValueError as e:
raise LoadAgateTableValueError(e, node=self.model)
table.original_abspath = os.path.abspath(path)
return table
- manifest 信息效果
从下图也可以看出是一个seed 的物化,处理上也就比较符合物化的模式
说明
以上是关于seed 使用以及内部处理的简单说明,通过结合源码分析可以更好了解内部处理使用好seed 功能
参考资料
core/dbt/include/global_project/macros/materializations/seeds/seed.sql
core/dbt/clients/agate_helper.py
core/dbt/context/providers.py
https://agate.readthedocs.io/en/latest/cookbook/create.html