这是 这个问题 的一个分支。 python 中的代码运行良好。当我尝试 cythonized 版本时,我开始收到“Can't pickle <cyfunction init_worker_processes at 0x7fffd7da5a00>”,即使我在顶层定义了 init_worker_processes 。因此,我将其移至另一个模块并使用导入的 init_worker_processes。现在,我收到以下错误:
error: unrecognized arguments: -s -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=8, pipe_handle=16) --multiprocessing-fork
Python3/lib/python3.9/multiprocessing/resource_tracker.py:96: UserWarning: resource_tracker: process died unexpectedly, relaunching. Some resources might leak.
warnings.warn('resource_tracker: process died unexpectedly, '
我没有明确使用错误中报告的
-s
或
-c
该错误来自多处理库中的以下代码(方法 -
ensure_running
)
warnings.warn('resource_tracker: process died unexpectedly, '
'relaunching. Some resources might leak.')
如何解决此问题?
# python code
import numpy as np
@dataclass
class TmpData:
name: str
value: int
def worker(name: str, data: TmpData) -> NoReturn:
# get existing logger which already has a stdout StreamHandler
logger = mp.get_logger()
logger.info(f"name: {name}; value: {data.value}")
if name == "XYZ":
raise RuntimeError("XYZ worker failed")
time.sleep(data.value)
def init_worker_processes() -> None:
# this only runs single time per each thread
logger = mp.log_to_stderr()
logger.setLevel(logging.INFO)
signal.signal(signal.SIGINT, signal.SIG_IGN)
if __name__ == "__main__":
map_data: Dict[str, TmpData] = {
key: TmpData(name=key, value=np.random.randint(5, 15))
for key in ["ABC", "DEF", "XYZ"]
}
main_logger = logging.getLogger()
with mp.get_context("spawn").Pool(
processes=2,
initializer=init_worker_processes,
) as pool:
results = []
for key in map_data:
try:
results.append(
pool.apply_async(
worker,
args=(
key,
map_data[key],
),
)
)
except KeyboardInterrupt:
pool.terminate()
pool.close()
pool.join()
for result in results:
try:
result.get()
except Exception as err:
main_logger.error(f"{err}")
请注意,主要问题似乎是“-s”和“-c”无法识别选项;不确定那些来自哪里。
问题在于你试图在多进程环境中使用 Cython 函数,而 Cython 函数不能直接在进程之间进行序列化。Python 的
multiprocessing
模块依赖于 pickle 协议来在进程之间传递数据,而 Cython 函数默认情况下并不可序列化。
为了解决这个问题,你需要将 Cython 函数包装在一个可以被序列化的 Python 函数中。你可以使用
functools.partial
函数来实现这一点。
以下是修改后的代码:
cython_code.pyx:
import logging
import time
from dataclasses import dataclass
import numpy as np
from multiprocessing import log_to_stderr
@dataclass
cdef class TmpData:
name: str
value: int
cdef void worker_cython(str name, TmpData data):
logger = log_to_stderr()
logger.setLevel(logging.INFO)
logger.info(f"name: {name}; value: {data.value}")
if name == "XYZ":
raise RuntimeError("XYZ worker failed")
time.sleep(data.value)
def worker(name: str, data: TmpData) -> None:
worker_cython(name, data)
setup.py:
from setuptools import setup
from Cython.Build import cythonize
setup(
ext_modules=cythonize("cython_code.pyx"),
)
python_code.py:
import logging
import signal
from functools import partial
from multiprocessing import Pool
from cython_code import worker, TmpData
def init_worker_processes() -> None:
# this only runs single time per each thread
logger = log_to_stderr()
logger.setLevel(logging.INFO)
signal.signal(signal.SIGINT, signal.SIG_IGN)
if __name__ == "__main__":
import numpy as np
map_data = {
key: TmpData(name=key, value=np.random.randint(5, 15))
for key in ["ABC", "DEF", "XYZ"]
}
main_logger = logging.getLogger()
with Pool(
processes=2,
initializer=init_worker_processes,
) as pool:
results = []
for key in map_data:
try:
# Wrap the worker function using partial
func = partial(worker, key, map_data[key])
results.append(pool.apply_async(func))
except KeyboardInterrupt:
pool.terminate()
pool.close()
pool.join()
for result in results:
try:
result.get()
except Exception as err:
main_logger.error(f"{err}")
步骤:
-
编译 Cython 代码:
运行
python setup.py build_ext --inplace
来编译 Cython 代码。 -
运行 Python 代码:
运行
python python_code.py
来执行程序。
这样修改后,
worker
函数就变成了一个可序列化的 Python 函数,可以被传递给多进程工作者,从而避免了 "process died unexpectedly" 错误。