开发自动化云 ELT 管道
寻求体现数据驱动的方法始于获取数据。该难题的一个基础部分是开发自动化 ELT 管道,以正确获取和交付必要的数据。
ELT , 乙 X 道 ** 大号** 负载 ** 吨** ransform 是一种通用架构方法,旨在处理大数据并最大限度地利用现代廉价云存储选项。
这个介绍性博客将重点介绍一种 ELT 管道方法,利用 Python 和 Azure 数据堆栈 .
比赛计划: 我们将从 API 查询数据,将其加载并写入存储,然后自动化该过程以按计划运行。
ELT Architecture
对于这个演示,我们将使用 CoinMarketCap API 因其成熟的文档和丰富的可访问数据。
我们的设置包括两个从 CoinMarketCap API 提取的 python 脚本。我们的第一个 python 脚本根据市值提取目前排名前 100 的代币。
然后,我们的第二个脚本从第一个查询中获取这 100 个条目,并执行另一个 API 调用来提取有关硬币的更详细信息。
我们的第二个脚本还充当一个编排脚本,运行包含我们的凭据的配置文件以及脚本 1。
脚本 1 ,CallandWrite。我们进行 API 调用,指定我们希望返回的记录数以及 api_key 以验证我们的请求。
_#API 调用 CoinMarketCap_
**定义** api_mapping_call(number_of_entries, api_key):
"""对 CMC API 进行 api 调用并提取指定条目数的基本硬币信息。
返回字典。"""
网址 **=** 'https://pro-api.coinmarketcap.com/v1/cryptocurrency/map'
参数 **=** {
'listing_status':'活动',
'限制':number_of_entries,
“排序”:“cmc_rank”
}
标题 **=** {
'接受':'应用程序/json',
'X-CMC_PRO_API_KEY': api_key
}
会议 **=** 会议()
会议 **.** 标题 **.** 更新(标题)
回复 **=** 会议 **.** 获取(网址,参数 **=** 参数)
数据 **=** json **.** 负载(响应 **.** 文本)
**返回** 数据
初始查询以 JSON 格式返回数据,这对于 API 很常见。我们将分解我们想要的列,重命名一些列,为数据集添加一个文件名,然后在我们摄取这些数据时添加一个时间戳。我们可以将这些操作总结为我们的“原始”数据摄取。
最佳做法是从存储中读取新写入的文件,但是在此演示中,我将使用我们写入存储的数据帧。
**从** azure.storage.filedatalake **进口** 数据湖服务客户端
storage_account_name **=** storage_account_name
storage_account_key **=** storage_account_key
容器名称 **=** “硬币清单”
目录名 **=** “生的”
**定义** write_to_storage(storage_account_name,storage_account_key, container_name, directory_name,dataset, pandas_dataframe, file_name):
"""将数据帧写入存储的函数。指定存储帐户名称、存储帐户密钥、容器名称、目录名称和文件名。
该函数将检查容器是否已经存在,如果不存在,它将创建一个新容器。如果已经存在,它会将文件写入指定的容器。
数据集必须保存为指定的文件类型(csv、txt、parquet)
DataFrame 必须是 Pandas DF,将从函数返回
文件名必须以特定文件类型(csv、txt、parquet)结尾。"""
_#函数运行时的时间戳_
注入日期 **=** 现在 **.** strftime("%m/%d/%Y %H:%M:%S")
_#从日期时间中删除空格,反斜杠指示带有 ADLS 的新文件夹结构_
trimmed_injest_Date **=** 注入日期 **.** 代替( ” ”, ”_”) **.** 代替(' ', '_') **.** 代替(”/”,”_”)
_#将给定的文件名拆分为文件标题和文件类型_
文件名标题,文件类型 **=** 文件名 **.** 分裂('。')
_#created 存储文件,因为它将出现在 ADLS_
存储文件 **=** ("{}_{}.{}") **.** 格式(file_name_title,trimmed_injest_Date,file_type)
服务客户端 **=** DataLakeServiceClient(account_url **=** “{}://{}.dfs.core.windows.net” **.** 格式(
"https", storage_account_name), 凭证 **=** storage_account_key)
**尝试** :
文件系统客户端 **=** 服务客户端 **.** 创建文件系统(文件系统 **=** 容器名称)
dir_client **=** 文件系统客户端 **.** 获取目录客户端(目录名称)
dir_client **.** 创建目录()
_#将数据设置为适当的数据框_
文件 **=** 数据集
文件客户端 **=** dir_client **.** 创建文件(存储文件)
文件客户端 **.** append_data(文件,0,len(文件))
文件客户端 **.** 刷新数据(len(文件))
**除了** :
_#ResourceAlreadyExists_
文件系统客户端 **=** 服务客户端 **.** 获取文件系统客户端(文件系统 **=** 容器名称)
dir_client **=** 文件系统客户端 **.** 获取目录客户端(目录名称)
dir_client **.** 创建目录()
_#将数据设置为适当的数据框_
文件 **=** 数据集
文件客户端 **=** dir_client **.** 创建文件(存储文件)
文件客户端 **.** append_data(文件,0,len(文件))
文件客户端 **.** 刷新数据(len(文件))
**返回** pandas_dataframe
我们将通过选择所需的列来结束此脚本,然后将数据集作为 parquet 文件写入存储,为我们留下当前市值排名前 100 的代币。
继续前进 脚本 2 , 硬币查询。在这个脚本中,我们利用我们在前一个脚本中获得的前 100 个硬币,并执行另一个 API 调用,返回每个条目的辅助信息。
这里 API 调用的棘手部分是我们每分钟最多只能提交 30 个代码,所以我们需要分解我们的请求。
过程:隔离代码,将代码分成 25 个组,提交批处理并将 JSON 转换为数据框, 让一分钟过去 ,请求下一组 25 并将该数据帧附加到原始数据帧......重复直到所有代码都提交。此过程通常需要 5 分钟才能执行,请求之间有 1 分钟的暂停。
**进口** 时间
**定义** get_coin_info_breakup(list_of_coins_, api_key):
"""函数对硬币列表中的每个硬币 ID 进行 API 调用。
旨在从前 100 名硬币中获取列表并获取其最新价格信息。 """
_#创建一个空数据框,我们也将附加响应_
df **=** PD **.** 数据帧()
_#a 计数变量,用于在尝试连接时索引响应_
_#获取API调用参数_
number_of_coin_ids **=** len(list_of_coins_)
_#max 数量为每分钟 30 个 API_
number_of_queries_per_run **=** 25
四舍五入 **=** np **.** ceil(number_of_coin_ids **/** number_of_queries_per_run)
硬币数组 **=** list_of_coins_
coin_split **=** np **.** array_split(coin_array,rounded_up)
数数 **=** 0
**尽管** 数数 ** <** 四舍五入:
**为了** ID **在** coin_split[计数]:
**尝试** :
数据 **=** Make_Call(id, api_key)
cmc_df **=** PD **.** 数据框 **.** from_dict(data['data'], 东方 **=** '指数')
_#expand 包含价格信息的嵌套报价列_
扩展_df **=** PD **.** 数据框 **.** from_dict(cmc_df['quote'][0], 东方 **=** '指数')
_#set then index for count 即运行次数_
扩展_df **.** 设置索引(pd **.** 索引([count]),就地 **=** **真的** )
_#dropping 我们不会在本练习中探索的嵌套列_
cmc_df_raw **=** cmc_df **.** drop(['quote','tags', 'platform'], 轴 **=** 1)
_#将索引设置为计数变量,因此可以加入_
cmc_df_raw **.** 设置索引(pd **.** 索引([count]),就地 **=** **真的** )
_#concat 很棘手,必须添加_
_#必须添加密钥以使它们正确附加_
coin_price_df **=** PD **.** 连接([cmc_df_raw,expanded_df],轴 **=** 1、按键 **=** ['cmc_df_raw', 'expanded_df'])
coin_price_merge **=** PD **.** 合并(cmc_df_raw,expanded_df)
df **=** df **.** 追加(coin_price_merge)
**除了** 键错误:
**继续**
_#parse dict 并将相关数据加载到 DF_
时间 **.** 睡觉(60)
数数 **+=** 1
**返回** df
现在我们已经为我们的前 100 个硬币编译了我们的辅助信息。接下来我们将其写入存储,然后类似于脚本 1 细化数据集。
有了这些辅助信息,我们还可以将信息分解为单独的描述性数据集,例如考虑数量或供应,然后将它们写入存储。
检查点: 我们现在有两个脚本来获取相关信息并将该数据写入存储。下一步,编排我们的脚本以实现这一点 日常的 .
对于这一部分,我们将进入云端。在这个设置中,我们将有 Azure 数据块 托管我们的 Python 脚本,然后利用 Azure 数据工厂 来安排他们的预定运行。
有关如何执行的深入分步教程,请按照此操作 女士文档 .
笔记: 确保在执行管道时已在 spark 集群上安装了 azure 存储库。
Adding ADLS package
准备好管道后,请确保将其安排为每天运行。这将完成我们的云 ELT 管道。
总结 ,我们查询一个 API,将结果写入存储,优化数据集,然后每天自动执行该过程。
展望未来的进步,我们可以实施一个转换工具来转换和建模数据以建立我们的报告团队。
感谢您阅读并祝您创建自己的管道和自定义数据集好运。
参考
完整的源代码 在这里可用 .
关于有用的博客 使用 Python 写入 ADLS .
Python 包文档 用于与 ADLS 交互。
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明
本文链接:https://www.qanswer.top/10968/54320208
标签:文件,存储,_#,管道,API,自动化,数据,ELT,客户端 From: https://www.cnblogs.com/amboke/p/16648576.html