首页 > 其他分享 >Airflow 搭建

Airflow 搭建

时间:2024-04-08 13:33:06浏览次数:25  
标签:03 Airflow py 2024 dag airflow DAG 搭建

安装

采用pip安装

# 下面的安装方式是通过pip采用清华源来安装,一般安装的版本比较低
pip install apache-airflow -i https://pypi.tuna.tsinghua.edu.cn/simple

采用anaconda安装--推荐

anacoand/miniforge 安装方式参考这里

conda install apache-airflow

初始化数据库

airflow db init
启动
airflow webserver -p 8080 -D  # 启动web服务
airflow scheduler -D  # 启动调度器
启动airflow后创建用户:
airflow users create\
 --username admin\
 --firstname feng\
 --lastname jinxiong\
 --role Admin\
 --email [email protected]

替换数据库为mysql

安装mysql数据库连接驱动

pip3 install mysql-connector-python

修改airflow配置

vim airflow.cfg

配置方法:

点我查阅

查看当前使用的数据库信息:

airflow config get-value database sql_alchemy_conn

安装 ssh连接类型

参阅资料
可以通过ssh方式来远程执行远程设备上的脚本,这个时候默认的是没有ssh连接类型的,需要手动安装

pip install apache-airflow-providers-ssh

通过上方的安装,然后重启airflow就可以看到多出了一个ssh连接类型:
image

版本升级

可以直接通过pip升级

pip install --upgrade apache-airflow

升级后重启系统,可能发现服务无法正常启动,可以手动运行airflow scheduler -D
根据提示,可能需要输入Y来升级数据,升级完成之后,再启动调度器,应该就正常了。

删除自带的模板dag

可以直接从web端看到文件的绝对路径
image

Ssh operator写法

from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
from datetime import datetime,timedelta


default_args = {
    'owner': 'babyfengfjx',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0
}

with DAG(
        'APP23_test_dag',
        description='自动化执行V23应用验证',
        default_args=default_args,
        schedule_interval=timedelta(minutes=5),
        start_date=datetime(2023, 11, 1),
        catchup=False,
        max_active_runs=1,
        tags=['APP23','APPTEST']

         ) as dag:
    run_python_script = SSHOperator(
        task_id='run_python_script',
        ssh_conn_id='10.20.52.212_connection',  # 在Airflow配置中定义SSH连接
        command='python3 /home/app23/PycharmProjects/pythonProject/AppCompatibility/AppCompatibility_remote.py',
        dag=dag,
    )

在上面那种方式遇到了一些问题,执行过程中会报SSH超时,后来更换成下面这种模式:

BashOperator的模式

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
ssh_password = '1'
default_args = {
    'owner': 'babyfengfjx',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0
}

with DAG(
        'APP23_test_dag',
        description='自动化执行V23应用验证',
        default_args=default_args,
        schedule_interval=timedelta(minutes=5),
        start_date=datetime(2023, 11, 1),
        catchup=False,
        max_active_runs=1,
        tags=['APP23', 'APPTEST']
) as dag:
    run_python_script = BashOperator(
        task_id='run_python_script',
        bash_command='sshpass -p "{password}" ssh [email protected] "python3 /home/app23/PycharmProjects/pythonProject/AppCompatibility/AppCompatibility_remote.py"'.format(
            password=ssh_password),
        dag=dag,
    )

FAQ

01.无法后台启动的原因

数据库没启动

首先一种情况是因为切换了mysql数据库,数据库没有启动,所以会导致scheduler进程无法启动,造成服务启动不成功。
因异常关闭,导致webserver后台PID文件不正常,
日志中也会有记录,可以查看airflow/目录下的日志文件,通过手动删除对应的pid文件,然后再次启动即可。

02.执行脚本涉及本地读写均无法正常运行

在执行脚本中,我有一个使用sqlite的场景,直接在本地执行脚本是可以正常运营,但是通过airflow调度就有问题,无法打开数据库文件。
实际原因是数据库文件没有写绝对路径,airflow运行时找不到具体文件,所以会不成功,此时只需要将文件名写成绝对路径即可。

03.时区不是中国时区

在使用过程中发现,使用cron表达式填写的时间条件都与我本地相差8小时,是因为默认时区是UTC,得修改一下
如果您想将Airflow的时区设置为中国的时区(中国标准时间,CST),可以按照以下步骤进行配置:

  1. 打开Airflow的配置文件 airflow.cfg。该文件通常位于Airflow安装目录的 airflow 子目录下。
  2. 查找并编辑配置项 default_timezone,将其设置为 Asia/Shanghai。
  3. 保存配置文件并重启Airflow的调度器和Web服务器,以使配置更改生效。
    通过将 default_timezone 设置为 Asia/Shanghai,Airflow将使用中国标准时间作为默认时区,以便任务的调度时间与中国的本地时间一致。
    请注意,时区名称遵循标准的IANA时区命名约定。在这种情况下,Asia/Shanghai 是中国上海的标准时区名称。
    完成以上配置后,Airflow将根据中国标准时间(CST)计算任务的调度时间,以与中国的本地时间保持一致。

04.通过python脚本执行linux命令启动应用无响应

主要原因是airflow执行环境的环境变量与人为使用的桌面环境的环境变量不一致,其中一个核心的环境变量DISPLAY 没有,导致图形界面的应用无法启动成功。
在启动应用前,手动添加如下变量信息即可

import os
# 设置环境变量
os.environ['DISPLAY'] = ':0' 

其中 os.environ是系统的环境变量,可以手动添加上面的环境变量信息即可。

05.被执行设备IP变更后,就无法执行成功

有一台设备B是作为自动化测试应用的设备,在一次IP变更后,同步调整airflow的IP地址参数,也无法顺利执行了。
执行后就一直报错:

deepin-PC
*** Found local files:
***   * /home/deepin/airflow/logs/dag_id=14.APP20_test_dag/run_id=manual__2024-03-04T05:50:58.873417+00:00/task_id=run_pythonV20_script/attempt=1.log
[2024-03-04, 13:51:01 CST] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: 14.APP20_test_dag.run_pythonV20_script manual__2024-03-04T05:50:58.873417+00:00 [queued]>
[2024-03-04, 13:51:01 CST] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: 14.APP20_test_dag.run_pythonV20_script manual__2024-03-04T05:50:58.873417+00:00 [queued]>
[2024-03-04, 13:51:01 CST] {taskinstance.py:1361} INFO - Starting attempt 1 of 1
[2024-03-04, 13:51:01 CST] {taskinstance.py:1382} INFO - Executing <Task(BashOperator): run_pythonV20_script> on 2024-03-04 05:50:58.873417+00:00
[2024-03-04, 13:51:01 CST] {standard_task_runner.py:57} INFO - Started process 8455 to run task
[2024-03-04, 13:51:01 CST] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', '14.APP20_test_dag', 'run_pythonV20_script', 'manual__2024-03-04T05:50:58.873417+00:00', '--job-id', '510771', '--raw', '--subdir', 'DAGS_FOLDER/14.APP20_remote_DAG.py', '--cfg-path', '/tmp/tmp_epptm3p']
[2024-03-04, 13:51:01 CST] {standard_task_runner.py:85} INFO - Job 510771: Subtask run_pythonV20_script
[2024-03-04, 13:51:01 CST] {task_command.py:416} INFO - Running <TaskInstance: 14.APP20_test_dag.run_pythonV20_script manual__2024-03-04T05:50:58.873417+00:00 [running]> on host deepin-PC
[2024-03-04, 13:51:02 CST] {taskinstance.py:1662} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='李晓飞' AIRFLOW_CTX_DAG_ID='14.APP20_test_dag' AIRFLOW_CTX_TASK_ID='run_pythonV20_script' AIRFLOW_CTX_EXECUTION_DATE='2024-03-04T05:50:58.873417+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-03-04T05:50:58.873417+00:00'
[2024-03-04, 13:51:02 CST] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2024-03-04, 13:51:02 CST] {subprocess.py:75} INFO - Running command: ['/usr/bin/bash', '-c', 'sshpass -p "1" ssh [email protected] "python3 /home/apptest/Documents/PycharmProjects/pythonProject/AppCompatibility/product_auto/main.py"']
[2024-03-04, 13:51:02 CST] {subprocess.py:86} INFO - Output:
[2024-03-04, 13:51:02 CST] {subprocess.py:97} INFO - Command exited with return code 6  # 在这里没有到执行脚本的步骤就已经异常了,很有可能压根就没登录进来……
[2024-03-04, 13:51:02 CST] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/deepin/anaconda3/lib/python3.11/site-packages/airflow/operators/bash.py", line 210, in execute
    raise AirflowException(
airflow.exceptions.AirflowException: Bash command failed. The command returned a non-zero exit code 6.
[2024-03-04, 13:51:02 CST] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=14.APP20_test_dag, task_id=run_pythonV20_script, execution_date=20240304T055058, start_date=20240304T055101, end_date=20240304T055102
[2024-03-04, 13:51:03 CST] {standard_task_runner.py:104} ERROR - Failed to execute job 510771 for task run_pythonV20_script (Bash command failed. The command returned a non-zero exit code 6.; 8455)
[2024-03-04, 13:51:03 CST] {local_task_job_runner.py:228} INFO - Task exited with return code 1
[2024-03-04, 13:51:03 CST] {taskinstance.py:2778} INFO - 0 downstream tasks scheduled from follow-on schedule check

通过查询上面的错误信息:The command returned a non-zero exit code 6,说是权限问题,路径问题,但这些所有的信息都没有变化,这个问题一直没有得到彻底解决。

最后通过分析日志内容,发现在airflow执行远程连接尝试运行脚本的时候,下一步并没有到执行脚本的这一步,也就是说,很有可能是在远程连接设备B的时候就已经出问题了,但是通过其他设备ssh连接变更后的IP地址,是没有任何问题的,为何airflow执行就出现这个问题呢?

通过猜测,可能是ssh登录的时候保存的秘钥或者主机信息有一些信息相同,但是IP又不同导致的,因为我在以前连接过的设备中再次SSH连接这个变更IP的设备后,发现客户端会提示一些信息,意思是主机的信息变化了,问我是否继续。

根据这个信息,想到很有可能是因为这个原因导致的,于是直接在服务器中删除~/.ssh/known_hosts 文件,再次尝试,发现问题得到解决。

06.如何给dag任务指定超时时间

在实际生产过程中,发现有些任务在程序都已经结束的时候,dag任务仍旧无法退出,导致一直处于运行状态,无法结束。
这里就添加了一个超时参数,主要放在default_args
(当然这个实际有没有问题还得持续观测了)
default_args = {
'owner': '罗鑫思',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'dagrun_timeout': timedelta(minutes=30),
}
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
ssh_password = '1'
default_args = {
'owner': '罗鑫思',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'dagrun_timeout': timedelta(minutes=30),
}

with DAG(
'15.APP23_test_dag',
description='自动化执行V23应用验证',
default_args=default_args,
schedule_interval=('*/5 9-19 * * 1-7'),
start_date=datetime(2024, 2, 18),
catchup=False,
max_active_runs=1,
tags=['APP23', 'APPTEST']
) as dag:
run_python_script = BashOperator(
task_id='run_pythonV23_script',
bash_command='sshpass -p "{password}" ssh [email protected] "python3 '
'/home/app23/PycharmProjects/pythonProject/AppCompatibility/product_auto/main.py"'.format(password=ssh_password),
dag=dag,
)
任务编写的原子性
在我们编写airflow脚本任务时,尽可能以原子性方针为指导,每个任务均是不可分割的一个独立功能,即使单个功能失败,不影响其他功能的结果。
例如:
考虑对用户事件DAG的一个简单扩展,我们希望在其中添加一些功能,以便在每次
运行结束时发送前10名用户的电子邮件。 一个简单的方法是扩展我们前面的函数,额外调用
一些发送包含统计信息的电子邮件的函数。
def _send_stats(email,**context):
stats = pd.read_csv(context[“templates_dict”][“stats_path”])
email_stats(stats,email=email)

send_stats = PythonOperator
(task_id=“send_stats”,
python_callable=_send_stats,
op_kwargs={“email”:
[email protected]“},
templates_dict={“stats_path”:“/data/stats/{{ds}}.csv”},
dag=dag,

calculate_stats >> send_stats
不幸的是,这种方法的缺点是任务不再是原子的。你知道为什么吗?如果没有,考虑一下
如果我们的_send_stats函数失败会发生什么(如果我们的电子邮件服务器有点故障,这
肯定会发生 在本例中,我们已经将统计信息写入了输出文件的output_path中,使得任
务看起来好像成功了,即使它以失败告终。
要以原子方式实现此功能,我们可以简单地将电子邮件功能拆分为一个单独的任

编写airflow服务
当在 Linux 中创建一个服务时,需要使用 systemd 作为服务管理器。systemd 是一个用于管理系统进程和服务的工具,它能够自动启动、停止和监视服务,并提供了一些额外的功能,如进程管理、日志记录等。

在 systemd 中,服务由单独的服务文件定义,通常存储在 /etc/systemd/system 目录下。这些服务文件使用 INI 格式,以 .service 扩展名结尾。服务文件包含了服务的配置信息,如服务名称、描述、启动命令、环境变量等。

下面是上面提供的 Airflow Web 服务器的服务文件的详细解释:
[Unit]
Description=Airflow Webserver
After=network.target
这部分指定了服务的描述和依赖关系。

  • Description 是对服务的简短描述,
  • After 指定了服务启动所依赖的目标(这里是 network.target,表示在网络启动后才启动服务)。
    [Service]
    Environment="PATH=/home/deepin/anaconda3/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
    Environment="PYTHONPATH=/home/deepin/airflow/dags/moduler_FJX:$PYTHONPATH"
    User=deepin
    Group=deepin
    Type=simple
    ExecStart=/home/deepin/anaconda3/bin/airflow webserver -p 8080 -D
    Restart=on-failure
    RestartSec=5s
    PrivateTmp=true
    这部分是服务的主要配置信息。在 Environment 指令中,我们设置了两个环境变量:PATH 和 PYTHONPATH。PATH 环境变量指定了可执行文件的搜索路径,PYTHONPATH 环境变量指定了 Python 模块的搜索路径。通过设置这两个环境变量,我们确保了在服务启动时,正确的可执行文件和 Python 模块路径会被包含在搜索路径中。
    User 和 Group 指定了服务运行的用户和用户组。在这里,服务将以 deepin 用户和 deepin 用户组的身份运行。
    Type 指定了服务的类型,这里是 simple。simple 类型表示这是一个简单的服务,即服务启动后,systemd 不会监视服务的状态。
    ExecStart 指定了要执行的命令,即启动 Airflow Web 服务器的命令。在这里,我们使用了 /home/deepin/anaconda3/bin/airflow webserver -p 8080 -D 命令来启动服务器。-p 8080 指定了服务器监听的端口为 8080,-D 表示以守护进程方式运行。
    Restart 和 RestartSec 指定了服务的重启策略。Restart=on-failure 表示在服务失败时自动重启,RestartSec=5s 表示在服务停止后等待 5 秒再重启。
    PrivateTmp=true 指定服务使用私有的临时文件系统。
    [Install]
    WantedBy=multi-user.target
    这部分定义了服务的安装信息。WantedBy=multi-user.target 指定了服务随系统启动时自动启动,并作为多用户模式的一部分。
    这就是一个基本的 systemd 服务文件的解释和格式。通过编辑和配置服务文件,你可以定义自己的服务,并使用 systemd 来管理和控制它们。

FAQ

Airflow 运行后报脚本中找不到模块

主要是airflow运行的环境没有找到对应个人模块的pythonpath路径导致的,实际如何能自动加载路径也没搞清楚,最后通过在DAG文件开头手动加载对应目录来解决的。

下面是/home/deepin/airflow/dags文件夹的目录结构:

(base) deepin@deepin-PC:~/airflow/dags$ tree -I '*.pyc|__pycache__'  # tree -I 是排除显示一些内容
├── appcomment_DAG.py
├── appshelve_DAG.py
├── bbs_DAG.py
├── bug_DAG.py
├── case_DAG.py
├── casesheet_DAG.py
├── gerrit_DAG.py
├── github_DAG.py
├── moduler_FJX
│   ├── appcomment.py
│   ├── AppShelveInfo.py
│   ├── bbsmonitor.py
│   ├── buglist_oper.py
│   ├── caselist_oper.py
│   ├── casesheet_oper.py
│   ├── data_oper_module_mysql.py
│   ├── gerrit_rsync.py
│   ├── github2mdy.py
│   ├── __init__.py
│   ├── PMSlog.py
│   ├── pms_oper.py
│   ├── product_oper.py
│   ├── srlist_oper.py
│   └── tasklist_oper.py
├── pmslog_DAG.py
├── product_DAG.py
├── sr_DAG.py
└── task_DAG.py

标签:03,Airflow,py,2024,dag,airflow,DAG,搭建
From: https://www.cnblogs.com/babyfengfjx/p/18120937

相关文章

  • Window下SRS流媒体服务器的搭建+RTMP视频推流
     一、前期准备SRS流媒体服务器无法直接在Windows上运行,依赖于Linux内核环境,一般需要下载虚拟机。本文采用Docker作为容器,打开win自带的Hyper-V虚拟机。在安装docker之前先要打开window的虚拟机。 1.1 开启Hyper-V(1)“控制面板”——“程序”—......
  • 茴香豆:搭建你的 RAG 智能助理(笔记)
    视频地址:https://www.bilibili.com/video/BV1QA4m1F7t4文档地址:https://github.com/InternLM/Tutorial/blob/camp2/huixiangdou/readme.md作业地址:https://github.com/InternLM/Tutorial/blob/camp2/huixiangdou/homework.md茴香豆项目地址:  https://github.com/InternLM/......
  • 【UnityRPG游戏制作】Unity_RPG项目之界面面板分离和搭建
    ......
  • Vscode+gcc-arm+openocd搭建STM32开发环境
    1简介尝试使用Vscode搭建STM32开发环境,自己记录一下详细的配置过程2工具下载设计到的相关软件以及资源包括Vscode软件、STM32CubeMX、mingw64以及openocd,相应的软件介绍以及下载链接如下:Vscode软件:宇宙第一编辑器,开源,插件丰富CubeMx:初始化代码生产器,HAL库mingw64:因......
  • windows环境下使用tomcat搭建文件服务器(带权限验证)
    操作系统:Windows11 jdk:jdk1.8tomcat版本:8.5.100 开始准备所需物料。tomcat下载地址:https://tomcat.apache.org/download-80.cgi选择windows64位选择64-bitWindowszip。查看tomcat版本说明支持,tomcat8支持1.7及以上,我这边下载1.8版本。jdk下载路径:https://www.or......
  • 五一假期来临,各地景区云旅游、慢直播方案设计与平台搭建
    一、行业背景经文化和旅游部数据中心测算,今年清明节假期3天全国国内旅游出游1.19亿人次,按可比口径较2019年同期增长11.5%;国内游客出游花费539.5亿元,较2019年同期增长12.7%。踏青赏花和户外徒步成为假期的热门出游主题。随着清明假期的过去以及即将到来的五一小长假,各地景区又将再......
  • Java商城免费搭建 VR全景商城 saas商城 b2b2c商城 o2o商城 积分商城 秒杀商城 拼团商
     1.涉及平台平台管理、商家端(PC端、手机端)、买家平台(H5/公众号、小程序、APP端(IOS/Android)、微服务平台(业务服务) 2.核心架构SpringCloud、SpringBoot、Mybatis、Redis3.前端框架VUE、Uniapp、Bootstrap/H5/CSS3、IOS、Android、小程序4.核心思想分布式、微服务......
  • 免费小程序商城搭建之b2b2c o2o 多商家入驻商城 直播带货商城 电子商务b2b2c o2o 多商
     1.涉及平台平台管理、商家端(PC端、手机端)、买家平台(H5/公众号、小程序、APP端(IOS/Android)、微服务平台(业务服务) 2.核心架构SpringCloud、SpringBoot、Mybatis、Redis3.前端框架VUE、Uniapp、Bootstrap/H5/CSS3、IOS、Android、小程序4.核心思想分布式、微服务......
  • .net core中EF core的环境搭建
    //数据上下文MyDbContext.csusingMicrosoft.EntityFrameworkCore;namespaceLearn00.Models{publicclassMyDbContext:DbContext{//摘要://把Employees{get;set;},理解成是一个容器//用来存放Employee类型的实体,该实......
  • 使用miniforge平替anaconda,重建airflow服务
    背景因公司通知不能使用anaconda,可以采用miniforge作为开源平替,因之前环境搭建使用的就是anaconda,当前需要卸载并替换成miniforge。那为什么一定要用这个呢,其实也不是一定,而是用这个搭建环境比较省事,如果没用这个,我当前环境的python版本过低,解决这个问题耗费的时间会更久,所以最......