首页 > 其他分享 >lightdash 与dbt集成的玩法简单说明

lightdash 与dbt集成的玩法简单说明

时间:2024-04-09 13:59:52浏览次数:27  
标签:errors const await 玩法 dbt lightdash error options

lightdash 是依赖dbt 进行建模的bi 工具,以下说明下lightdash 是如何集成dbt 的

简单操作流程

如下,主要是安装cli,预处理表,然后创建项目

内部处理简单说明

packages/cli/src/index.ts 代码位置,主要是通过自定义一些dbt 的meta 信息,然后通过包装的dbt run 命令执行模型的创建(此处lightdash 会解析dbt
的manifest.json 文件)

  • lightdash dbt run 处理简单说明
    代码在 packages/cli/src/handlers/dbt/run.ts 中
    参考处理
export const dbtRunHandler = async (
    options: DbtRunHandlerOptions,
    command: Command,
) => {
    GlobalState.setVerbose(options.verbose);
 
    if (!command.parent) {
        throw new Error('Parent command not found');
    }
 
    await LightdashAnalytics.track({
        event: 'dbt_command.started',
        properties: {
            command: `${command.parent.args}`,
        },
    });
 
    const commands = command.parent.args.reduce<string[]>((acc, arg) => {
        if (arg === '--verbose') return acc;
        return [...acc, arg];
    }, []);
 
    GlobalState.debug(`> Running dbt command: ${commands}`);
 
    try {
        const subprocess = execa('dbt', commands, {
            stdio: 'inherit',
        });
        await subprocess;
    } catch (e: unknown) {
        const msg = e instanceof Error ? e.message : '-';
        await LightdashAnalytics.track({
            event: 'dbt_command.error',
            properties: {
                command: `${commands}`,
                error: `${msg}`,
            },
        });
        throw new ParseError(`Failed to run dbt:\n  ${msg}`);
    }
   //  生成解析,主要是对于dbt 的manifest 解析,获取模型信息,以及编译之后的模型信息(dbt 的run 与compile 生成的一些元数据是有差异的)
    await generateHandler({
        ...options,
        assumeYes: true,
        excludeMeta: options.excludeMeta,
    });
};

generateHandler 处理上边有一些简单的说明,我主要说明下核心处理

//  遍历编译的模型
for await (const compiledModel of compiledModels) {
        const spinner = GlobalState.startSpinner(
            `  Generating .yml for model ${styles.bold(compiledModel.name)}`,
        );
        try {
             // 获取数据仓库的表信息
            const table = await getWarehouseTableForModel({
                model: compiledModel,
                warehouseClient,
            });
            // 基于上边模型以及表信息更新模型的yaml 定义 packages/cli/src/dbt/models.ts ,具体处理比较复杂,详细的可以参考源码
            const { updatedYml, outputFilePath } = await findAndUpdateModelYaml(
                {
                    model: compiledModel,
                    table,
                    docs: manifest.docs,
                    includeMeta: !options.excludeMeta,
                    projectDir: absoluteProjectPath,
                    projectName: context.projectName,
                    assumeYes: options.assumeYes,
                },
            );
            try {
                const existingHeadComments = await getFileHeadComments(
                    outputFilePath,
                );
                const ymlString = yaml.dump(updatedYml, {
                    quotingType: '"',
                });
               // 写入项目schema 信息
                await fs.writeFile(
                    outputFilePath,
                    existingHeadComments
                        ? `${existingHeadComments}\n${ymlString}`
                        : ymlString,
                );
            } catch (e) {
                const msg = e instanceof Error ? e.message : '-';
                throw new ParseError(
                    `Failed to write file ${outputFilePath}\n ${msg}`,
                );
            }
            spinner.succeed(
                `  ${styles.bold(compiledModel.name)}${styles.info(
                    ` ➡️  ${path.relative(process.cwd(), outputFilePath)}`,
                )}`,
            );
        } catch (e: unknown) {
            const msg = e instanceof Error ? e.message : '-';
            await LightdashAnalytics.track({
                event: 'generate.error',
                properties: {
                    executionId,
                    trigger: 'generate',
                    error: `${msg}`,
                },
            });
            spinner.fail(`  Failed to generate ${compiledModel.name}.yml`);
            throw e;
        }
    }
  • lightdash deploy --create
    packages/cli/src/handlers/deploy.ts
    创建完成之后需要进行部署deployHandler 参考处理
export const deployHandler = async (options: DeployHandlerOptions) => {
    GlobalState.setVerbose(options.verbose);
    await checkLightdashVersion();
    const executionId = uuidv4();
    // 编译模型信息
    const explores = await compile(options);
 
    const config = await getConfig();
    let projectUuid: string;
 
    if (options.create !== undefined) {
        const project = await createNewProject(executionId, options);
        if (!project) {
            console.error(
                "To preview your project, you'll need to manually enter your warehouse connection details.",
            );
            const createProjectUrl =
                config.context?.serverUrl &&
                new URL('/createProject', config.context.serverUrl);
            if (createProjectUrl) {
                console.error(
                    `Fill out the project connection form here: ${createProjectUrl}`,
                );
            }
            return;
        }
        projectUuid = project.projectUuid;
        await setProject(projectUuid, project.name);
    } else {
        if (!(config.context?.project && config.context.serverUrl)) {
            throw new AuthorizationError(
                `No active Lightdash project. Run 'lightdash login --help'`,
            );
        }
        projectUuid = config.context.project;
    }
   // 部署的模型信息
    await deploy(explores, { ...options, projectUuid });
   // 项目信息
    const displayUrl = options.create
        ? `${config.context?.serverUrl}/createProject/cli?projectUuid=${projectUuid}`
        : `${config.context?.serverUrl}/projects/${projectUuid}/home`;
 
    console.error(`${styles.bold('Successfully deployed project:')}`);
    console.error('');
    console.error(`      ${styles.bold(`⚡️ ${displayUrl}`)}`);
    console.error('');
};

compile 方法
尽管方法名字为compile 但是实际上还是基于dbt run 生成的元数据信息进行check 然后转换为Explore 实体类型

export const compile = async (options: CompileHandlerOptions) => {
    const dbtVersion = await getDbtVersion();
    const manifestVersion = await getDbtManifest();
    GlobalState.debug(`> dbt version ${dbtVersion}`);
    const executionId = uuidv4();
    await LightdashAnalytics.track({
        event: 'compile.started',
        properties: {
            executionId,
            dbtVersion,
            useDbtList: !!options.useDbtList,
            skipWarehouseCatalog: !!options.skipWarehouseCatalog,
            skipDbtCompile: !!options.skipDbtCompile,
        },
    });
 
    if (!isSupportedDbtVersion(dbtVersion)) {
        if (process.env.CI === 'true') {
            console.error(
                `Your dbt version ${dbtVersion} does not match our supported versions (1.3.* - 1.7.*), this could cause problems on compile or validation.`,
            );
        } else {
            const answers = await inquirer.prompt([
                {
                    type: 'confirm',
                    name: 'isConfirm',
                    message: `${styles.warning(
                        `Your dbt version ${dbtVersion} does not match our supported version (1.3.* - 1.7.*), this could cause problems on compile or validation.`,
                    )}\nDo you still want to continue?`,
                },
            ]);
            if (!answers.isConfirm) {
                throw new Error(`Unsupported dbt version ${dbtVersion}`);
            }
        }
    }
 
    // Skipping assumes manifest.json already exists.
    let compiledModelIds: string[] | undefined;
    if (options.useDbtList) {
        compiledModelIds = await dbtList(options);
    } else if (!options.skipDbtCompile) {
        await dbtCompile(options);
    } else {
        GlobalState.debug('> Skipping dbt compile');
    }
 
    const absoluteProjectPath = path.resolve(options.projectDir);
    const absoluteProfilesPath = path.resolve(options.profilesDir);
 
    GlobalState.debug(`> Compiling with project dir ${absoluteProjectPath}`);
    GlobalState.debug(`> Compiling with profiles dir ${absoluteProfilesPath}`);
 
    const context = await getDbtContext({ projectDir: absoluteProjectPath });
    const profileName = options.profile || context.profileName;
    const { target } = await loadDbtTarget({
        profilesDir: absoluteProfilesPath,
        profileName,
        targetName: options.target,
    });
 
    GlobalState.debug(`> Compiling with profile ${profileName}`);
    GlobalState.debug(`> Compiling with target ${target}`);
 
    const credentials = await warehouseCredentialsFromDbtTarget(target);
    const warehouseClient = warehouseClientFromCredentials({
        ...credentials,
        startOfWeek: isWeekDay(options.startOfWeek)
            ? options.startOfWeek
            : undefined,
    });
    const manifest = await loadManifest({ targetDir: context.targetDir });
    const models = getModelsFromManifest(manifest).filter((model) => {
        if (compiledModelIds) {
            return compiledModelIds.includes(model.unique_id);
        }
        // in case they skipped the compile step, we check if the models are compiled
        return model.compiled;
    });
 
    const adapterType = manifest.metadata.adapter_type;
 
    const { valid: validModels, invalid: failedExplores } =
        await validateDbtModel(adapterType, models);
 
    if (failedExplores.length > 0) {
        const errors = failedExplores.map((failedExplore) =>
            failedExplore.errors.map(
                (error) => `- ${failedExplore.name}: ${error.message}\n`,
            ),
        );
        console.error(
            styles.warning(`Found ${
                failedExplores.length
            } errors when validating dbt models:
${errors.join('')}`),
        );
    }
 
    // Skipping assumes yml has the field types.
    let catalog: WarehouseCatalog = {};
    if (!options.skipWarehouseCatalog) {
        GlobalState.debug('> Fetching warehouse catalog');
        catalog = await warehouseClient.getCatalog(
            getSchemaStructureFromDbtModels(validModels),
        );
    } else {
        GlobalState.debug('> Skipping warehouse catalog');
    }
 
    const validModelsWithTypes = attachTypesToModels(
        validModels,
        catalog,
        false,
    );
 
    if (!isSupportedDbtAdapter(manifest.metadata)) {
        await LightdashAnalytics.track({
            event: 'compile.error',
            properties: {
                executionId,
                dbtVersion,
                error: `Dbt adapter ${manifest.metadata.adapter_type} is not supported`,
            },
        });
        throw new ParseError(
            `Dbt adapter ${manifest.metadata.adapter_type} is not supported`,
        );
    }
 
    GlobalState.debug(
        `> Converting explores with adapter: ${manifest.metadata.adapter_type}`,
    );
    const validExplores = await convertExplores(
        validModelsWithTypes,
        false,
        manifest.metadata.adapter_type,
        [DbtManifestVersion.V10, DbtManifestVersion.V11].includes(
            manifestVersion,
        )
            ? []
            : Object.values(manifest.metrics),
        warehouseClient,
    );
    console.error('');
 
    const explores = [...validExplores, ...failedExplores];
 
    explores.forEach((e) => {
        const status = isExploreError(e)
            ? styles.error('ERROR')
            : styles.success('SUCCESS');
        const errors = isExploreError(e)
            ? `: ${styles.error(e.errors.map((err) => err.message).join(', '))}`
            : '';
        console.error(`- ${status}> ${e.name} ${errors}`);
    });
    console.error('');
    const errors = explores.filter((e) => isExploreError(e)).length;
    console.error(
        `Compiled ${explores.length} explores, SUCCESS=${
            explores.length - errors
        } ERRORS=${errors}`,
    );
 
    await LightdashAnalytics.track({
        event: 'compile.completed',
        properties: {
            executionId,
            explores: explores.length,
            errors,
            dbtMetrics: Object.values(manifest.metrics).length,
            dbtVersion,
        },
    });
    return explores;
};

deploy 方法处理

export const deploy = async (
    explores: (Explore | ExploreError)[],
    options: DeployArgs,
): Promise<void> => {
    const errors = explores.filter((e) => isExploreError(e)).length;
    if (errors > 0) {
        if (options.ignoreErrors) {
            console.error(
                styles.warning(`\nDeploying project with ${errors} errors\n`),
            );
        } else {
            console.error(
                styles.error(
                    `Can't deploy with errors. If you still want to deploy, add ${styles.bold(
                        '--ignore-errors',
                    )} flag`,
                ),
            );
            process.exit(1);
        }
    }
   // 通过put 写入模型信息
    await lightdashApi<null>({
        method: 'PUT',
        url: `/api/v1/projects/${options.projectUuid}/explores`,
        body: JSON.stringify(explores),
    });
    await LightdashAnalytics.track({
        event: 'deploy.triggered',
        properties: {
            projectId: options.projectUuid,
        },
    });
};

Explore 类型定义

export type Explore = {
    name: string; // Must be sql friendly (a-Z, 0-9, _)
    label: string; // Friendly name
    tags: string[];
    groupLabel?: string;
    baseTable: string; // Must match a tableName in tables
    joinedTables: CompiledExploreJoin[]; // Must match a tableName in tables
    tables: { [tableName: string]: CompiledTable }; // All tables in this explore
    targetDatabase: SupportedDbtAdapter; // Type of target database e.g. postgres/redshift/bigquery/snowflake/databricks
    warehouse?: string;
    ymlPath?: string;
    sqlPath?: string;
};

说明

lightdash 对于dbt 的处理核心还是利用了dbt 的cli 命令,然后自己解析,之后通过接口写入数据到lightdash backend 服务中,整体上比较依赖manifest
同时部分解析上使用了cli output 为json 格式处理的,以上只是简单的关于dbt 集成部分的,实际上还有bi 分析部分的,后边说明下

参考资料

packages/cli/src/handlers/dbt/run.ts
packages/cli/src/handlers/deploy.ts
packages/cli/src/handlers/compile.ts
packages/common/src/compiler/translator.ts
https://docs.lightdash.com/get-started/setup-lightdash/get-project-lightdash-ready

标签:errors,const,await,玩法,dbt,lightdash,error,options
From: https://www.cnblogs.com/rongfengliang/p/18121475

相关文章

  • dbt meta 配置简单说明
    dbt的meta从dbt系统的角度来说,属于一些元数据的扩展,可以添加一个二外的描述信息,方便进行文档或者其他的扩展(比如开发自己的解析处理)lightdash是基于dbt的一个bi平台,就比较依赖dbt的meta能力配置说明dbtmeta可以通过dbt_project.yml的models属性配置,或者通过config......
  • dbt query_header 简单说明
    dbt对于每个实际执行的任务(实际sql)都会包含一个任务注释,可以方便的查看dbt版本,执行nodeid,target参考格式/*{"app":"dbt","dbt_version":"1.5.11","profile_name":"dremio_nessie","target_name":"dev",......
  • dbt debug macro 简单说明
    dbt支持debugmacro可以用来进行调试使用{%macromy_macro()%} {%setsomething_complex=my_complicated_macro()%} {{debug()}} {%endmacro%}参考实现实际上就是通过环境变量开启了一个debug上下文变量ifos.en......
  • MyDumper/MyLoader的进阶玩法
    一、前言从mydumperv0.11.5版本开始,mydumper提供了--load-data参数,使用此参数导出的sql文件将不再是insert语句,而是loaddata语句。在MySQL官方文档中关于loaddata是这么描述的:Whenloadingatablefromatextfile,useLOADDATA.Thisisusually20timesfasterthanus......
  • dbt statement macro 简单说明
    statementblocks实际上就是一个标准的jinja2macro调用包装,提供了方便的sql执行能力,因为需要进行查询结果的存储,dbt提供了一个store_result的macro,内部数据的处理基于了agate这个方便的python数据处理包为了查询使用提供了load_resultmacro以下只说明关于stateme......
  • 清明节开展趣味主题h5互动玩法的效果是什么
    清明节是一个较为重要的节日,祭祀、踏青春游、吃青团等,对企业商家来说可以借势清明节开展多种不同形式的玩法,其中h5小游戏由于其线上互动效果及页面展示信息,可以有效实现品牌传播、渠道引流涨粉、用户促活等效果。而商家如何开展品牌主题营销,关键还是要看营销工具如何选择,除了......
  • dbt macro 的执行简单说明
    BaseAdapter中包含了一个adapter实际运行依赖的转换,链接处理,当然也包含了macro的执行,具体方法有直接的execute_macroModelRunner中的materialization_macro(run命令)还有run-operation中RunOperationTask的_run_unsafe方法ModelRunnercall_macro处理参考调用......
  • 自营小说3.0版推文新玩法、文字一键生成AI漫画视频、实时数据小白当天591.33?
    文字一键生成AI漫画视频、实时数据小白当天591.33?-1-项目简介小说推文是老项目,存在至今,不断有人再做,但是之前的玩法太老套,因为之前都是解压视频,文字需要转换成语音,然后需要去找解压素材,我们需要用剪映再去剪辑,这样一个作品需要30分钟,浪费时间,还不知道有没有效果今天给大......
  • dbt this macro 处理简单说明
    dbtthismacro提供了一种方便的对于当前模型展现的方法,可以使用在增量模型以及pre&posthooks中this实际是就类似ref('<the_current_model>')是一个relation包含了database,schema以及模型标识使用示例一个增量处理的,基于this可以方便的引用模型{{config(mater......
  • dbt return macro 内部实现简单说明
    jinja2默认是没有returnmacro的,dbt在实现的时候比较有意思,通过一个exception触发的,以下是简单说明参考使用一个包含return的macro{%macrodemoapp(name,version)%}{%ifversion=='v1'%}{{return("appdemo")}}{%else%}......