首页 > 编程问答 >Pandas to_csv 无法附加到 Databricks 云中的文件(OSError Errno 95 不支持操作)

Pandas to_csv 无法附加到 Databricks 云中的文件(OSError Errno 95 不支持操作)

时间:2024-07-30 12:29:15浏览次数:8  
标签:python pandas csv databricks

为什么我无法使用下面的代码覆盖文件?
我希望创建文件 b.csv,然后附加每次迭代。虽然我给出 mode='a' (追加),但不知何故我可能会创建文件但不追加到它。

files = dbutils.fs.ls("/mnt/lake/RAW/test/billion-row-ingestion-time/table/")
parquet_file_list = [each.path for each in files if each.name!='_delta_log/']

for each in parquet_file_list:
    i=0
    df = spark.read.parquet(each).toPandas()
    df.to_csv('/dbfs/FileStore/raw/billion-row-ingestion-time/b.csv', mode='a')
    print("interation: ", i+1)

输出

OSError: [Errno 95] Operation not supported
---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
File /databricks/python/lib/python3.10/site-packages/pandas/io/formats/csvs.py:261, in CSVFormatter.save(self)
    251 self.writer = csvlib.writer(
    252     handles.handle,
    253     lineterminator=self.line_terminator,
   (...)
    258     quotechar=self.quotechar,
    259 )
--> 261 self._save()

File /databricks/python/lib/python3.10/site-packages/pandas/io/formats/csvs.py:266, in CSVFormatter._save(self)
    265     self._save_header()
--> 266 self._save_body()

File /databricks/python/lib/python3.10/site-packages/pandas/io/formats/csvs.py:304, in CSVFormatter._save_body(self)
    303     break
--> 304 self._save_chunk(start_i, end_i)

File /databricks/python/lib/python3.10/site-packages/pandas/io/formats/csvs.py:315, in CSVFormatter._save_chunk(self, start_i, end_i)
    314 ix = self.data_index[slicer]._format_native_types(**self._number_format)
--> 315 libwriters.write_csv_rows(
    316     data,
    317     ix,
    318     self.nlevels,
    319     self.cols,
    320     self.writer,
    321 )

File /databricks/python/lib/python3.10/site-packages/pandas/_libs/writers.pyx:55, in pandas._libs.writers.write_csv_rows()

OSError: [Errno 95] Operation not supported

During handling of the above exception, another exception occurred:

OSError                                   Traceback (most recent call last)
File <command-1964363491723333>, line 4
      2 i=0
      3 df = spark.read.parquet(each).toPandas()
----> 4 df.to_csv('/dbfs/FileStore/raw/billion-row-ingestion-time/b.csv', mode='a')
      5 print("interation: ", i+1)

File /databricks/python/lib/python3.10/site-packages/pandas/core/generic.py:3551, in NDFrame.to_csv(self, path_or_buf, sep, na_rep, float_format, columns, header, index, index_label, mode, encoding, compression, quoting, quotechar, line_terminator, chunksize, date_format, doublequote, escapechar, decimal, errors, storage_options)
   3540 df = self if isinstance(self, ABCDataFrame) else self.to_frame()
   3542 formatter = DataFrameFormatter(
   3543     frame=df,
   3544     header=header,
   (...)
   3548     decimal=decimal,
   3549 )
-> 3551 return DataFrameRenderer(formatter).to_csv(
   3552     path_or_buf,
   3553     line_terminator=line_terminator,
   3554     sep=sep,
   3555     encoding=encoding,
   3556     errors=errors,
   3557     compression=compression,
   3558     quoting=quoting,
   3559     columns=columns,
   3560     index_label=index_label,
   3561     mode=mode,
   3562     chunksize=chunksize,
   3563     quotechar=quotechar,
   3564     date_format=date_format,
   3565     doublequote=doublequote,
   3566     escapechar=escapechar,
   3567     storage_options=storage_options,
   3568 )

File /databricks/python/lib/python3.10/site-packages/pandas/io/formats/format.py:1180, in DataFrameRenderer.to_csv(self, path_or_buf, encoding, sep, columns, index_label, mode, compression, quoting, quotechar, line_terminator, chunksize, date_format, doublequote, escapechar, errors, storage_options)
   1159     created_buffer = False
   1161 csv_formatter = CSVFormatter(
   1162     path_or_buf=path_or_buf,
   1163     line_terminator=line_terminator,
   (...)
   1178     formatter=self.fmt,
   1179 )
-> 1180 csv_formatter.save()
   1182 if created_buffer:
   1183     assert isinstance(path_or_buf, StringIO)

File /databricks/python/lib/python3.10/site-packages/pandas/io/formats/csvs.py:241, in CSVFormatter.save(self)
    237 """
    238 Create the writer & save.
    239 """
    240 # apply compression and byte/text conversion
--> 241 with get_handle(
    242     self.filepath_or_buffer,
    243     self.mode,
    244     encoding=self.encoding,
    245     errors=self.errors,
    246     compression=self.compression,
    247     storage_options=self.storage_options,
    248 ) as handles:
    249 
    250     # Note: self.encoding is irrelevant here
    251     self.writer = csvlib.writer(
    252         handles.handle,
    253         lineterminator=self.line_terminator,
   (...)
    258         quotechar=self.quotechar,
    259     )
    261     self._save()

File /databricks/python/lib/python3.10/site-packages/pandas/io/common.py:124, in IOHandles.__exit__(self, *args)
    123 def __exit__(self, *args: Any) -> None:
--> 124     self.close()

File /databricks/python/lib/python3.10/site-packages/pandas/io/common.py:116, in IOHandles.close(self)
    114     self.created_handles.remove(self.handle)
    115 for handle in self.created_handles:
--> 116     handle.close()
    117 self.created_handles = []
    118 self.is_wrapped = False

OSError: [Errno 95] Operation not supported

错误“OSError: [Errno 95] Operation not supported”在 Databricks 中意味着你尝试在一个不支持该操作的文件系统上执行追加写入操作。虽然 Databricks 文件系统支持读取和写入 dbfs:/FileStore 路径,但它不支持直接追加。

你不应该在分布式环境(例如 Spark 和 Databricks)中,循环迭代并将数据帧片段附加到单个文件中。这会导致性能低下,并且如你所见,还可能导致错误。

以下是更好的方法:

1. 使用 coalesce repartition 将 Spark 数据帧收集到单个文件中:

from pyspark.sql.functions import spark_partition_id

# ...你的读取 parquet 文件的代码...

# 合并到单个分区以确保单个输出文件
df = df.coalesce(1) 

# 可选:如果需要特定数量的文件,请使用 repartition
# df = df.repartition(4) # 创建 4 个输出文件

df.write.csv('/dbfs/FileStore/raw/billion-row-ingestion-time/b.csv', mode='overwrite', header=True)

此方法使用 Spark 将所有数据写入单个 CSV 文件。 coalesce(1) 确保写入单个文件, mode='overwrite' 确保每个迭代都覆盖现有文件。

2. 使用 Pandas 在驱动程序上收集数据(如果数据较小):

import pandas as pd

# ...你的读取 parquet 文件的代码...

pandas_df = df.toPandas()  # 收集到驱动程序
pandas_df.to_csv('/dbfs/FileStore/raw/billion-row-ingestion-time/b.csv', mode='a', header=not os.path.exists('/dbfs/FileStore/raw/billion-row-ingestion-time/b.csv'))

此方法首先使用 toPandas() 将整个数据帧收集到驱动程序节点。 注意:仅当数据适合驱动程序节点的内存时才应使用此方法。

其他提示:

  • 避免在循环中对大型数据集执行 toPandas() ,因为它可能会导致内存不足错误。
  • 考虑使用 Spark 的内置文件格式(例如 Parquet 或 Delta Lake)来获得更好的性能和可扩展性,尤其是在处理大型数据集时。

请记住,选择最适合你的需求和数据大小的方法。

标签:python,pandas,csv,databricks
From: 78284654

相关文章

  • 从零开始的Python开发日记(7):短信验证功能开发流程
    短信验证功能开发流程在开发一个包含登录、注册以及短信验证的功能时,你需要遵循一个系统的开发流程。以下是实现这一功能的基本步骤,包括所需的技术和代码示例。1.环境配置首先,确保你的开发环境已经配置好,并安装了必要的库和工具。pipinstallfastapiuvicornsqlalche......
  • 【Python数值分析】革命:引领【数学建模】新时代的插值与拟合前沿技术
    目录​编辑第一部分:插值的基本原理及应用1.插值的基本原理1.1插值多项式1.2拉格朗日插值 1.3牛顿插值 1.4样条插值2.插值的Python实现2.1使用NumPy进行插值2.2使用SciPy进行插值2.2.1一维插值​编辑2.2.2二维插值3.插值的应用场景3.1数据平......
  • 在家用电脑上设置 Python 和 Jupyter,尝试打开 Jupyter 笔记本并显示错误,无法获取
    我有最新的Python版本3.12.4和以下版本的Jupyter:SelectedJupytercorepackages...IPython:8.26.0ipykernel:6.29.5ipywidgets:notinstalledjupyter_client:8.6.2jupyter_core:5.7.2jupyter_server:2.14.2jupyterlab......
  • Python - Reloading a module
    Eachmoduleisloadedintomemoryonlyonceduringaninterpretersessionorduringaprogramrun,regardlessofthenumberoftimesitisimportedintoaprogram.Ifmultipleimportsoccur,themodule’scodewillnotbeexecutedagainandagain.Suppose......
  • vscode python 3.7 pylance debugpy 插件 vsix
    可能报错  crashed5timesinthelast3minutes.Theserverwillnotberestarted.  ---pylance 可能报错  cannotreadpropertiesofundefinedreadingresolveEnvironment   --- debugger可能      vscodepython3.7调试没有反应......
  • Python获取秒级时间戳与毫秒级时间戳的方法[通俗易懂]
    参考资料:https://cloud.tencent.com/developer/article/21581481、获取秒级时间戳与毫秒级时间戳、微秒级时间戳代码语言:javascript复制importtimeimportdatetimet=time.time()print(t)#原始时间数据print(int(t))......
  • CEFPython
    在Tkinter界面中直接嵌入Selenium的浏览器视图并不是一件直接的事情,因为Selenium本身并不提供图形界面嵌入的功能。Selenium主要用于自动化web浏览器,但它并不直接控制浏览器窗口的显示方式,而是依赖于WebDriver来与浏览器交互。然而,你可以使用一些替代方案来在Tkinter应用中模拟或......
  • 《最新出炉》系列初窥篇-Python+Playwright自动化测试-58 - 文件下载
    1.简介前边几篇文章讲解完如何上传文件,既然有上传,那么就可能会有下载文件。因此宏哥就接着讲解和分享一下:自动化测试下载文件。可能有的小伙伴或者童鞋们会觉得这不是很简单吗,还用你介绍和讲解啊,不说就是访问到下载页面,然后定位到要下载的文件的下载按钮后,点击按钮就可以了。其实......
  • Python - Function Annotations
     deffunc(s:str,i:int,j:int)->str:returns[i:j]Theparametersissupposedtobeastring,soweplaceacolonaftertheparameternameandthenwritestr.Parametersiandjaresupposedtobeintegerssowewriteintforthem.Returntypeis......
  • 使用带有 pythonKit XCODE 的嵌入式 Python,在 iOS 应用程序中与 OpenCV-python 签名不
    我根据Beewares使用指南在XCODE中将Python嵌入到我的iOS项目中https://github.com/beeware/Python-Apple-support/blob/main/USAGE.md运行时,我得到pythonKit找不到由ultralytics导入的cv2错误。当我将OpenCV-python添加到我的app_packages文件夹时......