我有以下数据框:
df = spark.createDataFrame(
[
("D1", "D2", "H1", None, None),
("D1", "D2", "H1", "H2", None),
("D1", "D2", "H1", "H2", "H3")
],
["Dimension1", "Dimention2", "Hierarchy1", "Hierarchy2", "Hierarchy3"]
)
我想对其进行转换,使其变成这样的东西:
new_df = spark.createDataFrame(
[
("D1", "D2", "H1"),
("D1", "D2", "H2"),
("D1", "D2", "H3")
],
["Dimension1", "Dimention2", "Hierarchy"]
)
逻辑是:
whencondition = when((col("Hierarchy1").isNotNull()) & (col("Hierarchy2").isNull()) & (col("Hierarchy3").isNull()), lit("H1")).when((col("Hierarchy1").isNotNull()) & (col("Hierarchy2").isNotNull()) & (col("Hierarchy3").isNull()), lit("H2")).when((col("Hierarchy1").isNotNull()) & (col("Hierarchy2").isNotNull()) & (col("Hierarchy3").isNotNull()), lit("H3")).alias("Hierarchy")
display(df.select("Dimension1", "Dimention2", whencondition))
当然可以有任意数量的层次结构列,但最终输出我只希望有一列来显示该记录所处的层次结构级别。 我首先创建了一个列表
hierarchies = ["Hierarchy1", "Hierarchy2", "Hierarchy3"]
并得到了这样的结果:
when(reduce(lambda x, y: x & y, [(col( "`" + x + "`").isNotNull()) if x in hierarchies[:i+1] else (col( "`" + x + "`").isNull()) for x in hierarchies]), lit(hierarchies[i]))
适用于
i < len(hierarchies)
,但不幸的是没有进一步的
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when
# Initialize SparkSession
spark = SparkSession.builder.appName("PivotDataFrame").getOrCreate()
# Sample DataFrame
df = spark.createDataFrame(
[
("D1", "D2", "H1", None, None),
("D1", "D2", "H1", "H2", None),
("D1", "D2", "H1", "H2", "H3")
],
["Dimension1", "Dimention2", "Hierarchy1", "Hierarchy2", "Hierarchy3"]
)
# List of Hierarchy columns
hierarchies = ["Hierarchy1", "Hierarchy2", "Hierarchy3"]
# Construct the when condition dynamically
when_condition = reduce(
lambda x, y: x.when(
reduce(
lambda a, b: a & b,
[col(f"`{h}`").isNotNull() if i <= idx else col(f"`{h}`").isNull() for idx, h in enumerate(hierarchies)]
),
lit(y)
),
hierarchies,
lit(None) # Base case for when().otherwise()
).alias("Hierarchy")
# Select desired columns with the dynamic when condition
new_df = df.select("Dimension1", "Dimention2", when_condition)
# Display the result
new_df.show()
Explanation:
- Dynamic When Condition:
-
We use
reduce
to iterate through thehierarchies
list and dynamically build thewhen
condition. -
For each hierarchy level
i
, we check if all previous hierarchy columns are not null and all subsequent columns are null. -
If the condition is met, we assign the current hierarchy level (
hierarchies[i]
) to the "Hierarchy" column usinglit(hierarchies[i])
. -
Base Case:
-
We start the
reduce
function withlit(None)
to handle cases where none of the hierarchy conditions are met. This ensures that the "Hierarchy" column will be null if all hierarchy values are null. -
Select and Display:
- Finally, we select the desired columns ("Dimension1", "Dimention2", and the dynamically generated "Hierarchy" column) and display the resulting DataFrame.
This code will work for any number of hierarchy columns, as long as they are listed in the
hierarchies
list.