首页 > 其他分享 >Apache DolphinScheduler将上游Task执行结果传递给下游

Apache DolphinScheduler将上游Task执行结果传递给下游

时间:2024-11-07 16:32:20浏览次数:1  
标签:info Task String DolphinScheduler List sqlParameters Apache outputProperty

01 背景

公司的数据开发平台需要用到DolphinScheduler做任务调度,其中一个场景是:上游任务执行结束后,需要将任务执行结果传递给下游任务。

DolphinScheduler肯定是能实现任务之间的传参的,具体的可以看:DolphinScheduler | 文档中心 (https://dolphinscheduler.apache.org/zh-cn/docs/3.2.2/guide/parameter/context)。

但是官方案例中介绍的任务之间传参是提前在管理台上配置好的,OK,那么问题来了,如何实现任务之间的动态传参呢?比如说我们自定义Task,然后在Task执行结束后将执行结果封装,传递给DAG中的下一个Task。

02 分析

如果DolphinScheduler官方的案例没有演示如何动态传,我们开发者应该如何去处理这种需求?

我是这么做的:分析DolphinScheduler内置的Task,总有一个Task是需要传递参数给下游的。我这里盲猜两个,一个是_SqlTask,一个是HttpTask。我的观点是:总不能做完SQL查询,或者做完HTTP请求后就不管结果吧?_

分析 HttpTask 源码

分析HttpTask源码,直接找到HttpTask的handle方法,DolphinScheduler中,任何Task的具体执行逻辑都在这个handle方法中。

handle方法分析

@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
    long startTime = System.currentTimeMillis();
    String formatTimeStamp = DateUtils.formatTimeStamp(startTime);
    String statusCode = null;
    String body = null;

    try (
            CloseableHttpClient client = createHttpClient();
            CloseableHttpResponse response = sendRequest(client)) {
        statusCode = String.valueOf(getStatusCode(response));
        body = getResponseBody(response);
        exitStatusCode = validResponse(body, statusCode);
        // 看名字应该就能猜到是处理请求结果的
        addDefaultOutput(body);
        long costTime = System.currentTimeMillis() - startTime;
        log.info(
                "startTime: {}, httpUrl: {}, httpMethod: {}, costTime : {} milliseconds, statusCode : {}, body : {}, log : {}",
                formatTimeStamp, httpParameters.getUrl(),
                httpParameters.getHttpMethod(), costTime, statusCode, body, output);
    } catch (Exception e) {
        appendMessage(e.toString());
        exitStatusCode = -1;
        log.error("httpUrl[" + httpParameters.getUrl() + "] connection failed:" + output, e);
        throw new TaskException("Execute http task failed", e);
    }

}

继续看addDefaultOutput方法

public void addDefaultOutput(String response) {
    // put response in output
    // 创建Property对象
    Property outputProperty = new Property();
    // 设置Prop,也就是设置Key
    outputProperty.setProp(String.format("%s.%s", taskExecutionContext.getTaskName(), "response"));
    // 设置是入参还是出参,这里是出参,因为是将结果给下游任务
    outputProperty.setDirect(Direct.OUT);
    // 设置参数类型,VARCHAR表示就是字符串
    outputProperty.setType(DataType.VARCHAR);
    // 设置Value,就是http请求结果
    outputProperty.setValue(response);
    // 重点:将Property添加到varPool中
    httpParameters.addPropertyToValPool(outputProperty);
}

分析SqlTask源码

handler方法分析

@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
    log.info("Full sql parameters: {}", sqlParameters);
    log.info(
            "sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit  {}",
            sqlParameters.getType(),
            sqlParameters.getDatasource(),
            sqlParameters.getSql(),
            sqlParameters.getLocalParams(),
            sqlParameters.getUdfs(),
            sqlParameters.getShowType(),
            sqlParameters.getConnParams(),
            sqlParameters.getVarPool(),
            sqlParameters.getLimit());
    try {

        // get datasource
        baseConnectionParam = (BaseConnectionParam) DataSourceUtils.buildConnectionParams(dbType,
                sqlTaskExecutionContext.getConnectionParams());
        List<String> subSqls = DataSourceProcessorProvider.getDataSourceProcessor(dbType)
                .splitAndRemoveComment(sqlParameters.getSql());

        // ready to execute SQL and parameter entity Map
        List<SqlBinds> mainStatementSqlBinds = subSqls
                .stream()
                .map(this::getSqlAndSqlParamsMap)
                .collect(Collectors.toList());

        List<SqlBinds> preStatementSqlBinds = Optional.ofNullable(sqlParameters.getPreStatements())
                .orElse(new ArrayList<>())
                .stream()
                .map(this::getSqlAndSqlParamsMap)
                .collect(Collectors.toList());
        List<SqlBinds> postStatementSqlBinds = Optional.ofNullable(sqlParameters.getPostStatements())
                .orElse(new ArrayList<>())
                .stream()
                .map(this::getSqlAndSqlParamsMap)
                .collect(Collectors.toList());

        List<String> createFuncs = createFuncs(sqlTaskExecutionContext.getUdfFuncParametersList());

        // execute sql task
        // 这个方法就是处理sql结果的
        executeFuncAndSql(mainStatementSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs);

        setExitStatusCode(TaskConstants.EXIT_CODE_SUCCESS);

    } catch (Exception e) {
        setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
        log.error("sql task error", e);
        throw new TaskException("Execute sql task failed", e);
    }
}

所以我们在看下executeFuncAndSql方法内部实现

public void executeFuncAndSql(List<SqlBinds> mainStatementsBinds,
                              List<SqlBinds> preStatementsBinds,
                              List<SqlBinds> postStatementsBinds,
                              List<String> createFuncs) throws Exception {
    try (
            Connection connection =
                    DataSourceClientProvider.getAdHocConnection(DbType.valueOf(sqlParameters.getType()),
                            baseConnectionParam)) {

        // create temp function
        if (CollectionUtils.isNotEmpty(createFuncs)) {
            createTempFunction(connection, createFuncs);
        }

        // pre execute
        executeUpdate(connection, preStatementsBinds, "pre");

        // main execute
        String result = null;
        // decide whether to executeQuery or executeUpdate based on sqlType
        if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) {
            // query statements need to be convert to JsonArray and inserted into Alert to send
            result = executeQuery(connection, mainStatementsBinds.get(0), "main");
        } else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) {
            // non query statement
            String updateResult = executeUpdate(connection, mainStatementsBinds, "main");
            result = setNonQuerySqlReturn(updateResult, sqlParameters.getLocalParams());
        }
        // deal out params
        // 这个方法就是来处理结果的
        sqlParameters.dealOutParam(result);

        // post execute
        executeUpdate(connection, postStatementsBinds, "post");
    } catch (Exception e) {
        log.error("execute sql error: {}", e.getMessage());
        throw e;
    }
}

通过dealOutParam看具体处理细节

public void dealOutParam(String result) {
    if (CollectionUtils.isEmpty(localParams)) {
        return;
    }
    List<Property> outProperty = getOutProperty(localParams);
    if (CollectionUtils.isEmpty(outProperty)) {
        return;
    }
    if (StringUtils.isEmpty(result)) {
        varPool = VarPoolUtils.mergeVarPool(Lists.newArrayList(varPool, outProperty));
        return;
    }
    List<Map<String, String>> sqlResult = getListMapByString(result);
    if (CollectionUtils.isEmpty(sqlResult)) {
        return;
    }
    // if sql return more than one line
    if (sqlResult.size() > 1) {
        Map<String, List<String>> sqlResultFormat = new HashMap<>();
        // init sqlResultFormat
        Set<String> keySet = sqlResult.get(0).keySet();
        for (String key : keySet) {
            sqlResultFormat.put(key, new ArrayList<>());
        }
        for (Map<String, String> info : sqlResult) {
            for (String key : info.keySet()) {
                sqlResultFormat.get(key).add(String.valueOf(info.get(key)));
            }
        }
        for (Property info : outProperty) {
            if (info.getType() == DataType.LIST) {
                info.setValue(JSONUtils.toJsonString(sqlResultFormat.get(info.getProp())));
            }
        }
    } else {
        // result only one line
        Map<String, String> firstRow = sqlResult.get(0);
        for (Property info : outProperty) {
            info.setValue(String.valueOf(firstRow.get(info.getProp())));
        }
    }
    
    // 本质还是将sql结果处理后保存在varPool中,varPool才是关键所在
    varPool = VarPoolUtils.mergeVarPool(Lists.newArrayList(varPool, outProperty));

}

所以,源代码分析到这,我们就知道了:如果想实现动态传参,那么我们需要将传递的数据封装成_org.apache.dolphinscheduler.plugin.task.api.model.Property,然后添加到内置集合变量org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters#varPool中_

03 具体实现

这里我们不去讨论自定义Task的具体实现步骤,这不是本文的重点。

当我们实现自定义Task后,可以这样编码实现动态传参:

Property outputProperty = new Property();
// 添加我们要传递的数据Key
outputProperty.setProp("xxxxKey"));
// OUT
outputProperty.setDirect(Direct.OUT);
// 这里传递的数据是什么类型就写什么类型,建议通过json字符串处理数据
outputProperty.setType(DataType.VARCHAR);
// 添加我们要传递的数据Key
outputProperty.setValue("xxxxValue");
// 这里的xxxxParameters是我们自己自定义的,一般情况下,一个Task对应一个Parameters
xxxxParameters.addPropertyToValPool(outputProperty);

DolphinScheduler内部有将_List<Property> varPool转换成Map<String, Property> varParams的逻辑,然后会将varParams与其他的参数合并,最后通过taskExecutionContext.setPrepareParamsMap(propertyMap) 将数据设置给Map<String, Property> prepareParamsMap。_

04 总结

关于DolphinScheduler(海豚调度器)是什么,能做什么,怎么使用等等,这里我就不再赘述,大家感兴趣的可以去看看官方文档:DolphinScheduler | 文档中心 (https://dolphinscheduler.apache.org/zh-cn/docs/3.2.2)

希望通过本篇文章能让各位读者掌握Task之间的动态传参,然后应用在实际工作中。如果本篇文章能给屏幕前的你们或多或少的一些帮助,也是我喜闻乐见的。

本文由 白鲸开源 提供发布支持!

标签:info,Task,String,DolphinScheduler,List,sqlParameters,Apache,outputProperty
From: https://www.cnblogs.com/DolphinScheduler/p/18533078

相关文章

  • Task.Delay 和 Thread.Sleep 的区别
    Task.Delay 和 Thread.Sleep 都可以用来在代码中引入延迟,但它们之间有几个关键的区别:异步vs.同步:Thread.Sleep 是一个同步方法,它会阻塞当前线程,直到指定的时间过去。这意味着在 Thread.Sleep 执行期间,当前线程不能做任何其他事情,这可能会导致应用程序的响应性降低,......
  • Apache DolphinScheduler + OceanBase,搭建分布式大数据调度平台的实践
    本文整理自白鲸开源联合创始人,ApacheDolphinSchedulerPMCChair,ApacheFoundationMember代立冬的演讲。主要介绍了DolphinScheduler及其架构、DolphinScheduler与OceanBase的联合大数据方案。DolphinScheduler是什么?ApacheDolphinScheduler,作为一款云原生且配备强大的可视......
  • 8+ 典型分析场景,25+ 标杆案例,Apache Doris 和 SelectDB 精选案例集(2024版)电子版上线
    当前,各企业正面临前所未有的数据增量,不仅体现在数据规模的急剧上升,还体现在数据的类型多样性和产生速度的加快。数据体量大固然蕴藏着更大的潜力及可能性,但如何有效利用这些数据,解决实际问题、赋能业务增长,才是各企业发展的关键。因此,企业亟需搭建高效的数据处理与分析平台,以帮......
  • DolphinScheduler 限制秒级别的定时调度
    背景DolphinScheduler定时任务配置采用的7位Crontab表达式,分别对应秒、分、时、月天、月、周天、年。在团队日常开发工作中,工作流的定时调度一般不会细化到秒级别。但历史上出现过因配置的疏忽大意而产生故障时间,如应该配置每分钟执行的工作流被配置长了每秒执行,造成短时......
  • Pbootcms网站,从Apache切换为Nginx后网站打不开
    打开网站设置登录宝塔面板。选择需要配置的网站,点击“设置”。进入伪静态设置在网站设置页面中,找到并点击“伪静态”选项卡。添加Nginx伪静态规则在伪静态设置中,清空原有规则或选择自定义规则。输入以下Nginx伪静态规则:location/{if(!-e$request_f......
  • 数据可视化——Apache ECharts实现
    目录1、什么是ECharts     2、官网入口3.工具准备 4.插入html文件5.小例子1、什么是ECharts             ECharts(EnterpriseCharts,商业级数据图表)是一个使用JavaScript实现的开源可视化库,能够流畅地运行在PC和移动设备上,兼容当前绝大部分浏......
  • Apache 配置出错常见问题及解决方法
    Apache配置出错常见问题及解决方法一、端口被占用问题问题描述:在启动Apache时,出现“Addressalreadyinuse”或类似的错误提示,这意味着Apache想要使用的端口已经被其他程序占用,导致Apache无法正常启动。原因分析:系统中已经有其他的应用程序在使用Apache......
  • Apache HTTP Sever 的初级操作指南
    Apache初级操作指南摘要:本文详细介绍了ApacheHTTPServer的初级操作方法,包括安装与配置、启动与停止服务、虚拟主机设置以及访问日志与错误日志的查看与分析等内容。通过本文的学习,初学者可以快速掌握Apache的基本操作,为搭建和管理网站奠定基础。一、引言ApacheH......
  • C++多线程:package_task
    std::packaged_taskstd::packaged_task包装一个可调用对象,并允许获取该可调用对象计算的结果,可调用对象内部包含两个基本元素:1.被包装的任务任务是一个可调用对象,如函数指针或函数对象,该对象的执行结果会传递给共享状态2.共享状态用于保存任务的返回结果,并可通过future对象异......
  • Goby 漏洞发布|Apache Solr /solr/admin/info/properties:/admin/info/key 权限绕过漏
    漏洞名称:ApacheSolr/solr/admin/info/properties:/admin/info/key权限绕过漏洞(CVE-2024-45216)EnglishName:ApacheSolr/solr/admin/info/properties:/admin/info/keyPermissionBypassVulnerability(CVE-2024-45216)CVSScore:7.3漏洞描述:ApacheSolr是一个开源搜索服......