airflow部署
目录\
一、构建虚拟环境virtualenv
yum install python3.11
python -V
ln -s /usr/bin/python3.11 /usr/bin/python
yum install virtualenv
useradd airflow
创建环境(在当前目录中创建一个文件夹)
(airflow) [root@localhost opt]# virtualenv -p python3.11 airflow
在 Linux 或 Mac 中,激活新的 python 环境
source airflow/bin/activate
二、安装apache-airflow[celery]==2.7.1
下载这个文件"https://raw.githubusercontent.com/apache/airflow/constraints-2.7.1/constraints-3.11.txt"
利用文件方式安装
pip install "apache-airflow[celery]==2.7.1" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.7.1/constraints-3.11.txt"
pip install "apache-airflow[celery]==2.7.1" -c /opt/airflow/constraints-3.11.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
三、安装数据库
[root@node1 ~]# dnf install -y https://download.postgresql.org/pub/repos/yum/reporpms/EL-9-x86_64/pgdg-redhat-repo-latest.noarch.rpm
[root@node1 ~]# dnf -qy module disable postgresql
[root@node1 ~]# dnf install -y postgresql16-server
[root@node1 ~]# /usr/pgsql-16/bin/postgresql-16-setup initdb
[root@node1 ~]# systemctl enable postgresql-16
[root@node1 ~]# systemctl start postgresql-16
CREATE DATABASE airflow;
CREATE USER airflow WITH PASSWORD 'airflow128';
GRANT ALL PRIVILEGES ON DATABASE airflow TO airflow;
-- PostgreSQL 15 requires additional privileges:
USE airflow;
GRANT ALL ON SCHEMA public TO airflow;
firewall-cmd --zone=public --add-port=8080/tcp --permanent
firewall-cmd --zone=public --add-port=5432 --permanent
pip install psycopg2-binary -i https://pypi.tuna.tsinghua.edu.cn/simple
修改配置文件airflow.cfg
四、安装配置消息队列
yum install centos-release-rabbitmq-38.noarch
yum install systemctl enable rabbitmq-server.service
systemctl enable rabbitmq-server.service
systemctl start rabbitmq-server.service
rabbitmqctl add_user myuser mypassword
rabbitmqctl add_vhost myvhost
rabbitmqctl set_user_tags myuser mytag
rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"
rabbitmqctl add_user airflow 123456
rabbitmqctl add_vhost airflow
rabbitmqctl set_permissions -p airflow airflow ".*" ".*" ".*"
airflow db init
airflow webserver
airflow scheduler
airflow celery worker
airflow users create \
--username airflow \
--firstname airflow \
--lastname airflow \
--role Admin \
--email [email protected]
airflow users create \
--username admin \
--firstname admin \
--lastname admin \
--role Admin \
--email [email protected]
五、修改配置文件airflow.cfg
broker_url = 'amqp://myuser:mypassword@localhost:5672/myvhost'
# localhost 这个必须是主机名,其他的不行
[core]
dags_folder = /home/airflow/airflow/dags
#修改时区
default_timezone = Asia/Hong Kong
#配置Executor类型,集群建议配置CeleryExecutor
executor = CeleryExecutor
# 配置数据库
sql_alchemy_conn = postgresql://postgres:[email protected]:5432/airflow
[webserver]
#设置时区
default_ui_timezone = Asia/Hong Kong
[scheduler_failover]
# 配置airflow Master节点,这里配置为node1,node2,两节点需要免密
scheduler_nodes_in_cluster = node1,node2
[celery]
#配置Celery broker使用的消息队列
broker_url = redis://node1:6379/0
#配置Celery broker任务完成后状态更新使用库
result_backend = db+postgresql://postgres:123456@node1:5432/airflow
smtp_smarthost: 'smtp.exmail.qq.com:587'
smtp_from: '[email protected]'
smtp_hello: 'youjivest.com'
smtp_auth_username: '[email protected]'
smtp_auth_password: 'kHThsAfxfsGHvcWF'
六、邮件发送配置
[email]
email_backend = airflow.utils.email.send_email_smtp
subject_template = /path/to/my_subject_template_file
html_content_template = /path/to/my_html_content_template_file
from_email您可以通过在以下部分中进行设置来配置发件人的电子邮件地址[email]:
[email]
from_email = "John Doe <[email protected]>"
[smtp]
smtp_host = smtp.sendgrid.net # 您的 SMTP 服务器主机名
smtp_starttls = False # 设置为 True 表示使用 STARTTLS,这是一种加密连接的方式,一般情况下保持为 False
smtp_ssl = False # 设置为 True 表示使用 SSL 连接,如果您的 SMTP 服务器需要 SSL 连接,则设置为 True
smtp_user = apikey # SMTP 用户名,通常是 'apikey'
smtp_password = your-api-key # 您的 SMTP 密钥,应该替换为实际的 API 密钥
smtp_port = 587 # SMTP 服务器端口号,通常是 587
smtp_mail_from = [email protected] # 发件人的邮箱地址
[smtp]
smtp_host=smtp.exmail.qq.com:587
smtp_starttls=False
smtp_ssl=False
[email protected]
smtp_password=kHThsAfxfsGHvcWF
smtp_port=587
smtp_mail_from=<your-from-email>
七、send_email.py test
# !/usr/bin/python
# -*- coding: UTF-8 -*-
import smtplib
import pendulum
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from email.mime.text import MIMEText
from email.header import Header
def send_email(mail_msg):
"""发送邮件 第三方 SMTP 服务"""
# ----------发送者
mail_host = "smtp.exmail.qq.com:587" # 设置服务器
mail_user = "[email protected]" # 用户名,发送者邮箱
mail_pass = "kHThsAfxfsGHvcWF" # 口令, 可以去自己的邮箱设置中查看
sender = '[email protected]' # 发送者邮箱
# ----------接收者
receivers = ['[email protected]'] # 接收邮件,可设置为你的QQ邮箱或者其他邮箱
message = MIMEText(mail_msg, 'html', 'utf-8')
message['From'] = Header("Airflow-告警")
message['To'] = Header("developer")
subject = 'Airflow 任务告警'
message['Subject'] = Header(subject, 'utf-8')
try:
smtp_obj = smtplib.SMTP()
smtp_obj.connect(mail_host, 587) # 25 为 SMTP 端口号
smtp_obj.login(mail_user, mail_pass)
smtp_obj.sendmail(sender, receivers, message.as_string())
except smtplib.SMTPException:
print("Error: 无法发送邮件")
def send_email_fun(msg):
send_email(msg)
def failure_callback(context):
"""DAG失败时,触发该回调函数,context是上下文,调用时自动传递 """
dag_id = context['dag'].dag_id if context
task_id = context['task_instance'].task_id
reason = context['exception']
mail_msg = f"""
<p>日期:{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}</p>
<p>DAG ID:{dag_id}...</p>
<p>Task id:{task_id}...</p>
<P>STATE: Error!</P>
<p>Exception:{reason}...</p>
"""
send_email_fun(mail_msg)
local_tz = pendulum.timezone("Asia/Shanghai")
start_date = datetime(2023, 10, 14, 11, 13, 50, tzinfo=local_tz)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': start_date,
'on_failure_callback': failure_callback, # DAG运行失败时,调用该回调,不需要传参
'on_success_callback': success_callback, # DAG成功运行时调用该回调,success_callback自行定义,一般成功时没必要发邮件通知
}
dag = DAG(
dag_id="test_send",
default_args=default_args,
schedule_interval=None,
)
def print_hello(**context):
"""根据自身需求定义的操作,context会在PythonOperator调用时自动传入"""
print("hello")
with open("./context.txt", "a", encoding="utf-8") as f:
f.write(str(context))
failure_callback = PythonOperator(
task_id='send_email',
# provide_context=True,
python_callable=print_hello, # 调用该函数时,自动传入context
provide_context=True,
dag=dag,
)
failure_callback
八、设置systemctl 启动
[root@pgnode1 systemd]# cat /etc/systemd/system/airflow-webserver.service
[Unit]
Description=Apache Airflow Web Server
After=network.target
[Service]
User=airflow
Group=airflow
ExecStart=/opt/airflow/bin/airflow webserver
Restart=on-failure
RestartSec=5s
[Install]
WantedBy=multi-user.target
[root@pgnode1 systemd]# cat /etc/systemd/system/airflow-scheduler.service
[Unit]
Description=Apache Airflow Scheduler
After=network.target
[Service]
User=airflow
Group=airflow
ExecStart=/opt/airflow/bin/airflow scheduler
Restart=on-failure
RestartSec=5s
[Install]
WantedBy=multi-user.target
[root@node3 ~]# cat /etc/systemd/system/airflow-worker.service
[Unit]
Description=Apache Airflow Worker
After=network.target
[Service]
User=airflow
Group=airflow
ExecStart=/opt/airflow/bin/airflow celery worker
Restart=on-failure
RestartSec=5s
[Install]
WantedBy=multi-user.target
九、airflow配置身份验证:LDAP
pip安装 python-ldap 所需要的依赖
yum install python3.11-devel openldap-devel gcc
此方法将根据 LDAP 服务器验证用户的凭据。
警告:要使用 LDAP,您需要安装python-ldap。
https://www.python-ldap.org/en/python-ldap-3.4.3/ 安装文档python-ldap。
对于典型的 Microsoft AD 设置(所有用户都可以执行 LDAP 搜索):
pip install python-ldap -i https://pypi.tuna.tsinghua.edu.cn/simple
参考文档https://flask-appbuilder.readthedocs.io/en/latest/security.html
参考
对于典型的 OpenLDAP 设置(其中 LDAP 搜索需要特殊帐户):
AUTH_TYPE = AUTH_LDAP
AUTH_LDAP_SERVER = "ldap://ldap.example.com"
AUTH_LDAP_USE_TLS = False
# registration configs
AUTH_USER_REGISTRATION = True # allow users who are not already in the FAB DB
AUTH_USER_REGISTRATION_ROLE = "Public" # this role will be given in addition to any AUTH_ROLES_MAPPING
AUTH_LDAP_FIRSTNAME_FIELD = "givenName"
AUTH_LDAP_LASTNAME_FIELD = "sn"
AUTH_LDAP_EMAIL_FIELD = "mail" # if null in LDAP, email is set to: "{username}@email.notfound"
# search configs
AUTH_LDAP_SEARCH = "ou=users,dc=example,dc=com" # the LDAP search base
AUTH_LDAP_UID_FIELD = "uid" # the username field
AUTH_LDAP_BIND_USER = "uid=admin,ou=users,dc=example,dc=com" # the special bind username for search
AUTH_LDAP_BIND_PASSWORD = "admin_password" # the special bind password for search
您可以通过配置来限制 LDAP 搜索范围:
# only allow users with memberOf="cn=myTeam,ou=teams,dc=example,dc=com"
AUTH_LDAP_SEARCH_FILTER = "(memberOf=cn=myTeam,ou=teams,dc=example,dc=com)"
您可以根据 LDAP 角色赋予 FlaskAppBuilder 角色(注意,这需要设置 AUTH_LDAP_SEARCH)
# a mapping from LDAP DN to a list of FAB roles
AUTH_ROLES_MAPPING = {
"cn=fab_users,ou=groups,dc=example,dc=com": ["User"],
"cn=fab_admins,ou=groups,dc=example,dc=com": ["Admin"],
}
# the LDAP user attribute which has their role DNs
# username字段LDAP用户属性,该属性具有其角色DN
AUTH_LDAP_GROUP_FIELD = "memberOf"
# if we should replace ALL the user's roles each login, or only on registration
# 如果我们应该在每次登录时替换所有用户的角色,或者仅在注册时替换
AUTH_ROLES_SYNC_AT_LOGIN = True
# force users to re-auth after 30min of inactivity (to keep roles in sync)
# 强制用户在不活动30分钟后重新进行身份验证(以保持角色同步)
PERMANENT_SESSION_LIFETIME = 1800
传输层安全协议
对于 STARTTLS,配置ldap://服务器并将AUTH_LDAP_USE_TLS设置为True:
AUTH_LDAP_SERVER = "ldap://ldap.example.com"
AUTH_LDAP_USE_TLS = True
对于 LDAP over TLS (ldaps),请使用ldaps://方案配置服务器并将AUTH_LDAP_USE_TLS设置为False:
AUTH_LDAP_SERVER = "ldaps://ldap.example.com"
AUTH_LDAP_USE_TLS = False
"""Default configuration for the Airflow webserver"""
import os
#from airflow.www.fab_security.manager import AUTH_DB
from airflow.www.fab_security.manager import AUTH_LDAP
# from airflow.www.fab_security.manager import AUTH_OAUTH
# from airflow.www.fab_security.manager import AUTH_OID
# from airflow.www.fab_security.manager import AUTH_REMOTE_USER
basedir = os.path.abspath(os.path.dirname(__file__))
# Flask-WTF flag for CSRF
WTF_CSRF_ENABLED = True
#AUTH_TYPE = AUTH_DB
AUTH_TYPE = AUTH_LDAP
# Uncomment to setup Full admin role name
AUTH_ROLE_ADMIN = 'Admin'
# The default user self registration role
# AUTH_USER_REGISTRATION_ROLE = "Public"
# When using LDAP Auth, setup the ldap server
#AUTH_LDAP_SERVER = "ldap://leojiang.com"
AUTH_LDAP_SERVER = "ldap://ipa.youji*.com"
AUTH_LDAP_USE_TLS = False
# registration configs
# 指定Airflow允许LDAP用户在登录时自动注册
AUTH_USER_REGISTRATION = True # allow users who are not already in the FAB DB
# 这个必须配置要不会报认证的错误
AUTH_USER_REGISTRATION_ROLE = "Admin" # this role will be given in addition to any AUTH_ROLES_MAPPING
# 指定用户每次登陆都会刷新一次权限
AUTH_ROLES_SYNC_AT_LOGIN = True
# search configs(以下就是你自己ldap的一些用户信息,根据自己的配置即可)
#AUTH_LDAP_SEARCH = "ou=ID,ou=Data,dc=leo,dc=se" # the LDAP search base
AUTH_LDAP_SEARCH = "cn=users,cn=accounts,dc=youji*,dc=com" # the LDAP search base
AUTH_LDAP_UID_FIELD = "uid" # the username field
#AUTH_LDAP_BIND_USER = "CN=leojiangdm,OU=CA,OU=SvcAccount,OU=P1,OU=ID,OU=Data,DC=leo,DC=se" # the special bind username for search
AUTH_LDAP_BIND_USER = "uid=admin,cn=users,cn=accounts,dc=youjivest,dc=com"
#AUTH_LDAP_BIND_PASSWORD = "Adfsadfasdfsd@123" # the special bind password for search
AUTH_LDAP_BIND_PASSWORD = "********"
# 您可以通过配置来限制LDAP搜索范围:(注意如果用 AUTH_ROLES_MAPPING 参数,则该参数不能使用)
# only allow users with memberOf="cn=myTeam,ou=teams,dc=example,dc=com"
#AUTH_LDAP_SEARCH_FILTER = "(memberOf=cn=myTeam,ou=teams,dc=example,dc=com)"
AUTH_LDAP_SEARCH_FILTER = "(memberOf=cn=ipausers,cn=groups,cn=accounts,dc=youjivest,dc=com)"
# 您可以基于 LDAP 角色赋予 FlaskAppBuilder 角色(注意,这需要设置 AUTH_LDAP_SEARCH):
# a mapping from LDAP DN to a list of FAB roles
#AUTH_ROLES_MAPPING = {
# "cn=fab_users,ou=groups,dc=example,dc=com": ["User"],
# "cn=fab_admins,ou=groups,dc=example,dc=com": ["Admin"],
#}
十、promethes监控 airflow
https://www.redhat.com/en/blog/monitoring-apache-airflow-using-prometheus
三个 Airflow 组件:Webserver、Scheduler 和 Worker。从 Webserver、Scheduler 和 Worker 开始的实线显示了从这三个组件流向 statsd_exporter 的指标。statsd_exporter 聚合指标,将它们转换为 Prometheus 格式,并将它们公开为 Prometheus 端点。Prometheus 服务器会定期抓取此端点,并将指标保存在其数据库中。然后可以在 Grafana 仪表板中查看存储在 Prometheus 中的气流指标。
本博客的其余部分将创建上图所示的设置。我们准备去:
- 配置 Airflow 以发布 statsd 指标。
- 使用 statsd_exporter 将 statsd 指标转换为 Prometheus 指标。
- 部署 Prometheus 服务器来收集指标并将其提供给 Grafana。
创建 Airflow 数据库和airflow.cfg 配置文件:
$ airflow initdb
打开Airflow配置文件airflow.cfg进行编辑
$ vi ~/airflow/airflow.cfg
通过设置 statsd_on = True 打开 statsd 指标。在保存更改之前,statsd 配置应如下所示:
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow
基于此配置,Airflow 将把 statsd 指标发送到 statsd 服务器,该服务器将接受 localhost:8125 上的指标。我们将在下一节中启动该服务器。
本节的最后一步是启动 Airflow Web 服务器和调度程序进程。您可能希望在两个单独的终端窗口中运行这些命令。确保在发出命令之前激活 Python 虚拟环境:
$ airflow webserver
$ airflow scheduler
此时,Airflow 正在运行并将 statsd 指标发送到localhost:8125。在下一节中,我们将启动 statsd_exporter,它将收集 statsd 指标并将其导出为 Prometheus 指标。
------------
将 statsd 指标转换为 Prometheus 指标
让我们从安装 statsd_exporter 开始本节。如果您的计算机上正确设置了 Golang 环境,则可以通过发出以下命令来安装 statsd_exporter:
go get github.com/prometheus/statsd_exporter
------------
# 下载 statsd_exporter
wget https://github.com/prometheus/statsd_exporter/releases/
[root@airflow opt]# rz -E
rz waiting to receive.
[root@airflow opt]# ll
total 7716
drwxr-xr-x. 7 airflow airflow 137 Oct 10 17:23 airflow
-rw-r--r--. 1 root root 7898529 Oct 12 10:55 statsd_exporter-0.24.0.linux-amd64.tar.gz
[root@airflow opt]# tar xf statsd_exporter-0.24.0.linux-amd64.tar.gz
[root@airflow opt]# ll
total 7716
drwxr-xr-x. 7 airflow airflow 137 Oct 10 17:23 airflow
drwxr-xr-x. 2 1001 1002 58 Jun 2 16:07 statsd_exporter-0.24.0.linux-amd64
-rw-r--r--. 1 root root 7898529 Oct 12 10:55 statsd_exporter-0.24.0.linux-amd64.tar.gz
[root@airflow opt]# mv statsd_exporter-0.24.0.linux-amd64 statsd_exporter
[root@airflow opt]# cp statsd_exporter /usr/local/bin/
cp: -r not specified; omitting directory 'statsd_exporter'
[root@airflow opt]# cp statsd_exporter/
LICENSE NOTICE statsd_exporter
[root@airflow opt]# cp statsd_exporter/statsd_exporter /usr/local/bin/
[root@airflow opt]# vim /usr/lib/systemd/system/statsd_exporter.service
[Unit]
Documentation=https://prometheus.io/
[Service]
Type=simple
ExecStart=/usr/local/bin/statsd_exporter --statsd.listen-udp localhost:8125 --log.level debug
Restart=on-failure
[Install]
WantedBy=multi-user.target
或者,您可以使用prometheus/statsd-exporter容器映像部署 statsd_exporter 。映像文档包含有关如何拉取和运行映像的说明。
当 Airflow 运行时,在同一台计算机上启动 statsd_exporter:
[root@airflow ~]# cat /usr/lib/systemd/system/statsd_exporter.service
[Unit]
Documentation=https://prometheus.io/
[Service]
Type=simple
ExecStart=/usr/local/bin/statsd_exporter --statsd.listen-udp localhost:8125 --log.level debug
Restart=on-failure
[Install]
WantedBy=multi-user.target
[root@airflow ~]#
$ statsd_exporter --statsd.listen-udp localhost:8125 --log.level debug
level=info ts=2020-09-18T15:26:51.283Z caller=main.go:302 msg="Starting StatsD -> Prometheus Exporter" version="(version=, branch=, revision=)"
level=info ts=2020-09-18T15:26:51.283Z caller=main.go:303 msg="Build context" context="(go=go1.14.7, user=, date=)"
level=info ts=2020-09-18T15:26:51.283Z caller=main.go:304 msg="Accepting StatsD Traffic" udp=localhost:8125 tcp=:9125 unixgram=
level=info ts=2020-09-18T15:26:51.283Z caller=main.go:305 msg="Accepting Prometheus Requests" addr=:9102
level=debug ts=2020-09-18T15:26:52.534Z caller=listener.go:69 msg="Incoming line" proto=udp line=airflow.executor.open_slots:32|g
如果一切顺利,您应该会看到气流指标在屏幕上滚动,如上面的示例所示。您还可以验证 statsd_exporter 是否正在执行其工作并以 Prometheus 格式公开指标。Prometheus 指标应该可以通过localhost:9102 访问。您可以使用curl来获取Prometheus指标:
$curl localhost:9102/metrics
# HELP airflow_collect_dags 由 statsd_exporter 自动生成的指标。
# TYPE airflow_collect_dags gauge
airflow_collect_dags 50.056391
# HELP airflow_dag_loading_duration_example_bash_operator 由 statsd_exporter 自动生成的指标。
# 类型airflow_dag_loading_duration_example_bash_operator摘要
airflow_dag_loading_duration_example_bash_operator{quantile =“0.5”}1.108e-06airflow_dag_loading_duration_example_bash_operator
{quantile =“0.9”}4.942e-06airflow_dag_loading_duration_example_bash_operator
{quantile =“0.99” “} 4.942e-06
airflow_dag_loading_duration_example_bash_operator_sum 1.8886000000000002e-05
airflow_dag_loading_duration_example_bash_operator_count 7
# HELP airflow_dag_loading_duration_example_branch_dop_operator_v3 由 statsd_exporter 自动生成的指标。
# 类型airflow_dag_loading_duration_example_branch_dop_operator_v3摘要
airflow_dag_loading_duration_example_branch_dop_operator_v3{quantile =“0.5”}1.61e-06airflow_dag_loading_duration_example_branch_dop_operator_v3
{quantile =“0.9”}5.776e-06airflow_dag_ load_duration_example_branch_dop_operator_v3{quantile="0.99"
} 5.776e-06 airflow_dag_loading_duration_example_branch_dop_operator_v3_sum
1.8076e-05
airflow_dag_loading_duration_example_branch_dop_operator_v3_count 6
...
使用 Prometheus 收集指标
完成上一部分后,Airflow 指标现在以 Prometheus 格式提供。下一步,我们将部署 Prometheus 服务器来收集这些指标。您可以从项目下载页面或GitHub发布页面下载 Prometheus 预编译的二进制文件。或者,您可以利用现有的Prometheus容器映像。
收集 Airflow 指标的最低 Prometheus 配置如下所示:
scrape_configs:
- job_name: airflow
static_configs:
- targets: ['localhost:9102']
它指示 Prometheus 服务器定期从端点 l ocalhost:9102 抓取指标。将上述配置保存为名为 prometheus.yml 的文件,并通过发出以下命令启动 Prometheus 服务器:
$ prometheus --config.file prometheus.yml
现在,您可以使用浏览器转到 Prometheus 内置仪表板(网址为http://localhost:9090/graph)并查看 Airflow 指标。
标签:statsd,airflow,部署,AUTH,exporter,LDAP,com From: https://www.cnblogs.com/zttong/p/17760156.html