首页 > 其他分享 >使用Django-Channels实现websocket通信+大模型对话

使用Django-Channels实现websocket通信+大模型对话

时间:2024-08-14 17:06:58浏览次数:9  
标签:channels websocket self Django Channels WebSocket user import

前言

最近一直在做这个大模型项目,我选了 Django 作为框架(现在很多大模型应用都用的 FastAPI,不过我已经用习惯 Django 了)

之前使用 AspNetCore 作为后端的时候,我先后尝试了 Blazor Server,WebAPI SSE(Server Sent Event)等方案来实现大模型对话,目前好像 SSE 是用得比较多的,ChatGPT 也是用的这个。

自从进入 3.x 时代,Django 开始支持异步编程了,所以也能实现 SSE (不过我在测试中发现有点折腾),最终还是决定使用 WebSocket 来实现,Django 生态里的这套东西就是 channels 了。(话说 FastAPI 好像可以直接支持 SSE 和 WebSocket )

先看效果

放一张聊天界面

聊天界面

我还做了个对话历史页面

对话历史页面

关于 django-channels

搬运一下官网的介绍,https://channels.readthedocs.io/en/latest/introduction.html

Channels wraps Django’s native asynchronous view support, allowing Django projects to handle not only HTTP, but protocols that require long-running connections too - WebSockets, MQTT, chatbots, amateur radio, and more.

OK,Channels 将 Django 从一个纯同步的 HTTP 框架扩展到可以处理异步协议如 WebSocket。原本 Django 是基于 WSGI 的,channels 使用基于 ASGI 的 daphne 服务器,而且不止能使用 WebSocket ,还能支持 HTTP/2 等新的技术。

几个相关的概念:

  • Channels: 持久的连接,如 WebSocket,可用于实时数据传输。
  • Consumers: 处理输入事件的异步功能,类似于 Django 的视图,但专为异步操作设计。
  • Routing: 类似于 Django 的 URL 路由系统,Channels 使用 routing 来决定如何分发给定的 WebSocket 连接或消息到相应的 Consumer。

与传统的 Django 请求处理相比,Django Channels 允许开发者使用异步编程模式,这对于处理长时间运行的连接或需要大量并发连接的应用尤其有利。这种架构上的变化带来了更高的性能和更好的用户体验,使 Django 能够更好地适应现代互联网应用的需求。

通过引入 Channels, Django 不再只是一个请求/响应式的 Web 框架,而是变成了一个真正意义上能够处理多种网络协议和长时间连接的全功能框架。这使得 Django 开发者可以在不离开熟悉的环境的情况下,开发出更加丰富和动态的应用。

使用场景

先介绍下使用场景

这个 demo 项目的后端使用 StarAI 和 LangChain 调用 LLM 获取回答,然后通过 WebSocket 与前端通信,前端我选了 React + Tailwind

安装

以 DjangoStarter 项目为例(使用 pdm 作为包管理器)

pdm add channels[daphne]

然后修改 src/config/settings/components/common.py

把 daphne 添加到注册Apps里,注意要放在最前面

# 应用定义
INSTALLED_APPS: Tuple[str, ...] = (
    'daphne',
)

之后使用 runserver 时,daphne 会代替 Django 内置的服务器运行

接着,channels 还需要一个 channel layer,这可以让多个消费者实例相互通信以及与 Django 的其他部分通信,这个 layer 可以选 Redis

pdm add channels_redis

配置

ASGI

OK,接下来得修改一下 src/config/asgi.py

import os

from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.security.websocket import AllowedHostsOriginValidator
from django.core.asgi import get_asgi_application

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')

django_asgi_app = get_asgi_application()

from apps.chat.routing import websocket_urlpatterns

application = ProtocolTypeRouter({
    "http": django_asgi_app,
    # Just HTTP for now. (We can add other protocols later.)
    "websocket": AllowedHostsOriginValidator(
        AuthMiddlewareStack(URLRouter(websocket_urlpatterns))
    ),
})

除了官网文档说的配置之外,我这里已经把聊天应用的路由加进来了

因为这个 demo 项目只有这一个 app 使用了 WebSocket ,所以这里直接把 chat 里的 routing 作为 root URLRouter

如果有多个 WebSocket app 可以按需修改

channel layer

修改 src/config/settings/components/channels.py 文件

from config.settings.components.common import DOCKER

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [("redis" if DOCKER else "127.0.0.1", 6379)],
        },
    },
}

其他的配置就不赘述了,DjangoStarter 里都配置好了

编写后端代码

channels 的使用很简单,正如前面的介绍所说,我们只需要完成 consumer 的逻辑代码,然后配置一下 routing 就行。

那么开始吧

consumer

创建 src/apps/chat/consumers.py 文件

身份认证、使用大模型生成回复、聊天记录等功能都在这了

先上代码,等下介绍

import asyncio
import json

from asgiref.sync import async_to_sync
from channels.db import database_sync_to_async
from channels.generic.websocket import WebsocketConsumer, AsyncWebsocketConsumer

from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage

from .models import Conversation, Message

class LlmConsumer(AsyncWebsocketConsumer):
    def __init__(self, *args, **kwargs):
        super().__init__(args, kwargs)
        self.chat_id = None
        self.conversation = None

    @database_sync_to_async
    def get_conversation(self, pk, user):
        obj, _ = Conversation.objects.get_or_create(id=pk, user=user)
        return obj

    @database_sync_to_async
    def add_message(self, role: str, content: str):
        return Message.objects.create(
            conversation=self.conversation,
            role=role,
            content=content,
        )

    async def connect(self):
        self.chat_id = self.scope["url_route"]["kwargs"]["chat_id"]
        await self.accept()

        # 检查用户是否已登录
        user = self.scope["user"]
        if not user.is_authenticated:
            await self.close(code=4001, reason='Authentication required. Please log in again.')
            return
        else:
            self.conversation = await self.get_conversation(self.chat_id, user)

    async def disconnect(self, close_code):
        ...

    async def receive(self, text_data=None, bytes_data=None):
        text_data_json: dict = json.loads(text_data)
        history: list = text_data_json.get("history")

        user = self.scope["user"]
        if not user.is_authenticated:
            reason = 'Authentication required. Please log in again.'
            await self.send(text_data=json.dumps({"message": reason}))
            await asyncio.sleep(0.1)
            await self.close(code=4001, reason=reason)
            return

        await self.add_message(**history[-1])

        llm = ChatOpenAI(model="gpt4")
        resp = llm.stream(
            [
                *[
                    HumanMessage(content=e['content']) if e['role'] == 'user' else AIMessage(content=e['content'])
                    for e in history
                ],
            ]
        )

        message_chunks = []
        for chunk in resp:
            message_chunks.append(chunk.content)
            await self.send(text_data=json.dumps({"message": ''.join(message_chunks)}))
            await asyncio.sleep(0.1)

        await self.add_message('ai', ''.join(message_chunks))

工作流程

  • 前端发起ws连接之后,执行 connect 方法的代码,身份认证没问题的话就调用 self.accept() 方法接受连接
  • 接收到前端发来的信息,会自动执行 receive 方法,处理之后调用 self.send 可以发送信息

身份认证

虽然在 asgi.py 里已经配置了 AuthMiddlewareStack

但还是需要在代码里自行处理认证逻辑

这个中间件的作用是把 header 里的 authorization 信息变成 consumer 里的 self.scope["user"]

要点

  • consumer 有同步版本和异步版本,这里我使用了异步版本 AsyncWebsocketConsumer
  • async consumer 里访问 Django ORM 需要使用 database_sync_to_async 装饰器
  • connect 里面,身份验证失败后调用 self.close 关闭连接,里面的参数 code 不能和 HTTP Status Code 冲突,我一开始用的 401 ,结果前端接收时变成其他 code ,换成自定义的 4001 才可以接收到。但 reason 参数是一直接收不到的,不知道为啥,可能跟浏览器的 WebSocket 实现有关?
  • receive 方法里,将大模型生成的内容使用流式输出发送给客户端时,一定要在 for 循环里加上 await asyncio.sleep 等待一段时间,这是为了留出时间让 WebSocket 发送消息给客户端,不然会变成全部生成完再一次性发给客户端,没有流式输出的效果。
  • receive 方法里,接收到消息后先判断当前是否有登录(或者登录是否过期,表现和未登录一致),如果未登录则发送信息 "Authentication required. Please log in again." 给客户端,然后再关闭连接。

routing

写完了 consumer ,配置一下路由

编辑 src/apps/chat/routing.py 文件

from django.urls import re_path

from . import consumers

websocket_urlpatterns = [
    re_path(r"ws/chat/demo/(?P<room_name>\w+)/$", consumers.ChatConsumer.as_asgi()),
    re_path(r"ws/chat/llm/(?P<chat_id>\w+)/$", consumers.LlmConsumer.as_asgi()),
]

这里注意只能使用 re_path 不能 path (官方文档说的)

客户端开发

OK,后端部分到这就搞定了,接下来写一下客户端的代码

我选择了 React 来实现客户端

无关代码就不放了,直接把关键的代码贴上来

function App() {
  const [prompt, setPrompt] = useState('')
  const [messages, setMessages] = useState([])
  const [connectState, setConnectState] = useState(3)
  const [conversation, setConversation] = useState()

  const chatSocket = useRef(null)
  const messagesRef = useRef(null)
  const reLoginModalRef = useRef(null)

  React.useEffect(() => {
    openWebSocket()
    getConversation()

    return () => {
      chatSocket.current.close();
    }
  }, [])

  React.useEffect(() => {
    // 自动滚动到消息容器底部
    messagesRef.current.scrollIntoView({behavior: 'smooth'})
  }, [messages]);

  const getConversation = () => {
    // ... 省略获取聊天记录代码
  }

  const openWebSocket = () => {
    setConnectState(0)
    chatSocket.current = new WebSocket(`ws://${window.location.host}/ws/chat/llm/${ConversationId}/`)
    chatSocket.current.onopen = function (e) {
      console.log('WebSocket连接建立', e)
      setConnectState(chatSocket.current.readyState)
    };

    chatSocket.current.onmessage = function (e) {
      const data = JSON.parse(e.data)
      setMessages(prevMessages => {
        if (prevMessages[prevMessages.length - 1].role === 'ai')
          return [
            ...prevMessages.slice(0, -1), {role: 'ai', content: data.message}
          ]
        else
          return [
            ...prevMessages, {role: 'ai', content: data.message}
          ]
      })
    };

    chatSocket.current.onclose = function (e) {
      console.error('WebSocket 链接断开。Chat socket closed unexpectedly.', e)
      setConnectState(chatSocket.current.readyState)
      if (e.code === 4001) {
        // 显示重新登录对话框
        new Modal(reLoginModalRef.current, options).show()
      }
    };
  }

  const sendMessage = () => {
    if (prompt.length === 0) return

    const newMessages = [...messages, {
      role: 'user', content: prompt
    }]
    setMessages(newMessages)
    setPrompt('')
    chatSocket.current.send(JSON.stringify({
      'history': newMessages
    }))
  }
}

前端代码没啥好说的,简简单单

我用了 messagesRef.current.scrollIntoView({behavior: 'smooth'}) 来实现消息自动滑动到底部,之前用 Blazor 开发 AIHub 的时候好像是用了其他实现,我记得不是这个

不过这个方法挺丝滑的

WebSocket 地址直接写在这里面感觉没那么优雅,而且后面部署后得改成 wss:// 也麻烦,得研究一下有没有更优雅的实现。

小结

这个只是简单的demo,实际上生产还得考虑很多问题,本文就是为 channels 的应用开了个头,后续有新的研究成果会持续更新博客~

标签:channels,websocket,self,Django,Channels,WebSocket,user,import
From: https://www.cnblogs.com/deali/p/18359353

相关文章

  • django 基础指令
    1、创建django项目#pip下载djangopipinstallDjango#查看下载的django库版本python-mdjango--version#创建一个名为HelloWorld的django项目django-adminstartprojectHelloWorld2、django项目添加一个app视图#进入项目目录cd.../HelloWorld#创建一个名......
  • 基于django+vue基于微信小程序的社区物资订购系统【开题报告+程序+论文】计算机毕设
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景在城市化进程加速的今天,社区作为居民生活的基本单元,其内部物资供应与需求的高效匹配显得尤为重要。特别是在特殊时期,如疫情期间,社区物资供......
  • 基于django+vue基于微信小程序的母支组新农改系统【开题报告+程序+论文】计算机毕设
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景随着农村电商的蓬勃发展,如何有效整合农村资源、促进农产品上行成为乡村振兴的重要议题。传统的农村市场存在着信息不对称、销售渠道有限等......
  • 基于django+vue基于微信小程序的垃圾分类系统演示录像22023【开题报告+程序+论文】计
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景在环境保护意识日益增强的今天,垃圾分类作为促进资源循环利用、减少环境污染的关键举措,受到了社会各界的广泛关注。然而,垃圾分类知识的普及......
  • 基于django+vue基于微信小程序的垃圾分类系统演示录像12023【开题报告+程序+论文】计
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景随着城市化进程的加速,生活垃圾产量急剧增加,垃圾分类已成为城市管理和环境保护的重要议题。然而,传统的垃圾分类方式存在效率低、准确性差、......
  • 初始Django之宿舍管理系统(ORM基础入门篇)
    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档初始Django之宿舍管理系统(ORM入门篇)项目部分功能展示:**管理员登录:****宿管登录:****学生登录:**前言宿舍管理系统是一种能够提高学生宿舍管理效率的工具。随着大学生群体的不断增长,宿舍管理工......
  • django常用的组合搜索组件
    文章目录django常用的组合搜索组件快速使用配置信息1.视图函数2.前端模板3.css样式代码实现django常用的组合搜索组件在项目开发中,如果有大量数据就要用到组合搜索,通过组合搜索对大块内容进行分类筛选。快速使用三步走:(其实主要就是传入配置信息)创建组合搜索......
  • 【Django开发】django美多商城项目完整开发4.0第2篇:项目准备【附代码文档】
    本教程的知识点为:美多商城项目准备项目准备配置1.修改settings/dev.py文件中的路径信息2.INSTALLED_APPS3.数据库用户部分图片验证码1.后端接口设计:视图原型2.具体视图实现用户部分使用Celery完成发送短信判断帐号是否存在1.判断用户名是否存在后端接口设......
  • Django-rest-framework(DRF)怎么使用celery
    目录一、什么是celery1、celery简介2、celery的使用场景3、celery的架构二、Django使用celery1、安装celery2、Django配置三、定时任务和异步任务一、什么是celery1、celery简介Celery是一个基于Python开发的分布式异步消息任务队列,它专注于实时处理的异步任务队......
  • WebSockets:原理、握手及代码实现
    1.WebSockets原理WebSockets是HTML5标准的一部分,旨在为Web应用提供全双工通信能力。与传统的HTTP请求不同,WebSockets连接一旦建立,就可以在客户端和服务器之间自由传输数据,而不再需要每次通信都重新建立连接。工作流程:建立连接:客户端通过HTTP协议发起WebSocket握......