首页 > 其他分享 >开源模型应用落地-FastAPI-助力模型交互-进阶篇(一)

开源模型应用落地-FastAPI-助力模型交互-进阶篇(一)

时间:2024-07-11 17:28:41浏览次数:21  
标签:websocket FastAPI 模型 userid 进阶篇 WebSocket model self def

一、前言

   FastAPI 的高级用法可以为开发人员带来许多好处。它能帮助实现更复杂的路由逻辑和参数处理,使应用程序能够处理各种不同的请求场景,提高应用程序的灵活性和可扩展性。

    在数据验证和转换方面,高级用法提供了更精细和准确的控制,确保输入数据的质量和安全性。它还能更高效地处理异步操作,提升应用程序的性能和响应速度,特别是在处理大量并发请求时优势明显。

    此外,高级用法还有助于更好地整合数据库操作、实现数据的持久化和查询优化,以及实现更严格的认证和授权机制,保护应用程序的敏感数据和功能。总之,掌握 FastAPI 的高级用法可以帮助开发人员构建出功能更强大、性能更卓越、安全可靠的 Web 应用程序。

    本篇学习FastAPI的生命周期事件,示例均在开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(二)基础上进行扩展,建议有需要的老铁们,先去学习。


二、术语

2.1. Lifespan Events(生命周期事件)

    通过生命周期事件,可以更好地管理应用的整个生命周期中的资源和操作,确保资源的正确初始化和释放,提高应用的性能、可靠性和可维护性。

    Lifespan Events主要有以下作用:

  1. 资源初始化与释放:可以在应用启动时执行一些初始化操作,例如创建数据库连接池、加载共享的机器学习模型等需要在整个应用中使用且可在请求间共享的资源。在应用关闭时,执行清理和释放资源的操作,例如关闭数据库连接、释放内存或其他相关资源。
  2. 避免不必要的操作:如果某些资源的初始化成本较高(如加载大型模型),使用 Lifespan Events 可以避免在每次请求时都进行初始化,仅在应用启动后且接收请求之前执行一次。同时,也可以防止在一些不需要处理实际请求的情况下(如运行简单的自动化测试)进行不必要的资源加载,从而提高性能和效率。
  3. 分离启动和关闭逻辑:将与应用启动和关闭相关的逻辑集中在一个地方进行管理,使代码更加清晰和可维护。
     

三、前置条件

3.1. 创建虚拟环境&安装依赖

  增加Google Search以及langchainhub的依赖包

conda create -n fastapi_test python=3.10
conda activate fastapi_test
pip install fastapi websockets uvicorn transformers==4.32.0 accelerate tiktoken einops transformers_stream_generator==0.0.4 scipy


3.2. 下载Qwen-1_8B-Chat模型

huggingface:

https://huggingface.co/Qwen/Qwen-1_8B-Chaticon-default.png?t=N7T8https://huggingface.co/Qwen/Qwen-1_8B-Chat

​魔搭:

魔搭社区汇聚各领域最先进的机器学习模型,提供模型探索体验、推理、训练、部署和应用的一站式服务。icon-default.png?t=N7T8https://modelscope.cn/models/qwen/Qwen-1_8B-Chat


四、技术实现

4.1. startup & shutdown event

# -*- coding: utf-8 -*-
import traceback

from transformers import AutoTokenizer, AutoModelForCausalLM
from transformers import GenerationConfig

import torch
import uvicorn

from typing import Annotated
from fastapi import (
    Depends,
    FastAPI,
    WebSocket,
    WebSocketException,
    WebSocketDisconnect,
    status,
)


model_path = "E:/model/qwen-1_8b-chat"

class ConnectionManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()

app = FastAPI()

async def authenticate(
    websocket: WebSocket,
    userid: str,
    secret: str,
):
    if userid is None or secret is None:
        raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)

    print(f'userid: {userid},secret: {secret}')
    if '12345' == userid and 'xxxxxxxxxxxxxxxxxxxxxxxxxx' == secret:
        return 'pass'
    else:
        return 'fail'

async def chat(query):
    position = 0
    try:
        for response in model.chat_stream(tokenizer, query, history = None):
            result = response[position:]
            position = len(response)
            yield result

    except Exception:
        traceback.print_exc()

@app.websocket("/ws")
async def websocket_endpoint(*,websocket: WebSocket,userid: str,permission: Annotated[str, Depends(authenticate)],):
    await manager.connect(websocket)
    try:
        while True:
            text = await websocket.receive_text()

            if 'fail' == permission:
                await manager.send_personal_message(
                    f"authentication failed", websocket
                )
            else:
                if text is not None and len(text) > 0:
                    async for msg in chat(text):
                        await manager.send_personal_message(msg, websocket)

    except WebSocketDisconnect:
        manager.disconnect(websocket)
        print(f"Client #{userid} left the chat")
        await manager.broadcast(f"Client #{userid} left the chat")


def loadTokenizer():
    tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
    return tokenizer

def loadModel(config):
    model = AutoModelForCausalLM.from_pretrained(model_path, device_map="cpu", trust_remote_code=True).eval()
    model.generation_config = config
    return model

@app.on_event("startup")
async def startup_event():
    global model,tokenizer
    config = GenerationConfig.from_pretrained(model_path, trust_remote_code=True, top_p=0.9, temperature=0.45,repetition_penalty=1.1, do_sample=True, max_new_tokens=8192)
    tokenizer = loadTokenizer()
    model = loadModel(config)

@app.on_event("shutdown")
def shutdown_event():
    torch.cuda.empty_cache()

if __name__ == '__main__':
    uvicorn.run(app, host='0.0.0.0',port=7777)

调用结果:

用户输入:你好

模型输出:你好!有什么我能帮助你的吗?

说明:

  1. 在startup事件函数中加载模型资源
  2. 在shutdown时间函数中释放资源
  3. startup & shutdown event已过期,后面可能会被移除,建议使用lifespan event代替

4.2. lifespan event

import traceback
from contextlib import asynccontextmanager

from transformers import AutoTokenizer, AutoModelForCausalLM
from transformers import GenerationConfig

import torch
import uvicorn

from typing import Annotated
from fastapi import (
    Depends,
    FastAPI,
    WebSocket,
    WebSocketException,
    WebSocketDisconnect,
    status,
)


model_path = "E:/model/qwen-1_8b-chat"

class ConnectionManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()


def loadTokenizer():
    tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
    return tokenizer


def loadModel(config):
    model = AutoModelForCausalLM.from_pretrained(model_path, device_map="cpu", trust_remote_code=True).eval()
    model.generation_config = config
    return model


@asynccontextmanager
async def lifespan(app: FastAPI):
    # 加载模型
    global model, tokenizer
    config = GenerationConfig.from_pretrained(model_path, trust_remote_code=True, top_p=0.9, temperature=0.45,
                                              repetition_penalty=1.1, do_sample=True, max_new_tokens=8192)
    tokenizer = loadTokenizer()
    model = loadModel(config)
    yield
    # 释放资源
    torch.cuda.empty_cache()



app = FastAPI(lifespan=lifespan)

async def authenticate(
    websocket: WebSocket,
    userid: str,
    secret: str,
):
    if userid is None or secret is None:
        raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)

    print(f'userid: {userid},secret: {secret}')
    if '12345' == userid and 'xxxxxxxxxxxxxxxxxxxxxxxxxx' == secret:
        return 'pass'
    else:
        return 'fail'

async def chat(query):
    position = 0
    try:
        for response in model.chat_stream(tokenizer, query, history = None):
            result = response[position:]
            position = len(response)
            yield result

    except Exception:
        traceback.print_exc()

@app.websocket("/ws")
async def websocket_endpoint(*,websocket: WebSocket,userid: str,permission: Annotated[str, Depends(authenticate)],):
    await manager.connect(websocket)
    try:
        while True:
            text = await websocket.receive_text()

            if 'fail' == permission:
                await manager.send_personal_message(
                    f"authentication failed", websocket
                )
            else:
                if text is not None and len(text) > 0:
                    async for msg in chat(text):
                        await manager.send_personal_message(msg, websocket)

    except WebSocketDisconnect:
        manager.disconnect(websocket)
        print(f"Client #{userid} left the chat")
        await manager.broadcast(f"Client #{userid} left the chat")


if __name__ == '__main__':
    uvicorn.run(app, host='0.0.0.0',port=7777)

调用结果:

没有输出警告信息

用户输入:你好,广州有什么好玩的地方推荐?

模型输出:广州有很多值得一去的景点,比如白云山、长隆野生动物园、陈家祠、珠江夜游等。此外,你还可以去逛逛上下九步行街,品尝当地的美食,或者参观广州塔等高楼大厦。


五、附带说明

5.1. 测试界面

<!DOCTYPE html>
<html>
    <head>
        <title>Chat</title>
    </head>
    <body>
        <h1>WebSocket Chat</h1>
        <form action="" onsubmit="sendMessage(event)">
            <label>USERID: <input type="text" id="userid" autocomplete="off" value="12345"/></label>
            <label>SECRET: <input type="text" id="secret" autocomplete="off" value="xxxxxxxxxxxxxxxxxxxxxxxxxx"/></label>
            <br/>
            <button onclick="connect(event)">Connect</button>
            <hr>
            <label>Message: <input type="text" id="messageText" autocomplete="off"/></label>
            <button>Send</button>
        </form>
        <ul id='messages'>
        </ul>
        <script>
            var ws = null;
            function connect(event) {
                var userid = document.getElementById("userid")
                var secret = document.getElementById("secret")
                ws = new WebSocket("ws://localhost:7777/ws?userid="+userid.value+"&secret=" + secret.value);
                ws.onmessage = function(event) {
                    var messages = document.getElementById('messages')
                    var message = document.createElement('li')
                    var content = document.createTextNode(event.data)
                    message.appendChild(content)
                    messages.appendChild(message)
                };
                event.preventDefault()
            }
            function sendMessage(event) {
                var input = document.getElementById("messageText")
                ws.send(input.value)
                input.value = ''
                event.preventDefault()
            }
        </script>
    </body>
</html>

标签:websocket,FastAPI,模型,userid,进阶篇,WebSocket,model,self,def
From: https://blog.csdn.net/qq839019311/article/details/140270641

相关文章

  • 大模型开发入门必读资料
    随着GPT的爆红,“AI大模型”已成为技术圈最火的话题。华为、阿里、腾讯、字节等大厂纷纷加大对AI技术和市场的投资,许多AI创业公司也如雨后春笋般涌现。这些公司都在以高薪资争夺AI大模型人才。作为普通工程师,我们要及时抓住机会,才能享受AI技术带来的红利。要抓......
  • 大模型应用元年,到底有哪些场景可以实际落地?
    很多企业和个人都号称自己打造了AI大模型实际落地场景,其中有噱头、蹭热点,也有真实落地应用的。下面我将聊聊有哪些应用是真实落地可执行的。大模型写作生成式大语言大模型的看家本领非写作莫属。大模型输出logits的基础上加上top_p、top_k、temperature等随机采样策略,是生......
  • 大模型关键技术与应用
    2022年底,OpenAI发布了跨时代的ChatGPT应用。这是第一个具有流畅的多轮对话体验、渊博的通识知识,并能够深刻理解人类意图的生成式人工智能(AI)应用。它的成功使大模型成为AI的主旋律,在极短的时间内改变了AI产业的格局。尽管距离ChatGPT的发布仅过去一年多,但大模型技术已经取得......
  • T5架构和主流llama3架构有什么区别和优缺点、transformer中encoder 和decoder的不同、
    T5架构和主流llama3架构有什么区别和优缺点T5和LLaMA是两种在自然语言处理(NLP)领域广泛应用的大型语言模型,它们在架构和应用上有显著的区别和各自的优缺点。T5架构架构特点:Encoder-Decoder结构:T5(Text-to-TextTransferTransformer)采用了经典的Encoder-DecoderTransform......
  • text2speech文生音频模型XTTS-V2部署带UI
    text2speech文生音频模型XTTS-V2部署带UI模型下载链接,及前端代码效果链接见个人博客:https://pylzzz.online效果图:python后端代码flask框架由于使用的是自己电脑的gpu运算,所以中间有转发的过程,利用内网穿透和虚拟局域网通信。内网穿透教程可见个人博客所需依赖tts......
  • 合合信息“大模型加速器”亮相2024世界人工智能大会
    文章目录......
  • AI推介-大语言模型LLMs之RAG(检索增强生成)论文速览(arXiv方向):2024.06.20-2024.07.01
    文章目录~1.AStudyonEffectofReferenceKnowledgeChoiceinGeneratingTechnicalContentRelevanttoSAPPhIREModelUsingLargeLanguageModel2.FromRAGtoRICHES:RetrievalInterlacedwithSequenceGeneration3.SK-VQA:SyntheticKnowledgeGeneration......
  • 【大模型应用开发 动手做AI Agent】什么是Function Calling
    【大模型应用开发动手做AIAgent】什么是FunctionCalling1.背景介绍1.1问题的由来在人工智能和机器学习领域,函数调用(FunctionCalling)是一个基础且核心的概念。它指的是程序中一个函数被另一个函数、程序或库调用的过程。函数调用允许我们组织代码结构,复用代码片段,以......
  • 如何检测一个大模型是否为套壳chatGPT
    如何检测一个大模型是否为套壳chatGPT相关时事截至目前,OpenAI的ChatGPT在以下国家和地区不受支持:中国俄罗斯朝鲜古巴伊朗叙利亚乌克兰(有特定例外)——ChatGPT不受支持的国家和地区引言在当前快速发展的人工智能和自然语言处理领域,语言模型的使用变得日益普遍,特别是像GP......
  • 上交2024最新-动手学大模型
    介绍  今天分享一个上海交大的免费的大模型,有相关文档和Slides,目前是2.2K星标,还是挺火的!获取:上交2024最新-《动手学大模型》实战分享!  《动手学大模型》系列编程实践,由上海交通大学2024年春季《人工智能安全技术》(NIS3353)讲义拓展而来(教师:张倬胜),旨在提供大模型相......