首页 > 其他分享 >在结构数组上使用 PySpark UDF 进行数据转换:在结构数组中添加新字段

在结构数组上使用 PySpark UDF 进行数据转换:在结构数组中添加新字段

时间:2022-08-28 18:01:44浏览次数:88  
标签:StringType PySpark -- StructField dept UDF 可为 数组 True

在结构数组上使用 PySpark UDF 进行数据转换:在结构数组中添加新字段

PySpark UDF on complex Data types

在处理系统日志或任何其他半结构化数据时,我们遇到了具有许多嵌套字段和嵌入式结构数组的数据。

我们要选择的第一个也是最简单的解决方案是展开字段,然后执行数据转换。如果您需要平面模式,这种方法并没有错,但为了保持模式完整,我们需要对嵌套字段应用转换。

一种方法是将 Dataframe 转换为 RDD 并使用低级 API 来转换 Dataframe。假设我们想使用 Spark SQL API 以方便使用

为了克服这个问题,我们可以使用 PySpark UDF,它可以将复杂字段作为参数并返回新字段。

让我们创建一个足够复杂的示例数据,以便为我们的用例处理。

 从 pyspark.sql 导入 SparkSession  
 从 pyspark.sql.types 导入 *  
 从 pyspark.sql 导入行  
 从 pyspark.sql.functions 导入 udf, col data = [(["James","","Smith","36636","M",3000, [{'dept':'HR','allocation':0.4},{'dept':'FIN ','分配':0.6}]],  
 ["Michael","Rose","","40288","M",4000,[{'dept':'HR','allocation':0.4},{'dept':'FIN','allocation ':0.6}]],  
 ["罗伯特","","威廉姆斯","42114","M",4000,[{'dept':'HR','allocation':0.9},{'dept':'FIN','allocation ':0.1}]],  
 ["Maria","Anne","Jones","39192","F",4000,[{'dept':'HR','allocation':0.75},{'dept':'FIN','分配':0.25}]],  
 ["Jen","Mary","Brown","","F",-1,[{'dept':'HR','allocation':0.30},{'dept':'FIN','分配':0.70}]])  
 ] 架构 =( ArrayType(StructType([  
 StructField("名字",StringType(),True),  
 StructField("中间名",StringType(),True),  
 StructField("姓氏",StringType(),True),  
 StructField("id", StringType(), True),  
 StructField("性别", StringType(), True),  
 StructField("salary", IntegerType(), True),  
 StructField("隶属关系", ArrayType(StructType([StructField('dept', StringType()),  
 StructField('allocation', FloatType())]  
 )  
 ), 真的)  
 ])  
 )) spark = SparkSession.builder.appName('test_udf').getOrCreate()  
 df = spark.createDataFrame(data= data , schema=schema)

这只是一行数据,其中包含存储在 Array of Struct 中的许多员工的详细信息。

问题陈述:我们要添加 经理 内部结构中的字段 隶属关系 并保持 Dataframe 的结构完整,因此不希望通过爆炸来改变粒度。

Dataframe 的当前架构:

 根  
 |-- 值:数组(可为空=真)  
 | |-- 元素:结构(containsNull = true)  
 | | |-- 名字:字符串(可为空 = true)  
 | | |-- 中间名:字符串(可为空=真)  
 | | |-- 姓氏:字符串(可为空 = true)  
 | | |-- id: 字符串(可为空=真)  
 | | |-- 性别:字符串(可为空=真)  
 | | |-- 工资:整数(可为空 = true)  
 | | |-- 隶属关系:数组(可为空 = true)  
 | | | |-- 元素:结构(containsNull = true)  
 | | | | |-- 部门:字符串(可为空=真)  
 | | | | |-- 分配:浮动(可为空=真)

目标模式:在结构数组中添加新字段

 根  
 |-- 值:数组(可为空=真)  
 | |-- 元素:结构(containsNull = true)  
 | | |-- 名字:字符串(可为空 = true)  
 | | |-- 中间名:字符串(可为空=真)  
 | | |-- 姓氏:字符串(可为空 = true)  
 | | |-- id: 字符串(可为空=真)  
 | | |-- 性别:字符串(可为空=真)  
 | | |-- 工资:整数(可为空 = true)  
 | | |-- 隶属关系:数组(可为空 = true)  
 | | | |-- 元素:结构(containsNull = true)  
 | | | | |-- 部门:字符串(可为空=真)  
 | | | | |-- 分配:浮动(可为空=真)  
 | | | | |-- 经理:字符串(可为空=真)

为了嵌入新字段,我们将编写一个 UDF,它将未分解的字段作为参数并返回一个新字段 经理 嵌入到我们想要的级别,即隶属关系数组。

定义 return_schema 在数组中有新字段 隶属关系:

 return_schema = (ArrayType(StructType([  
 StructField("名字", StringType(), True),  
 StructField("中间名", StringType(), True),  
 StructField("姓氏", StringType(), True),  
 StructField("id", StringType(), True),  
 StructField("性别", StringType(), True),  
 StructField("salary", IntegerType(), True),  
 StructField("隶属关系", ArrayType(StructType([StructField('dept', StringType()),  
 StructField('分配', FloatType()),  
 StructField('manager', StringType())]  
 )  
 ), 真的)  
 ])  
 ))

下一步是编写带有对象的 UDF 图式 并返回一个新对象 return_schema。

 @udf(returnType=return_schema)  
 def add_manager(p):  
 行 = []  
 对于 p 中的 ele:  
 inner_rows = []  
 对于 ele.affiliations 中的 aff_ele:  
 inner_rows.append(Row(dept=aff_ele.dept, clubs=aff_ele.allocation,manager='Mr. X')) # 例如:可以通过 API 调用或映射 DS 来拉取新字段  
 行.追加(  
 行(名字=ele.firstname,中间名=ele.middlename,姓氏=ele.lastname,id=ele.id,性别=ele.gender,  
 薪水=ele.salary,隶属关系=inner_rows))  
 返回行

最后一件事是在 select 语句中调用 udf:

 df.select(add_manager(col("value"))).show(truncate=False)

Dataframe 中的每条记录都作为 Row 对象传递到 UDF。我们要修改数组的模式 隶属关系不可变 Row[] 的行为不允许修改。因此,我们为每条记录创建一个新的 Row 对象,并返回一个嵌套的 Row 对象,该对象与 return_schema .

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明

本文链接:https://www.qanswer.top/1346/52362817

标签:StringType,PySpark,--,StructField,dept,UDF,可为,数组,True
From: https://www.cnblogs.com/amboke/p/16633259.html

相关文章

  • PySpark ML 预测流失用户
    PySparkML预测流失用户项目定义这是Udacity的Capstone项目,使用Spark分析来自音乐应用Sparkify的用户行为数据。主要目标是根据音乐应用程序的用户日志数据预测......
  • 【算法笔记】一文解决数组类型算法题(1)
    本文主要介绍数据结构中的数组,以及LeetCode题库下面相关题型的分类和解法套路。数组理论概述定义数组是存储在一块连续内存上的,由相同元素集合组成的数据结构。利用索......
  • 在排序数组中查找元素的第一个和最后一个位置
    目录题目描述解题思路解题代码题目描述题目地址:https://leetcode.cn/problems/find-first-and-last-position-of-element-in-sorted-array/题目要求给你一个按照非递......
  • 搜索旋转排序数组
    目录题目描述解题思路解题代码题目描述题目地址:https://leetcode.cn/problems/search-in-rotated-sorted-array/题目要求整数数组nums按升序排列,数组中的值互不相同......
  • vue中props定义对象和数组的区别
    扯开怎么定义,为什么要定义props,相信小伙伴们都知道,都会用了,但是有个问题,为什么有时候定义的是数组形式,有时候是对象形式呢?一句话:props对象形式才能给默认值和类型和必填......
  • js声明数组的四种方式
    js声明数组的四种方式_麦客子的博客-CSDN博客_js声明数组的写法 https://blog.csdn.net/a911711054/article/details/72869324<!DOCTYPEhtml><htmllang="en"><head......
  • LeetCode刷题23-在排序数组中查找元素的第一个和最后一个位置
    importjava.util.Arrays;/***功能描述**@authorASUS*@version1.0*@Date2022/8/27*/publicclassMain06{publicstaticvoidmain(String[]......
  • 排序数组中查找元素的第一个和最后一个位置
    给你一个按照非递减顺序排列的整数数组nums,和一个目标值target。请你找出给定目标值在数组中的开始位置和结束位置。如果数组中不存在目标值target,返回 [-1,-1]。你......
  • 字节数组流
    字节数组流ByteArrayInputStream和ByteArrayOutputStream都是用于需要流和数组转换的情况!字节数组输入流说白了,FIleInputStream是把文件当做数据源,而ByteArrayInputS......
  • JS-数组
    数组数据结构 数据的结构(逻辑结构存储结构算法)存储结构(数据存储的结构方式)线性结构数组(顺序表)队列栈堆链表非线性结构树图hash(散列表......