以前简单介绍过基于faststream + python rq callback 进行job 状态标记,以下是结合源码说明下处理
参考玩法
代码实现
- faststream 参考代码
from faststream.redis import RedisBroker
from faststream.asgi import AsgiFastStream,get,AsgiResponse
from typing import Any
broker = RedisBroker("redis://localhost:6379",validate=True)
@get
async def liveness_ping(scope):
return AsgiResponse(b"is liveness ping", status_code=200)
app = AsgiFastStream(broker,asgi_routes=[
("/",liveness_ping),
],
asyncapi_path="/docs",
)
# 为了演示此处通过response channel 标记为job 回调的消息处理,利用faststream 方便的消息处理我们开发起来很方便
@broker.subscriber("response")
async def msgv2(msg: Any):
print("response",msg)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8001)
- job 处理
入队, 为了方便都使用了字符串,worker 具体执行的时候会自动进行模块方法的加载
result = q.enqueue(infos.get("module"),**infos,on_success="callback.report_success")
- callback 定义
import json
def report_success(job, connection, result, *args, **kwargs):
print("job success",job.id,job.kwargs,result,args,kwargs)
info = {
"data": {
"name":"dalong",
"age":11,
"job_id":job.id
}
}
# 此处使用json 格式,因为faststream默认的消息格式就是json的,我们只需要返回包含job id 信息的json 内容就可以了,实际faststream 订阅装饰器方法会基于job id 进行数据处理
connection.publish('response', json.dumps(info))
说明
因为redis 的发布订阅并不是很可靠的服务,如果我们比较的时候服务不稳定或者异常了,可能会有不正常的问题,实际上基于stream 模式或者按需基于job api 查询也是可能需要的,属于一种灾备能力(补偿机制)
参考资料
https://python-rq.org/
https://faststream.airt.ai/latest/redis/