首页 > 其他分享 >Celery with FastAPI and TortoiseORM

Celery with FastAPI and TortoiseORM

时间:2023-06-08 15:24:27浏览次数:30  
标签:__ celery run FastAPI app Celery TortoiseORM import def

API server using: fastapi+tortoise-orm+postgresql+redis+supervisor+nginx
There are some period tasks and async tasks that will run in celery+rabbitmq

Demo

  1. init.py
from .celery import app as celery_app  # NOQA

__all_ = ("celery_app",)
  1. celery.py
import asyncio
import os
from pathlib import Path
from typing import Any, Coroutine

from celery import Celery
from celery.schedules import crontab
from celery.signals import worker_ready
from dotenv import load_dotenv

from .settings import CELERY_ROOT, broker_url

load_dotenv()

try:
    FTP_PASSWD = os.environ["FTP_PASSWD"]
except KeyError as e:
    print(e, "请先配置环境变量或.env文件")
    raise

CALLBACK_KEY_PREFIX = "celery__callback__"
ORDER_NUM_NOTIFIED_KEY = "__notify__order_num="
IS_TEST_SERVER = str(CELERY_ROOT).startswith("/home/ubuntu")
host_name = CELERY_ROOT.name
app = Celery(host_name, broker=broker_url)
app.config_from_object(f"{host_name}.settings")



def get_result(task_id: str) -> Any:
    """获取celery任务ID对应的执行结果"""
    return app.AsyncResult(task_id).result


def run_async(coro: Coroutine, initdb: bool = True) -> Any:
    """同步的方式运行异步函数"""
    if initdb:
        # 当时celery还是4.x版本,不太支持异步
        # 所以db操作是通过发HTTP请求给fastapi来实现的
        raise Exception("Not support db operation yet!")
    return asyncio.run(coro)


def run_raw_async(coro: Coroutine) -> Any:
    """同步的方式运行异步函数, 但不初始数据库, 适用于无SQL操作的情况"""
    return run_async(coro, False)


@app.task
def do_query(mchid: str, order_num: str, cached_key: str) -> None:
    """循环询问结果"""
    run_raw_async(query_pay_result(mchid, order_num, cached_key))


@app.task
def do_transfer_notify(r: dict, cached_key: str) -> None:
    """解析回调通知数据并做转发"""
    run_raw_async(transfer_notify(r, cached_key, is_notified=True))


def run_shell(cmd, echo: bool = True):
    """打印要执行的命令, 并返回命令执行的结果"""
    if echo:
        print("--> " + cmd)
    with os.popen(cmd) as p:
        return p.read()


@worker_ready.connect
def print_git_info(**kwargs):
    """确认celery执行的是哪一个commit的代码"""
    cmd = "git log -1 --stat"
    print(run_shell(cmd))  # 因为配置了supervisor,所以直接用print即可(当然也可以用logger.info)
    if "nothing to commit, working tree clean" not in run_shell("git status", False):
        print("Code already changed after last commit...\n")
        print(run_shell("git diff"))


app.conf.update(
    beat_schedule={
        "run-everyday": {
            "task": f"{Path(__file__).parent.name}.celery.update_check_data",
            # 每天09:20(生产服是9:25)启动定时任务
            "schedule": crontab(minute=20 if IS_TEST_SERVER else 25, hour=9),
        }
    }
)


@app.task
def update_check_data() -> None:
    """更新对账单"""
    # This is a period task, and also can be run by xxx.delay()
    pass
  1. settings.py
from pathlib import Path

CELERY_ROOT = Path(__file__).parent.resolve()
# celery
broker_url = f"amqp://waket:123456@localhost:5672/{CELERY_ROOT.name}"
result_backend = "redis://localhost:6379/1"
task_serializer = "msgpack"
result_serializer = "json"
task_result_expires = 60 * 60 * 24
accept_content = ["json", "msgpack"]

timezone = "Asia/Shanghai"
enable_utc = False

REDIS_HOST = "localhost"
REDIS_DB = 2
REDIS_URL = f"redis://{REDIS_HOST}/{REDIS_DB}"
  1. routers/foo.py
from <project_name>.celery import do_transfer_notify

@router.post("", include_in_schema=False)
async def fine_thank_you(request: Request):
    """接收通知,然后启动celery任务转发给对应的项目"""
    data = await request.json()
    rv = do_transfer_notify.delay(data)
    return dict(data=data, rv=rv, ip=get_client_ip(request))

标签:__,celery,run,FastAPI,app,Celery,TortoiseORM,import,def
From: https://www.cnblogs.com/waketzheng/p/17466570.html

相关文章

  • fastapi 异步应用
    #定义一个专门创建事件循环loop的函数,在另一个线程中启动它defstart_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()@app.get("/addr")defindex(adcode:str=None,address:str=None,level:str=None):importtimemessage={"......
  • Celery
    Celery1、简介Celery是一个python第三方模块,是一个功能完备即插即用的分布式异步任务队列框架。它适用于异步处理问题,当大批量发送邮件、或者大文件上传,批图图像处理等等一些比较耗时的操作,我们可将其异步执行,解决了项目程序在执行过程中因为耗时任务而形成阻塞,导致出现请求......
  • Python|通过FastAPI开发一个快速的WebAPI项目
    前言Python如此受欢迎的众多原因之一是Python有大量成熟和稳定的库可供选择:网页开发有:Django和Flask,提供了很好的网络开发体验和大量的有用文档机器学习有:scikit-learn、Keras等,提供了丰富的机器学习的包和数据处理和可视化工具。FastAPI是一个快速、轻量级的现代A......
  • 【Python】如何在FastAPI中使用UUID标记日志,以跟踪一个请求的完整生命周期
    为什么要使用uuid标记日志?在分布式系统中,一个请求可能会经过多个服务,每个服务都会生成自己的日志。如果我们只使用普通的日志记录,那么很难将这些日志串联在一起,以至难以跟踪一个请求的完整生命周期。如果能够使用uuid标记日志,为每个请求生成一个唯一的uuid,且这个日志可以在不同......
  • Celery框架
    Celery框架1.什么是celerycelery是一个简答,灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度.这里面提到了一个概念:分布式系统一个系统应用(网站),会有相关组件(web服务器,web应用,数据库,消息中间件),将它们架构在不同......
  • celery笔记三之task和task的调用
    本文首发于公众号:Hunter后端原文链接:celery笔记三之task和task的调用这一篇笔记介绍task和task的调用。以下是本篇笔记目录:基础的task定义方式日志处理任务重试忽略任务运行结果task的调用1、基础的task定义方式前面两篇笔记中介绍了最简单的定义方式,使用@......
  • celery笔记一之celery介绍、启动和运行结果跟踪
    本文首发于公众号:Hunter后端原文链接:celery笔记一之celery介绍、启动和运行结果跟踪本篇笔记内容如下:celery介绍celery准备celery启动和异步任务的运行运行结果跟踪1、celery介绍celery大致有两种应用场景,一种是异步任务,一种是定时任务。比如说在一个接口请求中,......
  • fastapi
    FastAPI是一个基于Python的现代、快速(高性能)的Web框架,用于构建WebAPI。它具有简洁的语法、自动的API文档生成和交互式测试界面,以及高性能的异步支持  ......
  • VUE+FastAPI结合高德地图API做一个旅游推荐系统
    最近一个月没有更新博客,去写了一个系统,包含前台和后台,还有后端的API。前后台使用VUE+高德地图API,后台使用FastAPI,实现了一些基本的功能。前台部分因为我也是前端初学者,写的比较乱基础功能用户登陆注册功能搜索功能主页照片墙跳转对应详情页打卡功能评论功能(没接......
  • fastapi最简单使用示例
    直接上代码了fromfastapiimportFastAPI,Requestimportuvicornapp=FastAPI()@app.post("/")asyncdefcreate_item(request:Rquest):json_post_raw=awaitrequest.json()#下面的代码就是根据取得的数据进行自己的相应解析answer={这里自己填......