首页 > 其他分享 >fastapi_socketio

fastapi_socketio

时间:2023-01-12 19:33:34浏览次数:55  
标签:socketio fastapi data sid import sio

代码逻辑图

image

E:\song\下载的文件\fastapi-socketio-example-master\backend\asgi.py

import uvicorn
from api import config, app


if __name__ == '__main__':
    uvicorn.run('asgi:app', host=config.HOST, port=config.PORT, reload=True)

E:\song\下载的文件\fastapi-socketio-example-master\backend\config.py

import os

LOCALHOST = '127.0.0.1'


class BaseConfig:

    def __init__(self, host=LOCALHOST, port=5000):
        self.HOST = host
        self.PORT = port

E:\song\下载的文件\fastapi-socketio-example-master\backend\api\__init__.py

from fastapi import FastAPI
from config import BaseConfig


config = BaseConfig()

app = FastAPI(title='TeamChat clone')

from . import routes
from . import sockets
app.mount('/ws', sockets.sio_app)

E:\song\下载的文件\fastapi-socketio-example-master\backend\api\routes\hello_world.py

from fastapi import APIRouter


router = APIRouter()

@router.get('/hello')
async def hello():
    return {'hello': 'world'}

@router.get('/ping')
async def ping():
    return {'ping': 'pong'}

E:\song\下载的文件\fastapi-socketio-example-master\backend\api\routes\__init__.py

from api import app
from . import hello_world


app.include_router(hello_world.router, tags=['Test Route'], prefix='/hello_world')

E:\song\下载的文件\fastapi-socketio-example-master\backend\api\sockets\index.py

from . import sio


# Temporary data
current_active_users = []

@sio.event
def connect(sid, environ):
    print(f"{sid } is connected.")

@sio.on('message')
async def broadcast(sid, data: object):
    print(f'sender-{sid}: ', data)
    await sio.emit('response', data)

@sio.on('update_status')
async def broadcast_status(sid, data: object):
    print(f'status {data["presence"]}')
    data['sid'] = sid

    if data not in current_active_users:
        current_active_users.append(data)
    
    if data['presence'] == 'offline':
        for user in current_active_users:
            if user['name'] == data['name']:
                current_active_users.remove(user)

    await sio.emit('status', current_active_users)

@sio.event
async def disconnect(sid):
    print('disconnected from front end', sid)
    for user in current_active_users:
        if user['sid'] == sid:
            user['presence'] = 'offline'
    print(current_active_users)
    await sio.emit('re_evaluate_status')


E:\song\下载的文件\fastapi-socketio-example-master\backend\api\sockets\__init__.py

import socketio

sio = socketio.AsyncServer(
    async_mode='asgi',
    cors_allowed_origins='*'
)
sio_app = socketio.ASGIApp(sio)

from . import index

标签:socketio,fastapi,data,sid,import,sio
From: https://www.cnblogs.com/zhuoss/p/17047734.html

相关文章

  • Vue3请求Fastapi接口提示No 'Access-Control-Allow-Origin' header is present...
    问题出现在使用Vue3和Fastapi做前后端分离项目时,前端调用接口console报错:No'Access-Control-Allow-Origin'headerispresentontherequestedresource解决方法在......
  • FastAPI学习
      fromfastapiimportFastAPIfromenumimportEnumfrompydanticimportBaseModelfromtypingimportUnionapp=FastAPI()classModelName(str,Enum):......
  • fastapi
    fastapiFastAPI是一个用于构建API的现代、快速(高性能)的web框架,使用Python3.6+并基于标准的Python类型提示。关键特性:快速:可与NodeJS和Go比肩的极高性......
  • FastAPI 记录笔记
    https://fastapi.tiangolo.com/安装pipinstallfastapipipinstall"uvicorn[standard]"基本代码main.pyfromtypingimportUnionfromfastapiimportFastAPI......
  • agv_fastapi_socket_mock
    E:\song\agv_fastapi_socket_v4_mock\app.pyimporttimefromfastapiimportFastAPI,WebSocket,Request,WebSocketDisconnectfromfastapi.responsesimportRedire......
  • supervisor+gunicorn+uvicorn部署fastapi项目
    一、编写一个项目本项目是在虚拟环境下的:先启动虚拟环境:source.venv/bin/activate。(创建虚拟环境自己去找) 项目用于演示,所以非常简单,......
  • FastAPI + Vue
    相当于这篇文章的翻译:https://testdriven.io/blog/developing-a-single-page-app-with-fastapi-and-vuejs/源码地址:https://github.com/testdrivenio/fastapi-vue一、后......
  • 【Web开发】Python实现Web服务器(FastAPI)
    文章目录​​1、简介​​​​2、安装​​​​3、官方示例​​​​3.1入门示例​​​​3.2跨域CORS​​​​3.3文件操作​​​​3.4WebSocket​​​​结语​​1、简介Fas......
  • fastapi身份认证
    官方文档FastApi提供了OAuth2PasswordBearer类对OAuth2中的password授权模式提供了支持。一、实现逻辑创建OAuth2PasswordBearer实例并指明tokenurl(认证用户获取tok......
  • 基于 Serverless一键体验FastAPI
    基于Serverless一键体验FastAPI​​活动地址​​​​服务搭建​​​​服务使用​​​​ServerlessDevs介绍​​​​什么是ServerlessDevs​​​​ServerlessDevs优势​......