我正在尝试将大型 txt 文件读入数据帧。每个文件大小为 10-15 GB,
因为 IO 需要很长时间。我想并行读取多个文件并将它们放入单独的数据帧中。
我尝试了下面的代码
from multiprocessing.pool import ThreadPool
def read_file(file_path):
return spark.read.csv(file_path)
pool = ThreadPool(10)
df_list = pool.starmap(read_file,[[file1,file2,file3...]])
但它给出了 pickel 错误。 我该怎么做?,有什么替代方案可以满足我的要求吗?
我想并行读取多个文件并将它们放入单独的数据帧中。
在尝试使用多处理时遇到 pickle 错误,因为它与 PySpark 的分布式特性不兼容。PySpark 已经提供了在工作节点之间并行化操作(包括读取文件)的机制。
以下是如何并行读取多个文件并将它们存储在单独的数据帧中的方法:
from pyspark.sql import SparkSession
# 初始化 SparkSession
spark = SparkSession.builder.appName("ParallelFileReads").getOrCreate()
# 文件路径列表
file_paths = ["file1", "file2", "file3", ...]
# 并行读取文件到单独的数据帧
dataframes = {}
for file_path in file_paths:
df = spark.read.csv(file_path)
dataframes[file_path] = df
# 现在可以通过文件名访问每个数据帧
for file_path, df in dataframes.items():
print(f"数据帧 {file_path}:")
df.show()
此代码的工作原理:
- 创建 SparkSession: 这将启动 PySpark 环境。
- 定义文件路径: 列出要读取的所有文件路径。
- 循环并读取: 它循环遍历文件路径,为每个文件创建一个数据帧,并将数据帧存储在字典中。字典键是文件名,这使得以后更容易识别每个数据帧。
此方法利用了 PySpark 的分布式特性,而无需使用多处理。Spark 将自动在工作节点之间分配文件读取任务,从而实现并行化和高效地处理大型文件。
其他提示:
-
使用分区:
如果的文件特别大,请考虑在读取数据时使用
.option("numPartitions", ...)
指定分区数。这可以进一步提高性能。 - Parquet 文件: 为了获得更好的性能和效率,请考虑将数据存储在 Parquet 等列式格式中,而不是 CSV。
请记住,根据的集群资源和文件大小调整 Spark 配置(例如执行器数量、执行器内存)以获得最佳性能。
标签:python,pyspark,databricks From: 78794702