`
import time
import smtplib
from email.mime.text import MIMEText
from google.auth.transport.requests import AuthorizedSession
from google.oauth2.service_account import Credentials
import yaml
def load_config(config_path='config.yaml'):
with open(config_path, 'r') as file:
return yaml.safe_load(file)
def get_service_account_credentials(credentials_path):
credentials = Credentials.from_service_account_file(
credentials_path,
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
return credentials
def create_authorized_session(credentials):
return AuthorizedSession(credentials)
def get_airflow_dags(composer_env_name, project_id, region, session):
airflow_url = (
f"https://{region}-composer.googleapis.com/v1/projects/{project_id}/locations/{region}"
f"/environments/{composer_env_name}/dagRuns"
)
response = session.get(airflow_url)
response.raise_for_status()
return response.json()
def get_task_status(dag_run, composer_env_name, project_id, region, session):
dag_id = dag_run['dag_id']
run_id = dag_run['run_id']
tasks_url = (
f"https://{region}-composer.googleapis.com/v1/projects/{project_id}/locations/{region}"
f"/environments/{composer_env_name}/dagRuns/{dag_id}/{run_id}/tasks"
)
response = session.get(tasks_url)
response.raise_for_status()
return response.json()
def should_alert_task(task, dag_run, monitored_tasks):
task_id = (dag_run['dag_id'], task['task_id'], dag_run['run_id'], dag_run['execution_date'])
if task_id not in monitored_tasks:
monitored_tasks.add(task_id)
return True
return False
def send_task_alert(task, dag_run, config):
task_details = {
"dag_id": dag_run['dag_id'],
"task_id": task['task_id'],
"state": task['state'],
"execution_date": dag_run['execution_date'],
"log_url": task.get('log_url')
}
html_content = f"""
<html>
<body>
<h2>Airflow Task {task['state'].capitalize()} Alert</h2>
<p>Task <strong>{task['task_id']}</strong> in DAG <strong>{dag_run['dag_id']}</strong> is in a <strong>{task['state']}</strong> state.</p>
<p>Execution Date: {dag_run['execution_date']}</p>
<p>Logs: <a href="{task.get('log_url')}">View Logs</a></p>
</body>
</html>
"""
for recipient in config['monitoring']['email_recipients']:
send_email(f"Airflow Task {task['state'].capitalize()}: {task['task_id']}", recipient, html_content, config)
def send_email(subject, recipient, html_content, config):
msg = MIMEText(html_content, 'html')
msg['From'] = config['email']['smtp_username']
msg['To'] = recipient
msg['Subject'] = subject
with smtplib.SMTP(config['email']['smtp_server'], config['email']['smtp_port']) as server:
server.starttls()
server.login(config['email']['smtp_username'], config['email']['smtp_password'])
server.sendmail(msg['From'], recipient, msg.as_string())
def monitor_airflow(credentials_path, composer_env_name, project_id, region, config):
credentials = get_service_account_credentials(credentials_path)
session = create_authorized_session(credentials)
monitored_tasks = set()
while True:
dags = get_airflow_dags(composer_env_name, project_id, region, session)
for dag_run in dags.get('dag_runs', []):
tasks = get_task_status(dag_run, composer_env_name, project_id, region, session)
for task in tasks.get('tasks', []):
if task['state'] in ['failed', 'retry']:
if should_alert_task(task, dag_run, monitored_tasks):
send_task_alert(task, dag_run, config)
time.sleep(config['monitoring']['interval_seconds'])
def main():
config = load_config()
credentials_path = config['gcp']['credentials_path']
composer_env_name = config['gcp']['composer_env_name']
import time
from datetime import datetime, timezone
from google.auth.transport.requests import AuthorizedSession
from google.oauth2.service_account import Credentials
import yaml
def load_config(config_path='config.yaml'):
with open(config_path, 'r') as file:
return yaml.safe_load(file)
def get_service_account_credentials(credentials_path):
credentials = Credentials.from_service_account_file(
credentials_path,
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
return credentials
def create_authorized_session(credentials):
return AuthorizedSession(credentials)
def get_all_dags(session, airflow_url, today_str):
response = session.get(f"{airflow_url}/dags?execution_date_gte={today_str}")
response.raise_for_status()
return response.json()
def get_dag_runs(session, airflow_url, dag_id, today_str):
response = session.get(f"{airflow_url}/dags/{dag_id}/dagRuns?execution_date_gte={today_str}")
response.raise_for_status()
return response.json()
def get_task_instances(session, airflow_url, dag_id, dag_run_id, today_str):
response = session.get(f"{airflow_url}/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances?execution_date_gte={today_str}")
response.raise_for_status()
return response.json()
def should_alert_task(task, dag_run, monitored_tasks):
task_id = (dag_run['dag_id'], task['task_id'], dag_run['run_id'], dag_run['execution_date'])
if task_id not in monitored_tasks:
monitored_tasks.add(task_id)
return True
return False
def monitor_airflow(credentials_path, airflow_url, config):
credentials = get_service_account_credentials(credentials_path)
session = create_authorized_session(credentials)
monitored_tasks = set()
today_str = datetime.now(timezone.utc).strftime("%Y-%m-%dT00:00:00Z")
while True:
dags = get_all_dags(session, airflow_url, today_str)
for dag in dags['dags']:
dag_id = dag['dag_id']
dag_runs = get_dag_runs(session, airflow_url, dag_id, today_str)
for dag_run in dag_runs['dag_runs']:
task_instances = get_task_instances(session, airflow_url, dag_id, dag_run['dag_run_id'], today_str)
for task in task_instances['task_instances']:
if task['state'] in ['failed', 'up_for_retry']:
if should_alert_task(task, dag_run, monitored_tasks):
monitored_tasks.add((task['task_id'], task['state'], task['execution_date'], task.get('log_url')))
time.sleep(config['monitoring']['interval_seconds'])
def main():
config = load_config()
credentials_path = config['gcp']['credentials_path']
airflow_url = config['gcp']['airflow_url']
monitor_airflow(credentials_path, airflow_url, config)
if __name__ == "__main__":
main()
我希望它能够顺利运行并消除导入错误。自从我添加了 gcp 逻辑以来,这种情况就不断发生,我希望它能够顺利运行并消除导入错误。自从我添加 gcp 逻辑以来,这种情况不断发生 我希望它能够顺利运行并消除导入错误。自从我添加 gcp 逻辑以来,这种情况不断发生 我希望它能够顺利运行并消除导入错误。自从我添加了 gcp 逻辑以来,这种情况就不断发生,我希望它能够顺利运行并消除导入错误。自从我添加了 gcp 逻辑以来,这种情况就不断发生,我希望它能够顺利运行并消除导入错误。自从我添加 gcp 逻辑以来,这种情况不断发生 我希望它能够顺利运行并消除导入错误。自从我添加 gcp 逻辑以来,这种情况不断发生 我希望它能够顺利运行并消除导入错误。自从我添加 gcp 逻辑以来,这种情况不断发生
导入错误是由代码意外中断引起的,从而导致不完整的
main
函数定义。
以下是经过更正的代码:
import time
import smtplib
from email.mime.text import MIMEText
from google.auth.transport.requests import AuthorizedSession
from google.oauth2.service_account import Credentials
import yaml
def load_config(config_path='config.yaml'):
with open(config_path, 'r') as file:
return yaml.safe_load(file)
def get_service_account_credentials(credentials_path):
credentials = Credentials.from_service_account_file(
credentials_path,
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
return credentials
def create_authorized_session(credentials):
return AuthorizedSession(credentials)
def get_airflow_dags(composer_env_name, project_id, region, session):
airflow_url = (
f"https://{region}-composer.googleapis.com/v1/projects/{project_id}/locations/{region}"
f"/environments/{composer_env_name}/dagRuns"
)
response = session.get(airflow_url)
response.raise_for_status()
return response.json()
def get_task_status(dag_run, composer_env_name, project_id, region, session):
dag_id = dag_run['dag_id']
run_id = dag_run['run_id']
tasks_url = (
f"https://{region}-composer.googleapis.com/v1/projects/{project_id}/locations/{region}"
f"/environments/{composer_env_name}/dagRuns/{dag_id}/{run_id}/tasks"
)
response = session.get(tasks_url)
response.raise_for_status()
return response.json()
def should_alert_task(task, dag_run, monitored_tasks):
task_id = (dag_run['dag_id'], task['task_id'], dag_run['run_id'], dag_run['execution_date'])
if task_id not in monitored_tasks:
monitored_tasks.add(task_id)
return True
return False
def send_task_alert(task, dag_run, config):
task_details = {
"dag_id": dag_run['dag_id'],
"task_id": task['task_id'],
"state": task['state'],
"execution_date": dag_run['execution_date'],
"log_url": task.get('log_url')
}
html_content = f"""
<html>
<body>
<h2>Airflow Task {task['state'].capitalize()} Alert</h2>
<p>Task <strong>{task['task_id']}</strong> in DAG <strong>{dag_run['dag_id']}</strong> is in a <strong>{task['state']}</strong> state.</p>
<p>Execution Date: {dag_run['execution_date']}</p>
<p>Logs: <a href="{task.get('log_url')}">View Logs</a></p>
</body>
</html>
"""
for recipient in config['monitoring']['email_recipients']:
send_email(f"Airflow Task {task['state'].capitalize()}: {task['task_id']}", recipient, html_content, config)
def send_email(subject, recipient, html_content, config):
msg = MIMEText(html_content, 'html')
msg['From'] = config['email']['smtp_username']
msg['To'] = recipient
msg['Subject'] = subject
with smtplib.SMTP(config['email']['smtp_server'], config['email']['smtp_port']) as server:
server.starttls()
server.login(config['email']['smtp_username'], config['email']['smtp_password'])
server.sendmail(msg['From'], recipient, msg.as_string())
def monitor_airflow(credentials_path, composer_env_name, project_id, region, config):
credentials = get_service_account_credentials(credentials_path)
session = create_authorized_session(credentials)
monitored_tasks = set()
while True:
dags = get_airflow_dags(composer_env_name, project_id, region, session)
for dag_run in dags.get('dag_runs', []):
tasks = get_task_status(dag_run, composer_env_name, project_id, region, session)
for task in tasks.get('tasks', []):
if task['state'] in ['failed', 'retry']:
if should_alert_task(task, dag_run, monitored_tasks):
send_task_alert(task, dag_run, config)
time.sleep(config['monitoring']['interval_seconds'])
def main():
config = load_config()
credentials_path = config['gcp']['credentials_path']
composer_env_name = config['gcp']['composer_env_name']
project_id = config['gcp']['project_id']
region = config['gcp']['region']
monitor_airflow(credentials_path, composer_env_name, project_id, region, config)
if __name__ == "__main__":
main()
所做的更改是:
-
修复了
main
函数定义: 添加了缺少的project_id
和region
变量的初始化,并将对monitor_airflow
的调用移至main
函数内。
通过此更改,代码现在应正确运行,而不会出现任何导入错误。
标签:python From: 78850623