dlt 直接基于cli包装了dbt 的运行,对于希望通过dlt 进行etl 之后,还想运行dbt 的模型处理的场景就比较方便了,而且dlt 与dbt 的集成也是官方
一个很不错的特性,以下是一个简单试用
环境准备
- docker-compose
version: "3"
services:
pg:
image: postgres:16.0
ports:
- "5432:5432"
environment:
- POSTGRES_PASSWORD=postgres
- dlt 配置
.dlt/secrets.toml
[destination.postgres.credentials]
database = "postgres"
username = "postgres"
password = "postgres" # replace with your password
host = "localhost" # or the IP address location of your database
port = 5432
connect_timeout = 15
- dlt 以及dbt 初始化
基于了venv
pip install dbt-postgres dlt[postgres]
- dbt 基本项目结构
安装完成dbt 包之后可以通过dbt cli 提示操作
集成
- dlt 与dbt 简单数据处理
app.py
import dlt
# have data? dlt likes data
data = [{'id': 1, 'name': 'John','age':111}, {'id': 2, 'name': 'Jane'}]
# open connection
pipeline = dlt.pipeline(
pipeline_name="dalong",
destination='postgres',
dataset_name='postgres_data'
)
# 数据加载的
load_info = pipeline.run(
data,
write_disposition="merge",
primary_key="id",
table_name="users"
)
print(load_info)
# dlt dbt pipeline 定义
pipeline = dlt.pipeline(
pipeline_name='dalong',
destination='postgres',
dataset_name='postgres_data_dbt'
)
venv = dlt.dbt.get_venv(pipeline)
# dbt 项目定义,主要是dbt 相关的配置
dbt = dlt.dbt.package(
pipeline,
"mydlt_dbt",
venv=venv
)
# 运行
models = dbt.run_all()
# 完成之后输出模型信息
for m in models:
print(
f"Model {m.model_name} materialized" +
f"in {m.time}" +
f"with status {m.status}" +
f"and message {m.message}"
)
dbt 简单模型 (详细的参考github)
models/users/my_first_dbt_model.sql
{{ config(materialized='table') }}
with users as (
select * from postgres_data.users
)
select *
from users
运行
因为直接基于了代码,通过python 运行任务就行了
- cli
python app.py
- 效果
说明
以上是基于代码包含pipeline 集成的,实际上也可以不包含pipeline,参考代码
import os
from dlt.helpers.dbt import create_runner
runner = create_runner(
None, # use current virtual env to run dlt
None, # we do not need dataset name and we do not pass any credentials in environment to dlt
working_dir=".", # the package below will be cloned to current dir
package_location="https://github.com/dbt-labs/jaffle_shop.git",
package_profiles_dir=os.path.abspath("."), # profiles.yml must be placed in this dir
package_profile_name="duckdb_dlt_dbt_test", # name of the profile
)
models = runner.run_all()
完整代码我已经push github 了可以参考
参考资料
https://dlthub.com/docs/dlt-ecosystem/transformations/dbt/
https://github.com/rongfengliang/dlt_dbt_learning/tree/main