首页 > 其他分享 >faststream 自己的asgi 实现

faststream 自己的asgi 实现

时间:2024-09-23 10:50:42浏览次数:7  
标签:faststream asgi app broker 实现 import router

faststream 目前自己包含了一个asgi 的实现,可以快速实现api 能力,同时也可以与其他web 框架集成,以下是一个简单试用

参考代码

  • demo.py
from faststream.redis import RedisBroker
 
from faststream.asgi import AsgiFastStream,get,AsgiResponse
 
from typing import Any
 
from pydantic import BaseModel
 
class Msg(BaseModel):
    name: str
    age: int
 
 
broker = RedisBroker("redis://localhost:6379",validate=True)
 
 
@get
async def liveness_ping(scope):
    return AsgiResponse(b"is liveness ping", status_code=200)
 
app = AsgiFastStream(broker,asgi_routes=[
    ("/",liveness_ping),
],
asyncapi_path="/docs",
)
 
 
@broker.publisher("response")
@broker.subscriber("test")
async def msg(msg: Msg):
    print("test",msg)
    return {"name":"dalong","age":18}
 
 
@broker.subscriber("response")
async def msgv2(msg: Any):
    print("response",msg)
 
if  __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)
fastapi 集成
一种是基于asgi 兼容,还有就是基于扩展的router 实现
from contextlib import asynccontextmanager
 
from fastapi import FastAPI
from faststream import FastStream
from faststream.nats import NatsBroker
from faststream.asgi import make_ping_asgi, make_asyncapi_asgi
 
broker = NatsBroker()
 
@asynccontextmanager
async def start_broker(app):
    """Start the broker with the app."""
    async with broker:
        await broker.start()
        yield
 
app = FastAPI(lifespan=start_broker)
 
app.mount("/health", make_ping_asgi(broker, timeout=5.0))
app.mount("/asyncapi", make_asyncapi_asgi(FastStream(broker)))
基于router实现

from fastapi import Depends, FastAPI
from pydantic import BaseModel
from faststream.redis.fastapi import RedisRouter, Logger
from typing import Any
router = RedisRouter("redis://localhost:6379")
 
class Incoming(BaseModel):
    m: dict
 
def call():
    return True
 
@router.subscriber("test")
@router.publisher("response")
async def hello(m: Incoming, logger: Logger, d=Depends(call)):
    logger.info(m)
    return  {"name":"dalong","age":18}  
 
 
@router.subscriber("response")
async def appdemo(m: Any,logger: Logger):
    logger.info(m)
    return {"response": "Hello, response!"}
 
@router.get("/")
async def hello_http():
    return "Hello, HTTP!"
 
@router.post("/send")
async def hello_http(msg: Incoming):
    await router.broker.publish(msg,"test")
    return "Hello, HTTP!"
 
 
app = FastAPI(lifespan=router.lifespan_context)
app.include_router(router)
 
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

参考资料

https://faststream.airt.ai/latest/getting-started/asgi/

标签:faststream,asgi,app,broker,实现,import,router
From: https://blog.51cto.com/rongfengliang/12087662

相关文章

  • lazarus使用中文拼音首字母实现中文变量等快速代码补全
    在lazarus使用中文变量等代码补全功能基础上,按以下方法就可以实现输入中文拼音首字母就可以快速代码补全功能。代码补全功能:Ctrl+w 打开\lazarus\ide\wordcompletion.pp找到 procedureAddIfMatch(constALine,ALineUp:string;constAFirstPos,ALength:Integer);(lazarus......
  • SpringBoot + Disruptor 实现特快高并发处理,支撑每秒 600 万订单无压力!
    01、背景02、Disruptor介绍03、Disruptor的核心概念04、RingBuffer05、SequenceDisruptor06、Sequencer07、SequenceBarrier08、WaitStrategy09、Event10、EventProcessor11、EventHandler12、Producer13、案例-demo14、总结01、背景工作中遇到项目使用Di......
  • Spring Boot集成OpenPDF实现PDF导出功能
    如果你想要在SpringBoot项目中使用OpenPDF来生成PDF文件,而不是iText,你可以通过将HTML转换成PDF的方式来实现。OpenPDF是一个开源的JavaPDF库,它基于iText5.x版本,但是它主要提供了HTML到PDF的转换能力。下面是如何在SpringBoot项目中设置并使用OpenPDF来生成PDF文件的一个简单......
  • SpringBoot+mail 轻松实现各类邮件自动推送
    在实际的项目开发过程中,经常需要用到邮件通知功能。例如,通过邮箱注册,邮箱找回密码,邮箱推送报表等等,实际的应用场景非常的多。早期的时候,为了能实现邮件的自动发送功能,通常会使用JavaMail相关的api来完成。后来Spring推出的JavaMailSender工具,进一步简化了邮件的自动发送......
  • 从理论到实践:全面指导企业实现数字化转型的战略路径
    全球企业数字化转型的必然性在全球范围内,数字化转型成为了企业战略中的核心命题。随着云计算、大数据、人工智能等新兴技术的快速发展,企业的运营模式、管理体系及客户体验正在发生深刻的变革。数字技术不仅为企业带来了新的商业机会,还使其有能力通过敏捷的战略调整提高市场竞......
  • 【C++驾轻就熟】string类以及string类的模拟实现
    目录一、为什么学习string类?二、标准库中的string类 2.1string类(了解)2.2string类的常用接口说明 1.string类对象的常见构造 2.string类对象的容量操作3.string类对象的访问及遍历操作 4.string类对象的修改操作5.string类非成员函数 三、 string类的......
  • 毕业设计选题|基于微信小程序实现戏曲文化苑系统
    作者简介:Java领域优质创作者、CSDN博客专家、CSDN内容合伙人、掘金特邀作者、阿里云博客专家、51CTO特邀作者、多年架构师设计经验、多年校企合作经验,被多个学校常年聘为校外企业导师,指导学生毕业设计并参与学生毕业答辩指导,有较为丰富的相关经验。期待与各位高校教师、企业......
  • 毕业设计选题|基于微信小程序实现戏曲文化苑系统
    作者主页:编程千纸鹤作者简介:Java领域优质创作者、CSDN博客专家、CSDN内容合伙人、掘金特邀作者、阿里云博客专家、51CTO特邀作者、多年架构师设计经验、多年校企合作经验,被多个学校常年聘为校外企业导师,指导学生毕业设计并参与学生毕业答辩指导,有较为丰富的相关经验。期待与......
  • DataGridView DataGridViewCheckBoxColumn 实现禁用效果,因为默认的不带禁用效果
    ///<summary>///DataGridViewDisableCheckBoxColumn///自定义disablecheckbox列实现禁用效果///</summary>publicclassDataGridViewDisableCheckBoxColumn:DataGridViewCheckBoxColumn{publicDataGridViewDisableCheckBoxColumn(){this.......
  • 【开题报告+文档+源码】基于springboot的旅游路线推荐系统的设计与实现
    项目背景与意义随着互联网和移动互联网的普及,人们获取信息的渠道变得更加便利和多样化。旅游者不再满足于传统的旅游指南和旅行社推荐,他们更倾向于通过网络平台获取个性化、多样化的旅游推荐信息。因此,旅游推荐管理系统应运而生,为旅游者提供了更加精准、全面的旅游推荐服务......