为什么我无法使用下面的代码覆盖文件?
我希望创建文件 b.csv,然后附加每次迭代。虽然我给出
mode='a'
(追加),但不知何故我可能会创建文件但不追加到它。
files = dbutils.fs.ls("/mnt/lake/RAW/test/billion-row-ingestion-time/table/")
parquet_file_list = [each.path for each in files if each.name!='_delta_log/']
for each in parquet_file_list:
i=0
df = spark.read.parquet(each).toPandas()
df.to_csv('/dbfs/FileStore/raw/billion-row-ingestion-time/b.csv', mode='a')
print("interation: ", i+1)
输出
OSError: [Errno 95] Operation not supported
---------------------------------------------------------------------------
OSError Traceback (most recent call last)
File /databricks/python/lib/python3.10/site-packages/pandas/io/formats/csvs.py:261, in CSVFormatter.save(self)
251 self.writer = csvlib.writer(
252 handles.handle,
253 lineterminator=self.line_terminator,
(...)
258 quotechar=self.quotechar,
259 )
--> 261 self._save()
File /databricks/python/lib/python3.10/site-packages/pandas/io/formats/csvs.py:266, in CSVFormatter._save(self)
265 self._save_header()
--> 266 self._save_body()
File /databricks/python/lib/python3.10/site-packages/pandas/io/formats/csvs.py:304, in CSVFormatter._save_body(self)
303 break
--> 304 self._save_chunk(start_i, end_i)
File /databricks/python/lib/python3.10/site-packages/pandas/io/formats/csvs.py:315, in CSVFormatter._save_chunk(self, start_i, end_i)
314 ix = self.data_index[slicer]._format_native_types(**self._number_format)
--> 315 libwriters.write_csv_rows(
316 data,
317 ix,
318 self.nlevels,
319 self.cols,
320 self.writer,
321 )
File /databricks/python/lib/python3.10/site-packages/pandas/_libs/writers.pyx:55, in pandas._libs.writers.write_csv_rows()
OSError: [Errno 95] Operation not supported
During handling of the above exception, another exception occurred:
OSError Traceback (most recent call last)
File <command-1964363491723333>, line 4
2 i=0
3 df = spark.read.parquet(each).toPandas()
----> 4 df.to_csv('/dbfs/FileStore/raw/billion-row-ingestion-time/b.csv', mode='a')
5 print("interation: ", i+1)
File /databricks/python/lib/python3.10/site-packages/pandas/core/generic.py:3551, in NDFrame.to_csv(self, path_or_buf, sep, na_rep, float_format, columns, header, index, index_label, mode, encoding, compression, quoting, quotechar, line_terminator, chunksize, date_format, doublequote, escapechar, decimal, errors, storage_options)
3540 df = self if isinstance(self, ABCDataFrame) else self.to_frame()
3542 formatter = DataFrameFormatter(
3543 frame=df,
3544 header=header,
(...)
3548 decimal=decimal,
3549 )
-> 3551 return DataFrameRenderer(formatter).to_csv(
3552 path_or_buf,
3553 line_terminator=line_terminator,
3554 sep=sep,
3555 encoding=encoding,
3556 errors=errors,
3557 compression=compression,
3558 quoting=quoting,
3559 columns=columns,
3560 index_label=index_label,
3561 mode=mode,
3562 chunksize=chunksize,
3563 quotechar=quotechar,
3564 date_format=date_format,
3565 doublequote=doublequote,
3566 escapechar=escapechar,
3567 storage_options=storage_options,
3568 )
File /databricks/python/lib/python3.10/site-packages/pandas/io/formats/format.py:1180, in DataFrameRenderer.to_csv(self, path_or_buf, encoding, sep, columns, index_label, mode, compression, quoting, quotechar, line_terminator, chunksize, date_format, doublequote, escapechar, errors, storage_options)
1159 created_buffer = False
1161 csv_formatter = CSVFormatter(
1162 path_or_buf=path_or_buf,
1163 line_terminator=line_terminator,
(...)
1178 formatter=self.fmt,
1179 )
-> 1180 csv_formatter.save()
1182 if created_buffer:
1183 assert isinstance(path_or_buf, StringIO)
File /databricks/python/lib/python3.10/site-packages/pandas/io/formats/csvs.py:241, in CSVFormatter.save(self)
237 """
238 Create the writer & save.
239 """
240 # apply compression and byte/text conversion
--> 241 with get_handle(
242 self.filepath_or_buffer,
243 self.mode,
244 encoding=self.encoding,
245 errors=self.errors,
246 compression=self.compression,
247 storage_options=self.storage_options,
248 ) as handles:
249
250 # Note: self.encoding is irrelevant here
251 self.writer = csvlib.writer(
252 handles.handle,
253 lineterminator=self.line_terminator,
(...)
258 quotechar=self.quotechar,
259 )
261 self._save()
File /databricks/python/lib/python3.10/site-packages/pandas/io/common.py:124, in IOHandles.__exit__(self, *args)
123 def __exit__(self, *args: Any) -> None:
--> 124 self.close()
File /databricks/python/lib/python3.10/site-packages/pandas/io/common.py:116, in IOHandles.close(self)
114 self.created_handles.remove(self.handle)
115 for handle in self.created_handles:
--> 116 handle.close()
117 self.created_handles = []
118 self.is_wrapped = False
OSError: [Errno 95] Operation not supported
错误“OSError: [Errno 95] Operation not supported”在 Databricks 中意味着你尝试在一个不支持该操作的文件系统上执行追加写入操作。虽然 Databricks 文件系统支持读取和写入
dbfs:/FileStore
路径,但它不支持直接追加。
你不应该在分布式环境(例如 Spark 和 Databricks)中,循环迭代并将数据帧片段附加到单个文件中。这会导致性能低下,并且如你所见,还可能导致错误。
以下是更好的方法:
1. 使用
coalesce
或
repartition
将 Spark 数据帧收集到单个文件中:
from pyspark.sql.functions import spark_partition_id
# ...你的读取 parquet 文件的代码...
# 合并到单个分区以确保单个输出文件
df = df.coalesce(1)
# 可选:如果需要特定数量的文件,请使用 repartition
# df = df.repartition(4) # 创建 4 个输出文件
df.write.csv('/dbfs/FileStore/raw/billion-row-ingestion-time/b.csv', mode='overwrite', header=True)
此方法使用 Spark 将所有数据写入单个 CSV 文件。
coalesce(1)
确保写入单个文件,
mode='overwrite'
确保每个迭代都覆盖现有文件。
2. 使用 Pandas 在驱动程序上收集数据(如果数据较小):
import pandas as pd
# ...你的读取 parquet 文件的代码...
pandas_df = df.toPandas() # 收集到驱动程序
pandas_df.to_csv('/dbfs/FileStore/raw/billion-row-ingestion-time/b.csv', mode='a', header=not os.path.exists('/dbfs/FileStore/raw/billion-row-ingestion-time/b.csv'))
此方法首先使用
toPandas()
将整个数据帧收集到驱动程序节点。
注意:仅当数据适合驱动程序节点的内存时才应使用此方法。
其他提示:
-
避免在循环中对大型数据集执行
toPandas()
,因为它可能会导致内存不足错误。 - 考虑使用 Spark 的内置文件格式(例如 Parquet 或 Delta Lake)来获得更好的性能和可扩展性,尤其是在处理大型数据集时。
请记住,选择最适合你的需求和数据大小的方法。
标签:python,pandas,csv,databricks From: 78284654