我有一个巨大的 PySpark 数据框,其中包含 1.5B 行,包括列
fieldA
我有一个 8.8M 唯一
fieldA
值的列表,我想从 1.5B 行中过滤掉。但是,我认为由于数据量较大,我不断收到类似
StackOverflowError
或
OutOfMemoryError
的错误。
我尝试将 8.8M 列表拆分为 20K 值的较小列表,并拆分 1.5B将数据帧分成每个 15M 行的较小数据帧。然后,对于 15M 行的每个数据帧,连续(循环)过滤掉不同的 20K 个
fieldA
值 (
temp_df = temp_df.filter(~col('fieldA').isin(fieldA_part_list))
),直到过滤掉所有 8.8M 值,然后将最终的
temp_df
写入 parquet 文件。对接下来的 15M 行数据帧重复此操作。然而,我认为这导致了数百个
.filter()
,这可能就是当我尝试在第一个 15M 数据帧上写入镶木地板文件时给我带来的
StackOverflowError
的原因。
然后我尝试过滤掉完整的 8.8每个 15M 数据帧的 M 值。对于每个 15M 数据帧,我会将过滤结果写入 parquet 文件。但是,当我尝试写入镶木地板文件时,我在第一个 15M 数据帧上得到了
OutOfMemoryError
如何从数据帧的 1.5B 行中过滤掉与任何 8.8M
fieldA
值匹配的行,以有效的方式?
遇到的问题是一个常见问题,即在 Spark 中处理大型数据集时,会遇到“内存不足”或“堆栈溢出”错误。拆分数据帧和筛选列表的想法是正确的方向,但实现可能会导致遇到的问题。
以下是对方法的改进,可以以更有效的方式解决此问题:
-
广播列表: 不要在循环中对筛选列表进行切片和迭代,而应使用
sparkContext.broadcast()
将 8.8MfieldA
值列表广播到所有执行器节点。广播可以更有效地分发数据,并防止每个任务重复读取列表。python fieldA_broadcast = spark.sparkContext.broadcast(set(fieldA_list))
-
使用
join
进行筛选: 可以使用leftAnti
联接代替filter
来更有效地筛选数据。首先,从的 8.8M 值列表中创建一个临时数据帧。然后,在fieldA
列上执行leftAnti
联接。这将返回所有在原始数据帧中但不在临时数据帧中的行。 ```python # 从列表创建数据帧 fieldA_df = spark.createDataFrame(fieldA_list, StringType()).toDF("fieldA")
# 执行 leftAnti 联接 filtered_df = df.join(fieldA_df, "fieldA", "leftanti") ```
-
分区和写入:
处理完过滤操作后,在写入结果之前,请确保的数据帧已正确分区。良好的分区策略(例如,基于
fieldA
或任何其他合适的列)可以显著提高写入性能。 ```python # 假设想要根据 "fieldA" 对数据进行分区 filtered_df = filtered_df.repartition("fieldA")
# 将结果写入 Parquet 文件 filtered_df.write.parquet("path/to/output") ```
以下是完整代码:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
# 初始化 SparkSession
spark = SparkSession.builder().appName("FilterLargeDataset").getOrCreate()
# 示例数据
data = [("A", 1), ("B", 2), ("C", 3), ("D", 4), ("E", 5)]
df = spark.createDataFrame(data, ["fieldA", "value"])
# 要筛选的值列表
fieldA_list = ["A", "C", "E"]
# 广播列表
fieldA_broadcast = spark.sparkContext.broadcast(set(fieldA_list))
# 从列表创建数据帧
fieldA_df = spark.createDataFrame(fieldA_list, StringType()).toDF("fieldA")
# 执行 leftAnti 联接
filtered_df = df.join(fieldA_df, "fieldA", "leftanti")
# 分区并写入结果
filtered_df = filtered_df.repartition("fieldA")
filtered_df.write.parquet("path/to/output")
# 停止 SparkSession
spark.stop()
通过结合这些技术,应该能够有效地筛选大型数据集,而不会遇到内存不足或堆栈溢出错误。请记住调整 Spark 配置设置(例如,执行器内存、驱动程序内存等),以找到适合集群大小和数据集的最佳配置。
标签:python,pandas,pyspark,out-of-memory From: 78815102