首页 > 其他分享 >dbt query_header 简单说明

dbt query_header 简单说明

时间:2024-04-06 09:16:05浏览次数:16  
标签:comment self header str sql query dbt

dbt 对于每个实际执行的任务(实际sql)都会包含一个任务注释,可以方便的查看dbt 版本,执行nodeid,target

参考格式

/* {"app": "dbt", "dbt_version": "1.5.11", "profile_name": "dremio_nessie", "target_name": "dev", "node_id": "model.dremio_demo_app.my_mydemoapp"} */

内部实现

因为实际的执行是对于实际db 的操作,所以query_header 是在connections 内部处理的

  • BaseConnectionManager 内部定义
def __init__(self, profile: AdapterRequiredConfig, mp_context: SpawnContext) -> None:
    self.profile = profile
    self.thread_connections: Dict[Hashable, Connection] = {}
    self.lock: RLock = mp_context.RLock()
    self.query_header: Optional[MacroQueryStringSetter] = None
 
def set_query_header(self, query_header_context: Dict[str, Any]) -> None:
    self.query_header = MacroQueryStringSetter(self.profile, query_header_context)
  • MacroQueryStringSetter 的实现
class MacroQueryStringSetter:
    def __init__(self, config: AdapterRequiredConfig, manifest: Manifest):
        self.manifest = manifest
        self.config = config
        # 可以看出是一个macro,实际如果debug 也可以看出会有一个query comment 的macro 
        comment_macro = self._get_comment_macro()
        self.generator: QueryStringFunc = lambda name, model: ""
        # if the comment value was None or the empty string, just skip it
        if comment_macro:
            assert isinstance(comment_macro, str)
            macro = "\n".join(
                (
                    "{%- macro query_comment_macro(connection_name, node) -%}",
                    comment_macro,
                    "{% endmacro %}",
                )
            )
            ctx = self._get_context()
            self.generator = QueryStringGenerator(macro, ctx)
        self.comment = _QueryComment(None)
        self.reset()
 
    def _get_comment_macro(self) -> Optional[str]:
        return self.config.query_comment.comment
 
    def _get_context(self) -> Dict[str, Any]:
        return generate_query_header_context(self.config, self.manifest)
 
    def add(self, sql: str) -> str:
        return self.comment.add(sql)
 
    def reset(self):
        self.set("master", None)
 
    def set(self, name: str, node: Optional[ResultNode]):
        wrapped: Optional[NodeWrapper] = None
        if node is not None:
            wrapped = NodeWrapper(node)
        comment_str = self.generator(name, wrapped)
 
        append = False
        if isinstance(self.config.query_comment, QueryComment):
            append = self.config.query_comment.append
        self.comment.set(comment_str, append)
  • _QueryComment 实现
class _QueryComment(local):
    """A thread-local class storing thread-specific state information for
    connection management, namely:
        - the current thread's query comment.
        - a source_name indicating what set the current thread's query comment
    """
 
    def __init__(self, initial) -> None:
        self.query_comment: Optional[str] = initial
        self.append: bool = False
 
    def add(self, sql: str) -> str:
        if not self.query_comment:
            return sql
 
        if self.append:
            # replace last ';' with '<comment>;'
            sql = sql.rstrip()
            if sql[-1] == ";":
                sql = sql[:-1]
                return "{}\n/* {} */;".format(sql, self.query_comment.strip())
 
            return "{}\n/* {} */".format(sql, self.query_comment.strip())
 
        return "/* {} */\n{}".format(self.query_comment.strip(), sql)
 
    def set(self, comment: Optional[str], append: bool):
        if isinstance(comment, str) and "*/" in comment:
            # tell the user "no" so they don't hurt themselves by writing
            # garbage
            raise DbtRuntimeError(f'query comment contains illegal value "*/": {comment}')
        self.query_comment = comment
        self.append = append
  • 实际内容来自AdapterRequiredConfig 实际上是配置相关的
class AdapterRequiredConfig(HasCredentials, Protocol):
    project_name: str
    query_comment: QueryComment
    cli_vars: Dict[str, Any]
    target_path: str
    log_cache_events: bool
  • QueryComment 中默认comment macro
    可以看出此信息与上边示例的一致
 
DEFAULT_QUERY_COMMENT = """
{%- set comment_dict = {} -%}
{%- do comment_dict.update(
    app='dbt',
    dbt_version=dbt_version,
    profile_name=target.get('profile_name'),
    target_name=target.get('target_name'),
) -%}
{%- if node is not none -%}
  {%- do comment_dict.update(
    node_id=node.unique_id,
  ) -%}
{% else %}
  {# in the node context, the connection name is the node_id #}
  {%- do comment_dict.update(connection_name=connection_name) -%}
{%- endif -%}
{{ return(tojson(comment_dict)) }}
"""
  • SQLConnectionManager会使用相关信息
    实际上就是拼接sql
 def execute(
        self,
        sql: str,
        auto_begin: bool = False,
        fetch: bool = False,
        limit: Optional[int] = None,
    ) -> Tuple[AdapterResponse, agate.Table]:
        sql = self._add_query_comment(sql)
        _, cursor = self.add_query(sql, auto_begin)
        response = self.get_response(cursor)
        if fetch:
            table = self.get_result_from_cursor(cursor, limit)
        else:
            table = empty_table()
        return response, table
  • 参数传递
    目前是dbt 在解析dbt 项目的manifest 生成过程中传递的
    dbt core 中的manifest 模块生成
class QueryHeaderContext(ManifestContext):
    def __init__(self, config: AdapterRequiredConfig, manifest: Manifest) -> None:
        super().__init__(config, manifest, config.project_name)
 
def generate_query_header_context(config: AdapterRequiredConfig, manifest: Manifest):
    ctx = QueryHeaderContext(config, manifest)
    return ctx.to_dict()

传递是dbt cli 的context 处理装饰器中 (core requires 模块)

register_adapter(runtime_config, get_mp_context())
          adapter = get_adapter(runtime_config)
          adapter.set_macro_context_generator(generate_runtime_macro_context)
          adapter.set_macro_resolver(ctx.obj["manifest"])
          query_header_context = generate_query_header_context(
              adapter.config, ctx.obj["manifest"]
          )
          adapter.connections.set_query_header(query_header_context)
      return func(*args, **kwargs)

说明

dbt 的注释还是比较方便的,可以进行执行信息查看分析,实际上基于macro 的sql 查询注释信息生成

参考资料

dbt/adapters/base/connections.py
dbt/adapters/base/query_headers.py
core/dbt/cli/requires.py

标签:comment,self,header,str,sql,query,dbt
From: https://www.cnblogs.com/rongfengliang/p/18105085

相关文章

  • dbt debug macro 简单说明
    dbt支持debugmacro可以用来进行调试使用{%macromy_macro()%} {%setsomething_complex=my_complicated_macro()%} {{debug()}} {%endmacro%}参考实现实际上就是通过环境变量开启了一个debug上下文变量ifos.en......
  • Elastic学习之旅 (6) Query DSL
    大家好,我是Edison。首先说声抱歉,这个ES学习系列很久没更新了,现在继续吧。上一篇:ES的倒排索引和Analyzer什么是QueryDSLDSL是DomainSpecificLanguage的缩写,指的是为特定问题领域设计的计算机语言。这种语言专注于某特定领域的问题解决,因而比通用编程语言更有效率。在Elastic......
  • Excel、PowerQuery 和 ChatGPT 终极手册(上)
    原文:UltimateChatGPTHandbookforEnterprises译者:飞龙协议:CCBY-NC-SA4.0序言在不断发展的数据管理和分析领域中,掌握Excel的查找功能不仅是一种技能,更是高效数据处理的基石。《使用PowerQuery和ChatGPT的终极Excel》不仅仅是一本书;它是为数据爱好者、Excel爱好......
  • jquery的blockUI遮罩层的使用(1),bootstrap前端开发
    //pluginmethodforblockingelementcontent$.fn.block=function(opts){if(this[0]===window){$.blockUI(opts);returnthis;}varfullOpts=$.extend({},$.blockUI.defaults,opts||{});this.each(function(){var$el=$(this);if(fullOpts......
  • dbt statement macro 简单说明
    statementblocks实际上就是一个标准的jinja2macro调用包装,提供了方便的sql执行能力,因为需要进行查询结果的存储,dbt提供了一个store_result的macro,内部数据的处理基于了agate这个方便的python数据处理包为了查询使用提供了load_resultmacro以下只说明关于stateme......
  • JQuery
    目录简介JQuery对象JQuery的使用Dom对象与JQuery包装集对象JQuery选择器基础选择器层次选择器表单选择器JQueryDom操作操作元素属性操作元素样式操作元素的内容创建元素添加元素删除元素遍历元素JQuery事件ready预加载事件绑定事件JQueryAjax$.ajax$.get$.post$.getJSON简介......
  • 界面控件Kendo UI for jQuery 2024 Q1亮点 - 新的ToggleButton组件
    Telerik & KendoUI 2024Q1版本于2024年初发布,在此版本中将AI集成到了UI组件中,在整个产品组合中引入AIPrompt组件以及10多个新的UI控件、支持Angular17、多个数据可视化功能增强等。P.S:KendoUIforjQuery提供了在短时间内构建现代Web应用程序所需的一切,从众多UI子控件中......
  • 界面控件Kendo UI for jQuery 2024 Q1亮点 - 新的ToggleButton组件
    Telerik & KendoUI 2024Q1版本于2024年初发布,在此版本中将AI集成到了UI组件中,在整个产品组合中引入AIPrompt组件以及10多个新的UI控件、支持Angular17、多个数据可视化功能增强等。P.S:KendoUIforjQuery提供了在短时间内构建现代Web应用程序所需的一切,从众多UI子控......
  • dbt macro 的执行简单说明
    BaseAdapter中包含了一个adapter实际运行依赖的转换,链接处理,当然也包含了macro的执行,具体方法有直接的execute_macroModelRunner中的materialization_macro(run命令)还有run-operation中RunOperationTask的_run_unsafe方法ModelRunnercall_macro处理参考调用......
  • dbt this macro 处理简单说明
    dbtthismacro提供了一种方便的对于当前模型展现的方法,可以使用在增量模型以及pre&posthooks中this实际是就类似ref('<the_current_model>')是一个relation包含了database,schema以及模型标识使用示例一个增量处理的,基于this可以方便的引用模型{{config(mater......