首页 > 编程问答 >使用多处理代码的 cythonized 版本“进程意外死亡”

使用多处理代码的 cythonized 版本“进程意外死亡”

时间:2024-07-31 06:52:14浏览次数:7  
标签:python python-3.x multiprocessing cython python-multiprocessing

这是 这个问题 的一个分支。 python 中的代码运行良好。当我尝试 cythonized 版本时,我开始收到“Can't pickle <cyfunction init_worker_processes at 0x7fffd7da5a00>”,即使我在顶层定义了 init_worker_processes 。因此,我将其移至另一个模块并使用导入的 init_worker_processes。现在,我收到以下错误:

error: unrecognized arguments: -s -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=8, pipe_handle=16) --multiprocessing-fork
Python3/lib/python3.9/multiprocessing/resource_tracker.py:96: UserWarning: resource_tracker: process died unexpectedly, relaunching.  Some resources might leak.
  warnings.warn('resource_tracker: process died unexpectedly, '

我没有明确使用错误中报告的 -s -c 该错误来自多处理库中的以下代码(方法 - ensure_running

warnings.warn('resource_tracker: process died unexpectedly, '
                              'relaunching.  Some resources might leak.')

如何解决此问题?

# python code
import numpy as np


@dataclass
class TmpData:
    name: str
    value: int

def worker(name: str, data: TmpData) -> NoReturn:
    # get existing logger which already has a stdout StreamHandler
    logger = mp.get_logger() 
    logger.info(f"name: {name}; value: {data.value}")

    if name == "XYZ":
        raise RuntimeError("XYZ worker failed")

    time.sleep(data.value)


def init_worker_processes() -> None:
    # this only runs single time per each thread
    logger = mp.log_to_stderr() 
    logger.setLevel(logging.INFO)

    signal.signal(signal.SIGINT, signal.SIG_IGN)


if __name__ == "__main__":
    map_data: Dict[str, TmpData] = {
        key: TmpData(name=key, value=np.random.randint(5, 15))
        for key in ["ABC", "DEF", "XYZ"]
    }

    main_logger = logging.getLogger()
    with mp.get_context("spawn").Pool(
        processes=2,
        initializer=init_worker_processes,
    ) as pool:
        results = []
        for key in map_data:
            try:
                results.append(
                    pool.apply_async(
                        worker,
                        args=(
                            key,
                            map_data[key],
                        ),
                    )
                )
            except KeyboardInterrupt:
                pool.terminate()

        pool.close()
        pool.join()

        for result in results:
            try:
                result.get()
            except Exception as err:
                main_logger.error(f"{err}")

请注意,主要问题似乎是“-s”和“-c”无法识别选项;不确定那些来自哪里。


问题在于你试图在多进程环境中使用 Cython 函数,而 Cython 函数不能直接在进程之间进行序列化。Python 的 multiprocessing 模块依赖于 pickle 协议来在进程之间传递数据,而 Cython 函数默认情况下并不可序列化。

为了解决这个问题,你需要将 Cython 函数包装在一个可以被序列化的 Python 函数中。你可以使用 functools.partial 函数来实现这一点。

以下是修改后的代码:

cython_code.pyx:

import logging
import time
from dataclasses import dataclass

import numpy as np

from multiprocessing import log_to_stderr

@dataclass
cdef class TmpData:
    name: str
    value: int

cdef void worker_cython(str name, TmpData data):
    logger = log_to_stderr()
    logger.setLevel(logging.INFO)
    logger.info(f"name: {name}; value: {data.value}")

    if name == "XYZ":
        raise RuntimeError("XYZ worker failed")

    time.sleep(data.value)

def worker(name: str, data: TmpData) -> None:
    worker_cython(name, data)

setup.py:

from setuptools import setup
from Cython.Build import cythonize

setup(
    ext_modules=cythonize("cython_code.pyx"),
)

python_code.py:

import logging
import signal
from functools import partial
from multiprocessing import Pool

from cython_code import worker, TmpData

def init_worker_processes() -> None:
    # this only runs single time per each thread
    logger = log_to_stderr() 
    logger.setLevel(logging.INFO)

    signal.signal(signal.SIGINT, signal.SIG_IGN)


if __name__ == "__main__":
    import numpy as np
    map_data = {
        key: TmpData(name=key, value=np.random.randint(5, 15))
        for key in ["ABC", "DEF", "XYZ"]
    }

    main_logger = logging.getLogger()
    with Pool(
        processes=2,
        initializer=init_worker_processes,
    ) as pool:
        results = []
        for key in map_data:
            try:
                # Wrap the worker function using partial
                func = partial(worker, key, map_data[key])
                results.append(pool.apply_async(func))
            except KeyboardInterrupt:
                pool.terminate()

        pool.close()
        pool.join()

        for result in results:
            try:
                result.get()
            except Exception as err:
                main_logger.error(f"{err}")

步骤:

  1. 编译 Cython 代码: 运行 python setup.py build_ext --inplace 来编译 Cython 代码。
  2. 运行 Python 代码: 运行 python python_code.py 来执行程序。

这样修改后, worker 函数就变成了一个可序列化的 Python 函数,可以被传递给多进程工作者,从而避免了 "process died unexpectedly" 错误。

标签:python,python-3.x,multiprocessing,cython,python-multiprocessing
From: 78813844

相关文章

  • 如何遍历Python字典同时避免KeyErrors?
    解析大型JSON时,某些键可能仅在某些情况下存在,例如出现错误时。从服务器的API获取200OK的情况并不少见,但是您得到的响应包含应检查的错误。处理此问题的最佳方法是什么?我知道使用类似||之类的东西。|是处理KeyError的一种方法。get()但是如果......
  • Python 中的递归数据类型
    Python中最接近Haskell中的递归数据类型的是什么?(即在定义自身时使用类型自己的定义。)编辑:为了给出递归类型的更具体定义,下面是Haskell中的二叉树:dataTreea=Leafa|Branch(Treea)(Treea)我的阅读方式如下:二叉树可以是叶子,也可以包含两......
  • 如何在Python中平滑相邻的多边形?
    我正在寻找一种平滑多边形的方法,以便相邻/接触的多边形保持接触。单个多边形可以轻松平滑,例如使用PAEK或Bezier插值(https://pro.arcgis.com/en/pro-app/latest/tool-reference/cartography/smooth-polygon.htm),这自然会改变它们的边界边缘。但是如何平滑所有多边形......
  • Python多处理池不启动多个进程
    我正在尝试使用多处理池来创建多个进程。我有一个工作函数dummy_proc定义如下:importrefrommultiprocessingimportPooldefregex_check(input_string):#Patterntomatchboth"pm_lat"and"pm_lon_coslat"followedbytwofloatspattern=r"(c......
  • 迟滞建模作为 Python GEKKO 中 MPC 的控制约束
    我试图使用PythonGEKKO在用于控制信号调度的MPC优化问题中引入滞后约束。这已成为一项艰巨的任务,因为我无法将以下问题转换为GEKKO理解的方程。问题:如果开启时间<最短开启时间,则给定资产的控制调度不应将其关闭。如果关闭时间<最小关闭时间......
  • 在 Lambda Python 中获取 errorMessage": "期望值: 第 1 行第 1 列 (char 0)"
    我正在尝试使用slackapi和awslambda函数创建一个slack机器人。现在我只希望每当用户说“你好”时它就响应“你好”。当我在Lambda代码编辑器中测试代码时,出现此错误。我对Lambda很陌生,并且已经被困在这个问题上有一段时间了。非常感谢任何帮助!完整错误:Response......
  • 具有 Python lambda 函数的 QTimer 使用先前的数据运行
    我有一个GUI项目,它使用PySide2和Python3.8,它在QThread中执行一些后台任务。在该QThread中,我有QTimer成员对象,该对象必须定期运行一个函数,每次向其传递不同的数据。我没有使用QTimer.singleShot静态函数,因为如果需要某些特定场景,我需要......
  • 我如何在 python 上使用 spire.pdf 修复此错误
    我使用spirepdf,但出现以下错误:“DllNotFound_Windows,libSkiaSharp,Nosepuedeencontrarelmóduloespecificado.:....”defextract_text_from_pdf(file_path,output_file):#LoadaPDFdocumentdoc=PdfDocument()doc.LoadFromFile(file_path)ex......
  • C++ 函数返回极其缓慢,远慢于功能等效的 python 代码
    我有一个在我编写的脚本中使用的函数,用于从列表中删除多余的阻塞关键字。基本上,输入(以任何顺序):{"apple","bapple","banana","cherry","bananaman","sweetherrypie","sweet","b"}它应该输出一个缩小的字符串数组(以任何顺序):......
  • 在预定时间从 python telegram bot 发起对话
    对于没有提供代码,我提前表示歉意。我明天会尝试添加它,但我现在还没有接近它,思考如何解决这个问题让我一直在思考。我已经为一个机器人创建了一个程序,该程序的数据帧充满了之前请求的用户添加到机器人的消息列表中。现在,机器人使用job_queue在一天中的随机时间向每个用户发......