我目前正在开发一个多处理 Python 程序,其中每个进程将其索引作为连续的 4 字节整数写入共享内存。并且有一个读取器可以在没有任何锁的情况下读取其他进程的索引。因为我没有使用任何同步原语,所以我担心读取器进程可能会由于逐字节写入内存而读取损坏的数据(例如,一个索引的前 2 个字节和另一个索引的最后 2 个字节)。|| |阅读以前的索引对我来说不是问题;我只需要确保读取索引在历史上有效。
我观察到,在 macOS 上,不会发生此问题(可能是由于 Python 或 macOS 特定的行为)。但是,该程序也需要在其他操作系统(例如 Windows 和 Ubuntu)上运行。
我在下面创建了一个小示例。
任何确保数据完整性的建议或示例将不胜感激!
import time
import random
import struct
import multiprocessing
from multiprocessing import shared_memory
def writer(rank, shared_memory):
index = 0
while True:
byte_array = struct.pack("i", index)
shared_memory.buf[rank * 4: rank * 4 + 4] = byte_array
index += 1
time.sleep(random.random())
def reader(shared_memory, size):
while True:
indexes = struct.unpack(f"{size}i", shared_memory.buf[:size*4])
print(f"Read indexes: {indexes}")
time.sleep(1)
if __name__ == "__main__":
size = 10
shared_memory = shared_memory.SharedMemory(
create=True, size= 4 * size
)
ctx = multiprocessing.get_context("spawn")
processes = []
for i in range(size):
p = ctx.Process(target=writer, args=(i, shared_memory))
processes.append(p)
p.start()
reader_process = multiprocessing.Process(target=reader, args=(shared_memory, size))
reader_process.start()
for p in processes:
p.join()
reader_process.join()
Any advice or examples to ensure data integrity would be appreciated!
你绝对正确地担心在没有同步原语的情况下从共享内存中读取数据会出现数据损坏。即使在 macOS 上一切看起来正常运行,你的代码也存在竞争条件,最终会在 macOS 或任何其他操作系统上导致错误的读取。
问题在于,读取器进程可以捕获写入器进程只写入了其 4 字节索引中的一部分,从而导致读取的值损坏。
为了解决这个问题,你需要在写入器和读取器之间引入同步机制。以下是如何使用
multiprocessing.Lock
来确保数据完整性:
import time
import random
import struct
import multiprocessing
from multiprocessing import shared_memory
def writer(rank, shared_memory, lock):
index = 0
while True:
byte_array = struct.pack("i", index)
with lock: # Acquire lock before writing
shared_memory.buf[rank * 4: rank * 4 + 4] = byte_array
index += 1
time.sleep(random.random())
def reader(shared_memory, size, lock):
while True:
with lock: # Acquire lock before reading
indexes = struct.unpack(f"{size}i", shared_memory.buf[:size*4])
print(f"Read indexes: {indexes}")
time.sleep(1)
if __name__ == "__main__":
size = 10
shared_memory = shared_memory.SharedMemory(create=True, size= 4 * size)
lock = multiprocessing.Lock() # Create a lock
ctx = multiprocessing.get_context("spawn")
processes = []
for i in range(size):
p = ctx.Process(target=writer, args=(i, shared_memory, lock)) # Pass the lock to the writer
processes.append(p)
p.start()
reader_process = multiprocessing.Process(target=reader, args=(shared_memory, size, lock)) # Pass the lock to the reader
reader_process.start()
for p in processes:
p.join()
reader_process.join()
在这个修改后的代码中:
-
我们创建了一个
multiprocessing.Lock
对象,并在写入器和读取器进程之间共享它。 - 写入器进程在更新共享内存之前获取锁,并在完成后释放锁。
- 读取器进程在从共享内存读取数据之前获取锁,并在完成后释放锁。
这可以确保读取器进程永远不会在写入器进程更新 4 字节索引的过程中读取共享内存,从而防止出现数据损坏并保持数据完整性。
还应注意的是,虽然这种基于锁的方法可以有效地防止数据损坏,但它可能会给你的代码带来一些开销并影响性能,尤其是在争用锁的情况下。如果你需要一个性能更高的解决方案,你可能需要研究其他方法,例如无锁数据结构或原子操作,但这些方法的实现可能更加复杂。
标签:python,multiprocessing,data-integrity From: 78805755