首页 > 其他分享 >airflow DAG/PIPELINE examples reference

airflow DAG/PIPELINE examples reference

时间:2024-08-06 23:50:01浏览次数:28  
标签:taxi PIPELINE airflow s3 callable DAG data id conn

data-pipelines-with-apache-airflow

https://github.com/BasPH/data-pipelines-with-apache-airflow

Code for Data Pipelines with Apache Airflow

https://www.manning.com/books/data-pipelines-with-apache-airflow

A successful pipeline moves data efficiently, minimizing pauses and blockages between tasks, keeping every process along the way operational. Apache Airflow provides a single customizable environment for building and managing data pipelines, eliminating the need for a hodgepodge collection of tools, snowflake code, and homegrown processes. Using real-world scenarios and examples, Data Pipelines with Apache Airflow teaches you how to simplify and automate data pipelines, reduce operational overhead, and smoothly integrate all the technologies in your stack.

 

抓取网页保存

https://github.com/BasPH/data-pipelines-with-apache-airflow/blob/master/chapter14/dags/nyc_dag.py

import io
import json

import airflow.utils.dates
import geopandas
import pandas as pd
import requests
from airflow.hooks.base import BaseHook
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from minio import Minio
from nyctransport.operators.pandas_operator import PandasOperator
from nyctransport.operators.s3_to_postgres import MinioPandasToPostgres
from requests.auth import HTTPBasicAuth

dag = DAG(
    dag_id="nyc_dag",
    schedule_interval="*/15 * * * *",
    start_date=airflow.utils.dates.days_ago(1),
    catchup=False,
)


def _download_citi_bike_data(ts_nodash, **_):
    citibike_conn = BaseHook.get_connection(conn_id="citibike")

    url = f"http://{citibike_conn.host}:{citibike_conn.port}/recent/minute/15"
    response = requests.get(
        url, auth=HTTPBasicAuth(citibike_conn.login, citibike_conn.password)
    )
    data = response.json()

    s3_hook = S3Hook(aws_conn_id="s3")
    s3_hook.load_string(
        string_data=json.dumps(data),
        key=f"raw/citibike/{ts_nodash}.json",
        bucket_name="datalake",
    )


download_citi_bike_data = PythonOperator(
    task_id="download_citi_bike_data", python_callable=_download_citi_bike_data, dag=dag
)


def _download_taxi_data():
    taxi_conn = BaseHook.get_connection(conn_id="taxi")
    s3_hook = S3Hook(aws_conn_id="s3")

    url = f"http://{taxi_conn.host}"
    response = requests.get(url)
    files = response.json()

    exported_files = []
    for filename in [f["name"] for f in files]:
        response = requests.get(f"{url}/{filename}")
        s3_key = f"raw/taxi/{filename}"
        try:
            s3_hook.load_string(
                string_data=response.text, key=s3_key, bucket_name="datalake"
            )
            print(f"Uploaded {s3_key} to MinIO.")
            exported_files.append(s3_key)
        except ValueError:
            print(f"File {s3_key} already exists.")

    return exported_files


download_taxi_data = PythonOperator(
    task_id="download_taxi_data", python_callable=_download_taxi_data, dag=dag
)


def get_minio_object(
    pandas_read_callable, bucket, paths, pandas_read_callable_kwargs=None
):
    s3_conn = BaseHook.get_connection(conn_id="s3")
    minio_client = Minio(
        s3_conn.extra_dejson["host"].split("://")[1],
        access_key=s3_conn.extra_dejson["aws_access_key_id"],
        secret_key=s3_conn.extra_dejson["aws_secret_access_key"],
        secure=False,
    )

    if isinstance(paths, str):
        if paths.startswith("[") and paths.endswith("]"):
            paths = eval(paths)
        else:
            paths = [paths]

    if pandas_read_callable_kwargs is None:
        pandas_read_callable_kwargs = {}

    dfs = []
    for path in paths:
        minio_object = minio_client.get_object(bucket_name=bucket, object_name=path)
        df = pandas_read_callable(minio_object, **pandas_read_callable_kwargs)
        dfs.append(df)
    return pd.concat(dfs)


def transform_citi_bike_data(df):
    # Map citi bike lat,lon coordinates to taxi zone ids
    taxi_zones = geopandas.read_file(
        "https://s3.amazonaws.com/nyc-tlc/misc/taxi_zones.zip"
    ).to_crs("EPSG:4326")
    start_gdf = geopandas.GeoDataFrame(
        df,
        crs="EPSG:4326",
        geometry=geopandas.points_from_xy(
            df["start_station_longitude"], df["start_station_latitude"]
        ),
    )
    end_gdf = geopandas.GeoDataFrame(
        df,
        crs="EPSG:4326",
        geometry=geopandas.points_from_xy(
            df["end_station_longitude"], df["end_station_latitude"]
        ),
    )
    df_with_zones = geopandas.sjoin(
        start_gdf, taxi_zones, how="left", op="within"
    ).rename(columns={"LocationID": "start_location_id"})
    end_zones = geopandas.sjoin(end_gdf, taxi_zones, how="left", op="within")
    df_with_zones["end_location_id"] = end_zones["LocationID"]
    return df_with_zones[
        [
            "tripduration",
            "starttime",
            "start_location_id",
            "stoptime",
            "end_location_id",
        ]
    ]


def write_minio_object(
    df, pandas_write_callable, bucket, path, pandas_write_callable_kwargs=None
):
    s3_conn = BaseHook.get_connection(conn_id="s3")
    minio_client = Minio(
        s3_conn.extra_dejson["host"].split("://")[1],
        access_key=s3_conn.extra_dejson["aws_access_key_id"],
        secret_key=s3_conn.extra_dejson["aws_secret_access_key"],
        secure=False,
    )
    bytes_buffer = io.BytesIO()
    pandas_write_method = getattr(df, pandas_write_callable.__name__)
    pandas_write_method(bytes_buffer, **pandas_write_callable_kwargs)
    nbytes = bytes_buffer.tell()
    bytes_buffer.seek(0)
    minio_client.put_object(
        bucket_name=bucket, object_name=path, length=nbytes, data=bytes_buffer
    )


process_citi_bike_data = PandasOperator(
    task_id="process_citi_bike_data",
    input_callable=get_minio_object,
    input_callable_kwargs={
        "pandas_read_callable": pd.read_json,
        "bucket": "datalake",
        "paths": "raw/citibike/{{ ts_nodash }}.json",
    },
    transform_callable=transform_citi_bike_data,
    output_callable=write_minio_object,
    output_callable_kwargs={
        "bucket": "datalake",
        "path": "processed/citibike/{{ ts_nodash }}.parquet",
        "pandas_write_callable": pd.DataFrame.to_parquet,
        "pandas_write_callable_kwargs": {"engine": "auto"},
    },
    dag=dag,
)


def transform_taxi_data(df):
    df[["pickup_datetime", "dropoff_datetime"]] = df[
        ["pickup_datetime", "dropoff_datetime"]
    ].apply(pd.to_datetime)
    df["tripduration"] = (
        (df["dropoff_datetime"] - df["pickup_datetime"]).dt.total_seconds().astype(int)
    )
    df = df.rename(
        columns={
            "pickup_datetime": "starttime",
            "pickup_locationid": "start_location_id",
            "dropoff_datetime": "stoptime",
            "dropoff_locationid": "end_location_id",
        }
    ).drop(columns=["trip_distance"])
    return df


process_taxi_data = PandasOperator(
    task_id="process_taxi_data",
    input_callable=get_minio_object,
    input_callable_kwargs={
        "pandas_read_callable": pd.read_csv,
        "bucket": "datalake",
        "paths": "{{ ti.xcom_pull(task_ids='download_taxi_data') }}",
    },
    transform_callable=transform_taxi_data,
    output_callable=write_minio_object,
    output_callable_kwargs={
        "bucket": "datalake",
        "path": "processed/taxi/{{ ts_nodash }}.parquet",
        "pandas_write_callable": pd.DataFrame.to_parquet,
        "pandas_write_callable_kwargs": {"engine": "auto"},
    },
    dag=dag,
)

taxi_to_db = MinioPandasToPostgres(
    task_id="taxi_to_db",
    dag=dag,
    minio_conn_id="s3",
    minio_bucket="datalake",
    minio_key="processed/taxi/{{ ts_nodash }}.parquet",
    pandas_read_callable=pd.read_parquet,
    postgres_conn_id="result_db",
    postgres_table="taxi_rides",
    pre_read_transform=lambda x: io.BytesIO(x.data),
)

citi_bike_to_db = MinioPandasToPostgres(
    task_id="citi_bike_to_db",
    dag=dag,
    minio_conn_id="s3",
    minio_bucket="datalake",
    minio_key="processed/citibike/{{ ts_nodash }}.parquet",
    pandas_read_callable=pd.read_parquet,
    postgres_conn_id="result_db",
    postgres_table="citi_bike_rides",
    pre_read_transform=lambda x: io.BytesIO(x.data),
)

download_citi_bike_data >> process_citi_bike_data >> citi_bike_to_db
download_taxi_data >> process_taxi_data >> taxi_to_db

 

awesome-apache-airflow

https://github.com/jghoman/awesome-apache-airflow

 

use-case-airflow-llm-rag-finance

抓网页,解析文档, 存到向量数据库。

https://github.com/fanqingsong/use-case-airflow-llm-rag-finance

LLMOps: Automatic retrieval-augmented generation with Airflow, GPT-4 and Weaviate

This repository contains the DAG code used in the LLMOps: Automatic retrieval-augmented generation with Airflow, GPT-4 and Weaviate use case. The pipeline was modelled after the Ask Astro reference architecture.

The DAGs in this repository use the following tools:

 

标签:taxi,PIPELINE,airflow,s3,callable,DAG,data,id,conn
From: https://www.cnblogs.com/lightsong/p/18346201

相关文章

  • 一文带你玩转全新采集配置 CRD:AliyunPipelineConfig
    作者:玄飏既然是一文玩转,自然要讲些背景1.1. 什么是 iLogtail 采集配置长话短说:SLS:阿里云日志服务,一站式提供数据采集、加工、查询与分析、可视化、告警、消费与投递等功能,全面提升您在研发、运维、运营、安全等场景的数字化能力。iLogtail:SLS推出的一款可观测数据采集器......
  • Python_DAG-有向无环图-igraph
    DAG-有向无环图-igraph安装pipinstallpython-igraphpipinstallpycairopiplist发现Python安装的有igraph包有两个:igraph、python-igraph有向图 有向图(Digraph)是图论中的一种图结构,其中的边(弧)具有方向性,表明从一个节点(顶点)到另一个节点的单向关系。与无向图不同,无向......
  • Airflow vs. Luigi vs. Argo vs. MLFlow vs. KubeFlow
    Airflowvs.Luigivs.Argovs.MLFlowvs.KubeFlowhttps://www.datarevenue.com/en-blog/airflow-vs-luigi-vs-argo-vs-mlflow-vs-kubeflow Airflowisthemostpopularsolution,followedbyLuigi.Therearenewercontenderstoo,andthey’reallgrowingfast......
  • iree 编译流程(2)——buildGlobalOptimizationPassPipeline
    buildGlobalOptimizationPassPipelineIREE::Util::createSimplifyGlobalAccessesPass这个pass主要做这几件事:将不可变globaltensor的load提前到了block的开头,将globaltensor的store安全地挪到block的结尾。进行以下化简:如果loadafterstore,则把load......
  • 我想参数化我的 pipeline.yaml 文件,但 Ploomber 无法读取我的 env.yaml 文件
    我使用诗歌。它在目录中。然后创建了一个子目录(名为“src”),Ploomber在该子目录中正常工作,加载正确的文件。但是当我在与第一个子目录(“src”相同的级别)创建第二个子目录(名为“src_common”)时"),我遇到了问题:ploomber无法正确加载第二个子目录(“src_common”)中的参数......
  • jenkins pipeline 发布 jar并运行
    废话不多说,上教程 一,配置publishssh 二,配置sshserver记住Name 三设置发布选项1设置模块多选(请安装多选插件:ActiveChoicesPlug-in)  四编写pipeline说明:1modules.split(',')中的modules就是上面多选框的Name2详细解说:execCommand:'cd/java;p......
  • Jenkins环境变量与构建工具 (pipeline)
     Jenkins内置变量pipeline{agentanystages{stage('mcwtest'){steps{echo"Running${env.BUILD_NUMBER}on${env.JENKINS_URL}"//方法一echo"Running$env.BUILD_NUMBERo......
  • 当我尝试在 flink 集群上运行 Beam Pipeline 时,为什么会出现 ERROR:root:java.lang.Nu
    我正在尝试在本地托管的Flink集群上运行一个简单的Beam管道,但在执行此操作时遇到错误。我已经尝试了在互联网上可以找到的所有内容。importapache_beamasbeamfromapache_beam.ioimportReadFromTextfromapache_beam.ioimportWriteToTextfromapache_beam.option......
  • 如何修复我的 Python Azure Function DevOps Pipeline 上的“找到 1 个函数(自定义)加载
    我正在尝试使用AzureDevOps构建管道将PythonAzureFunction部署到Azure门户。由于某种原因,代码被部署到服务器,但我在尝试访问端点时收到404错误。我收到一个错误,显示1functionsfound(Custom)0functionsloaded,以及在服务器上显示ModuleNotFound......
  • Redis中pipeline(管道)详解
    redis管道pipeline举个例子:小卖铺免费让你拿50瓶饮料,你是一次拿一瓶拿回家,还是打包一次或者多次拿回家?概念Redis管道(pipelining)是一种在客户端向服务端发送多个请求而不等待响应的技术。它可以显著提高Redis应用程序的性能。管道的主要思想是客户端向服务端发送多个请求......