首页 > 编程问答 >在此出现导入错误,不确定是什么损坏了

在此出现导入错误,不确定是什么损坏了

时间:2024-08-09 15:28:39浏览次数:14  
标签:python

`

import time
import smtplib
from email.mime.text import MIMEText
from google.auth.transport.requests import AuthorizedSession
from google.oauth2.service_account import Credentials
import yaml

def load_config(config_path='config.yaml'):
    with open(config_path, 'r') as file:
        return yaml.safe_load(file)

def get_service_account_credentials(credentials_path):
    credentials = Credentials.from_service_account_file(
        credentials_path,
        scopes=["https://www.googleapis.com/auth/cloud-platform"]
    )
    return credentials

def create_authorized_session(credentials):
    return AuthorizedSession(credentials)

def get_airflow_dags(composer_env_name, project_id, region, session):
    airflow_url = (
        f"https://{region}-composer.googleapis.com/v1/projects/{project_id}/locations/{region}"
        f"/environments/{composer_env_name}/dagRuns"
    )
    response = session.get(airflow_url)
    response.raise_for_status()
    return response.json()

def get_task_status(dag_run, composer_env_name, project_id, region, session):
    dag_id = dag_run['dag_id']
    run_id = dag_run['run_id']
    tasks_url = (
        f"https://{region}-composer.googleapis.com/v1/projects/{project_id}/locations/{region}"
        f"/environments/{composer_env_name}/dagRuns/{dag_id}/{run_id}/tasks"
    )
    response = session.get(tasks_url)
    response.raise_for_status()
    return response.json()

def should_alert_task(task, dag_run, monitored_tasks):
    task_id = (dag_run['dag_id'], task['task_id'], dag_run['run_id'], dag_run['execution_date'])
    if task_id not in monitored_tasks:
        monitored_tasks.add(task_id)
        return True
    return False

def send_task_alert(task, dag_run, config):
    task_details = {
        "dag_id": dag_run['dag_id'],
        "task_id": task['task_id'],
        "state": task['state'],
        "execution_date": dag_run['execution_date'],
        "log_url": task.get('log_url')
    }
    html_content = f"""
    <html>
    <body>
        <h2>Airflow Task {task['state'].capitalize()} Alert</h2>
        <p>Task <strong>{task['task_id']}</strong> in DAG <strong>{dag_run['dag_id']}</strong> is in a <strong>{task['state']}</strong> state.</p>
        <p>Execution Date: {dag_run['execution_date']}</p>
        <p>Logs: <a href="{task.get('log_url')}">View Logs</a></p>
    </body>
    </html>
    """
    for recipient in config['monitoring']['email_recipients']:
        send_email(f"Airflow Task {task['state'].capitalize()}: {task['task_id']}", recipient, html_content, config)

def send_email(subject, recipient, html_content, config):
    msg = MIMEText(html_content, 'html')
    msg['From'] = config['email']['smtp_username']
    msg['To'] = recipient
    msg['Subject'] = subject

    with smtplib.SMTP(config['email']['smtp_server'], config['email']['smtp_port']) as server:
        server.starttls()
        server.login(config['email']['smtp_username'], config['email']['smtp_password'])
        server.sendmail(msg['From'], recipient, msg.as_string())

def monitor_airflow(credentials_path, composer_env_name, project_id, region, config):
    credentials = get_service_account_credentials(credentials_path)
    session = create_authorized_session(credentials)
    monitored_tasks = set()

    while True:
        dags = get_airflow_dags(composer_env_name, project_id, region, session)
        for dag_run in dags.get('dag_runs', []):
            tasks = get_task_status(dag_run, composer_env_name, project_id, region, session)
            for task in tasks.get('tasks', []):
                if task['state'] in ['failed', 'retry']:
                    if should_alert_task(task, dag_run, monitored_tasks):
                        send_task_alert(task, dag_run, config)
        time.sleep(config['monitoring']['interval_seconds'])

def main():
    config = load_config()
    credentials_path = config['gcp']['credentials_path']
    composer_env_name = config['gcp']['composer_env_name']
import time
from datetime import datetime, timezone
from google.auth.transport.requests import AuthorizedSession
from google.oauth2.service_account import Credentials
import yaml

def load_config(config_path='config.yaml'):
    with open(config_path, 'r') as file:
        return yaml.safe_load(file)

def get_service_account_credentials(credentials_path):
    credentials = Credentials.from_service_account_file(
        credentials_path,
        scopes=["https://www.googleapis.com/auth/cloud-platform"]
    )
    return credentials

def create_authorized_session(credentials):
    return AuthorizedSession(credentials)

def get_all_dags(session, airflow_url, today_str):
    response = session.get(f"{airflow_url}/dags?execution_date_gte={today_str}")
    response.raise_for_status()
    return response.json()

def get_dag_runs(session, airflow_url, dag_id, today_str):
    response = session.get(f"{airflow_url}/dags/{dag_id}/dagRuns?execution_date_gte={today_str}")
    response.raise_for_status()
    return response.json()

def get_task_instances(session, airflow_url, dag_id, dag_run_id, today_str):
    response = session.get(f"{airflow_url}/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances?execution_date_gte={today_str}")
    response.raise_for_status()
    return response.json()

def should_alert_task(task, dag_run, monitored_tasks):
    task_id = (dag_run['dag_id'], task['task_id'], dag_run['run_id'], dag_run['execution_date'])
    if task_id not in monitored_tasks:
        monitored_tasks.add(task_id)
        return True
    return False

def monitor_airflow(credentials_path, airflow_url, config):
    credentials = get_service_account_credentials(credentials_path)
    session = create_authorized_session(credentials)
    monitored_tasks = set()
    
    today_str = datetime.now(timezone.utc).strftime("%Y-%m-%dT00:00:00Z")
    
    while True:
        dags = get_all_dags(session, airflow_url, today_str)
        for dag in dags['dags']:
            dag_id = dag['dag_id']
            dag_runs = get_dag_runs(session, airflow_url, dag_id, today_str)
            for dag_run in dag_runs['dag_runs']:
                task_instances = get_task_instances(session, airflow_url, dag_id, dag_run['dag_run_id'], today_str)
                for task in task_instances['task_instances']:
                    if task['state'] in ['failed', 'up_for_retry']:
                        if should_alert_task(task, dag_run, monitored_tasks):
                            monitored_tasks.add((task['task_id'], task['state'], task['execution_date'], task.get('log_url')))
        time.sleep(config['monitoring']['interval_seconds'])

def main():
    config = load_config()
    credentials_path = config['gcp']['credentials_path']
    airflow_url = config['gcp']['airflow_url']

    monitor_airflow(credentials_path, airflow_url, config)

if __name__ == "__main__":
    main()

我希望它能够顺利运行并消除导入错误。自从我添加了 gcp 逻辑以来,这种情况就不断发生,我希望它能够顺利运行并消除导入错误。自从我添加 gcp 逻辑以来,这种情况不断发生 我希望它能够顺利运行并消除导入错误。自从我添加 gcp 逻辑以来,这种情况不断发生 我希望它能够顺利运行并消除导入错误。自从我添加了 gcp 逻辑以来,这种情况就不断发生,我希望它能够顺利运行并消除导入错误。自从我添加了 gcp 逻辑以来,这种情况就不断发生,我希望它能够顺利运行并消除导入错误。自从我添加 gcp 逻辑以来,这种情况不断发生 我希望它能够顺利运行并消除导入错误。自从我添加 gcp 逻辑以来,这种情况不断发生 我希望它能够顺利运行并消除导入错误。自从我添加 gcp 逻辑以来,这种情况不断发生


导入错误是由代码意外中断引起的,从而导致不完整的 main 函数定义。

以下是经过更正的代码:

import time
import smtplib
from email.mime.text import MIMEText
from google.auth.transport.requests import AuthorizedSession
from google.oauth2.service_account import Credentials
import yaml

def load_config(config_path='config.yaml'):
    with open(config_path, 'r') as file:
        return yaml.safe_load(file)

def get_service_account_credentials(credentials_path):
    credentials = Credentials.from_service_account_file(
        credentials_path,
        scopes=["https://www.googleapis.com/auth/cloud-platform"]
    )
    return credentials

def create_authorized_session(credentials):
    return AuthorizedSession(credentials)

def get_airflow_dags(composer_env_name, project_id, region, session):
    airflow_url = (
        f"https://{region}-composer.googleapis.com/v1/projects/{project_id}/locations/{region}"
        f"/environments/{composer_env_name}/dagRuns"
    )
    response = session.get(airflow_url)
    response.raise_for_status()
    return response.json()

def get_task_status(dag_run, composer_env_name, project_id, region, session):
    dag_id = dag_run['dag_id']
    run_id = dag_run['run_id']
    tasks_url = (
        f"https://{region}-composer.googleapis.com/v1/projects/{project_id}/locations/{region}"
        f"/environments/{composer_env_name}/dagRuns/{dag_id}/{run_id}/tasks"
    )
    response = session.get(tasks_url)
    response.raise_for_status()
    return response.json()

def should_alert_task(task, dag_run, monitored_tasks):
    task_id = (dag_run['dag_id'], task['task_id'], dag_run['run_id'], dag_run['execution_date'])
    if task_id not in monitored_tasks:
        monitored_tasks.add(task_id)
        return True
    return False

def send_task_alert(task, dag_run, config):
    task_details = {
        "dag_id": dag_run['dag_id'],
        "task_id": task['task_id'],
        "state": task['state'],
        "execution_date": dag_run['execution_date'],
        "log_url": task.get('log_url')
    }
    html_content = f"""
    &lt;html&gt;
    &lt;body&gt;
        &lt;h2&gt;Airflow Task {task['state'].capitalize()} Alert&lt;/h2&gt;
        &lt;p&gt;Task &lt;strong&gt;{task['task_id']}&lt;/strong&gt; in DAG &lt;strong&gt;{dag_run['dag_id']}&lt;/strong&gt; is in a &lt;strong&gt;{task['state']}&lt;/strong&gt; state.&lt;/p&gt;
        &lt;p&gt;Execution Date: {dag_run['execution_date']}&lt;/p&gt;
        &lt;p&gt;Logs: &lt;a href="{task.get('log_url')}"&gt;View Logs&lt;/a&gt;&lt;/p&gt;
    &lt;/body&gt;
    &lt;/html&gt;
    """
    for recipient in config['monitoring']['email_recipients']:
        send_email(f"Airflow Task {task['state'].capitalize()}: {task['task_id']}", recipient, html_content, config)

def send_email(subject, recipient, html_content, config):
    msg = MIMEText(html_content, 'html')
    msg['From'] = config['email']['smtp_username']
    msg['To'] = recipient
    msg['Subject'] = subject

    with smtplib.SMTP(config['email']['smtp_server'], config['email']['smtp_port']) as server:
        server.starttls()
        server.login(config['email']['smtp_username'], config['email']['smtp_password'])
        server.sendmail(msg['From'], recipient, msg.as_string())

def monitor_airflow(credentials_path, composer_env_name, project_id, region, config):
    credentials = get_service_account_credentials(credentials_path)
    session = create_authorized_session(credentials)
    monitored_tasks = set()

    while True:
        dags = get_airflow_dags(composer_env_name, project_id, region, session)
        for dag_run in dags.get('dag_runs', []):
            tasks = get_task_status(dag_run, composer_env_name, project_id, region, session)
            for task in tasks.get('tasks', []):
                if task['state'] in ['failed', 'retry']:
                    if should_alert_task(task, dag_run, monitored_tasks):
                        send_task_alert(task, dag_run, config)
        time.sleep(config['monitoring']['interval_seconds'])

def main():
    config = load_config()
    credentials_path = config['gcp']['credentials_path']
    composer_env_name = config['gcp']['composer_env_name']
    project_id = config['gcp']['project_id']
    region = config['gcp']['region']
    monitor_airflow(credentials_path, composer_env_name, project_id, region, config)

if __name__ == "__main__":
    main()

所做的更改是:

  1. 修复了 main 函数定义: 添加了缺少的 project_id region 变量的初始化,并将对 monitor_airflow 的调用移至 main 函数内。

通过此更改,代码现在应正确运行,而不会出现任何导入错误。

标签:python
From: 78850623

相关文章

  • 为什么 True/False 在 Python 中要大写?
    所有成员都是骆驼案吧?为什么True/False而不是true/false,哪个更宽松?在Python中,True和False使用大写而不是小写,这与语言的禅宗(设计原则)和它们在Python中的特殊地位有关。让我们来分析一下:1.特殊地位:在Python中,True和False......
  • 如何使用 Python 从 Excel 工作表中读取正斜杠
    我有20多列的Excel工作表,如果我想选择不包含文本n/a的行,是否可以将n/a作为文本传递并找到它?我尝试过的代码是,''''将pandas导入为pd进口重新导入操作系统defextract_data(input_file):#读取输入的Excel文件df=pd.read_excel(input_file)#Checkif'......
  • 零基础学习人工智能—Python—Pytorch学习(三)
    前言这篇文章主要两个内容。一,把上一篇关于requires_grad的内容补充一下。二,介绍一下线性回归。关闭张量计算关闭张量计算。这个相对简单,阅读下面代码即可。print("============关闭require_grad==============")x=torch.randn(3,requires_grad=True)print(x)x.requir......
  • python配置pip镜像
    Python配置pip的镜像国内的网络通过pip下载软件包只有不到10k的下载速度。不仅下载的慢,还容易引发超时错误,导致下载失败。而将给pip配置国内的镜像源可以完美的解决这个问题。本文讲解了pip在windows和macos/linux的配置过程(在windows实操成功)一、找到配置文件1.windows在......
  • Pytorch深度学习入门基础(三):python 加载数据初认识
    目录 一、 导入二、数据集中数据和label的组成形式三、Dataset读入数据四、Dataset类代码实战4.1创建函数4.2  设置初始化函数4.3读取每一个图片4.4设置获取数据长度函数4.5创建实例4.5.1单个图片数据集4.5.2 多个图片数据集    现在来开......
  • python多版本共存和虚拟环境
    多版本共存1.调用方式"py-3.10",即可使用对应版本的python虚拟环境1.vscode底部切到CMD,敲"py-3.8-mvenv.venv"(虚拟环境文件夹名,通常用".venv")2.点击vscode右下角,选择虚拟环境作为解释器注:以"."开头的文件在计算机系统中通常被称为隐藏文件。这些文件在许多操作系统中默认是不......
  • Python练习:数据类型篇
    一、逻辑推理练习  1、在不运行下面程序的前提下,说出答案。 1.4.0==42."4.0"==43.bool("1")4.bool("0")5.str(32)6.int(6.26)7.float(32)8.float("3.21")9.int("434")10.int("3.42")11.bool(-1)12......
  • MySQL——使用Python操作MySQL
    文章目录安装PyMySQL使用PyMySQL操作MySQL在Python中操作MySQL数据库时,我们使用较多的库是PyMySQL,如果你选择使用PyMySQL库,那么首先需要通过pip安装它。pipinstallpymysql命令就是用来安装PyMySQL的。安装PyMySQL1.打开你的命令行工具(如cmd、PowerShell、......
  • Python按条件删除Excel表格数据的方法
      本文介绍基于Python语言,读取Excel表格文件,基于我们给定的规则,对其中的数据加以筛选,将不在指定数据范围内的数据剔除,保留符合我们需要的数据的方法。  首先,我们来明确一下本文的具体需求。现有一个Excel表格文件(在本文中我们就以.csv格式的文件为例),如下图所示。  其中,Ex......
  • Python 提取出SQL语句中Where的值的方法
    1.方法一:使用sqlparse库的方法为了提取SQL语句中WHERE子句的值,我们可以利用Python的sqlparse库,这是一个专门用于解析SQL语句的库。以下是一个示例代码,演示如何使用sqlparse来提取WHERE子句中的条件。首先,确保安装了sqlparse库。如果未安装,可以使用pip安装:bash复制代码pipins......