首页 > 编程问答 >pyarrow ipc 流 - 如何使用它进行通信?

pyarrow ipc 流 - 如何使用它进行通信?

时间:2024-07-25 09:32:07浏览次数:14  
标签:python pyarrow

阅读 pyarrow IPC 文档后,我的印象是 RecordBatchStreamReader 会读取流直到完成(最后的 0-s,是通过关闭流编写器写入的)。

但是我看到 reader 很快就停止了,这不是我期望看到的。

下面的示例开始在一个进程中写入文件流,并在延迟后开始在另一个进程中读取该文件流。我希望读者能够读取作者编写的所有批次,但它只读取第一批并退出。

我怎样才能 reader() 读取 writer() 写入的所有内容?添加 sink.flush() 是为了检查它是否有效 - 它不能,无论有没有它,代码的行为都是相同的。

import pyarrow as pa
import time
import multiprocessing as mp
import os

schema = pa.schema([("value", pa.int32())])

if os.path.exists("/tmp/stream.arrow"):
    os.remove("/tmp/stream.arrow")


def writer():
    print("writer started")
    with pa.OSFile("/tmp/stream.arrow", "wb") as sink:
        with pa.ipc.new_stream(sink, schema) as writer:
            for i in range(100):
                writer.write_batch(pa.RecordBatch.from_pylist([{"value": i}], schema))
                print(f"writer wrote {i}")
                time.sleep(0.25)
                sink.flush()
    print("writer done")


def reader():
    print("reader started")
    with pa.OSFile("/tmp/stream.arrow", "r") as source:
        with pa.ipc.open_stream(source) as reader:
            try:
                while True:
                    batch = reader.read_next_batch()
                    print(batch)
            except StopIteration:
                print("reader done")
                return


write_process = mp.Process(target=writer)
read_process = mp.Process(target=reader)

write_process.start()
time.sleep(2)
read_process.start()

write_process.join()
read_process.join()

示例输出

writer started
writer wrote 0
writer wrote 1
writer wrote 2
writer wrote 3
reader started
pyarrow.RecordBatch
value: int32
----
value: [0]
pyarrow.RecordBatch
value: int32
----
value: [1]
pyarrow.RecordBatch
value: int32
----
value: [2]
pyarrow.RecordBatch
value: int32
----
value: [3]
reader done
writer wrote 4
writer wrote 5
writer wrote 6
writer wrote 7

问题在于正在使用缓冲文件 I/O,并且缓冲区在写入进程退出之前没有被完全刷新到磁盘。

可以通过以下两种方法之一解决此问题:

1. 在写入进程中显式刷新缓冲区:

可以通过在每次写入后调用 sink.flush() 来手动刷新缓冲区。已经在代码中这样做了,但是需要在 writer.write_batch() 之后 立即执行此操作,以确保数据被写入文件:

def writer():
    print("writer started")
    with pa.OSFile("/tmp/stream.arrow", "wb") as sink:
        with pa.ipc.new_stream(sink, schema) as writer:
            for i in range(100):
                writer.write_batch(pa.RecordBatch.from_pylist([{"value": i}], schema))
                sink.flush() # 在这里刷新缓冲区
                print(f"writer wrote {i}")
                time.sleep(0.25)
    print("writer done")

2. 禁用缓冲 I/O:

可以通过在打开 OSFile 时将 buffering=0 传递给它来禁用缓冲 I/O。这将强制每次写入都直接写入磁盘:

def writer():
    print("writer started")
    with pa.OSFile("/tmp/stream.arrow", "wb", buffering=0) as sink:  # 禁用缓冲
        with pa.ipc.new_stream(sink, schema) as writer:
            for i in range(100):
                writer.write_batch(pa.RecordBatch.from_pylist([{"value": i}], schema))
                print(f"writer wrote {i}")
                time.sleep(0.25)
    print("writer done")

通过这两种方法中的任何一种,读取进程现在应该能够读取写入进程写入的所有批次。

标签:python,pyarrow
From: 78791134

相关文章

  • 如何在 Mac 上运行 Python 文件来读取 txt 文件并将其写入外部硬盘?
    我目前有一个充满了我想阅读的epub的文件夹,一个我已经阅读过并想再次阅读的epub的文件夹,以及一个相应的文件,其中每个文件都有epub文件的名称。问题是,这些文件夹仅位于我的外部硬盘上。我想要做的是让我的脚本解析这些文件夹中的epub列表,并在我的下载文件夹中创建最新的副......
  • 深入探索:使用Python进行网站数据加载逻辑分析与请求
    作为一名资深的Python程序员,我经常需要从网站中提取数据以供分析或进一步处理。这项任务涉及到对网站数据加载逻辑的深入分析,以及使用Python进行高效的网络请求。在本文中,我将分享如何分析网站的数据加载方式,并使用Python的requests库来模拟浏览器行为,获取所需的数据。网站......
  • 如何将 Python 列表添加到 Excel 中已有值的列的末尾?
    我目前正在尝试编写一个程序,将值附加到列表中,然后将这些值添加到Excel数据表中的列中。每次运行该程序时,我都希望在同一列的末尾添加更多值。所以我不确定如何解决这个问题,而且我在网上找到的其他答案也没有取得多大成功。以下是使用openpyxl库在Python中将......
  • 如何学习Python:糙快猛的大数据之路(学习地图)
    在这个AI和大数据主宰的时代,Python无疑是最炙手可热的编程语言之一。无论你是想转行还是提升技能,学习Python都是一个明智之选。但是,该如何开始呢?今天,让我们聊聊"糙快猛"的Python学习之道。什么是"糙快猛"学习法?"糙快猛"学习法,顾名思义,就是:糙:不追求完美,允许存......
  • Python 中 __get__ 方法的内部原理
    我正在摆弄描述符,结果碰壁了。我以为我可以像使用任何其他方法一样直接调用它,但显然,它似乎不一致或者我遗漏了一些东西。假设我有一个用作描述符的坐标类:|||还有一个Point类,它有2个坐标属性:classCoordinate:def__set_name__(self,owner,name):self._na......
  • 使用带有私钥的云前端生成签名 URL 的问题..使用 Python 3.7 为带有空格的 S3 对象生
    我在使用Python3.7为S3对象生成签名URL时遇到问题。具体来说,键中带有空格的对象的URL会导致“访问被拒绝”错误,而没有空格的对象的URL通常工作正常。但是,并非所有不带空格的对象都能正常工作,带空格的对象始终会失败。fromdatetimeimportdatetime,timedeltaimpo......
  • 有没有更好的方法来在存储库中的一组 python 程序之间共享公共代码
    当我想要快速、轻松地做许多不同的事情时,我会选择Python-即我总是会得到许多Python“程序”-例如一组脚本-或者如果我正在玩一些东西,一堆测试程序等-即始终是许多不同程序的松散集合。但是,我会分享某些内容。例如,如果我正在使用AI-我可能有30个左右完全不相......
  • 如何在Python中从两个不同长度的列表创建DataFrame,为第二个列表中的每个值重复第一个
    我是一个超级初学者,所以请耐心等待。我觉得这应该很容易,但我无法弄清楚。我不确定是否应该创建两个列表,然后将它们组合起来,或者是否有办法以这种方式直接创建DataFrame。我需要一列包含这些值:df=pd.DataFrame({'x1':np.linspace(-2.47,2.69,num=101)})然后我将值A......
  • Python multiprocessing.connection.Connection 的行为不符合规范
    根据python规范,recv()pythonConnection的方法,(从multiprocessing.Pipe()返回,当管道为空且管道的另一端关闭时抛出EOFError(这里参考:https://docs.python.org/3.9/library/multiprocessing.html#multiprocessing.connection.Connection.re......
  • 使用 python Flask 发送邮件中的图像
    我想发送一封包含html代码和图像的电子邮件但在gmail中它说图像已附加,我不想要这样,我只想要电子邮件正文中的图像。html_content=f"<imgsrc="cid:banner"alt=""style="width:80%;">"msg=MIMEMultipart('related')html_part=MIMEText(html_c......