statement blocks 实际上就是一个标准的jinja2 macro 调用包装, 提供了方便的sql 执行能力,因为需要进行
查询结果的存储,dbt 提供了一个store_result 的macro,内部数据的处理基于了agate 这个方便的python 数据处理包
为了查询使用提供了load_result macro
以下只说明关于statement部分,store_result 以及load_result macro 后边单独说明
statement 处理
statement macro 是在dbt-adapter 包中,实现内在dbt core 中,dbt-adapter 包含了dbt 的global macro (package 为dbt)
- 参考使用
# 定义查询
{%- call statement('states', fetch_result=True) -%}
select distinct state from {{ ref('users') }}
{%- endcall -%}
# 获取数据
{%- set states = load_result('states') -%}
{%- set states_data = states['data'] -%}
{%- set states_status = states['response'] -%}
- 参考代码
{#--
The macro override naming method (spark__statement) only works for macros which are called with adapter.dispatch. For macros called directly, you can just redefine them.
--#}
{%- macro statement(name=None, fetch_result=False, auto_begin=True, language='sql') -%}
{%- if execute: -%}
# 通过caller获取传递的sql
{%- set compiled_code = caller() -%}
{%- if name == 'main' -%}
{{ log('Writing runtime {} for node "{}"'.format(language, model['unique_id'])) }}
{{ write(compiled_code) }}
{%- endif -%}
{%- if language == 'sql'-%}
# 执行sql 查询,adapter macro 的execute
{%- set res, table = adapter.execute(compiled_code, auto_begin=auto_begin, fetch=fetch_result) -%}
{%- elif language == 'python' -%}
# 执行python 代码处理
{%- set res = submit_python_job(model, compiled_code) -%}
{#-- TODO: What should table be for python models? --#}
{%- set table = None -%}
{%- else -%}
{% do exceptions.raise_compiler_error("statement macro didn't get supported language") %}
{%- endif -%}
{%- if name is not none -%}
# 存储结果
{{ store_result(name, response=res, agate_table=table) }}
{%- endif -%}
{%- endif -%}
{%- endmacro %}
{% macro noop_statement(name=None, message=None, code=None, rows_affected=None, res=None) -%}
{%- set sql = caller() -%}
{%- if name == 'main' -%}
{{ log('Writing runtime SQL for node "{}"'.format(model['unique_id'])) }}
{{ write(sql) }}
{%- endif -%}
{%- if name is not none -%}
{{ store_raw_result(name, message=message, code=code, rows_affected=rows_affected, agate_table=res) }}
{%- endif -%}
{%- endmacro %}
# run_query 友好包装
{# a user-friendly interface into statements #}
{% macro run_query(sql) %}
{% call statement("run_query_statement", fetch_result=true, auto_begin=false) %}
{{ sql }}
{% endcall %}
{% do return(load_result("run_query_statement").table) %}
{% endmacro %}
说明
官方推荐通过run_query 对于statement 进行替换,run_query 是statement的包装使用上更加方便
参考资料
dbt/include/global_project/macros/etc/statement.sql
core/dbt/context/providers.py
https://docs.getdbt.com/reference/dbt-jinja-functions/run_query
https://docs.getdbt.com/reference/dbt-jinja-functions/statement-blocks
https://jinja.palletsprojects.com/en/3.0.x/templates/#call
https://agate.readthedocs.io/en/latest/
https://github.com/wireservice/agate