首页 > 其他分享 >开发自动化云 ELT 管道

开发自动化云 ELT 管道

时间:2022-09-02 09:14:36浏览次数:77  
标签:文件 存储 _# 管道 API 自动化 数据 ELT 客户端

开发自动化云 ELT 管道

寻求体现数据驱动的方法始于获取数据。该难题的一个基础部分是开发自动化 ELT 管道,以正确获取和交付必要的数据。

ELT , X 道 ** 大号** 负载 ** ** ransform 是一种通用架构方法,旨在处理大数据并最大限度地利用现代廉价云存储选项。

这个介绍性博客将重点介绍一种 ELT 管道方法,利用 PythonAzure 数据堆栈 .

比赛计划: 我们将从 API 查询数据,将其加载并写入存储,然后自动化该过程以按计划运行。

ELT Architecture

对于这个演示,我们将使用 CoinMarketCap API 因其成熟的文档和丰富的可访问数据。

我们的设置包括两个从 CoinMarketCap API 提取的 python 脚本。我们的第一个 python 脚本根据市值提取目前排名前 100 的代币。

然后,我们的第二个脚本从第一个查询中获取这 100 个条目,并执行另一个 API 调用来提取有关硬币的更详细信息。

我们的第二个脚本还充当一个编排脚本,运行包含我们的凭据的配置文件以及脚本 1。

脚本 1 ,CallandWrite。我们进行 API 调用,指定我们希望返回的记录数以及 api_key 以验证我们的请求。

_#API 调用 CoinMarketCap_  
 **定义** api_mapping_call(number_of_entries, api_key):  
 """对 CMC API 进行 api 调用并提取指定条目数的基本硬币信息。  
 返回字典。"""  
 网址 **=** 'https://pro-api.coinmarketcap.com/v1/cryptocurrency/map'  
 参数 **=** {  
 'listing_status':'活动',  
 '限制':number_of_entries,  
 “排序”:“cmc_rank”  
 }  
 标题 **=** {  
 '接受':'应用程序/json',  
 'X-CMC_PRO_API_KEY': api_key  
 }  
  
 会议 **=** 会议()  
 会议 **.** 标题 **.** 更新(标题)  
 回复 **=** 会议 **.** 获取(网址,参数 **=** 参数)  
 数据 **=** json **.** 负载(响应 **.** 文本)  
  **返回** 数据

初始查询以 JSON 格式返回数据,这对于 API 很常见。我们将分解我们想要的列,重命名一些列,为数据集添加一个文件名,然后在我们摄取这些数据时添加一个时间戳。我们可以将这些操作总结为我们的“原始”数据摄取。

最佳做法是从存储中读取新写入的文件,但是在此演示中,我将使用我们写入存储的数据帧。

**从** azure.storage.filedatalake **进口** 数据湖服务客户端  
 storage_account_name **=** storage_account_name  
 storage_account_key **=** storage_account_key  
 容器名称 **=** “硬币清单”  
 目录名 **=** “生的”  
  
 **定义** write_to_storage(storage_account_name,storage_account_key, container_name, directory_name,dataset, pandas_dataframe, file_name):  
 """将数据帧写入存储的函数。指定存储帐户名称、存储帐户密钥、容器名称、目录名称和文件名。  
 该函数将检查容器是否已经存在,如果不存在,它将创建一个新容器。如果已经存在,它会将文件写入指定的容器。  
 数据集必须保存为指定的文件类型(csv、txt、parquet)  
 DataFrame 必须是 Pandas DF,将从函数返回  
 文件名必须以特定文件类型(csv、txt、parquet)结尾。"""  
  
    _#函数运行时的时间戳_  
 注入日期 **=** 现在 **.** strftime("%m/%d/%Y %H:%M:%S")  
    _#从日期时间中删除空格,反斜杠指示带有 ADLS 的新文件夹结构_  
 trimmed_injest_Date **=** 注入日期 **.** 代替( ” ”, ”_”) **.** 代替(' ', '_') **.** 代替(”/”,”_”)  
    _#将给定的文件名拆分为文件标题和文​​件类型_  
 文件名标题,文件类型 **=** 文件名 **.** 分裂('。')  
    _#created 存储文件,因为它将出现在 ADLS_  
 存储文件 **=** ("{}_{}.{}") **.** 格式(file_name_title,trimmed_injest_Date,file_type)  
      
  
 服务客户端 **=** DataLakeServiceClient(account_url **=** “{}://{}.dfs.core.windows.net” **.** 格式(  
 "https", storage_account_name), 凭证 **=** storage_account_key)  
    **尝试** :  
 文件系统客户端 **=** 服务客户端 **.** 创建文件系统(文件系统 **=** 容器名称)  
 dir_client **=** 文件系统客户端 **.** 获取目录客户端(目录名称)  
 dir_client **.** 创建目录()  
        _#将数据设置为适当的数据框_  
 文件 **=** 数据集  
 文件客户端 **=** dir_client **.** 创建文件(存储文件)  
 文件客户端 **.** append_data(文件,0,len(文件))  
 文件客户端 **.** 刷新数据(len(文件))  
    **除了** :  
        _#ResourceAlreadyExists_  
 文件系统客户端 **=** 服务客户端 **.** 获取文件系统客户端(文件系统 **=** 容器名称)  
 dir_client **=** 文件系统客户端 **.** 获取目录客户端(目录名称)  
 dir_client **.** 创建目录()  
        _#将数据设置为适当的数据框_  
 文件 **=** 数据集  
 文件客户端 **=** dir_client **.** 创建文件(存储文件)  
 文件客户端 **.** append_data(文件,0,len(文件))  
 文件客户端 **.** 刷新数据(len(文件))  
  
    **返回** pandas_dataframe

我们将通过选择所需的列来结束此脚本,然后将数据集作为 parquet 文件写入存储,为我们留下当前市值排名前 100 的代币。

继续前进 脚本 2 , 硬币查询。在这个脚本中,我们利用我们在前一个脚本中获得的前 100 个硬币,并执行另一个 API 调用,返回每个条目的辅助信息。

这里 API 调用的棘手部分是我们每分钟最多只能提交 30 个代码,所以我们需要分解我们的请求。

过程:隔离代码,将代码分成 25 个组,提交批处理并将 JSON 转换为数据框, 让一分钟过去 ,请求下一组 25 并将该数据帧附加到原始数据帧......重复直到所有代码都提交。此过程通常需要 5 分钟才能执行,请求之间有 1 分钟的暂停。

**进口** 时间  
 **定义** get_coin_info_breakup(list_of_coins_, api_key):  
 """函数对硬币列表中的每个硬币 ID 进行 API 调用。  
 旨在从前 100 名硬币中获取列表并获取其最新价格信息。 """  
  
    _#创建一个空数据框,我们也将附加响应_  
 df **=** PD **.** 数据帧()  
    _#a 计数变量,用于在尝试连接时索引响应_  
    _#获取API调用参数_  
 number_of_coin_ids **=** len(list_of_coins_)  
    _#max 数量为每分钟 30 个 API_  
 number_of_queries_per_run **=** 25  
 四舍五入 **=** np **.** ceil(number_of_coin_ids **/** number_of_queries_per_run)  
 硬币数组 **=** list_of_coins_  
 coin_split **=** np **.** array_split(coin_array,rounded_up)  
 数数 **=** 0  
    **尽管** 数数 ** <** 四舍五入:  
        **为了** ID **在** coin_split[计数]:  
            **尝试** :  
 数据 **=** Make_Call(id, api_key)  
 cmc_df **=** PD **.** 数据框 **.** from_dict(data['data'], 东方 **=** '指数')  
  
                _#expand 包含价格信息的嵌套报价列_  
 扩展_df **=** PD **.** 数据框 **.** from_dict(cmc_df['quote'][0], 东方 **=** '指数')  
                _#set then index for count 即运行次数_  
 扩展_df **.** 设置索引(pd **.** 索引([count]),就地 **=** **真的** )  
  
                _#dropping 我们不会在本练习中探索的嵌套列_  
 cmc_df_raw **=** cmc_df **.** drop(['quote','tags', 'platform'], 轴 **=** 1)  
                  
                _#将索引设置为计数变量,因此可以加入_  
 cmc_df_raw **.** 设置索引(pd **.** 索引([count]),就地 **=** **真的** )  
                  
                _#concat 很棘手,必须添加_  
                _#必须添加密钥以使它们正确附加_  
 coin_price_df **=** PD **.** 连接([cmc_df_raw,expanded_df],轴 **=** 1、按键 **=** ['cmc_df_raw', 'expanded_df'])  
 coin_price_merge **=** PD **.** 合并(cmc_df_raw,expanded_df)  
 df **=** df **.** 追加(coin_price_merge)  
            **除了** 键错误:  
                **继续**  
              
            _#parse dict 并将相关数据加载到 DF_  
 时间 **.** 睡觉(60)  
 数数 **+=** 1  
    **返回** df

现在我们已经为我们的前 100 个硬币编译了我们的辅助信息。接下来我们将其写入存储,然后类似于脚本 1 细化数据集。

有了这些辅助信息,我们还可以将信息分解为单独的描述性数据集,例如考虑数量或供应,然后将它们写入存储。

检查点: 我们现在有两个脚本来获取相关信息并将该数据写入存储。下一步,编排我们的脚本以实现这一点 日常的 .

对于这一部分,我们将进入云端。在这个设置中,我们将有 Azure 数据块 托管我们的 Python 脚本,然后利用 Azure 数据工厂 来安排他们的预定运行。

有关如何执行的深入分步教程,请按照此操作 女士文档 .

笔记: 确保在执行管道时已在 spark 集群上安装了 azure 存储库。

Adding ADLS package

准备好管道后,请确保将其安排为每天运行。这将完成我们的云 ELT 管道。

总结 ,我们查询一个 API,将结果写入存储,优化数据集,然后每天自动执行该过程。

展望未来的进步,我们可以实施一个转换工具来转换和建模数据以建立我们的报告团队。

感谢您阅读并祝您创建自己的管道和自定义数据集好运。

参考

完整的源代码 在这里可用 .

关于有用的博客 使用 Python 写入 ADLS .

Python 包文档 用于与 ADLS 交互。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明

本文链接:https://www.qanswer.top/10968/54320208

标签:文件,存储,_#,管道,API,自动化,数据,ELT,客户端
From: https://www.cnblogs.com/amboke/p/16648576.html

相关文章

  • EDA 自动化库:SpeedML
    EDA自动化库:SpeedML必须知道的库之一,才能拥有深刻的EDA!在数据科学领域,我们知道探索性数据分析或EDA是最重要和最耗时的部分,并且要拥有可用于模型的数据,我们必须花费......
  • 如何使用大型语言模型推动自动化?
    如何使用大型语言模型推动自动化?自动化(几乎)无需软件开发如果每封电子邮件、每一份带有订单、发票、投诉、要约请求或工作申请的PDF文件都可以翻译成机器可读的数据,那......
  • WEB自动化-01-Cypress 介绍
    1.Cypress介绍1.1Cypress简介  Cypress是一款基于JavaScript的下一代前端测试工具。可以对浏览器中运行的任何内容进行快速、简单和可靠的测试。  Cypress......
  • 接口自动化测试-反射机制
    反射就是通过字符串的形式,导入模块;通过字符串的形式,去模块寻找指定函数,并执行。利用字符串的形式去对象(模块)中操作(查找/获取/删除/添加)成员,一种基于字符串的事件驱动!一、......
  • Java接口自动化测试框架系列(二)表格设计与数据读取
    一、测试系统分析不同系统有不同的接口,通过分析这些接口,提取共同点可以得到不同地区的系统共有的接口。如:登录、登出、用户信息完善等接口二、表格设计  不同列......
  • Go 语言入门 1-管道的特性及实现原理
    入坑go也快一年了,从今天开始会定期分享一下Go语言学习过程中的一些基础知识。 go语言中的管道,主要是用于协程之间的通信,比UNIX的管道更加轻量和易用。 我们......
  • 史上最全 Appium 自动化测试从入门到框架实战精华学习笔记(三)
    ⬇️点击“下方链接”,提升测试核心竞争力!>>更多技术文章分享和免费资料领取本系列文章汇总了从Appium自动化测试从基础到框架高级实战中,所涉及到的方方面面的知识点精华......
  • pyest+appium实现APP自动化测试,思路全总结在这里
    每天进步一点点,关注我们哦,每天分享测试技术文章本文章出自【码同学软件测试】码同学公众号:自动化软件测试,领取资料可加:magetest码同学抖音号:小码哥聊软件测试01appium......
  • K8S源码之deltafifo
    1、获取key的地方funcMetaNamespaceKeyFunc(objinterface{})(string,error){ ifkey,ok:=obj.(ExplicitKey);ok{ returnstring(key),nil } meta,err:......
  • 【iOS自动化测试】第一章:方案调研
    背景目前Android端已完成了相应的框架搭建,并实际落地产出了,由于Android使用的是Unittest+HtmlTestRunner产出报告,需要增加新功能的话需要改动到底层框架,所以目前在负责的i......