问题场景:上传文件
调用上传文件接口,异步任务解析文件,解析中需要执行python代码,此时会出现阻塞
启动celery命令
celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q nltable
代码:
import logging
import time
import datetime
from celery import shared_task
from services.nltable.table_async_service import TableAsyncService
import asyncio
logger = logging.getLogger(__name__)
@shared_task(queue='nltable')
def async_read_table_info_task(enterprise_id: str, table_id: str):
"""
异步添加读取表格信息任务到队列
Usage: async_read_table_info_task.delay(enterprise_id, table_id)
"""
start_at = time.perf_counter()
current_time = datetime.datetime.now()
formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
logger.info(
f"开始添加读取表格信息任务到队列, [{formatted_time}] enterprise_id: {enterprise_id}, table_id: {table_id} ")
TableAsyncService.sync_update_table_infos(enterprise_id, table_id)
end_at = time.perf_counter()
logger.info(f"读取表格信息任务添加到队列成功, 耗时: {end_at - start_at:.2f}秒")
class TableAsyncService:
@staticmethod
def sync_update_table_infos(enterprise_id: str, table_id: str) -> Optional[Table]:
"""
同步更新表格信息
"""
logger.info("start")
from langchain_experimental.utilities import PythonREPL
logger.info("2222")
python_repl = PythonREPL()
logger.info("111")
res = python_repl.run("print(1)")
logger.info(res)
执行结果
[2024-12-25 18:17:32,517: INFO/MainProcess] Task tasks.read_table_tasks.async_read_table_info_task[92ae44b2-d791-4706-960a-477ef70206d3] received
[2024-12-25 18:17:32,518: INFO/MainProcess] 开始添加读取表格信息任务到队列, [2024-12-25 18:17:32] enterprise_id: 1750727272898039810, table_id: eb9d87a6-6bac-47ec-8e7d-5598259aa760
[2024-12-25 18:17:32,518: INFO/MainProcess] start
[2024-12-25 18:17:32,518: INFO/MainProcess] 2222
[2024-12-25 18:17:32,518: INFO/MainProcess] 111
[2024-12-25 18:17:32,519: WARNING/MainProcess] Python REPL can execute arbitrary code. Use with caution.
参考说明文档
https://docs.celeryq.dev/en/stable/userguide/application.html
@classmethod
def worker(
cls,
command: str,
globals: Optional[Dict],
locals: Optional[Dict],
queue: multiprocessing.Queue,
) -> None:
old_stdout = sys.stdout
sys.stdout = mystdout = StringIO()
try:
logger.info("self.worker")
cleaned_command = cls.sanitize_input(command)
exec(cleaned_command, globals, locals)
sys.stdout = old_stdout
logger.info(f"sys.stdout {sys.stdout}")
logger.info(f"{mystdout.getvalue()}")
queue.put(mystdout.getvalue())
logger.info(f"put")
except Exception as e:
sys.stdout = old_stdout
queue.put(repr(e))
def run(self, command: str, timeout: Optional[int] = None) -> str:
"""Run command with own globals/locals and returns anything printed.
Timeout after the specified number of seconds."""
# Warn against dangers of PythonREPL
warn_once()
queue: multiprocessing.Queue = multiprocessing.Queue()
logger.info(f"langchain 123 {timeout} {queue}")
# Only use multiprocessing if we are enforcing a timeout
if timeout is not None:
# create a Process
p = multiprocessing.Process(
target=self.worker, args=(command, self.globals, self.locals, queue)
)
# start it
p.start()
# wait for the process to finish or kill it after timeout seconds
p.join(timeout)
if p.is_alive():
p.terminate()
return "Execution timed out"
else:
self.worker(command, self.globals, self.locals, queue)
# get the result from the worker function
logger.info(f"queue.get {queue.get()}")
return queue.get()
执行结果
[2024-12-26 09:29:57,229: INFO/MainProcess] langchain 123 None <multiprocessing.queues.Queue object at 0x0000021FB5C92BD0>
[2024-12-26 09:29:57,229: INFO/MainProcess] self.worker
[2024-12-26 09:29:57,230: INFO/MainProcess] sys.stdout <celery.utils.log.LoggingProxy object at 0x0000021FB5B37E50>
[2024-12-26 09:29:57,230: INFO/MainProcess] 1
[2024-12-26 09:29:57,230: INFO/MainProcess] put
无法执行
logger.info(f"queue.get {queue.get()}")
return queue.get()
- 尝试设置超时时间
res = python_repl.run("print(1)",timeout=10)
logger.info(f"res {res}")
此时能执行run方法中的日志
[2024-12-26 09:57:00,765: INFO/MainProcess] queue.get 1
但return语句出现阻塞
- 测试去掉方法,直接在任务中执行
class TableAsyncService:
@staticmethod
def sync_update_table_infos(enterprise_id: str, table_id: str) -> Optional[Table]:
"""
同步更新表格信息
"""
from langchain_experimental.utilities import PythonREPL
from gevent.queue import Queue
from gevent import spawn
queue: multiprocessing.Queue = multiprocessing.Queue()
python_repl = PythonREPL()
p = multiprocessing.Process(
target=python_repl.worker, args=("print(1)", {}, {}, queue)
)
# start it
p.start()
# wait for the process to finish or kill it after timeout seconds
p.join(10)
print(f"res {queue.get()}")
此时能够正常运行
- 进一步封装成方法
class TableAsyncService:
@staticmethod
def run_command(command:str,queue: multiprocessing.Queue):
from langchain_experimental.utilities import PythonREPL
python_repl = PythonREPL()
p = multiprocessing.Process(
target=python_repl.worker, args=("print(1)", {}, {}, queue)
)
# start it
p.start()
# wait for the process to finish or kill it after timeout seconds
p.join(10)
print("return")
return queue.get()
@staticmethod
def sync_update_table_infos(enterprise_id: str, table_id: str) -> Optional[Table]:
"""
同步更新表格信息
"""
queue: multiprocessing.Queue = multiprocessing.Queue()
res = TableAsyncService.run_command("print(1)",queue)
print(f"res {res}")
此时也可以正常运行
其他方案:
尝试使用spawn+multiprocessing queue
class TableAsyncService:
@staticmethod
def sync_update_table_infos(enterprise_id: str, table_id: str) -> Optional[Table]:
"""
同步更新表格信息
"""
from gevent import spawn
# queue: multiprocessing.Queue = multiprocessing.Queue()
# res = TableAsyncService.run_command("print(1)",queue)
# print(f"res {res}")
from langchain_experimental.utilities import PythonREPL
python_repl = PythonREPL()
queue = multiprocessing.Queue()
command = "print(1)"
greelet = spawn(python_repl.worker,command,{},{},queue)
greelet.join(10)
result = queue.get()
logger.info(f"res {result}")
能够正常运行
测试发现,如果queue在内置里初始化,return queue.get无法输出
两种方式的代码集合:
class TableAsyncService:
@staticmethod
def run_command(command:str,queue: multiprocessing.Queue):
from langchain_experimental.utilities import PythonREPL
python_repl = PythonREPL()
p = multiprocessing.Process(
target=python_repl.worker, args=(command, {}, {}, queue)
)
# start it
p.start()
# wait for the process to finish or kill it after timeout seconds
p.join(10)
return queue.get()
@staticmethod
def sync_update_table_infos(enterprise_id: str, table_id: str) -> Optional[Table]:
"""
同步更新表格信息
"""
start_time = time.time()
queue: multiprocessing.Queue = multiprocessing.Queue()
res = TableAsyncService.run_command("print(1)",queue)
print(f"res {res}")
end_time = time.time()
print(f" 耗时: {end_time - start_time}")
from langchain_experimental.utilities import PythonREPL
from gevent import spawn
python_repl = PythonREPL()
queue = multiprocessing.Queue()
command = "print(1)"
start_time = time.time()
greelet = spawn(python_repl.worker,command,{},{},queue)
greelet.join(10)
result = queue.get()
logger.info(f"res {result}")
end_time = time.time()
print(f" 耗时: {end_time - start_time}")
import logging
import time
import datetime
from celery import shared_task
from services.nltable.table_async_service import TableAsyncService
logger = logging.getLogger(__name__)
@shared_task(queue='nltable')
def async_read_table_info_task(enterprise_id: str, table_id: str):
"""
异步添加读取表格信息任务到队列
Usage: async_read_table_info_task.delay(enterprise_id, table_id)
"""
start_at = time.perf_counter()
current_time = datetime.datetime.now()
formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
logger.info(
f"开始添加读取表格信息任务到队列, [{formatted_time}] enterprise_id: {enterprise_id}, table_id: {table_id} ")
TableAsyncService.sync_update_table_infos(enterprise_id, table_id)
end_at = time.perf_counter()
logger.info(f"读取表格信息任务添加到队列成功, 耗时: {end_at - start_at:.2f}秒")
标签:info,queue,flask,multiprocessing,celery,python,time,table,id
From: https://www.cnblogs.com/Gimm/p/18631651