我有一个问题要问对 Python 的 多处理 库有更多经验的人,此时我几乎迷失了方向。
我目前正在构建一个应该在 Windows 11 和 Windows 11 上运行的图像处理应用程序装有 Debian Linux 的 OrangePi5。我的设置是,除了主程序之外,还有另外两个进程,一个用于处理不间断的 按钮输入 和其他IO,另一个用于将 相机的功能 与应用程序的其余部分分开。
保存这些进程的两个类的多处理核心定义相同:
-
共有三个队列。
- 数据队列 保存一个元组,其中包含该类的主要输出。数据定期放入其中,并且在任何给定点最多有一个元组。主程序要么获取现有内容,要么使用默认值。
- 配置队列 用于将带有配置方法名称和相应参数的元组发送到工作进程。
- 状态队列 用于获取确认来自工作进程的配置设置。
- 无论操作系统如何,多处理启动方法始终设置为 spawn
-
两个类都有 start_process 和 stop_process 方法。
- start 方法初始化与多处理相关的字段,并使用 init 事件启动工作线程。
- stop 方法设置一个停止事件,尝试加入工作线程,如果不起作用,它会终止它并递归调用自身以记录有关是否加入工作线程的详细信息以及进程是如何停止的。
问题是,即使按钮进程加入没有问题(当我将加入超时设置为 0.1 秒或更大时),相机始终必须终止,无论我使用多长时间使加入超时,它永远不会加入。我在想,由于相机的响应有时可能非常耗时,因此可能需要很长时间才能识别已设置停止事件,但我多次成功地捕捉到正在停止的进程join 仍然失败。
在两个操作系统上,无论是单独使用类还是相互使用类,都会发生同样的情况。我开始认为我在不知不觉中阻塞了一些资源,使相机工作人员无法加入,但我不知道如何解决这个问题。按钮发送不同类型的不可变值,相机发送一个或两个 np.array 图像(它将它们作为 image.copy() 获取)以及 None 或 Exception。
任何微移在正确的方向上,无论是到底是什么导致了这种情况,还是我如何查明到底是什么阻止了加入过程,将不胜感激。预先感谢!
按钮进程(加入没有问题)
def _worker_process(self) -> None:
try:
while not self._stop_event.is_set():
if not self._is_parent_alive():
break
if not self._config_queue.empty():
config_message: tuple = self._config_queue.get()
self._apply_config(config_message)
is_pressed: bool = self._button.check_if_pressed()
button_state_from_info, time_until_long_press = self._button.get_button_info(is_pressed)
# Determine if the state should be updated
state_idx: int = self._button_state_map.get(button_state_from_info, -1)
update_state: bool = False
# Button state pattern matching and state change record update here
...
self._clear_queue(self._data_queue)
self._data_queue_send(self._worker_but_st, time_until_long_press, st_ch_rec_tuple)
time.sleep(0.05)
except Exception:
full_traceback = traceback.format_exc()
self._clear_queue(self._data_queue)
self._data_queue_send(exception=full_traceback)
相机进程(似乎无法加入,无论我等多久)
def _worker_process(self) -> None:
try:
while not self._stop_event.is_set():
if not self._is_parent_alive():
break
if not self._config_queue.empty():
config_message: tuple = self._config_queue.get()
self._apply_config(config_message)
# Capture image(s)
self.camera.capture()
# Get the latest captured images based on exposure mode
if self.camera.exp_mode == ExposureMode.SINGLE_EXP:
latest_img_loexp = self.camera.get_last_img().copy()
latest_img_hiexp = None
elif self.camera.exp_mode == ExposureMode.DOUBLE_EXP:
latest_img_loexp = self.camera.get_last_img_loexp().copy()
latest_img_hiexp = self.camera.get_last_img_hiexp().copy()
else:
raise ValueError(f"Exposure mode ({self.camera.exp_mode}) either invalid or not recognised.")
self._clear_queue(self._data_queue)
self._data_queue_send(latest_img_loexp, latest_img_hiexp, None)
time.sleep(0.05)
except Exception:
full_traceback = traceback.format_exc()
self._clear_queue(self._data_queue)
self._data_queue_send(exception=full_traceback)
stop_process 方法(两个类相同)|| |如果它可以帮助判断问题所在,我可以提供测试文件、测试日志或定义“self.camera”的类的相关部分。
def stop_process(self, is_recursive_call=False) -> None:
self._stop_event.set()
if self._process is not None and self._process.is_alive():
try:
self._process.join(timeout=0.2)
# If the process is still alive and this is not a recursive call, terminate and retry
if self._process.is_alive() and not is_recursive_call:
print(f"{self.CLASS_NAME}: Failed to join worker process, terminating.")
self._process.terminate()
self.stop_process(is_recursive_call=True)
else:
self._report_on_stop_attempt()
except KeyboardInterrupt as e:
print(f"{self.CLASS_NAME}: KeyboardInterrupt in process joining, joining failed.")
raise e
elif self._process and not self._process.is_alive():
print(f"{self.CLASS_NAME}: Unexpected stop behavior - process was not alive "
f"on 'stop_process' ({self._process=}).")
else:
print(f"{self.CLASS_NAME}: Unexpected stop behavior - process was None.")
If it could help telling what's wrong with this, I can provide the test files, test logs or relevant parts of the class that defines 'self.camera'.
代码中似乎存在几个潜在问题,这些问题可能会导致相机进程无法正确加入:
-
相机捕获循环中的阻塞操作: 相机进程中的
self.camera.capture()
调用可能会阻塞,从而阻止进程在接收到停止事件后及时响应。这可能会导致join()
调用超时,即使工作线程实际上已收到停止信号。 -
队列中的大型数据对象: 提到相机进程通过队列发送“一个或两个
np.array
图像”。如果这些图像很大,则将它们放入队列和从队列中取出可能会很耗时。这可能会导致进程停止时出现延迟,从而导致join()
调用超时。 -
工作线程中的异常: 如果工作线程中的
try...except
块内发生异常,则该进程可能会以静默方式退出,而不会通知主进程。这会导致join()
调用永远挂起,因为它期望工作线程正常退出。
建议的解决方案:
-
使相机捕获非阻塞: 调查的相机库是否提供非阻塞方式来捕获图像。这可能涉及使用回调或事件来发出捕获图像的信号,而不是阻塞直到完成。
-
使用共享内存以获得更大的数据对象: 对于大型数据对象(如图像),请考虑使用共享内存机制,而不是通过队列发送它们。这可以显着提高性能并减少进程间通信的开销。Python 的
multiprocessing.shared_memory
模块可以帮助实现这一点。 -
在工作线程中实现适当的错误处理: 确保的工作线程中的
try...except
块捕获到所有潜在的异常,并记录或以其他方式向主进程报告任何错误。这将帮助诊断导致进程无法正确退出的任何问题。 -
在工作线程循环中更频繁地检查停止事件: 在工作线程循环中更频繁地检查
self._stop_event.is_set()
,尤其是在执行长时间运行的任务(如图像捕获)之后。这将允许工作线程更快地响应停止事件并更快地退出。 -
增加
join()
超时: 虽然这不是理想的解决方案,但可以尝试增加join()
调用的超时时间,以便为相机进程提供更多时间来干净地关闭。但是,这应该是一个临时解决方案,应该努力解决导致进程在第一时间无法及时退出的根本原因。
通过实施这些更改,可以提高相机进程正确响应停止事件并成功加入主进程的可能性,从而避免需要强制终止它。
标签:python,linux,windows,multiprocessing From: 78832175