首页 > 编程语言 >关于结合faststream + python rq callback 进行job 状态标记的简单说明

关于结合faststream + python rq callback 进行job 状态标记的简单说明

时间:2024-10-07 08:51:05浏览次数:1  
标签:faststream rq python json callback job import id

以前简单介绍过基于faststream + python rq callback 进行job 状态标记,以下是结合源码说明下处理

参考玩法

代码实现

  • faststream 参考代码
from faststream.redis import RedisBroker
 
from faststream.asgi import AsgiFastStream,get,AsgiResponse
 
from typing import Any
 
broker = RedisBroker("redis://localhost:6379",validate=True)
 
@get
async def liveness_ping(scope):
    return AsgiResponse(b"is liveness ping", status_code=200)
 
app = AsgiFastStream(broker,asgi_routes=[
    ("/",liveness_ping),
],
asyncapi_path="/docs",
)
 
# 为了演示此处通过response channel 标记为job 回调的消息处理,利用faststream 方便的消息处理我们开发起来很方便
@broker.subscriber("response")
async def msgv2(msg: Any):
    print("response",msg)
if  __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8001)
  • job 处理
    入队, 为了方便都使用了字符串,worker 具体执行的时候会自动进行模块方法的加载
result = q.enqueue(infos.get("module"),**infos,on_success="callback.report_success")
  • callback 定义
import json 
def report_success(job, connection, result, *args, **kwargs):
    print("job success",job.id,job.kwargs,result,args,kwargs)
    info = {
        "data": {
            "name":"dalong",
            "age":11,
            "job_id":job.id
        }
    }
    # 此处使用json 格式,因为faststream默认的消息格式就是json的,我们只需要返回包含job id 信息的json 内容就可以了,实际faststream 订阅装饰器方法会基于job id 进行数据处理
    connection.publish('response', json.dumps(info))

说明

因为redis 的发布订阅并不是很可靠的服务,如果我们比较的时候服务不稳定或者异常了,可能会有不正常的问题,实际上基于stream 模式或者按需基于job api 查询也是可能需要的,属于一种灾备能力(补偿机制)

参考资料

https://python-rq.org/
https://faststream.airt.ai/latest/redis/

标签:faststream,rq,python,json,callback,job,import,id
From: https://www.cnblogs.com/rongfengliang/p/18355959

相关文章

  • CherryPy :一个轻量级的 Python Web 框架
    CherryPy是一个用于构建Web应用的微框架。它通过简洁的API和强大的功能,使开发者能够快速创建高性能的Web应用。在生物信息开发中,数据库搭建和模型部署等场景可能会用到,因此记录下。学习资源官方文档:https://docs.cherrypy.dev/en/latest/GitHub项目:https://github.......
  • 使用schedule定时执行python程序
    可以在电脑上定时执行Python程序,例如每隔一段时间自动运行一次。不同操作系统提供了不同的定时任务管理工具:1.Windows:使用任务计划程序Windows的“任务计划程序”可以用来定时运行Python程序。步骤:打开任务计划程序(可以通过Windows搜索"任务计划程序"找到)。点击......
  • Python 循环语句的高级应用与深度探索
    在Python中,循环语句是实现重复操作的重要工具。本文将深入探讨Python循环语句的高级应用。 for 循环的高级用法#遍历字典并同时获取键和值my_dict={'a':1,'b':2,'c':3}forkey,valueinmy_dict.items():print(f'Key:{key},Value:{value}')#......
  • Python 高级绘图:探索数据可视化
    在Python中,我们可以使用多种库来进行绘图,实现数据可视化。以下将介绍一些高级的绘图技巧和代码实现: 使用 matplotlib 库进行高级绘图 matplotlib 是一个广泛使用的绘图库,提供了丰富的绘图功能。importmatplotlib.pyplotaspltimportnumpyasnp#生成示例......
  • Python 高级编程:高效读取 txt 文件的技巧与实践
    在Python中,读取txt文件是一项常见的操作。以下将介绍一些高级的方法来实现这一功能:使用with语句自动管理文件资源withopen('file.txt','r')asfile:content=file.read()print(content)with语句可以确保在代码块执行完毕后,文件会被正确地关闭,避免了资源泄漏......
  • Python 语言学习——应用1.1 数字图像处理(第一节,颜色)
    目录1.基础知识2.实战演示1.基础知识:        1.图像的表示.函数表示:图像是二维信号,定义为二维函数f(x,y),其中,x、y是空间坐标,f(x,y)是点(x,y)的幅值。拓展看,视频,又称动态图像,是多帧位图的有序组合,用三维函数f(x,y,t)表示,其中,t为时间变量,f(x,y,t)是t时刻那一帧点(x......
  • Python-初识Python
    前言:在这篇博客当中,我们将步入Python知识的殿堂,Python以其简单、易学、开发效率高在近些年的发展可谓是迅猛,在许多领域都可以见到它的场景,例如:人工智能/机器学习、大数据开发、后端开发等都会用到。1.认识Python 1.1什么是编程语言Python和我们的C语言一样也是一门编程......
  • 学习python第一天(滞留的第6天)
    本来是好好的学习第一天,但是因为我遇到了接踵而至的困难(虽然都是无足轻重的小问题,但是这足以打败一个纯纯新手了),然后根本找不到方法解决,就懈怠了,疲倦了,懒惰了,松懈了……(这里我想展开说一下,就是关于搭建python坏境,需要下载python和编译器或者编辑器,网上有很多教程,但是他们只是“喂......
  • python: sort
     table=[['1','Du','GeovinDu','13824350518',92],['2','Rose','Tom','1882458888',38],['3','Lin','bo','......
  • 分析、总结Python使用列表、元组、字典的场景
    分析、总结Python使用列表、元组、字典的场景在Python中,列表、元组和字典是最常用的数据结构,每种数据结构都有其特定的应用场景。以下是对这三种数据结构的分析与总结:1.列表(List)特点:可变性:可以动态地修改、添加、删除元素。顺序性:保持元素的插入顺序。允许重复:可以包含相......