首页 > 其他分享 >faststream 自己的asgi 实现

faststream 自己的asgi 实现

时间:2024-09-15 08:55:41浏览次数:1  
标签:faststream asgi app broker 实现 import router

faststream 目前自己包含了一个asgi 的实现,可以快速实现api 能力,同时也可以与其他web 框架集成,以下是一个简单试用

参考代码

  • demo.py
from faststream.redis import RedisBroker
 
from faststream.asgi import AsgiFastStream,get,AsgiResponse
 
from typing import Any
 
from pydantic import BaseModel
 
class Msg(BaseModel):
    name: str
    age: int
   
broker = RedisBroker("redis://localhost:6379",validate=True)
   
@get
async def liveness_ping(scope):
    return AsgiResponse(b"is liveness ping", status_code=200)
 
app = AsgiFastStream(broker,asgi_routes=[
    ("/",liveness_ping),
],
asyncapi_path="/docs",
)
   
@broker.publisher("response")
@broker.subscriber("test")
async def msg(msg: Msg):
    print("test",msg)
    return {"name":"dalong","age":18}
   
@broker.subscriber("response")
async def msgv2(msg: Any):
    print("response",msg)
 
if  __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)
  • fastapi 集成
    一种是基于asgi 兼容,还有就是基于扩展的router 实现
from contextlib import asynccontextmanager
 
from fastapi import FastAPI
from faststream import FastStream
from faststream.nats import NatsBroker
from faststream.asgi import make_ping_asgi, make_asyncapi_asgi
 
broker = NatsBroker()
 
@asynccontextmanager
async def start_broker(app):
    """Start the broker with the app."""
    async with broker:
        await broker.start()
        yield
 
app = FastAPI(lifespan=start_broker)
 
app.mount("/health", make_ping_asgi(broker, timeout=5.0))
app.mount("/asyncapi", make_asyncapi_asgi(FastStream(broker)))

基于router实现

from fastapi import Depends, FastAPI
from pydantic import BaseModel
from faststream.redis.fastapi import RedisRouter, Logger
from typing import Any
router = RedisRouter("redis://localhost:6379")
 
class Incoming(BaseModel):
    m: dict
 
def call():
    return True
 
@router.subscriber("test")
@router.publisher("response")
async def hello(m: Incoming, logger: Logger, d=Depends(call)):
    logger.info(m)
    return  {"name":"dalong","age":18}  
   
@router.subscriber("response")
async def appdemo(m: Any,logger: Logger):
    logger.info(m)
    return {"response": "Hello, response!"}
 
@router.get("/")
async def hello_http():
    return "Hello, HTTP!"
 
@router.post("/send")
async def hello_http(msg: Incoming):
    await router.broker.publish(msg,"test")
    return "Hello, HTTP!"
   
app = FastAPI(lifespan=router.lifespan_context)
app.include_router(router)
 
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

参考资料

https://faststream.airt.ai/latest/getting-started/asgi/

标签:faststream,asgi,app,broker,实现,import,router
From: https://www.cnblogs.com/rongfengliang/p/18350351

相关文章

  • 单片机毕业设计——基于物联网系统的防汛监测系统 要怎么设计与实现呢 要怎么设计与实
    基于物联网的智能教室设计与实现是通过集成多个传感器和控制设备,利用云平台进行数据管理和远程监控,以实现教室环境的自动化管理。以下是根据功能需求分步骤的具体实现方案:一、系统规划与设计需求分析:确定教室需要实现的功能,如温湿度检测、风扇控制、光照检测、人体感应、设......
  • MySQL 主从复制:实现数据同步与高可用
    在数据库系统中,数据的可靠性和可用性至关重要。MySQL的主从复制功能为我们提供了一种有效的方式来实现数据同步和高可用。今天,我们就来深入探讨一下MySQL中如何实现主从复制。一、主从复制的概念与作用主从复制是指将一个MySQL数据库服务器(主服务器)的数据复制到一个或多个M......
  • C语言实现三子棋(N子棋)
    目录1.游戏规则2.游戏实现3.游戏测试4.代码链接---------------------------------------------------------------------------------------------------------------------------------1.游戏规则    三子棋又称井字棋,一般来说是在一个3×3的棋盘中,双方执不同......
  • Python实现牛顿法 目录
    博客:Python实现牛顿法目录引言什么是牛顿法?牛顿法的历史与背景牛顿法的应用场景牛顿法的原理牛顿法的基本思想导数与泰勒展开的概念公式推导收敛性分析Python实现牛顿法面向对象的设计思路代码实现示例与解释牛顿法应用实例:求解非线性方程场景描述算法实现结果......
  • Python实现梯度下降法
    博客:Python实现梯度下降法目录引言什么是梯度下降法?梯度下降法的应用场景梯度下降法的基本思想梯度下降法的原理梯度的定义学习率的选择损失函数与优化问题梯度下降法的收敛条件Python实现梯度下降法面向对象的设计思路代码实现示例与解释梯度下降法应用实例:线......
  • 前端实现DNS解析和优化
            作为前端开发者如何针对DNS做优化?众所周知,这个DNS是做域名解析的,输入一个域名,经过DNS解析之后就可以得到一个IP地址,互联网中做请求都要把域名转成IP,这个转换过程是比较复杂的,就比较耗时,所以说有优化的空间,它转换之后是会做本地缓存。所以我们优化的目标主要是......
  • python实现插入排序算法
    插入排序是指,在已经排序过的列表,将需要添加的数据从开头依次进行比较,找到保存的位置,并将数据进行插入排序的方法。比如列表6,15,4,2,8,5,11,9,7,13第一步6和15比较,15大,不用比较。第二步4和前面两个数比较,就是6和15,4最小,将4插入到最前面。编程语言如何实现这个过程,将4和前......
  • 纯前端实现语音文字互转
            在现代互联网的发展中,语音技术正逐渐成为改变用户体验的重要一环。WebSpeechAPI的引入使得开发者能够在浏览器中轻松实现语音识别和语音合成功能,为用户带来更加直观和便捷的操作体验。本文将介绍WebSpeechAPI的基本概念、功能特性以及如何利用它来构建......
  • python+flask计算机毕业设计基于数据加密的高校奖学金评定系统的设计与实现(程序+开题+
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景随着高校规模的不断扩大和学生数量的激增,奖学金评定工作逐渐成为一项复杂而繁重的任务。传统的奖学金评定方式往往依赖于人工收集、整理和......
  • 【2025】“急救课堂”微信小程序的设计与实现, 在线急救培训与模拟演练、在线急救教育
    博主介绍:  ✌我是阿龙,一名专注于Java技术领域的程序员,全网拥有10W+粉丝。作为CSDN特邀作者、博客专家、新星计划导师,我在计算机毕业设计开发方面积累了丰富的经验。同时,我也是掘金、华为云、阿里云、InfoQ等平台的优质作者。通过长期分享和实战指导,我致力于帮助更多学生......