首页 > 其他分享 >高效批量工作流导入及脚本上线,利用DolphinScheduler接口轻松实现

高效批量工作流导入及脚本上线,利用DolphinScheduler接口轻松实现

时间:2025-01-22 14:34:19浏览次数:1  
标签:code 批量 url DolphinScheduler 接口 id headers 导入 response

file

实现了批量生成DolphinScheduler的任务,当导入时发现只能逐个导入,因此通过接口实现会更方便。

DolphinScheduler接口文档

DolphinScheduler是有接口文档的,地址是

http://IP:12345/dolphinscheduler/swagger-ui/index.html?language=zh_CN&lang=cn

不过这文档写的比较简略,自己需要研究研究。

token:所有的接口都需要用到token

file

在安全中心-令牌管理 创建一个token 。记住这个token,后面所有的接口都需要用到 。

header:根据上面的token组成请求要用的header

token = ''
headers = {
    'Accept': 'application/json',
    'token': token
}

项目ID project_id 可以在查看项目工作流时,在url中找到。

DolphinScheduler导入任务接口

导入任务的接口是

import_url = 'http://IP:12345/dolphinscheduler/projects/{project_id}/process-definition/import'

知道接口 就可以导入了。

def import_job(file_path):
# 打开文件并读取为二进制数据
    with open(file_path, 'rb') as file:
        files = {'file': file}
        # 导入工作流
        response = requests.post(import_url, headers=headers, files=files)
        print(response.status_code)
        if response.status_code != 200:
            print('上传失败  '+file_path)

需要注意的是,导入任务时 只支持二进制。

file_path 是工作流文件,具体实现 可以工作流中导出一个作为参考。
重复使用上述方法,就可以实现批量导入任务。

工作流上线

使用上述方法批量完成任务上传后,依旧有问题,逐个上线工作量也是个不小的工作量,因此继续使用接口。

经过研究发现,上线工作流需要先获取工作流的调度ID 。

获取工作流列表 - > 获取工作流code -> 获取所有工作流的调度ID -> 工作流上线

  • 获取工作流列表
    这是接口地址
jobs_url = 'http://IP:12345/dolphinscheduler/projects/{project_id}/process-definition'

不过这个要分页查询,稍微有一点点麻烦

def get_jobs_list():
    # 分页查询
    # 初始化分页参数
    pageNo = 1
    pageSize = 10
    url = f'{jobs_url}?pageSize=10&pageNo=1&searchVal='
    # 构建完整的URL
    # 存储所有结果
    all_items = list()
    while True:
        # 构建完整的URL
        url = f'{jobs_url}?pageSize={pageSize}&pageNo={pageNo}&searchVal='

        # 发送GET请求
        response = requests.get(url, headers=headers)

        # 检查响应状态码
        if response.status_code == 200:
            # 请求成功,处理响应数据
            items = response.content.decode()
            total = json.loads(items)["data"]["total"]
            item = json.loads(items)["data"]["totalList"]
            # 将当前页的数据添加到结果列表中
            for i in item:
                all_items.append(i)

            # 如果当前页没有数据,退出循环
            if pageNo * pageSize > total:
                break
            if not items:
                break
            # 增加页码
            pageNo += 1
        else:
            # 请求失败,打印错误信息
            print('请求失败:', response.status_code, response.text)
            break

    return all_items

all_items 是所有工作流的具体内容,需要提取一下

 all_jobs = get_jobs_list()
        job_codes = [job['code'] for job in all_jobs]

这样就是所有的工作流code。

  • 获取调度ID
    下面是调度ID的接口,因为不想分页,直接一页1000个。
schedules_url = 'http://36.133.140.132:12345/dolphinscheduler/projects/{project_id}/schedules?pageSize=1000&pageNo=1&processDefinitionCode='

使用这个接口就能拿到所有的调度ID

def schedule_id(job_code):
    url = schedules_url+str(job_code)
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        data = response.content.decode()
        js = json.loads(data)
        if len(js['data']['totalList'])>0 and js['data']['totalList'][0]['releaseState']=='OFFLINE':
            return js['data']['totalList'][0]['id']
    else:return ''

这里过滤了已经上线的调度ID 。

  • 上线
    万事俱备 终于可以上线了
online_url = 'http://36.133.140.132:12345/dolphinscheduler/projects/{project_id}/schedules/{scheduler_id}/online'

具体实现:

def online_job(scheduler_id):
    url = online_url.format(scheduler_id=scheduler_id)
    response = requests.post(url, headers=headers)
    if response.status_code == 200:
        print('success')
    else:
        print('online job failed')

到此 就可以实现导入-批量全自动了。

打完收工,祝你不加班。

原文链接:https://blog.csdn.net/weixin_45399602/article/details/143226396

本文由 白鲸开源 提供发布支持!

标签:code,批量,url,DolphinScheduler,接口,id,headers,导入,response
From: https://www.cnblogs.com/DolphinScheduler/p/18685793

相关文章

  • 为什么UI导入png图会出现白边
    1)为什么UI导入png图会出现白边2)升级Unity后产生的Objects泄露现象3)Unity升级后,加载Framework白屏4)如何优化轮廓线比较细的锯齿现象这是第419篇UWA技术知识分享的推送,精选了UWA社区的热门话题,涵盖了UWA问答、社区帖子等技术知识点,助力大家更全面地掌握和学习。UWA社区主页:commun......
  • 请问如何批量修改网站模板?
    批量修改网站模板可以提高工作效率,以下是几种常用的工具和方法:工具名称描述Notepad++支持多文件搜索和替换,适合批量修改文本文件。SublimeText高性能的文本编辑器,支持插件扩展,可以安装批量替换插件。VisualStudioCode微软开发的代码编辑器,支持多文件编辑和......
  • 批量检查微信小程序是否被封禁的 Go 源码接口
    本文展示了一个使用Go语言编写的示例,能够批量检查微信小程序是否被封禁。通过调用接口https://down.ychengsnsm.com/xcx/checkxcx.php?appid={appid},根据返回的code字段判断小程序的状态。若code为1,则表示小程序正常;若code为0,则表示小程序被封禁,并且封禁原因会......
  • 【YashanDB知识库】多csv文件一键式导入yashandb
    本文内容来自YashanDB官网,原文内容请见https://www.yashandb.com/newsinfo/7253738.html?templateId=1718516背景:csv文件导入yashandb存在以下两个痛点:1、导入通过yasboot或者yasldr导入csv文件均需要配置表的字段名,如果表比较多,字段也很多的情况下比较费力;2、一些大表需要导......
  • 电脑端利用Winrar软件批量解压加密文件
    最近在下载资源的时候发现,出现了使用同一压缩密码加密的一系列压缩包(这里只放了一部分,实际上有100多个……)想要使用电脑自带的Winrar软件解压缩变得非常困难,于是在网上查找了一下,有帖子说可以在winrar中选中所有文件以后,用鼠标右键可以输入一个统一密码?但是在实际操作中,并......
  • 如何解决虚拟主机上的数据库导入失败问题?
    当您在虚拟主机上尝试导入数据库时遇到失败,这通常是由几个常见原因造成的。为了帮助您顺利解决问题,我们将从以下几个角度出发,为您提供详细的解决方案。1. 确认数据库文件格式首先,确保您要导入的SQL文件是正确的格式,并且与目标数据库版本兼容。不同版本的MySQL/MariaDB之间可能......
  • WordPress产品导入后内容出现乱码,以及附属一些别的功能
    效果图如下  该插件附带了一个可以把产品描述里面的超链接给去掉,以及有的产品图片点击会在地址栏上面显示图片的路径,在该插件可以进行关闭,并且替换成一个模态窗,还有对产品邮费展示进行了处理,到金额到达包邮的时候,别的邮费进行隐藏下面是该插件源码目录结构duoladuola.......
  • 解决数据库导入失败及字符编码不一致的问题
    当您在导入SQL文件时遇到500错误,并且需要确认数据库编码是否为UTF-8时,可以按照以下步骤进行排查和解决:备份现有数据:在执行任何数据库操作之前,请确保已经对现有数据进行了完整备份。可以通过导出当前数据库结构和数据的方式创建备份文件,以防止意外丢失重要信息。检查SQL......
  • 多平台批量无水印视频图集下载器 DyD v3.0.3 免费版
    好久没有给大家推荐去水印的软件了,今天带来的这款DyD下载助手对自媒体小伙伴非常有帮助。它不仅能对视频进行编辑修改,帮助你打造原创内容,还自带了不少实用工具,提升你的创作效率。支持抖音、快手、小红书等平台。如今短视频非常火爆,任何人都能用手机拍摄一段视频。有了这个工具......
  • 震惊!体积不到 1M,竟能吊打众多批量收费软件!
    点击蓝字关注我作者|风雨软件前言今天,为大家隆重介绍一款堪称图片处理利器的软件——Jpg-C图片批量修整工具。它完全免费,极为好用,体积却仅有652K,小巧玲珑却功能强大!Jpg-C  图片批量修整工具Jpg-C 功能非常强大,不仅能对图片进行修整、压缩、裁剪,还支持批量压......