首页 > 编程问答 >无法过滤掉 PySpark 中巨大数据集中的数据帧

无法过滤掉 PySpark 中巨大数据集中的数据帧

时间:2024-07-31 16:56:48浏览次数:10  
标签:python pandas pyspark out-of-memory

我有一个巨大的 PySpark 数据框,其中包含 1.5B 行,包括列 fieldA 我有一个 8.8M 唯一 fieldA 值的列表,我想从 1.5B 行中过滤掉。但是,我认为由于数据量较大,我不断收到类似 StackOverflowError OutOfMemoryError 的错误。

我尝试将 8.8M 列表拆分为 20K 值的较小列表,并拆分 1.5B将数据帧分成每个 15M 行的较小数据帧。然后,对于 15M 行的每个数据帧,连续(循环)过滤掉不同的 20K 个 fieldA 值 ( temp_df = temp_df.filter(~col('fieldA').isin(fieldA_part_list)) ),直到过滤掉所有 8.8M 值,然后将最终的 temp_df 写入 parquet 文件。对接下来的 15M 行数据帧重复此操作。然而,我认为这导致了数百个 .filter() ,这可能就是当我尝试在第一个 15M 数据帧上写入镶木地板文件时给我带来的 StackOverflowError 的原因。

然后我尝试过滤掉完整的 8.8每个 15M 数据帧的 M 值。对于每个 15M 数据帧,我会将过滤结果写入 parquet 文件。但是,当我尝试写入镶木地板文件时,我在第一个 15M 数据帧上得到了 OutOfMemoryError

如何从数据帧的 1.5B 行中过滤掉与任何 8.8M fieldA 值匹配的行,以有效的方式?


遇到的问题是一个常见问题,即在 Spark 中处理大型数据集时,会遇到“内存不足”或“堆栈溢出”错误。拆分数据帧和筛选列表的想法是正确的方向,但实现可能会导致遇到的问题。

以下是对方法的改进,可以以更有效的方式解决此问题:

  1. 广播列表: 不要在循环中对筛选列表进行切片和迭代,而应使用 sparkContext.broadcast() 将 8.8M fieldA 值列表广播到所有执行器节点。广播可以更有效地分发数据,并防止每个任务重复读取列表。 python fieldA_broadcast = spark.sparkContext.broadcast(set(fieldA_list))

  2. 使用 join 进行筛选: 可以使用 leftAnti 联接代替 filter 来更有效地筛选数据。首先,从的 8.8M 值列表中创建一个临时数据帧。然后,在 fieldA 列上执行 leftAnti 联接。这将返回所有在原始数据帧中但不在临时数据帧中的行。 ```python # 从列表创建数据帧 fieldA_df = spark.createDataFrame(fieldA_list, StringType()).toDF("fieldA")

# 执行 leftAnti 联接 filtered_df = df.join(fieldA_df, "fieldA", "leftanti") ```

  1. 分区和写入: 处理完过滤操作后,在写入结果之前,请确保的数据帧已正确分区。良好的分区策略(例如,基于 fieldA 或任何其他合适的列)可以显著提高写入性能。 ```python # 假设想要根据 "fieldA" 对数据进行分区 filtered_df = filtered_df.repartition("fieldA")

# 将结果写入 Parquet 文件 filtered_df.write.parquet("path/to/output") ```

以下是完整代码:

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# 初始化 SparkSession
spark = SparkSession.builder().appName("FilterLargeDataset").getOrCreate()

# 示例数据
data = [("A", 1), ("B", 2), ("C", 3), ("D", 4), ("E", 5)]
df = spark.createDataFrame(data, ["fieldA", "value"])

# 要筛选的值列表
fieldA_list = ["A", "C", "E"]

# 广播列表
fieldA_broadcast = spark.sparkContext.broadcast(set(fieldA_list))

# 从列表创建数据帧
fieldA_df = spark.createDataFrame(fieldA_list, StringType()).toDF("fieldA")

# 执行 leftAnti 联接
filtered_df = df.join(fieldA_df, "fieldA", "leftanti")

# 分区并写入结果
filtered_df = filtered_df.repartition("fieldA")
filtered_df.write.parquet("path/to/output")

# 停止 SparkSession
spark.stop()

通过结合这些技术,应该能够有效地筛选大型数据集,而不会遇到内存不足或堆栈溢出错误。请记住调整 Spark 配置设置(例如,执行器内存、驱动程序内存等),以找到适合集群大小和数据集的最佳配置。

标签:python,pandas,pyspark,out-of-memory
From: 78815102

相关文章

  • VSCode:Python 虚拟环境未在集成终端中自动激活
    我最近安装了VSCode,并注意到当我打开集成终端时,Python虚拟环境不会自动激活。从此链接中VSCode内提供的信息:https://github.com/microsoft/vscode-python/wiki/Activate-Environments-in-Terminal-Using-Environment-Variables看来Python扩展可能不会对......
  • Python 中的多元回归
    我想在Python中基于多个相关数据数组和多个独立数据执行多元线性回归。我见过很多多重线性回归,具有多个独立输入,几乎每个人都认为多重=多元,但事实并非如此。我在互联网上看不到任何真正的多元教程。我想要的是多个输出+多个输入。frompandasimportDataFramefromsklear......
  • Python 复选框和 Excel
    我有一个Python系统,它收集用户的条目,进行计算并显示结果。当用户单击条目数据按钮时,所有数据条目和结果都会转换为Excel。在系统界面中有是部分的复选框,结果显示在Excel中。我需要一个助手来编写代码,以便我可以在每次用户选择选定的复选框时转换结果,将复选框的每......
  • Python - Class Decorators
    Wehaveusedfunctionstodecoratefunctionsandtodecorateclasses.Now,wewillseehowtodefineaclassasadecorator.Atthestartofthischapter,inthedefinitionofdecorator,wehadseenthatadecoratorisacallable;acallableisanyobject......
  • 基于Python的高校成绩分析【源码+LW+PPT+部署讲解】
    作者主页:编程千纸鹤作者简介:Java领域优质创作者、CSDN博客专家、CSDN内容合伙人、掘金特邀作者、阿里云博客专家、51CTO特邀作者、多年架构师设计经验、多年校企合作经验,被多个学校常年聘为校外企业导师,指导学生毕业设计并参与学生毕业答辩指导,有较为丰富的相关经验。期待......
  • 基于Python网络爬虫的电子产品信息查询可视化系统
    作者主页:编程千纸鹤作者简介:Java领域优质创作者、CSDN博客专家、CSDN内容合伙人、掘金特邀作者、阿里云博客专家、51CTO特邀作者、多年架构师设计经验、多年校企合作经验,被多个学校常年聘为校外企业导师,指导学生毕业设计并参与学生毕业答辩指导,有较为丰富的相关经验。期待......
  • [SUCTF 2019]Pythonginx (unicode转IDNA域名分割漏洞)
    代码我看到这两个就感觉有问题了,第一个转编码这个搜了一下是unicode转idna的问题,第二个urlopen是Python标准库中urllib.request模块中的一个函数,用于向指定的URL发送HTTP请求并获取响应参考文章:https://xz.aliyun.com/t/6070?time__1311=n4%2BxnD0DgDcmG%3DrDsYoxCqiIQ7KDtH......
  • 基于Python的高校成绩分析【源码+LW+PPT+部署讲解】
    作者简介:Java领域优质创作者、CSDN博客专家、CSDN内容合伙人、掘金特邀作者、阿里云博客专家、51CTO特邀作者、多年架构师设计经验、多年校企合作经验,被多个学校常年聘为校外企业导师,指导学生毕业设计并参与学生毕业答辩指导,有较为丰富的相关经验。期待与各位高校教师、企......
  • 基于Python网络爬虫的电子产品信息查询可视化系统
    作者简介:Java领域优质创作者、CSDN博客专家、CSDN内容合伙人、掘金特邀作者、阿里云博客专家、51CTO特邀作者、多年架构师设计经验、多年校企合作经验,被多个学校常年聘为校外企业导师,指导学生毕业设计并参与学生毕业答辩指导,有较为丰富的相关经验。期待与各位高校教师、企业......
  • 使用 Python 读取 .xlsx 文件的最快方法
    我正在尝试使用Python将.xlsx文件中的数据读入MySQL数据库。这是我的代码:wb=openpyxl.load_workbook(filename="file",read_only=True)ws=wb['MyWorksheet']conn=MySQLdb.connect()cursor=conn.cursor()cursor.execute("SETautocommit=0"......