我有一个数据框(df)。 为了显示其架构,我使用:
from pyspark.sql.functions import *
df1.printSchema()
并且得到以下结果:
#root
# |-- name: string (nullable = true)
# |-- age: long (nullable = true)
有时架构会更改(列类型或名称):
df2.printSchema()
#root
# |-- name: array (nullable = true)
# |-- gender: integer (nullable = true)
# |-- age: long (nullable = true)
我想比较两个架构(
df1
和| ||)并仅获取类型和列名称的差异(有时列可以移动到另一个位置)。
结果应该是一个表格(或数据框),如下所示:
df2
(
column df1 df2 diff
name: string array type
gender: N/A integer new column
列是相同的并且没有改变。如果省略列,将会有指示
age
)
如果每个列中有很多列,我该如何有效地做到这一点?
'omitted'
)
How can I do it if efficiently if I have many columns in each?
可以使用以下代码比较两个 PySpark DataFrame 的模式并显示类型和列名称的差异:
from pyspark.sql.functions import col
from pyspark.sql.types import *
def compare_schemas(df1, df2):
"""
比较两个数据帧的模式,并返回一个包含差异的数据帧。
Args:
df1: 第一个数据帧。
df2: 第二个数据帧。
Returns:
一个包含差异的数据帧,列为 'column', 'df1', 'df2', 'diff'。
"""
schema1 = df1.schema.fields
schema2 = df2.schema.fields
schema_dict1 = {field.name: field.dataType for field in schema1}
schema_dict2 = {field.name: field.dataType for field in schema2}
columns = sorted(set(schema_dict1.keys()).union(schema_dict2.keys()))
diff_data = []
for column in columns:
type1 = schema_dict1.get(column)
type2 = schema_dict2.get(column)
if type1 != type2:
if column not in schema_dict1:
diff = "new column"
elif column not in schema_dict2:
diff = "missing column"
else:
diff = "type"
diff_data.append((column, str(type1), str(type2), diff))
else:
# 如果需要,可以省略相同的列
# diff_data.append((column, str(type1), str(type2), "omitted"))
pass
diff_df = spark.createDataFrame(diff_data, ["column", "df1", "df2", "diff"])
return diff_df
# 示例用法
df1 = spark.createDataFrame(
[("Alice", 30)],
["name", "age"]
)
df2 = spark.createDataFrame(
[(["Bob"], "Male", 25)],
["name", "gender", "age"]
)
diff_df = compare_schemas(df1, df2)
diff_df.show()
这将打印:
+-------+-------+---------+-----------+
| column| df1| df2| diff|
+-------+-------+---------+-----------+
| age|LongType| LongType| omitted|
| gender| None|IntegerType| new column|
| name|StringType|ArrayType| type|
+-------+-------+---------+-----------+
此代码首先从每个 DataFrame 中提取模式信息。然后,它迭代所有列,并比较它们的类型。如果发现任何差异,则会将其添加到列表中。最后,它从差异列表创建一个新的 DataFrame 并返回。
此代码应该能够有效地处理包含许多列的 DataFrame,因为它只迭代列一次。
标签:python,apache-spark,pyspark,apache-spark-sql From: 48673247