首页 > 其他分享 >fastapi 使用websocket协议

fastapi 使用websocket协议

时间:2024-08-02 18:39:41浏览次数:14  
标签:__ 协议 websocket fastapi self await ws WebSocket

  fastapi是一个高性能异步web框架,并且支持websocket协议。这是一个比较新的框架,但github上的Star数挺多,快追上flask了。

1、websocket服务端的简单实现:

@app.websocket('/test')
async def websocket_test(websocket: WebSocket):
    # 服务器接受客户端的WebSocket连接请求。
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"服务器返回:{data}")
    # 客户端断开连接,捕获WebSocketDisconnect异常
    except WebSocketDisconnect:
        pass

fastapi官方推荐使用uvicorn 作为ASGI服务器使用:

from fastapi import FastAPI
from fastapi import WebSocket,WebSocketDisconnect
import uvicorn

app = FastAPI()

@app.websocket('/test')
async def websocket_test(websocket: WebSocket):
    # 服务器接受客户端的WebSocket连接请求。
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"服务器返回:{data}")
    # 客户端断开连接,捕获WebSocketDisconnect异常
    except WebSocketDisconnect:
        pass
    
if __name__ == "__main__":
    uvicorn.run(app='main:app',host="0.0.0.0",port=8000,loop="asyncio")

客户端代码:

import websockets, asyncio

async def ws_client():
    async with websockets.connect('ws://127.0.0.1:8000/ai/test') as ws:
        await ws.send('this is a test')
        response = await ws.recv()
        print(response)

if __name__ == '__main__':
    asyncio.run(ws_client())

 

2、以下是官网的websocket服务端的demo,实现多人聊天。自定义了一个ConnectionManager类,和已建立连接的ws进行通信,我加了一些注释,方便查看

from fastapi import FastAPI
import uvicorn
from fastapi import WebSocket
from typing import List

app = FastAPI()

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)
           
manager = ConnectionManager()

@app.websocket('/object/{client_id}')
async def websocket_endpoint(websocket: WebSocket, client_id: int):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            # 返回给当前ws发送消息的客户端
            await manager.send_personal_message(f"You wrote: {data}", websocket)
            # 返回给所有已连接状态的客户端
            await manager.broadcast(f"Client #{client_id} says: {data}")
    except WebSocketDisconnect:
        # 断开ws连接
        manager.disconnect(websocket)
        # 告诉剩余还在线客户端,Client是{client_id}的ws断开了
        await manager.broadcast(f"Client #{client_id} left the chat")
        
        
if __name__ == "__main__":
    uvicorn.run(app='main:app',host="0.0.0.0",log_level="debug",port=8000,loop="asyncio")

 

标签:__,协议,websocket,fastapi,self,await,ws,WebSocket
From: https://www.cnblogs.com/shenh/p/17478647.html

相关文章

  • 电子科技集团WJ20057型热网智能终端数据监测MODBUS通信协议
    WJ2007型热网智能终端数据监测MODBUS通信协议  1、 WJ2007终端增加数据通信协议,上传实时数据;采用MODBUS通信协议,格式见表1;   地址功能起始地址点数CRC校验0x010x030x000x000x000x420xC50xFB1byte1bytes2byt......
  • UFS4.0/UFS3.1/Unipro总线协议分析仪
    UFS4.0/UFS3.1/Unipro总线协议分析仪(Analyzer)&训练器(Exerciser):全球市场占有率排名第一的UFS/Unipro总线协议分析仪厂商,支持MIPIM-PHYv5.0GEAR5,UniProv2.0andUFSv4.0等规格测试并支持向下兼容。ProtocolInsight为开发移动设备的客户提供测试和测量工具,并为UFS的......
  • JavaWeb(10) HTTP协议
    一、HTTP协议1.定义        HTTP超文本传输协议(HTTP-HyperTexttransferprotocol),是一个属于应用层的面向对象的协议,由于其简捷、快速的方式,适用于分布式超媒体信息系统。它于1990年提出,经过十几年的使用与发展,得到不断地完善和扩展。它是一种详细规定了浏览器......
  • TCP/TP协议栈(逐渐更新版)
    TCP/IP协议栈应用层DNS协议传输层TCP协议TCP协议报文结构源端口目的端口序列号确认号头长度headerlengthordataoffset保留字段reserved状态字段URGACKPSHRSTSYNFIN窗口字段校验和紧急指针可选字段数据TCP协议三次握手TCP三次握手是一个老生常......
  • MQTT协议与中间件
    发布订阅模式:消费者(客户端)订阅服务器(作为代理Broker)上的主题,当有生产者(客户端)在主题中发布消息时,消费者可以收到。MQTT:基于发布订阅模式的轻量级通讯协议,可以以极少的代码和有限带宽,为连接远程设备提供实时可靠的消息服务。广泛应用于物联网,小型设备。MQTT传输的消息分为两......
  • 星塔链startowerchain的跳数网络协议
    星塔链StarTowerChain的跳数网络协议是其技术架构中的一个重要组成部分,它对于提升区块链网络的性能、降低能耗和增强安全性具有重要意义。以下是对星塔链跳数网络协议的详细分析:一、跳数网络协议的基本概念跳数网络协议是星塔链中用于实现节点间数据交换和交易处理的一种机制......
  • 传输层TCP协议
    传输层TCP协议1.TCP协议简介2.TCP特点3.TCP协议段格式4.确认应答(ACK)机制和序列号5.捎带应答6.标志位7.连接管理机制7.1三次握手7.2四次挥手7.3服务端状态转化8.超时重传机制9.流量控制10.滑动窗口11.拥塞控制12.延迟应答13.面向字节流14.粘包问题1......
  • 网络层IP协议,网段划分,NAT转换
    网络层IP协议1.IP协议的基本认识2.IP协议如何进主机定位和报文转发3.IP分片和组装4.IP协议报头格式5.网段划分如何进行网段划分(子网,公网)分类划分法子网掩码特殊的IP地址6.私有IP地址和公网IP地址7.IP地址数量限制问题8.NAT网络地址转换机制NAT简介工作原理......
  • Python中FastAPI项目使用 Annotated的参数设计
    在FastAPI中,你可以使用PEP593中的Annotated类型来添加元数据到类型提示中。这个功能非常有用,因为它允许你在类型提示中添加更多的上下文信息,例如描述、默认值或其他自定义元数据。FastAPI支持Annotated类型,这使得你可以为路径操作函数的参数提供额外的元数据,例如依赖项、查询参......
  • RPC和 HTTP协议
    RPC和HTTP的区别服务发现HTTP,知道服务域名,可以通过DNS解析得到服务的IP地址,从而进行访问RPC需要一个专门的中间服务去保存服务名和IP信息(注册中心,nacos、consul),想要访问某个服务,就得同时注册到中间服务,然后获取需要顶用服务的IP和端口信息底层链接形式HTTP/1.1......