BaseAdapter 中包含了一个adapter 实际运行依赖的转换,链接处理,当然也包含了macro 的执行,具体方法有直接的execute_macro
ModelRunner 中的materialization_macro(run 命令)还有run-operation 中RunOperationTask 的_run_unsafe 方法
ModelRunner call_macro 处理
- 参考调用
def execute(self, model, manifest):
context = generate_runtime_model_context(model, self.config, manifest)
materialization_macro = manifest.find_materialization_macro_by_name(
self.config.project_name, model.get_materialization(), self.adapter.type()
)
if materialization_macro is None:
raise MissingMaterializationError(
materialization=model.get_materialization(), adapter_type=self.adapter.type()
)
if "config" not in context:
raise DbtInternalError(
"Invalid materialization context generated, missing config: {}".format(context)
)
context_config = context["config"]
mat_has_supported_langs = hasattr(materialization_macro, "supported_languages")
model_lang_supported = model.language in materialization_macro.supported_languages
if mat_has_supported_langs and not model_lang_supported:
str_langs = [str(lang) for lang in materialization_macro.supported_languages]
raise DbtValidationError(
f'Materialization "{materialization_macro.name}" only supports languages {str_langs}; '
f'got "{model.language}"'
)
hook_ctx = self.adapter.pre_model_hook(context_config)
try:
# 此处主要处理了materialization_macro 相关的
result = MacroGenerator(
materialization_macro, context, stack=context["context_macro_stack"]
)()
finally:
self.adapter.post_model_hook(context_config, hook_ctx)
for relation in self._materialization_relations(result, model):
self.adapter.cache_added(relation.incorporate(dbt_created=True))
return self._build_run_model_result(model, context)
- MacroGenerator 处理
def __call__(self, *args, **kwargs):
with self.track_call():
return self.call_macro(*args, **kwargs)
- call_macro 处理
def call_macro(self, *args, **kwargs):
# called from __call__ methods
if self.context is None:
raise DbtInternalError("Context is still None in call_macro!")
assert self.context is not None
# 先获取macro,使用动态创建macro 模块
macro = self.get_macro()
with self.exception_handler():
try:
return macro(*args, **kwargs)
except MacroReturn as e:
return e.value
RunOperationTask 调用
实际上就是execute_macro 参考处理
- 参考代码
dbt/adapters/base/impl.py
def execute_macro(
self,
macro_name: str,
macro_resolver: Optional[MacroResolverProtocol] = None,
project: Optional[str] = None,
context_override: Optional[Dict[str, Any]] = None,
kwargs: Optional[Dict[str, Any]] = None,
) -> AttrDict:
"""Look macro_name up in the manifest and execute its results.
:param macro_name: The name of the macro to execute.
:param manifest: The manifest to use for generating the base macro
execution context. If none is provided, use the internal manifest.
:param project: The name of the project to search in, or None for the
first match.
:param context_override: An optional dict to update() the macro
execution context.
:param kwargs: An optional dict of keyword args used to pass to the
macro.
"""
if kwargs is None:
kwargs = {}
if context_override is None:
context_override = {}
# 会基于dbt 实现的macro_resolver
resolver = macro_resolver or self._macro_resolver
if resolver is None:
raise DbtInternalError("Macro resolver was None when calling execute_macro!")
# 同时dbt 还包装了一个MacroContextGeneratorCallable
if self._macro_context_generator is None:
raise DbtInternalError("Macro context generator was None when calling execute_macro!")
# 首先通过resolver查找macro
macro = resolver.find_macro_by_name(macro_name, self.config.project_name, project)
if macro is None:
if project is None:
package_name = "any package"
else:
package_name = 'the "{}" package'.format(project)
raise DbtRuntimeError(
'dbt could not find a macro with the name "{}" in {}'.format(
macro_name, package_name
)
)
macro_context = self._macro_context_generator(macro, self.config, resolver, project)
macro_context.update(context_override)
# dbt_common/clients/jinja.py 中的CallableMacroGenerator 类进行macro 函数的包装,通过__call__ 进行处理的
# 实际上此处MacroGenerator是CallableMacroGenerator 的子类,实际处理与上边的是类似的
macro_function = CallableMacroGenerator(macro, macro_context)
with self.connections.exception_handler(f"macro {macro_name}"):
# 执行macro
result = macro_function(**kwargs)
return result
- get_macro 处理
dbt_common/clients/jinja.py 中,此处有一个比较有意思的地方,因为默认macro 是不带dbt_macro__的,但是get_dbt_macro_name 会包含一个前缀,实际上是dbt 自己开发了一个MacroFuzzParser,对于macro 添加了一个前缀
def get_macro(self):
name = self.get_name()
template = self.get_template()
# make the module. previously we set both vars and local, but that's
# redundant: They both end up in the same place
# make_module is in jinja2.environment. It returns a TemplateModule
module = template.make_module(vars=self.context, shared=False)
macro = module.__dict__[get_dbt_macro_name(name)]
return macro
- MacroFuzzParser 处理
class MacroFuzzParser(jinja2.parser.Parser):
def parse_macro(self):
node = jinja2.nodes.Macro(lineno=next(self.stream).lineno)
# modified to fuzz macros defined in the same file. this way
# dbt can understand the stack of macros being called.
# - @cmcarthur
# 此处添加了自己的prefix dbt_macro__
node.name = get_dbt_macro_name(self.parse_assign_target(name_only=True).name)
self.parse_signature(node)
node.body = self.parse_statements(("name:endmacro",), drop_needle=True)
return node
- MacroFuzzParser 的使用
MacroFuzzEnvironment 扩展类中,默认dbt 使用的是MacroFuzzEnvironment 这个env
class MacroFuzzEnvironment(jinja2.sandbox.SandboxedEnvironment):
def _parse(self, source, name, filename):
return MacroFuzzParser(self, source, name, filename).parse()
说明
dbt 对于macro 的处理直接使用的是文字符串模版模式,同时对于macro 的解析添加了自己的实现,灵活性上也很方便,同时对于项目的macro 会在之前实际命令之前先生成解析好(包含了依赖处理),以上只是简单说明关于自行,后边会说明下dbt 对于jinja2的包装,同时
还有macrocontext macro resovler
参考资料
dbt/adapters/base/impl.py (adapters 包)
dbt_common/clients/jinja.py (common 包)