首页 > 编程语言 >flask 异步任务celery中运行ipython或python repl出现阻塞

flask 异步任务celery中运行ipython或python repl出现阻塞

时间:2024-12-26 11:21:32浏览次数:3  
标签:info queue flask multiprocessing celery python time table id

问题场景:上传文件

调用上传文件接口,异步任务解析文件,解析中需要执行python代码,此时会出现阻塞

启动celery命令

 celery -A app.celery worker -P gevent -c 1 --loglevel INFO   -Q nltable 

代码:

import logging
import time
import datetime
from celery import shared_task
from services.nltable.table_async_service import TableAsyncService
import asyncio


logger = logging.getLogger(__name__)


@shared_task(queue='nltable')
def async_read_table_info_task(enterprise_id: str, table_id: str):
    """
    异步添加读取表格信息任务到队列
    Usage: async_read_table_info_task.delay(enterprise_id, table_id)
    """
    start_at = time.perf_counter()
    current_time = datetime.datetime.now()
    formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
    logger.info(
        f"开始添加读取表格信息任务到队列, [{formatted_time}] enterprise_id: {enterprise_id}, table_id: {table_id} ")
    TableAsyncService.sync_update_table_infos(enterprise_id, table_id)

    end_at = time.perf_counter()

    logger.info(f"读取表格信息任务添加到队列成功, 耗时: {end_at - start_at:.2f}秒")
class TableAsyncService:

    @staticmethod
    def sync_update_table_infos(enterprise_id: str, table_id: str) -> Optional[Table]:
        """
        同步更新表格信息
        """
        logger.info("start")
        from langchain_experimental.utilities import PythonREPL
        logger.info("2222")
        python_repl = PythonREPL()
        logger.info("111")
        res = python_repl.run("print(1)")
        logger.info(res)

执行结果

[2024-12-25 18:17:32,517: INFO/MainProcess] Task tasks.read_table_tasks.async_read_table_info_task[92ae44b2-d791-4706-960a-477ef70206d3] received
[2024-12-25 18:17:32,518: INFO/MainProcess] 开始添加读取表格信息任务到队列, [2024-12-25 18:17:32] enterprise_id: 1750727272898039810, table_id: eb9d87a6-6bac-47ec-8e7d-5598259aa760    
[2024-12-25 18:17:32,518: INFO/MainProcess] start
[2024-12-25 18:17:32,518: INFO/MainProcess] 2222
[2024-12-25 18:17:32,518: INFO/MainProcess] 111
[2024-12-25 18:17:32,519: WARNING/MainProcess] Python REPL can execute arbitrary code. Use with caution.

参考说明文档
https://docs.celeryq.dev/en/stable/userguide/application.html

    @classmethod
    def worker(
        cls,
        command: str,
        globals: Optional[Dict],
        locals: Optional[Dict],
        queue: multiprocessing.Queue,
    ) -> None:
        old_stdout = sys.stdout
        sys.stdout = mystdout = StringIO()
        try:
            logger.info("self.worker")
            cleaned_command = cls.sanitize_input(command)
            exec(cleaned_command, globals, locals)
            sys.stdout = old_stdout
            logger.info(f"sys.stdout {sys.stdout}")
            logger.info(f"{mystdout.getvalue()}")
            queue.put(mystdout.getvalue())
            logger.info(f"put")

        except Exception as e:
            sys.stdout = old_stdout
            queue.put(repr(e))

    def run(self, command: str, timeout: Optional[int] = None) -> str:
        """Run command with own globals/locals and returns anything printed.
        Timeout after the specified number of seconds."""

        # Warn against dangers of PythonREPL
        warn_once()

        queue: multiprocessing.Queue = multiprocessing.Queue()
        logger.info(f"langchain 123 {timeout}  {queue}")
        # Only use multiprocessing if we are enforcing a timeout
        if timeout is not None:
            # create a Process
            p = multiprocessing.Process(
                target=self.worker, args=(command, self.globals, self.locals, queue)
            )

            # start it
            p.start()

            # wait for the process to finish or kill it after timeout seconds
            p.join(timeout)

            if p.is_alive():
                p.terminate()
                return "Execution timed out"
        else:
            self.worker(command, self.globals, self.locals, queue)
        # get the result from the worker function
        logger.info(f"queue.get {queue.get()}")
        return queue.get()


执行结果

[2024-12-26 09:29:57,229: INFO/MainProcess] langchain 123 None  <multiprocessing.queues.Queue object at 0x0000021FB5C92BD0>
[2024-12-26 09:29:57,229: INFO/MainProcess] self.worker
[2024-12-26 09:29:57,230: INFO/MainProcess] sys.stdout <celery.utils.log.LoggingProxy object at 0x0000021FB5B37E50>
[2024-12-26 09:29:57,230: INFO/MainProcess] 1

[2024-12-26 09:29:57,230: INFO/MainProcess] put

无法执行

        logger.info(f"queue.get {queue.get()}")
        return queue.get()
  1. 尝试设置超时时间
        res = python_repl.run("print(1)",timeout=10)
        logger.info(f"res {res}")

此时能执行run方法中的日志

[2024-12-26 09:57:00,765: INFO/MainProcess] queue.get 1

但return语句出现阻塞

  1. 测试去掉方法,直接在任务中执行

class TableAsyncService:

    @staticmethod
    def sync_update_table_infos(enterprise_id: str, table_id: str) -> Optional[Table]:
        """
        同步更新表格信息
        """
        from langchain_experimental.utilities import PythonREPL
        from gevent.queue import Queue
        from gevent import spawn

        queue: multiprocessing.Queue = multiprocessing.Queue()
        python_repl = PythonREPL()
        p = multiprocessing.Process(
            target=python_repl.worker, args=("print(1)", {}, {}, queue)
        )

        # start it
        p.start()

        # wait for the process to finish or kill it after timeout seconds
        p.join(10)
        print(f"res {queue.get()}")

此时能够正常运行

  1. 进一步封装成方法
class TableAsyncService:
    @staticmethod
    def run_command(command:str,queue: multiprocessing.Queue):
        from langchain_experimental.utilities import PythonREPL

        python_repl = PythonREPL()
        p = multiprocessing.Process(
            target=python_repl.worker, args=("print(1)", {}, {}, queue)
        )

        # start it
        p.start()

        # wait for the process to finish or kill it after timeout seconds
        p.join(10)
        print("return")
        return queue.get()
	  @staticmethod
    def sync_update_table_infos(enterprise_id: str, table_id: str) -> Optional[Table]:
        """
        同步更新表格信息
        """
        queue: multiprocessing.Queue = multiprocessing.Queue()
        res = TableAsyncService.run_command("print(1)",queue)
        print(f"res {res}")

此时也可以正常运行

其他方案:
尝试使用spawn+multiprocessing queue

class TableAsyncService:
    @staticmethod
    def sync_update_table_infos(enterprise_id: str, table_id: str) -> Optional[Table]:
        """
        同步更新表格信息
        """
        from gevent import spawn

        # queue: multiprocessing.Queue = multiprocessing.Queue()
        # res = TableAsyncService.run_command("print(1)",queue)
        # print(f"res {res}")

        from langchain_experimental.utilities import PythonREPL

        python_repl = PythonREPL()
        queue = multiprocessing.Queue()
        command = "print(1)"
        greelet = spawn(python_repl.worker,command,{},{},queue)
        greelet.join(10)
        result = queue.get()
        logger.info(f"res {result}")

能够正常运行

测试发现,如果queue在内置里初始化,return queue.get无法输出

两种方式的代码集合:

class TableAsyncService:
    @staticmethod
    def run_command(command:str,queue: multiprocessing.Queue):
        from langchain_experimental.utilities import PythonREPL

        python_repl = PythonREPL()
        p = multiprocessing.Process(
            target=python_repl.worker, args=(command, {}, {}, queue)
        )

        # start it
        p.start()

        # wait for the process to finish or kill it after timeout seconds
        p.join(10)

        return queue.get()

    @staticmethod
    def sync_update_table_infos(enterprise_id: str, table_id: str) -> Optional[Table]:
        """
        同步更新表格信息
        """
        start_time = time.time()
        queue: multiprocessing.Queue = multiprocessing.Queue()
        res = TableAsyncService.run_command("print(1)",queue)
        print(f"res {res}")
        end_time = time.time()
        print(f" 耗时: {end_time - start_time}")

        from langchain_experimental.utilities import PythonREPL
        from gevent import spawn
        python_repl = PythonREPL()
        queue = multiprocessing.Queue()
        command = "print(1)"
        start_time = time.time()
        greelet = spawn(python_repl.worker,command,{},{},queue)
        greelet.join(10)
        result = queue.get()
        logger.info(f"res {result}")
        end_time = time.time()
        print(f" 耗时: {end_time - start_time}")
		
		
import logging
import time
import datetime
from celery import shared_task
from services.nltable.table_async_service import TableAsyncService


logger = logging.getLogger(__name__)


@shared_task(queue='nltable')
def async_read_table_info_task(enterprise_id: str, table_id: str):
    """
    异步添加读取表格信息任务到队列
    Usage: async_read_table_info_task.delay(enterprise_id, table_id)
    """
    start_at = time.perf_counter()
    current_time = datetime.datetime.now()
    formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
    logger.info(
        f"开始添加读取表格信息任务到队列, [{formatted_time}] enterprise_id: {enterprise_id}, table_id: {table_id} ")
    TableAsyncService.sync_update_table_infos(enterprise_id, table_id)

    end_at = time.perf_counter()

    logger.info(f"读取表格信息任务添加到队列成功, 耗时: {end_at - start_at:.2f}秒")

标签:info,queue,flask,multiprocessing,celery,python,time,table,id
From: https://www.cnblogs.com/Gimm/p/18631651

相关文章

  • 49、Python入门 Python与AJAX:构建高效Web交互体验
             在现代Web开发中,Python作为后端语言以其简洁高效和丰富的库支持而广受欢迎,而AJAX(AsynchronousJavaScriptandXML)技术则为前端与后端的交互带来了革命性的变化。二者的结合能够构建出高效、流畅且具有卓越用户体验的Web应用。 一、AJAX技术概述AJAX不是......
  • 计算机毕业设计Python+Spark知识图谱酒店推荐系统 酒店价格预测系统 酒店可视化 酒店
    温馨提示:文末有CSDN平台官方提供的学长联系方式的名片!温馨提示:文末有CSDN平台官方提供的学长联系方式的名片!温馨提示:文末有CSDN平台官方提供的学长联系方式的名片!作者简介:Java领域优质创作者、CSDN博客专家、CSDN内容合伙人、掘金特邀作者、阿里云博客专家、51CTO......
  • 计算机毕业设计Python+卷积神经网络租房推荐系统 租房大屏可视化 租房爬虫 hadoop spa
    温馨提示:文末有CSDN平台官方提供的学长联系方式的名片!温馨提示:文末有CSDN平台官方提供的学长联系方式的名片!温馨提示:文末有CSDN平台官方提供的学长联系方式的名片!作者简介:Java领域优质创作者、CSDN博客专家、CSDN内容合伙人、掘金特邀作者、阿里云博客专家、51CTO......
  • Python中一切皆为对象,这样理解!
    """在python中一切皆为对象,变量是对象,函数是对象,类也是对象。对象(object)是指在内存中具有唯一标识符(id)、类型(type)和值(value)的实例。换句话说,对象是一个具有属性和方法的实体,这些属性和方法可以被访问和操作。(1)唯一标识符:其实就是对象在计算机内存中的地址。可使用内置......
  • 为孩子准备的 第一个python编程学习案例-pygame小游戏
    为孩子准备的第一个python编程学习案例python安装IDE安装thonny开发第一个小游戏-避坑指南最终运行通过的小游戏参考想指导孩子进行python编程启蒙,自己研究了一下如何从零搭建python开发环境、安装配置基本库并运行一个游戏示例.python安装安装最新版本的python,......
  • Python-流量分析常用工具脚本(Tshark,pyshark,scapy)
    免责声明:本文仅作分享~目录wiresharkscapy例:分析DNS流量检查数据包是否包含特定协议层(过滤)获取域名例:提取HTTP请求中的Host信息pyshark例:解析HTTP请求和响应例:分析DNS查询和响应tsahrk.exe在读此文章前,请确保您会使用wireshark并具备一些流量协议的......
  • 【最新原创毕设】基于PPH的花涧订购系统+00332(免费领源码)可做计算机毕业设计JAVA、PHP
    摘 要近年来,电子商务的快速发展引起了行业和学术界的高度关注。花涧订购系统旨在为用户提供一个简单、高效、便捷的花卉购物体验,它不仅要求用户清晰地查看所需信息,而且还要求界面设计精美,使得功能与页面完美融合,从而提升系统的可操作性。因此,我们需要深入研究信息内容,并利用......
  • 图像边缘检测与轮廓提取详解及python实现
    目录图像边缘检测与轮廓提取详解第一部分:图像边缘检测与轮廓提取概述1.1什么是边缘检测和轮廓提取?1.2边缘检测与轮廓提取的应用领域1.3为什么需要边缘检测和轮廓提取?第二部分:常见的图像边缘检测算法2.1Sobel算子2.2Canny边缘检测2.3拉普拉斯算子(LaplacianofGaus......
  • 华为机试:仿 LISP 运算 - Python实现之篇3
    篇1中可以将字符串解析成Python的list的形式,用编程术语叫做:解析出语法树.篇2中可以实现表达式的求值.根据操作符,跳转到相应的求值分支.以上功能,仅仅实现了一个计算器的功能.离变成编程语言还差了:函数定义和调用.那么,篇3来实现函数定义,即lambda的定义与解......
  • 将Python模块打包为可直接运行的ZIP文件
    要使用zipapp将Python模块(例如位于E:\py\abc.py)打包为可直接运行的ZIP文件,你需要按照以下步骤进行操作:一、准备环境确保Python安装:你需要有Python解释器安装在你的系统上,因为zipapp是Python的一个标准库模块。准备项目文件:确保你的Python模块(如abc.py)以及任何依赖项都位于同一......