首页 > 编程问答 >将 Streaming Assistants API 与 Websocket 结合使用

将 Streaming Assistants API 与 Websocket 结合使用

时间:2024-07-25 05:03:13浏览次数:11  
标签:python websocket openai-api

我正在开发一个 FastAPI 应用程序,其中使用 WebSockets 从 Assistant 的 API 事件处理程序实时发送文本增量。但是,负责发送这些增量的任务仍处于 PENDING 状态并且永远不会运行。因此,客户端不会收到预期的数据。

症状

  • WebSocket 连接打开并收到消息。
  • 为处理 WebSocket 通信(发送文本增量)而创建的任务陷入 PENDING 状态。
  • 没有通过 WebSocket 向客户端发送任何增量。
  • 调试日志记录确认任务未运行并无限期地保持挂起状态。
import asyncio
from typing_extensions import override
from openai import AssistantEventHandler
from openai.types.beta import AssistantStreamEvent
from openai.types.beta.threads import Text, TextDelta
from starlette.websockets import WebSocket, WebSocketDisconnect
from logging_config import logger

class EventHandler(AssistantEventHandler):
def init(self, websocket: WebSocket):
super().init()
self.websocket = websocket
logger.info(“EventHandler initialized”)

@override
def on_event(self, event: AssistantStreamEvent) -> None:
    logger.info(f"Event received: {event.event}")
    self.introspect_tasks()

@override
async def on_text_delta(self, delta: TextDelta, snapshot: Text) -> None:
    logger.info(f"Received text delta.value: {delta.value}")
    self.introspect_tasks()
    asyncio.ensure_future(self.send_text_delta(delta.value))
    await asyncio.sleep(0)  # Yield control to ensure the task runs

async def send_text_delta(self, value: str) -> None:
    try:
        logger.info(f"Attempting to send delta: {value}")
        await self.websocket.send_json({"type": "text_delta", "content": value})
        logger.info(f"Sent delta: {value}")
    except WebSocketDisconnect:
        logger.warning("WebSocket disconnected while sending delta")
    except Exception as e:
        logger.error(f"Error sending delta: {str(e)}")

@override
async def on_message_done(self, text: Text) -> None:
    logger.info("The message is now done....")
    self.introspect_tasks()
    asyncio.ensure_future(self.send_message_done())
    await asyncio.sleep(0)  # Yield control to ensure the task runs

async def send_message_done(self) -> None:
    try:
        logger.info("Attempting to send message_done")
        await self.websocket.send_json({"type": "message_done"})
        logger.info("Sent message_done")
    except WebSocketDisconnect:
        logger.warning("WebSocket disconnected while sending message_done")
    except Exception as e:
        logger.error(f"Error sending message_done: {str(e)}")
    finally:
        await self.close_websocket()

async def close_websocket(self) -> None:
    try:
        logger.info("Attempting to close WebSocket")
        await self.websocket.close()
        logger.info("Closed WebSocket")
    except Exception as e:
        logger.error(f"Error closing WebSocket: {str(e)}")

def introspect_tasks(self):
    current_tasks = asyncio.all_tasks()
    for task in current_tasks:
        if not task.done():
            logger.info(f"Running task: {task.get_name()}, state: {task._state}")

示例日志:

024-07-24 16:37:12,708 - logging_config - INFO - Event received: thread.message.delta
2024-07-24 16:37:12,709 - logging_config - INFO - Running task: Task-2, state: PENDING
2024-07-24 16:37:12,709 - logging_config - INFO - Running task: Task-10, state: PENDING
2024-07-24 16:37:12,709 - logging_config - INFO - Running task: Task-7, state: PENDING
2024-07-24 16:37:12,709 - logging_config - INFO - Running task: Task-8, state: PENDING
2024-07-24 16:37:12,709 - logging_config - INFO - Running task: Task-6, state: PENDING
2024-07-24 16:37:12,709 - logging_config - INFO - Running task: Task-1, state: PENDING
2024-07-24 16:37:12,709 - logging_config - INFO - Running task: Task-9, state: PENDING
2024-07-24 16:37:12,727 - logging_config - INFO - Event received: thread.message.delta
2024-07-24 16:37:12,727 - logging_config - INFO - Running task: Task-2, state: PENDING
2024-07-24 16:37:12,727 - logging_config - INFO - Running task: Task-10, state: PENDING
2024-07-24 16:37:12,727 - logging_config - INFO - Running task: Task-7, state: PENDING
2024-07-24 16:37:12,727 - logging_config - INFO - Running task: Task-8, state: PENDING
2024-07-24 16:37:12,728 - logging_config - INFO - Running task: Task-6, state: PENDING
2024-07-24 16:37:12,728 - logging_config - INFO - Running task: Task-1, state: PENDING
2024-07-24 16:37:12,728 - logging_config - INFO - Running task: Task-9, state: PENDING
2024-07-24 16:37:12,749 - logging_config - INFO - Event received: thread.message.delta
2024-07-24 16:37:12,750 - logging_config - INFO - Running task: Task-2, state: PENDING
2024-07-24 16:37:12,750 - logging_config - INFO - Running task: Task-10, state: PENDING
2024-07-24 16:37:12,750 - logging_config - INFO - Running task: Task-7, state: PENDING
2024-07-24 16:37:12,750 - logging_config - INFO - Running task: Task-8, state: PENDING
2024-07-24 16:37:12,750 - logging_config - INFO - Running task: Task-6, state: PENDING
2024-07-24 16:37:12,750 - logging_config - INFO - Running task: Task-1, state: PENDING
2024-07-24 16:37:12,750 - logging_config - INFO - Running task: Task-9, state: PENDING
2024-07-24 16:37:12,841 - logging_config - INFO - Event received: thread.message.completed
2024-07-24 16:37:12,842 - logging_config - INFO - Running task: Task-2, state: PENDING
2024-07-24 16:37:12,842 - logging_config - INFO - Running task: Task-10, state: PENDING
2024-07-24 16:37:12,842 - logging_config - INFO - Running task: Task-7, state: PENDING
2024-07-24 16:37:12,843 - logging_config - INFO - Running task: Task-8, state: PENDING
2024-07-24 16:37:12,843 - logging_config - INFO - Running task: Task-6, state: PENDING
2024-07-24 16:37:12,843 - logging_config - INFO - Running task: Task-1, state: PENDING
2024-07-24 16:37:12,843 - logging_config - INFO - Running task: Task-9, state: PENDING
/********/envs/llms/lib/python3.10/site-packages/openai/lib/streaming/_assistants.py:337: RuntimeWarning: coroutine ‘EventHandler.on_message_done’ was never awaited
self.on_message_done(event.data)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
2024-07-24 16:37:12,939 - logging_config - INFO - Event received: thread.run.step.completed
2024-07-24 16:37:12,940 - logging_config - INFO - Running task: Task-2, state: PENDING
2024-07-24 16:37:12,940 - logging_config - INFO - Running task: Task-10, state: PENDING
2024-07-24 16:37:12,940 - logging_config - INFO - Running task: Task-7, state: PENDING
2024-07-24 16:37:12,940 - logging_config - INFO - Running task: Task-8, state: PENDING
2024-07-24 16:37:12,940 - logging_config - INFO - Running task: Task-6, state: PENDING
2024-07-24 16:37:12,940 - logging_config - INFO - Running task: Task-1, state: PENDING
2024-07-24 16:37:12,940 - logging_config - INFO - Running task: Task-9, state: PENDING
2024-07-24 16:37:13,028 - logging_config - INFO - Event received: thread.run.completed
2024-07-24 16:37:13,028 - logging_config - INFO - Running task: Task-2, state: PENDING
2024-07-24 16:37:13,028 - logging_config - INFO - Running task: Task-10, state: PENDING
2024-07-24 16:37:13,029 - logging_config - INFO - Running task: Task-7, state: PENDING
2024-07-24 16:37:13,029 - logging_config - INFO - Running task: Task-8, state: PENDING
2024-07-24 16:37:13,029 - logging_config - INFO - Running task: Task-6, state: PENDING
2024-07-24 16:37:13,029 - logging_config - INFO - Running task: Task-1, state: PENDING
2024-07-24 16:37:13,029 - logging_config - INFO - Running task: Task-9, state: PENDING
2024-07-24 16:37:13,061 - logging_config - INFO - Finally close the web socket…
2024-07-24 16:37:13,062 - logging_config - INFO - Attempting to close W

已尝试的内容

  • 任务自省:添加了详细信息内省日志记录以确认任务仍处于 PENDING 状态。
  • 确保未来:使用 asyncio.ensure_future 来调度协程。
  • 让出控制:添加了等待 asyncio.sleep(0) 以让出控制并确保任务运行。| ||事件循环问题:确保没有导致问题的嵌套事件循环。
  • 任务调度:验证任务是在活动事件循环上下文中调度的。
  • 我的问题是:如何确保负责的任务通过 WebSocket 发送文本增量是否正确执行并且不会保持在 PENDING 状态?是否有一些我不知道的事件处理程序与 OpenAI Assistant 的 API 进行了奇怪的交互?

我从事这方面工作的时间比我现在愿意承认的要长:)。对于可能导致这些任务悬而未决的原因以及如何解决它的任何见解或建议将不胜感激!

I’ve been at this for longer than I’d care to admit at this point :). Any insights or suggestions on what might be causing these tasks to remain pending and how to resolve it would be greatly appreciated!


代码中导致任务一直处于 PENDING 状态的原因是,你没有在正确的事件循环上下文中运行它们。虽然使用 asyncio.ensure_future 可以将协程调度为任务,但如果没有正在运行的事件循环,这些任务将永远不会被执行。

在你的代码中, on_text_delta on_message_done 方法是 OpenAI 库的回调方法,它们 不是 在你的 FastAPI 应用程序的主事件循环中调用的。因此,你需要将创建的任务显式地放到主事件循环中运行。

以下是修改后的代码:

import asyncio
from typing_extensions import override
from openai import AssistantEventHandler
from openai.types.beta import AssistantStreamEvent
from openai.types.beta.threads import Text, TextDelta
from starlette.websockets import WebSocket, WebSocketDisconnect
from logging_config import logger

class EventHandler(AssistantEventHandler):
    def __init__(self, websocket: WebSocket):
        super().__init__()
        self.websocket = websocket
        logger.info("EventHandler initialized")

    @override
    def on_event(self, event: AssistantStreamEvent) -> None:
        logger.info(f"Event received: {event.event}")
        self.introspect_tasks()

    @override
    async def on_text_delta(self, delta: TextDelta, snapshot: Text) -> None:
        logger.info(f"Received text delta.value: {delta.value}")
        self.introspect_tasks()
        # 获取当前正在运行的事件循环
        loop = asyncio.get_running_loop() 
        # 使用 loop.create_task 将协程放到主事件循环中
        loop.create_task(self.send_text_delta(delta.value))

    async def send_text_delta(self, value: str) -> None:
        try:
            logger.info(f"Attempting to send delta: {value}")
            await self.websocket.send_json({"type": "text_delta", "content": value})
            logger.info(f"Sent delta: {value}")
        except WebSocketDisconnect:
            logger.warning("WebSocket disconnected while sending delta")
        except Exception as e:
            logger.error(f"Error sending delta: {str(e)}")

    @override
    async def on_message_done(self, text: Text) -> None:
        logger.info("The message is now done....")
        self.introspect_tasks()
        # 获取当前正在运行的事件循环
        loop = asyncio.get_running_loop()
        # 使用 loop.create_task 将协程放到主事件循环中
        loop.create_task(self.send_message_done())

    async def send_message_done(self) -> None:
        try:
            logger.info("Attempting to send message_done")
            await self.websocket.send_json({"type": "message_done"})
            logger.info("Sent message_done")
        except WebSocketDisconnect:
            logger.warning("WebSocket disconnected while sending message_done")
        except Exception as e:
            logger.error(f"Error sending message_done: {str(e)}")
        finally:
            await self.close_websocket()

    async def close_websocket(self) -> None:
        try:
            logger.info("Attempting to close WebSocket")
            await self.websocket.close()
            logger.info("Closed WebSocket")
        except Exception as e:
            logger.error(f"Error closing WebSocket: {str(e)}")

    def introspect_tasks(self):
        current_tasks = asyncio.all_tasks()
        for task in current_tasks:
            if not task.done():
                logger.info(f"Running task: {task.get_name()}, state: {task._state}")

修改后的代码中,我们在 on_text_delta on_message_done 方法中使用 asyncio.get_running_loop() 获取当前正在运行的事件循环,然后使用 loop.create_task() send_text_delta send_message_done 协程提交到主事件循环中。这样可以确保这些协程在 FastAPI 应用程序的主事件循环中运行,从而避免任务一直处于 PENDING 状态。

标签:python,websocket,openai-api
From: 78790537

相关文章

  • Jupyter Notebook 环境中的 Python 版本不匹配
    我遇到Jupyter笔记本启动横幅中报告的Python版本与我在笔记本中查询python--version时显示的版本之间的差异。启动横幅指示Python3.11.9,但是当我运行!python--version时,它返回Python3.11.7。我所做的步骤:basecondahas3.11.7versio......
  • Python XML 解析:字符串中的“<”被阻塞
    我有一个使用ET.XMLParser来解析CppCheckXML报告文件的Python模块。当尝试解析字符串中包含“<”的XML元素中的属性之一时,它会令人窒息,它会将其解释为格式错误的XML,例如:<errormsg="Includefile<iostream>notfound.">(注意字符和“iostream”之间的空格必须放......
  • 任意几行代码要成为Python中的函数需要什么?
    我正在上一门计算机科学课,我的任务是创建一个程序来实现一个带有参数的函数。我的老师告诉我,下面的代码不是一个函数,这让我很困惑,对于将某些代码行归类为“函数”所需的条件,我感到很困惑。defgame(numbers,max_turns,pfl,tgl):turns=0flag=Falseprint("You......
  • 如何使用 Python 创建新的 Azure 订阅?
    我正在尝试使用PythonSDK以编程方式创建新的Azure订阅。我发现的对AzurePythonSDK的唯一引用是这个这是我最终得到的结果:importazure.mgmt.billingimportazure.mgmt.subscriptioncreds=AzureCliCredential()client_name='test'defcreat......
  • 用于打印脚本输出的 Python 实用程序
    我可以发誓有一个实用程序可以打印一个python脚本,其输出交织在一起。例如,给定一个脚本:a=2b=3print(a+b)print(a*b)该实用程序将输出a=2b=3print(a+b)#>5print(a*b)#>6有人知道该实用程序的名称吗?我最难找到它。谢谢你!描述的实用程序没有标......
  • a method to make some handy tools with python
    Inmyworkingofcomputer,therearealotofsimplejobsthatarefrequentlyrepeated.Itriedtofindawaytomakethesejobbeenprocessedeasily.Method1:Themethodiswritingascripttodothejob,andexecutingthescriptbyutoolsextensionuto......
  • Python网络爬虫详解:实战豆瓣电影信息采集
    文章目录前言一、爬虫是什么?二、常用库及其作用1.Requests2.BeautifulSoup3.lxml4.Scrapy5.Selenium6.PyQuery7.Pandas8.JSON9.Time三、实现步骤步骤一:环境准备步骤二:数据采集步骤三:数据处理步骤四:数据存储总结前言随着互联网的迅猛发展和数据分析需求的不......
  • python学习之内置函数
    Python拥有许多内置函数,这些函数是Python的一部分,不需要额外导入即可直接使用。这些函数提供了对Python解释器功能的直接访问,涵盖了从数学计算到类型检查、从内存管理到异常处理等各个方面。下面是一些常用的Python内置函数及其简要说明:一、Printprint函数大家都不会......
  • Java中的WebSocket编程:实时通信实现
    Java中的WebSocket编程:实时通信实现大家好,我是微赚淘客系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿!实时通信在现代应用中变得越来越重要,从即时聊天到实时数据更新,WebSocket提供了一种高效的解决方案。本文将详细讲解如何在Java中使用WebSocket进行实时通信,涵盖基本的WebS......
  • Python中以函数为作用域
    点击查看代码#第一题foriteminrange(10):#不报错,没有函数,所有操作在全局作用域里面执行,item最后赋值为:9,此时item在缩进与全局都可以使用passprint(item)#第二题item=10deffunc():foriteminrange(10):#优先在本地查找,找不到在到全局查找p......