阅读 pyarrow IPC 文档后,我的印象是 RecordBatchStreamReader 会读取流直到完成(最后的 0-s,是通过关闭流编写器写入的)。
但是我看到 reader 很快就停止了,这不是我期望看到的。
下面的示例开始在一个进程中写入文件流,并在延迟后开始在另一个进程中读取该文件流。我希望读者能够读取作者编写的所有批次,但它只读取第一批并退出。
我怎样才能
reader()
读取
writer()
写入的所有内容?添加
sink.flush()
是为了检查它是否有效 - 它不能,无论有没有它,代码的行为都是相同的。
import pyarrow as pa
import time
import multiprocessing as mp
import os
schema = pa.schema([("value", pa.int32())])
if os.path.exists("/tmp/stream.arrow"):
os.remove("/tmp/stream.arrow")
def writer():
print("writer started")
with pa.OSFile("/tmp/stream.arrow", "wb") as sink:
with pa.ipc.new_stream(sink, schema) as writer:
for i in range(100):
writer.write_batch(pa.RecordBatch.from_pylist([{"value": i}], schema))
print(f"writer wrote {i}")
time.sleep(0.25)
sink.flush()
print("writer done")
def reader():
print("reader started")
with pa.OSFile("/tmp/stream.arrow", "r") as source:
with pa.ipc.open_stream(source) as reader:
try:
while True:
batch = reader.read_next_batch()
print(batch)
except StopIteration:
print("reader done")
return
write_process = mp.Process(target=writer)
read_process = mp.Process(target=reader)
write_process.start()
time.sleep(2)
read_process.start()
write_process.join()
read_process.join()
示例输出
writer started
writer wrote 0
writer wrote 1
writer wrote 2
writer wrote 3
reader started
pyarrow.RecordBatch
value: int32
----
value: [0]
pyarrow.RecordBatch
value: int32
----
value: [1]
pyarrow.RecordBatch
value: int32
----
value: [2]
pyarrow.RecordBatch
value: int32
----
value: [3]
reader done
writer wrote 4
writer wrote 5
writer wrote 6
writer wrote 7
问题在于正在使用缓冲文件 I/O,并且缓冲区在写入进程退出之前没有被完全刷新到磁盘。
可以通过以下两种方法之一解决此问题:
1. 在写入进程中显式刷新缓冲区:
可以通过在每次写入后调用
sink.flush()
来手动刷新缓冲区。已经在代码中这样做了,但是需要在
writer.write_batch()
之后
立即执行此操作,以确保数据被写入文件:
def writer():
print("writer started")
with pa.OSFile("/tmp/stream.arrow", "wb") as sink:
with pa.ipc.new_stream(sink, schema) as writer:
for i in range(100):
writer.write_batch(pa.RecordBatch.from_pylist([{"value": i}], schema))
sink.flush() # 在这里刷新缓冲区
print(f"writer wrote {i}")
time.sleep(0.25)
print("writer done")
2. 禁用缓冲 I/O:
可以通过在打开
OSFile
时将
buffering=0
传递给它来禁用缓冲 I/O。这将强制每次写入都直接写入磁盘:
def writer():
print("writer started")
with pa.OSFile("/tmp/stream.arrow", "wb", buffering=0) as sink: # 禁用缓冲
with pa.ipc.new_stream(sink, schema) as writer:
for i in range(100):
writer.write_batch(pa.RecordBatch.from_pylist([{"value": i}], schema))
print(f"writer wrote {i}")
time.sleep(0.25)
print("writer done")
通过这两种方法中的任何一种,读取进程现在应该能够读取写入进程写入的所有批次。
标签:python,pyarrow From: 78791134