我正在开发一个 FastAPI 应用程序,其中使用 WebSockets 从 Assistant 的 API 事件处理程序实时发送文本增量。但是,负责发送这些增量的任务仍处于 PENDING 状态并且永远不会运行。因此,客户端不会收到预期的数据。
症状
- WebSocket 连接打开并收到消息。
- 为处理 WebSocket 通信(发送文本增量)而创建的任务陷入 PENDING 状态。
- 没有通过 WebSocket 向客户端发送任何增量。
- 调试日志记录确认任务未运行并无限期地保持挂起状态。
import asyncio
from typing_extensions import override
from openai import AssistantEventHandler
from openai.types.beta import AssistantStreamEvent
from openai.types.beta.threads import Text, TextDelta
from starlette.websockets import WebSocket, WebSocketDisconnect
from logging_config import logger
class EventHandler(AssistantEventHandler):
def init(self, websocket: WebSocket):
super().init()
self.websocket = websocket
logger.info(“EventHandler initialized”)
@override
def on_event(self, event: AssistantStreamEvent) -> None:
logger.info(f"Event received: {event.event}")
self.introspect_tasks()
@override
async def on_text_delta(self, delta: TextDelta, snapshot: Text) -> None:
logger.info(f"Received text delta.value: {delta.value}")
self.introspect_tasks()
asyncio.ensure_future(self.send_text_delta(delta.value))
await asyncio.sleep(0) # Yield control to ensure the task runs
async def send_text_delta(self, value: str) -> None:
try:
logger.info(f"Attempting to send delta: {value}")
await self.websocket.send_json({"type": "text_delta", "content": value})
logger.info(f"Sent delta: {value}")
except WebSocketDisconnect:
logger.warning("WebSocket disconnected while sending delta")
except Exception as e:
logger.error(f"Error sending delta: {str(e)}")
@override
async def on_message_done(self, text: Text) -> None:
logger.info("The message is now done....")
self.introspect_tasks()
asyncio.ensure_future(self.send_message_done())
await asyncio.sleep(0) # Yield control to ensure the task runs
async def send_message_done(self) -> None:
try:
logger.info("Attempting to send message_done")
await self.websocket.send_json({"type": "message_done"})
logger.info("Sent message_done")
except WebSocketDisconnect:
logger.warning("WebSocket disconnected while sending message_done")
except Exception as e:
logger.error(f"Error sending message_done: {str(e)}")
finally:
await self.close_websocket()
async def close_websocket(self) -> None:
try:
logger.info("Attempting to close WebSocket")
await self.websocket.close()
logger.info("Closed WebSocket")
except Exception as e:
logger.error(f"Error closing WebSocket: {str(e)}")
def introspect_tasks(self):
current_tasks = asyncio.all_tasks()
for task in current_tasks:
if not task.done():
logger.info(f"Running task: {task.get_name()}, state: {task._state}")
示例日志:
024-07-24 16:37:12,708 - logging_config - INFO - Event received: thread.message.delta
2024-07-24 16:37:12,709 - logging_config - INFO - Running task: Task-2, state: PENDING
2024-07-24 16:37:12,709 - logging_config - INFO - Running task: Task-10, state: PENDING
2024-07-24 16:37:12,709 - logging_config - INFO - Running task: Task-7, state: PENDING
2024-07-24 16:37:12,709 - logging_config - INFO - Running task: Task-8, state: PENDING
2024-07-24 16:37:12,709 - logging_config - INFO - Running task: Task-6, state: PENDING
2024-07-24 16:37:12,709 - logging_config - INFO - Running task: Task-1, state: PENDING
2024-07-24 16:37:12,709 - logging_config - INFO - Running task: Task-9, state: PENDING
2024-07-24 16:37:12,727 - logging_config - INFO - Event received: thread.message.delta
2024-07-24 16:37:12,727 - logging_config - INFO - Running task: Task-2, state: PENDING
2024-07-24 16:37:12,727 - logging_config - INFO - Running task: Task-10, state: PENDING
2024-07-24 16:37:12,727 - logging_config - INFO - Running task: Task-7, state: PENDING
2024-07-24 16:37:12,727 - logging_config - INFO - Running task: Task-8, state: PENDING
2024-07-24 16:37:12,728 - logging_config - INFO - Running task: Task-6, state: PENDING
2024-07-24 16:37:12,728 - logging_config - INFO - Running task: Task-1, state: PENDING
2024-07-24 16:37:12,728 - logging_config - INFO - Running task: Task-9, state: PENDING
2024-07-24 16:37:12,749 - logging_config - INFO - Event received: thread.message.delta
2024-07-24 16:37:12,750 - logging_config - INFO - Running task: Task-2, state: PENDING
2024-07-24 16:37:12,750 - logging_config - INFO - Running task: Task-10, state: PENDING
2024-07-24 16:37:12,750 - logging_config - INFO - Running task: Task-7, state: PENDING
2024-07-24 16:37:12,750 - logging_config - INFO - Running task: Task-8, state: PENDING
2024-07-24 16:37:12,750 - logging_config - INFO - Running task: Task-6, state: PENDING
2024-07-24 16:37:12,750 - logging_config - INFO - Running task: Task-1, state: PENDING
2024-07-24 16:37:12,750 - logging_config - INFO - Running task: Task-9, state: PENDING
2024-07-24 16:37:12,841 - logging_config - INFO - Event received: thread.message.completed
2024-07-24 16:37:12,842 - logging_config - INFO - Running task: Task-2, state: PENDING
2024-07-24 16:37:12,842 - logging_config - INFO - Running task: Task-10, state: PENDING
2024-07-24 16:37:12,842 - logging_config - INFO - Running task: Task-7, state: PENDING
2024-07-24 16:37:12,843 - logging_config - INFO - Running task: Task-8, state: PENDING
2024-07-24 16:37:12,843 - logging_config - INFO - Running task: Task-6, state: PENDING
2024-07-24 16:37:12,843 - logging_config - INFO - Running task: Task-1, state: PENDING
2024-07-24 16:37:12,843 - logging_config - INFO - Running task: Task-9, state: PENDING
/********/envs/llms/lib/python3.10/site-packages/openai/lib/streaming/_assistants.py:337: RuntimeWarning: coroutine ‘EventHandler.on_message_done’ was never awaited
self.on_message_done(event.data)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
2024-07-24 16:37:12,939 - logging_config - INFO - Event received: thread.run.step.completed
2024-07-24 16:37:12,940 - logging_config - INFO - Running task: Task-2, state: PENDING
2024-07-24 16:37:12,940 - logging_config - INFO - Running task: Task-10, state: PENDING
2024-07-24 16:37:12,940 - logging_config - INFO - Running task: Task-7, state: PENDING
2024-07-24 16:37:12,940 - logging_config - INFO - Running task: Task-8, state: PENDING
2024-07-24 16:37:12,940 - logging_config - INFO - Running task: Task-6, state: PENDING
2024-07-24 16:37:12,940 - logging_config - INFO - Running task: Task-1, state: PENDING
2024-07-24 16:37:12,940 - logging_config - INFO - Running task: Task-9, state: PENDING
2024-07-24 16:37:13,028 - logging_config - INFO - Event received: thread.run.completed
2024-07-24 16:37:13,028 - logging_config - INFO - Running task: Task-2, state: PENDING
2024-07-24 16:37:13,028 - logging_config - INFO - Running task: Task-10, state: PENDING
2024-07-24 16:37:13,029 - logging_config - INFO - Running task: Task-7, state: PENDING
2024-07-24 16:37:13,029 - logging_config - INFO - Running task: Task-8, state: PENDING
2024-07-24 16:37:13,029 - logging_config - INFO - Running task: Task-6, state: PENDING
2024-07-24 16:37:13,029 - logging_config - INFO - Running task: Task-1, state: PENDING
2024-07-24 16:37:13,029 - logging_config - INFO - Running task: Task-9, state: PENDING
2024-07-24 16:37:13,061 - logging_config - INFO - Finally close the web socket…
2024-07-24 16:37:13,062 - logging_config - INFO - Attempting to close W
已尝试的内容
- 任务自省:添加了详细信息内省日志记录以确认任务仍处于 PENDING 状态。
- 确保未来:使用 asyncio.ensure_future 来调度协程。
- 让出控制:添加了等待 asyncio.sleep(0) 以让出控制并确保任务运行。| ||事件循环问题:确保没有导致问题的嵌套事件循环。
- 任务调度:验证任务是在活动事件循环上下文中调度的。
- 我的问题是:如何确保负责的任务通过 WebSocket 发送文本增量是否正确执行并且不会保持在 PENDING 状态?是否有一些我不知道的事件处理程序与 OpenAI Assistant 的 API 进行了奇怪的交互?
我从事这方面工作的时间比我现在愿意承认的要长:)。对于可能导致这些任务悬而未决的原因以及如何解决它的任何见解或建议将不胜感激!
I’ve been at this for longer than I’d care to admit at this point :). Any insights or suggestions on what might be causing these tasks to remain pending and how to resolve it would be greatly appreciated!
代码中导致任务一直处于 PENDING 状态的原因是,你没有在正确的事件循环上下文中运行它们。虽然使用
asyncio.ensure_future
可以将协程调度为任务,但如果没有正在运行的事件循环,这些任务将永远不会被执行。
在你的代码中,
on_text_delta
和
on_message_done
方法是 OpenAI 库的回调方法,它们
不是
在你的 FastAPI 应用程序的主事件循环中调用的。因此,你需要将创建的任务显式地放到主事件循环中运行。
以下是修改后的代码:
import asyncio
from typing_extensions import override
from openai import AssistantEventHandler
from openai.types.beta import AssistantStreamEvent
from openai.types.beta.threads import Text, TextDelta
from starlette.websockets import WebSocket, WebSocketDisconnect
from logging_config import logger
class EventHandler(AssistantEventHandler):
def __init__(self, websocket: WebSocket):
super().__init__()
self.websocket = websocket
logger.info("EventHandler initialized")
@override
def on_event(self, event: AssistantStreamEvent) -> None:
logger.info(f"Event received: {event.event}")
self.introspect_tasks()
@override
async def on_text_delta(self, delta: TextDelta, snapshot: Text) -> None:
logger.info(f"Received text delta.value: {delta.value}")
self.introspect_tasks()
# 获取当前正在运行的事件循环
loop = asyncio.get_running_loop()
# 使用 loop.create_task 将协程放到主事件循环中
loop.create_task(self.send_text_delta(delta.value))
async def send_text_delta(self, value: str) -> None:
try:
logger.info(f"Attempting to send delta: {value}")
await self.websocket.send_json({"type": "text_delta", "content": value})
logger.info(f"Sent delta: {value}")
except WebSocketDisconnect:
logger.warning("WebSocket disconnected while sending delta")
except Exception as e:
logger.error(f"Error sending delta: {str(e)}")
@override
async def on_message_done(self, text: Text) -> None:
logger.info("The message is now done....")
self.introspect_tasks()
# 获取当前正在运行的事件循环
loop = asyncio.get_running_loop()
# 使用 loop.create_task 将协程放到主事件循环中
loop.create_task(self.send_message_done())
async def send_message_done(self) -> None:
try:
logger.info("Attempting to send message_done")
await self.websocket.send_json({"type": "message_done"})
logger.info("Sent message_done")
except WebSocketDisconnect:
logger.warning("WebSocket disconnected while sending message_done")
except Exception as e:
logger.error(f"Error sending message_done: {str(e)}")
finally:
await self.close_websocket()
async def close_websocket(self) -> None:
try:
logger.info("Attempting to close WebSocket")
await self.websocket.close()
logger.info("Closed WebSocket")
except Exception as e:
logger.error(f"Error closing WebSocket: {str(e)}")
def introspect_tasks(self):
current_tasks = asyncio.all_tasks()
for task in current_tasks:
if not task.done():
logger.info(f"Running task: {task.get_name()}, state: {task._state}")
修改后的代码中,我们在
on_text_delta
和
on_message_done
方法中使用
asyncio.get_running_loop()
获取当前正在运行的事件循环,然后使用
loop.create_task()
将
send_text_delta
和
send_message_done
协程提交到主事件循环中。这样可以确保这些协程在 FastAPI 应用程序的主事件循环中运行,从而避免任务一直处于 PENDING 状态。