首页 > 其他分享 >hadoop组件---spark实战-----airflow----调度工具airflow定时运行任务的理解

hadoop组件---spark实战-----airflow----调度工具airflow定时运行任务的理解

时间:2023-08-08 23:34:07浏览次数:45  
标签:airflow schedule interval 28 hadoop ---- start date


我们在前面已经初步了解了 airflow:

hadoop组件—spark实战-----airflow----调度工具airflow的介绍和使用示例

但是 我们开始 尝试使用 airflow的 定时任务的时候 ,常常遇到一个尴尬的情况, 任务没有成功运行,或者说 设置开始时间是今天,但是 明天 才开始运行。

本篇文章 尝试 说明 其中的 原理。

首先 需要声明:

schedule_interval 的crontab的语法与 linux的crontab语法 类似,但是 运行的原理并不一样,它需要与airflow中的 start_date等设置协同生效。

几个概念

我们首先来结合实例,看几个概念

以DAG流程projects为例。

案例一

现在是2020-03-12 00:00:00

我计划让一个 任务在每天凌晨两点运行。

使用配置

tz = pytz.timezone('Asia/Shanghai')

default_args = {
    'depends_on_past': False,
    'start_date': datetime(2020, 3, 12, tzinfo=tz)
}

dag = DAG(
    "projects",
    default_args=default_args,
    schedule_interval='28 2 * * *')

我们会发现 这个任务并不会在 2020-3-12的2点28分执行,而是会在 2020-3-13的2点28分第一次执行。

案例二

假设当前时间是 2020-03-12 15:05 。

我们在projects的python代码中设置如下:

default_args = {
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(1)
}

dag = DAG(
    "projects",
    default_args=default_args,
    schedule_interval='28 2 * * *')

我们期望projects流程 从昨天开始 每天 凌晨2点28分执行,然后 启动scheduler。

会发现airflow成功触发了定时运行任务。 输出的时间节点如下:

Schedule:   28 2 * * *

Last Run: 2020-03-11 02:28

Start Date:  2020-03-12 15:05

则运行情况如下:

hadoop组件---spark实战-----airflow----调度工具airflow定时运行任务的理解_时间间隔

hadoop组件---spark实战-----airflow----调度工具airflow定时运行任务的理解_sed_02

概念理解

start_date 配置中使用的变量,含义为 调度计划开始时间 是一个静态的 期望值
Start Date 区别与start_date,指的是 调度计划开始运行的 时间, 是一个动态的 实际值
schedule interval 调度执行周期间隔
execution date 执行时间,具体任务开始运行的时间, 是一个动态的 实际值
Last Run 上一次的 execution date
period 窗口周期,一段有start 和 end 的时间段

原理

官方中的说法是

Let’s Repeat That The scheduler runs your job one schedule_interval AFTER the start date, at the END of the period 。

Airflow sets execution_date based on the left bound of the schedule period it is covering, not based on when it fires (which would be the right bound of the period)

也就是说 调度计划的 执行时间 execution date 应该是 start_date + schedule interval 。

但 任务的真正 运行 时间 其实 是在 周期的 末尾end,也就是 第二个满足 schedule interval的时间点。

第一次任务真正运行的时间 是 配置的 start_date 的第二个满足 schedule interval 的时间点

并且记录的 execution date 为 配置的 start_date 的第一个满足 schedule interval 的时间点,也就是Last Run。

另外,当作业已经执行过之后,start_date 的配置将不会再生效,这个作业的调度开始时间将直接按照上次调度所对应的 execution date 来计算。

hadoop组件---spark实战-----airflow----调度工具airflow定时运行任务的理解_定时_03

也就是 说 airflow会在start_date开始后,符合schedule_interval定义的第一个时间点记为execution_date,但是会在下个时间点到达是才开始运行任务,也就是说由于schedule interval 是一个窗口周期,它是一段的,并且在这一段时间 的end 才会开始运行, 因为这个窗口的原因,表现处理 我们的定时任务 实际执行会滞后一个周期。

怎么让任务当天运行

我们根据原理知道 之所以 任务不会当天运行 是因为 当我们的 时间间隔 以 天为单位时 ,比如 每天2点28 .

它就会在 第一个时间间隔的时间点 开始任务, 但是 实际上 是在 第二个时间 间隔的 时间点 才真正执行任务。

也就是 说 如果我想让 任务当天 运行 只要把 时间间隔 使用小时 或者 分钟来 表示就可以了 。

把schedule_interval设短一点。

比如设置成*/20 15 * * *这样的形式。

dag会在15:00-16:00间每隔20分钟跑一次

也就是 在 15点40的时候会 进行 第一次任务的真正运行

需要注意的是 防止 重复多次运行 程序需要有判断重复运行的 逻辑

airflow最佳实践

airflow官方实际上不推荐使用 动态的start_date。

We recommend against using dynamic values as start_date, especially datetime.now() as it can be quite confusing. The task is triggered once the period closes, and in theory an @hourly DAG would never get to an hour after now as now() moves along.

标签:airflow,schedule,interval,28,hadoop,----,start,date
From: https://blog.51cto.com/u_16218512/7013767

相关文章

  • k8s 学习笔记之配置存储——ConfigMap&Secret
    配置存储ConfigMapConfigMap是一种比较特殊的存储卷,它的主要作用是用来存储配置信息的。创建configmap.yaml,内容如下:apiVersion:v1kind:ConfigMapmetadata:name:configmapnamespace:devdata:info:|(这个|后面整个都是值)username:adminpassword:12......
  • hadoop组件---spark实战-----airflow----调度工具airflow部署到k8s中使用
    在之前的文章中我们已经了解了airflow和它的工作原理。hadoop组件—spark实战-----airflow----调度工具airflow的介绍和使用示例Scheduler进程,WebServer进程和Worker进程需要单独启动。Scheduler和WebServer可以跑在一个操作系统内,也可以分开,而通常Worker需要很多,如果是部署特定......
  • k8s---使用ingress配置域名转发时的traefik路径规则详解
    ingress中traefik的使用方式如下:apiVersion:extensions/v1beta1kind:Ingressmetadata:name:spark-client-testnamespace:defaultannotations:kubernetes.io/ingress.class:traefiktraefik.frontend.rule.type:PathPrefixspec:rules:-host:......
  • hadoop组件---spark实战-----airflow----调度工具airflow的介绍和使用示例
    Airflow是什么Airflow是一个可编程,调度和监控的工作流平台,基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行。airflow提供了丰富的命令行工具用于系统管控,而其web管理界面同样也可以方便的管控调度任务,并且对任务运行状态进行实时监控,方便了系统的运维和管理,......
  • 数据挖掘(七) -----在python程序中使用hail
    我们在之前的文章中已经尝试安装了hail和简单的使用数据挖掘(五)-----基于Spark的可伸缩基因数据分析平台开源存储运算架构hail全面了解和安装但是我们发现这种hail的运行方式是需要进入到conda的hail的虚拟环境中才能运行的。我们业务一般来说都是在外层执行,还有其他的业务逻......
  • 数据挖掘(五) -----基于Spark的可伸缩基因数据分析平台开源存储运算架构hail全面了解
    hail简介hail是一个开源的、通用的、面向python数据类型的处理基因数据专用的分析库和方法解决方案。hail的存在是为了支持多维度的复杂的数据结构,比如全基因组关联数据研究(GWAS).GWASTutorialhail的底层是通过python,scala,java和apachespark来实现的。hail官网gitlab官方文......
  • 云监控---grafana使用mysql数据源创建dashboard--全面解析
    grafana的dashboard简介经常被用作基础设施的时间序列数据和应用程序分析的可视化。Grafana主要特性:灵活丰富的图形化选项;可以混合多种风格;支持多个数据源;拥有丰富的插件扩展;支持用户权限管理。Grafana有着非常漂亮的图表和布局展示,功能齐全的度量仪表盘dashboard和图形编辑......
  • 遇到问题--Kubernetes--argo--output does not exist
    情况在使用argo进行流程串联时使用了output进行文件输出。在生产环境的argo中运行,即时需要output的文件在pod中不存在,也能正常运行进入后续步骤。但是内测环境的argo同样的情况下会报错。报错如下:path/mendel/need_update_barcode.txtdoesnotexist(or/mendel/need_update_......
  • Android平台GB28181设备接入端如何实现多视频通道接入?
    技术背景我们在设计Android平台GB28181设备接入模块的时候,有这样的场景诉求,一个设备可能需要多个通道,常见的场景,比如车载终端,一台设备,可能需要接入多个摄像头,那么这台车载终端设备可以作为主设备,然后,主设备下,配置多个通道,听起来是不是有点儿类似于DVR或NVR?技术实现这里,我们说下,我们......
  • 操作系统概述
    2.1.1操作系统的概念操作系统 是一组控制盒管理计算机系统的硬件和软件资源、控制程序执行、改善人机界面、合理地组织计算机工作流程并未用户使用计算机提供良好运行环境的一种系统软件。 目的:提高计算机系统的效率,增强系统的处理能力,提高系统资源的利用率,方便用户使用计算机。2......