我正在尝试同步多个线程。我期望使用 threading.Condition 和 threading.Barrier 时的脚本输出大致相同,但事实并非如此。请解释一下为什么会发生这种情况。
一般来说,我需要线程在一个 无限循环中执行工作(一些IO操作) ,但是每个循环都是以主线程的权限开始的,而权限是仅在所有线程完成前一个周期后给出。
脚本 1
from threading import Barrier, Thread
from time import sleep, time
br = Barrier(3)
store = []
def f1():
while True:
br.wait()
sleep(1)
print("Calc part1")
def f2():
while True:
br.wait()
sleep(1)
print("Calc part2")
Thread(target=f1).start()
Thread(target=f2).start()
for i in range(10):
br.wait()
print(f'end iter {i}')
print(f'-------------')
预期行为
ent iter 0
-------------
Calc part1
Calc part2
ent iter 1
-------------
Calc part2
Calc part1
ent iter 2
-------------
Calc part1
...
脚本 2
from threading import Condition, Thread
from time import sleep
condition = False
cv = Condition()
def predicate():
return condition
def f1():
for i in range(3):
with cv:
cv.wait_for(predicate)
sleep(1)
print("Calc part1")
def f2():
for i in range(3):
with cv:
cv.wait_for(predicate)
sleep(1)
print("Calc part2")
Thread(target=f1).start()
Thread(target=f2).start()
with cv:
condition = True
cv.notify_all()
意外行为
Calc part1
Calc part1
Calc part1
Calc part2
Calc part2
Calc part2
为什么线程的标准输出不混合在第二个脚本的结果中?
第二个脚本中的线程输出没有像预期的那样混合,因为
cv.notify_all()
不会在
with cv:
块结束之前实际释放锁
。这意味着第一个获得 CPU 时间片的线程(在的情况下似乎是
f1
)将在释放锁并允许
f2
继续运行之前完成其所有 3 次迭代。
以下是更详细的解释:
-
获取锁:
当
f1
和f2
到达with cv:
块时,它们都会尝试获取cv
的底层锁。只有一个线程可以一次获取锁。 -
等待条件:
获取锁的线程会调用
cv.wait_for(predicate)
。由于condition
最初设置为False
,因此两个线程都会在锁释放的情况下被阻塞并等待。 -
主线程通知:
主线程进入其
with cv:
块,获取锁,设置condition = True
,并调用cv.notify_all()
。这将唤醒所有正在等待的线程(f1
和f2
)。 -
竞争锁:
f1
和f2
现在都在竞争获取锁。但是,cv.notify_all()
不会立即释放锁。 它只是标记线程在下次获取锁时可以继续运行。主线程仍然持有锁。 -
f1
继续运行: 在的情况下,f1
首先获取锁(这取决于线程调度)。由于condition
现在是True
,因此cv.wait_for(predicate)
立即返回,f1
执行其循环中的三轮操作,打印三次 “Calc part1”。重要的是, 在整个过程中,f1
一直持有锁 。 -
f2
获取锁: 只有在f1
释放锁(通过退出其with cv:
块)后,f2
才最终有机会获取锁并运行。然后它也执行其三轮操作,打印三次 “Calc part2”。
要实现类似于 Barrier 的行为,需要在每次迭代后通知线程并使用循环来处理潜在的虚假唤醒。
以下是修改后的代码:
from threading import Condition, Thread
from time import sleep
condition = False
cv = Condition()
def predicate():
return condition
def f1():
for i in range(3):
with cv:
while not predicate(): # 使用循环处理虚假唤醒
cv.wait()
sleep(1)
print("Calc part1")
cv.notify_all() # 通知其他线程
def f2():
for i in range(3):
with cv:
while not predicate(): # 使用循环处理虚假唤醒
cv.wait()
sleep(1)
print("Calc part2")
cv.notify_all() # 通知其他线程
Thread(target=f1).start()
Thread(target=f2).start()
for i in range(3):
with cv:
condition = True
cv.notify_all()
condition = False # 重置条件以便下次迭代
在这个版本中,每个线程在完成其工作后都会通知其他线程,并且循环确保只有在满足条件时才会继续运行。主线程负责在每次迭代后重置条件。
请注意,尽管此修改后的代码可以实现类似于 Barrier 的行为,但在实际应用程序中使用 Barrier 通常更简单、更不容易出错。
标签:python,multithreading,synchronization,infinite-loop From: 78780758