首页 > 系统相关 >fastapi系列之-在多进程使用websocket一些问题细节的小结

fastapi系列之-在多进程使用websocket一些问题细节的小结

时间:2022-11-28 00:45:25浏览次数:44  
标签:websocket fastapi self await async 小结 app def

由于之前一直未深入去了解过关于fastapi中websocket多进程问题,由于之前的测试有可能都是但进程的方式进行启动测试,即便有时候是多进程的方式启动,但是巧合的是估计刚好用户都注册到同一个进程上面了,所以两户之间通信是没啥问题。

刚好一位“老友粉”遇到这种情况问题,索性抽空实践一番。

1 老友的问题描述:

  • 线上生产环境使用的多进程的方式部署启动fastapi服务
  • 然后需要需要通过一个后台发送HTTP请求之后处理后通过服务端webscoket向客户端websocket进行一次发送数据。

2 问题现象:

  • 通过后台发送HTTP请求之后处理后端服务端webscoket发送数据,但是客户端没收到?

3 问题的分析

起初我以为fock出来的子进程内部会通过某种机制进行内部的通信,结果自己太嫩了!理解错了!悲催~哈哈

我们都知道多进程的情况下,每个启动的进程有自己独立的存储空间。所以此时我们的某个进程下的连接的管理对象,其实是不存在数据。

4 问题的图解

 

 

5 问题原因

基于独立内存空间下的,我们的每个进程中的保存的客户端连接的对象也是独享的,所以只能另辟蹊径!

6 使用消息队列机制

PS:为了简单使用了redis的消息队列机制

一开始我所能想到就是利于消息的发布和订阅机制处理,确保所有的进程都会进行消息订阅,这样就可以达到每个进行收到消息的时候都会执行相关的消息了!

其实思路是对滴!自己实践一番之后,所有有了以下的一些总结:

主要处理思路:

1:每个进行启动的时候,都进行一个消息的机制的订阅 2:http进行接口请求的时候,进行消息发布 3:再消息消费的时候,进行调用进行自身的消息广播机制 4:如果你想延伸的,还可以跨进行的进行连接的同时备份(但是这个还没实践,仅仅是想法,想来这样肯定会有问题,所以不推荐)

7 最终的代码示例:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from starlette.endpoints import WebSocketEndpoint
from fastapi.responses import HTMLResponse
from enum import Enum
from typing import Any, Dict, List, Optional
import asyncio

app = FastAPI()


import redis
import aioredis
import os

class ConnectionManager:
    def __init__(self):
        # 保存当前所有的链接的websocket对象
        # self.active_connections: List[WebSocket] = []
        self.active_connections = []

    async def connect(self, websocket: WebSocket):


        client = str(websocket)[1:-1].split(' ')[3]
        print("是后端还是兑换",client)
        await websocket.accept()
        # 添加到当前已链接成功的队列中进行管理
        self.active_connections.append(websocket)

    async def close(self, websocket: WebSocket):
        # 主动的断开的客户端的链接,不是抛出异常的方式断开
        await websocket.close()
        self.active_connections.remove(websocket)

    async def disconnect(self, websocket: WebSocket):
        # 从队列里面删除我们的已经断开链接的websocket对象
        self.active_connections.remove(websocket)
        # await websocket.close()

    async def send_personal_message(self, message: str, websocket: WebSocket):
        # 发现消息
        await websocket.send_text(message)

    async def broadcast(self, message: str):
        # 循环变量给所有在线激活的链接发送消息-全局广播
        print("当前的用户链接数,",len(self.active_connections))
        for connection in self.active_connections:
            await connection.send_text(message)




@app.get("/test")
async def get34545():
    print("全局广播!!!PID", os.getpid())
    app.state.pubmessage.publish('message_channel_http', "我要全局广播!!!!!!!!!!")
    return '我要全局广播!'





@app.websocket_route("/ws/{user_id}", name="ws")
class EchoSever(WebSocketEndpoint):
    encoding: str = "text"
    session_name: str = ""
    count: int = 0

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # 从args中提取对应的输入的参数信息
        print(args[0]['endpoint'])
        print(args[0]['path_params'])
        self.user_id: Optional[str] = args[0]['path_params'].get('user_id')

    # 开始有链接上来的时候对应的处理
    async def on_connect(self, websocket):
        await  app.state.manager.connect(websocket)
        print("进入房间的时候的pid",  os.getpid())
        await  app.state.manager.broadcast(f"游客: {self.user_id}进入了房间!")
        # await self.daojishi(websocket)

    # 客户端开始有数据发送过来的时候的处理
    async def on_receive(self, websocket, data):
        # timeout_count = getattr(websocket, 'timeout_count')
        # setattr(websocket, 'timeout_count', 0)
        print("说话时候的PID", os.getpid())
        await  app.state.manager.broadcast(f"游客:{self.user_id} 说》{data}")

    # 客户端断开链接的时候
    async def on_disconnect(self, websocket, close_code):
        # 进行全局的广播所有的在线链接的所有用户消息
        try:
            await  app.state.manager.disconnect(websocket)
            # 广播给其他所有在线的websocket
            await  app.state.manager.broadcast(f"游客: {self.user_id} 离开了聊天室")
        except ValueError:
            # 倒计时自动结束的之后,客户端再点击一次断开的时候异常处理!
            pass



@app.on_event('startup')
async def on_startup():

    # 异步redis消息的队列的处理机制
    # https://aioredis.readthedocs.io/en/v1.2.0/start.html
    pubmessage = await aioredis.create_redis( 'redis://localhost')
    await pubmessage.set("ceshi","我是测试数据")
    sadsa = await pubmessage.get("ceshi")
    print('读取测试数据,验证redis链接情况:',sadsa)
    print("读取测试数据,验证redis链接情况!!!PID", os.getpid())

    app.state.pubmessage = pubmessage

    # 执行消息订阅机制
    loop = asyncio.get_event_loop()
    loop.create_task(register_pubsub())



async def register_pubsub():
    pool = await aioredis.create_pool( 'redis://localhost',minsize=5, maxsize=10)
    async def reader(channel):
        # 进行消息的消费
        while (await channel.wait_message()):
            msg = await channel.get(encoding='utf-8')
            print("========================================>")
            print("全局的广播信息!!!essage in {}: {}".format(channel.name, msg))
            # 执行全局的消息广播
            await app.state.manager.broadcast(f"HTTP游客:接收到全局的广播信息!")

    with await pool as conn:
        # 执行消息注册
        await conn.execute_pubsub('subscribe', 'message_channel_http')
        channel = conn.pubsub_channels['message_channel_http']
        await reader(channel)  # wait for reader to complete
        await conn.execute_pubsub('unsubscribe', 'message_channel_http')

    # 加下面的的话就会容易断开!傻叉了!
    # pool.close()
    # await pool.wait_closed()



@app.on_event('startup')
async def on_startup():

    manager = ConnectionManager()
    # 设置发布者属性对象
    app.state.manager = manager
    # 设置任务渠道消费者


if __name__ == '__main__':
    import uvicorn
    # import threading
    # kkl =threading.Thread(target=doresubscribe)
    # kkl.start()


    uvicorn.run('wstest:app', host='0.0.0.0', port=9082, access_log=False, workers=2, use_colors=True)
    # uvicorn.run(app='wstest:app', host="127.0.0.1", port=8000, workers =5, reload=True, debug=True)

 

 

8 演示:

1:使用客户端连接我们的服务端上:

 

 

2:请求到我们的HTTP接口上进行广播处理:

http://127.0.0.1:9082/test

 

 

3:官网我们处于不同进行的情况的客户端接收信息的情况:

 

 

 

这样就可以完成跨进程的之间的处理了!打完收工!

转发自https://www.modb.pro/db/185487

 

标签:websocket,fastapi,self,await,async,小结,app,def
From: https://www.cnblogs.com/a00ium/p/16931133.html

相关文章

  • Fastapi微服务系列(1)-之GRPC入门篇
    一些微服务说明前言在转回python之前,其实就对微服务有所尝试,不过当时使用的是go-micro-v2来进行了解,当时也只是浅尝辄止,没深入继续深究~其实微服务这东西没必要为了微服......
  • CSS错题小结
    一.列出所知的CSS选择器1.基本选择器:id选择器,类选择器,标签选择器,通配符选择器(*)复合选择器:后代选择器(.box1.box2),并集选择器(.box1,.box2),交集选择器(.box1.bo......
  • nginx反向代理websocket wss或ws端口
    1.直接上nginx配置文件注意:后端端口使用wss:proxy_passhttps://wss_8;后端端口使用ws:proxy_passhttp://wss_8;upstreamwss_8{server127.0.0.1:8004;}#......
  • FastAPI使用typing类型提示
    typing是Python标准库,用来做类型提示。FastAPI使用typing做了:编辑器支持;类型检查;定义类型,requestpathparameters,queryparameters,headers,bodies,dependencies等等;......
  • websocket 测试
    npminstall-gwscatwscat-l8888wscat-cws://127.0.0.1:8888  constWebSocket=require('ws');constws=newWebSocket.Server({port:8777});ws.on('connection......
  • 研究光度立体法阶段性小结和优化(可20ms获取4个2500*2000灰度图的Normal Map)。
     这个东西是我接触的第一个非2D方面的算法,到目前为止其实也没有完全搞定,不过可能短时间内也无法突破。先把能搞定的搞定吧。 这个东西也有一大堆参考资料,不过呢,搜来搜......
  • Spreadsheet样式设置小结(一)
    1.字体类:第1行代码将A7至B7两单元格设置为粗体字,Arial字体,10号字;第2行代码将B1单元格设置为粗体字。$spreadsheet->getActiveSheet()->getStyle('A7:B7')->getFont()......
  • 简单小结类与对象
    /*1.类与对象类是一个模版,抽象;对象是一个具体的实例2.方法定义、调用3.对应的引用引用类型:基本类型(8)对象是通过引用来操作的:栈--->堆4.属性:字段Field......
  • JFinal整合spring的websocket
    在使用JFinal整合spring使用spring的websocket的时候,遇到了很多问题,下面介绍整合的全过程和要注意的点。(整个项目使用maven进行搭建,服务器用的是eclipse自带的jetty)1.整个po......
  • spring配置websocket并实现群发/单独发送消息
    spring框架中自带了websocket的jar包,利用它可以实现与H5中WebSocket的对接,甚至websocket还可以通过依赖注入与http请求一同工作,详细配置实现过程如下文件目录结构如下,主要是......