dbt 对于每个实际执行的任务(实际sql)都会包含一个任务注释,可以方便的查看dbt 版本,执行nodeid,target
参考格式
/* {"app": "dbt", "dbt_version": "1.5.11", "profile_name": "dremio_nessie", "target_name": "dev", "node_id": "model.dremio_demo_app.my_mydemoapp"} */
内部实现
因为实际的执行是对于实际db 的操作,所以query_header 是在connections 内部处理的
- BaseConnectionManager 内部定义
def __init__(self, profile: AdapterRequiredConfig, mp_context: SpawnContext) -> None:
self.profile = profile
self.thread_connections: Dict[Hashable, Connection] = {}
self.lock: RLock = mp_context.RLock()
self.query_header: Optional[MacroQueryStringSetter] = None
def set_query_header(self, query_header_context: Dict[str, Any]) -> None:
self.query_header = MacroQueryStringSetter(self.profile, query_header_context)
- MacroQueryStringSetter 的实现
class MacroQueryStringSetter:
def __init__(self, config: AdapterRequiredConfig, manifest: Manifest):
self.manifest = manifest
self.config = config
# 可以看出是一个macro,实际如果debug 也可以看出会有一个query comment 的macro
comment_macro = self._get_comment_macro()
self.generator: QueryStringFunc = lambda name, model: ""
# if the comment value was None or the empty string, just skip it
if comment_macro:
assert isinstance(comment_macro, str)
macro = "\n".join(
(
"{%- macro query_comment_macro(connection_name, node) -%}",
comment_macro,
"{% endmacro %}",
)
)
ctx = self._get_context()
self.generator = QueryStringGenerator(macro, ctx)
self.comment = _QueryComment(None)
self.reset()
def _get_comment_macro(self) -> Optional[str]:
return self.config.query_comment.comment
def _get_context(self) -> Dict[str, Any]:
return generate_query_header_context(self.config, self.manifest)
def add(self, sql: str) -> str:
return self.comment.add(sql)
def reset(self):
self.set("master", None)
def set(self, name: str, node: Optional[ResultNode]):
wrapped: Optional[NodeWrapper] = None
if node is not None:
wrapped = NodeWrapper(node)
comment_str = self.generator(name, wrapped)
append = False
if isinstance(self.config.query_comment, QueryComment):
append = self.config.query_comment.append
self.comment.set(comment_str, append)
- _QueryComment 实现
class _QueryComment(local):
"""A thread-local class storing thread-specific state information for
connection management, namely:
- the current thread's query comment.
- a source_name indicating what set the current thread's query comment
"""
def __init__(self, initial) -> None:
self.query_comment: Optional[str] = initial
self.append: bool = False
def add(self, sql: str) -> str:
if not self.query_comment:
return sql
if self.append:
# replace last ';' with '<comment>;'
sql = sql.rstrip()
if sql[-1] == ";":
sql = sql[:-1]
return "{}\n/* {} */;".format(sql, self.query_comment.strip())
return "{}\n/* {} */".format(sql, self.query_comment.strip())
return "/* {} */\n{}".format(self.query_comment.strip(), sql)
def set(self, comment: Optional[str], append: bool):
if isinstance(comment, str) and "*/" in comment:
# tell the user "no" so they don't hurt themselves by writing
# garbage
raise DbtRuntimeError(f'query comment contains illegal value "*/": {comment}')
self.query_comment = comment
self.append = append
- 实际内容来自AdapterRequiredConfig 实际上是配置相关的
class AdapterRequiredConfig(HasCredentials, Protocol):
project_name: str
query_comment: QueryComment
cli_vars: Dict[str, Any]
target_path: str
log_cache_events: bool
- QueryComment 中默认comment macro
可以看出此信息与上边示例的一致
DEFAULT_QUERY_COMMENT = """
{%- set comment_dict = {} -%}
{%- do comment_dict.update(
app='dbt',
dbt_version=dbt_version,
profile_name=target.get('profile_name'),
target_name=target.get('target_name'),
) -%}
{%- if node is not none -%}
{%- do comment_dict.update(
node_id=node.unique_id,
) -%}
{% else %}
{# in the node context, the connection name is the node_id #}
{%- do comment_dict.update(connection_name=connection_name) -%}
{%- endif -%}
{{ return(tojson(comment_dict)) }}
"""
- SQLConnectionManager会使用相关信息
实际上就是拼接sql
def execute(
self,
sql: str,
auto_begin: bool = False,
fetch: bool = False,
limit: Optional[int] = None,
) -> Tuple[AdapterResponse, agate.Table]:
sql = self._add_query_comment(sql)
_, cursor = self.add_query(sql, auto_begin)
response = self.get_response(cursor)
if fetch:
table = self.get_result_from_cursor(cursor, limit)
else:
table = empty_table()
return response, table
- 参数传递
目前是dbt 在解析dbt 项目的manifest 生成过程中传递的
dbt core 中的manifest 模块生成
class QueryHeaderContext(ManifestContext):
def __init__(self, config: AdapterRequiredConfig, manifest: Manifest) -> None:
super().__init__(config, manifest, config.project_name)
def generate_query_header_context(config: AdapterRequiredConfig, manifest: Manifest):
ctx = QueryHeaderContext(config, manifest)
return ctx.to_dict()
传递是dbt cli 的context 处理装饰器中 (core requires 模块)
register_adapter(runtime_config, get_mp_context())
adapter = get_adapter(runtime_config)
adapter.set_macro_context_generator(generate_runtime_macro_context)
adapter.set_macro_resolver(ctx.obj["manifest"])
query_header_context = generate_query_header_context(
adapter.config, ctx.obj["manifest"]
)
adapter.connections.set_query_header(query_header_context)
return func(*args, **kwargs)
说明
dbt 的注释还是比较方便的,可以进行执行信息查看分析,实际上基于macro 的sql 查询注释信息生成
参考资料
dbt/adapters/base/connections.py
dbt/adapters/base/query_headers.py
core/dbt/cli/requires.py