首页 > 数据库 >FastAPi Celery RabbitMQ 与 Redis 的使用,并使用 Flower 监控 Celery 状态

FastAPi Celery RabbitMQ 与 Redis 的使用,并使用 Flower 监控 Celery 状态

时间:2024-01-16 16:26:52浏览次数:31  
标签:universities celery task get FastAPi Celery Flower import

FastAPi Celery RabbitMQ 与 Redis 的使用,并使用 Flower 监控 Celery 状态

本文介绍了Windows 下 FastAPi Celery 使用 RabbitMQ 与 Redis 做代理的使用方法,本文参考了国外大佬的文章,并做了修改与补充原文见这里,Suman Das,他文章中的完整代码,见这里,GitHub

RabbitMQ 与 Redis 的主要优缺点正如 Celery 官网介绍的那样:

  • RabbitMQ 功能齐全、稳定、耐用且易于安装。它是生产环境的绝佳选择。
  • Redis 的功能也很齐全,但在突然终止或断电时更容易丢失数据。

同时你也可以看这篇文章,里面详细的列出了使用 RabbitMQ 与 Redis 的差异,RabbitMQ 與 Redis 之間有何差異?文章见这里


环境准备

# windows 10
# Python 3.10.11 X64
fastapi==0.109.0
uvicorn[standard]==0.25.0
flower==2.0.1
httpx==0.26.0
celery[redis]==5.3.6
gevent==23.9.1
kombu==5.3.5
pydantic==2.5.3
starlette==0.35.1

代码准备

结构:

--- test_celery
    |--- celery_utils.py
    |--- main.py
    |--- router.py
    |--- schemas.py
    |--- tasks.py
    |--- universities.py
  1. 新建 main.py用于作为 tAPI 入口
"""
@ File        : main.py
@ Author      : yqbao
@ Version     : V1.0.0
@ Description : API 入口
"""
import uvicorn as uvicorn
from fastapi import FastAPI

import router
from celery_utils import create_celery

app = FastAPI(version="1.0.0", )
app.celery = create_celery() # 将celery实例挂到app全局实例中
app.include_router(router.router)

celery = app.celery

# 设置celery的代理路径与结果存储路径,此处均使用 RabbitMQ
celery.conf.update(broker_url="amqp://guest:guest@localhost:5672//")  # 代理
celery.conf.update(result_backend="rpc://")  # 结果存储

# 设置celery的代理路径与结果存储路径,此处均使用 Redis
# 默认用户可省略:redis://:<You Passowrd>@localhost:6379/0
# celery.conf.update(broker_url="redis://default:<You Passowrd>@localhost:6379/0")  # 代理
# celery.conf.update(result_backend="redis://default:<You Passowrd>@localhost:6379/1")  # 结果存储



if __name__ == "__main__":
    uvicorn.run("main:app", port=9000, reload=True)
  1. 添加文件 celery_utils.py 配置并获取 celery实例
"""
@ File        : celery_utils.py
@ Author      : yqbao
@ Version     : V1.0.0
@ Description : 配置并获取 celery 实例
"""
from celery import Celery
from celery.result import AsyncResult


def create_celery():
    celery = Celery(__name__)
    celery.conf.update(timezone="Asia/Shanghai")  # 时区
    celery.conf.update(enable_utc=False)  # 关闭UTC时区。默认启动
    celery.conf.update(task_track_started=True)  # 启动任务跟踪
    celery.conf.update(task_serializer='pickle')  # 任务序列化方式
    celery.conf.update(result_serializer='pickle')  # 结果序列化方式
    celery.conf.update(accept_content=['pickle', 'json'])  # 接受的类型
    celery.conf.update(result_expires=200)  # 结果过期时间,200s
    celery.conf.update(result_persistent=True)
    celery.conf.update(worker_send_task_events=False)
    celery.conf.update(worker_prefetch_multiplier=1)
    celery.conf.update(broker_connection_retry_on_startup=False)  # 关闭启动时重试代理连接,默认启动

    return celery


def get_task_info(task_id):
    """返回给定task_id的任务信息"""
    task_result = AsyncResult(task_id)
    result = {
        "task_id": task_id,
        "task_status": task_result.status,
        "task_result": task_result.result
    }
    return result
  1. 文件 tasks.py 添加 Celery 任务
"""
@ File        : tasks.py
@ Author      : yqbao
@ Version     : V1.0.0
@ Description : 添加 Celery 任务
"""
from typing import List
from celery import shared_task
import universities

# 使用 @shared_task 来复用 celery 实例,也就是 create_celery() 中的实例,而不需要每次都新建 celery 实例
# 并开启 celery 内置的失败重试功能
@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={"max_retries": 5},
             name='universities:get_all_universities_task')
def get_all_universities_task(self, countries: List[str]):
    data: dict = {}
    for cnt in countries:
        data.update(universities.get_all_universities_for_country(cnt))
    return data


@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={"max_retries": 5},
             name='university:get_university_task')
def get_university_task(self, country: str):
    university = universities.get_all_universities_for_country(country)
    return university
  1. 文件 router.py 添加 API 路由
"""
@ File        : router.py
@ Author      : yqbao
@ Version     : V1.0.0
@ Description : API 路由
"""

from celery import group
from fastapi import APIRouter
from starlette.responses import JSONResponse

import universities
from celery_utils import get_task_info
from schemas import Country
from tasks import get_all_universities_task, get_university_task

router = APIRouter(prefix='/universities', tags=['University'], responses={404: {"description": "Not found"}})


# 该 API 是普通的 API,与 Celery 无关
@router.post("/")
def get_universities(country: Country) -> dict:
    """返回提供的国家/地区的大学列表,例如 [“土耳其”、“印度”、“澳大利亚”],以同步方式输入"""
    data: dict = {}
    for cnt in country.countries:
        data.update(universities.get_all_universities_for_country(cnt))
    return data


# 该 API 用于演示如何使用 Celery 异步执行长时间运行的任务。
# 当我们调用此 API 时,它会向代理提交一条任务消息并返回该消息的任务 ID。
@router.post("/async")
async def get_universities_async(country: Country):
    """
    返回提供的国家/地区的大学列表,例如 [“土耳其”、“印度”、“澳大利亚”],
    以异步方式输入。它只返回任务 ID,稍后可用于获取结果。
    """
    task = get_all_universities_task.apply_async(args=[country.countries])
    return JSONResponse({"task_id": task.id})


# 该 API 用于演示通过任务 ID 获取任务运行状态
@router.get("/task/{task_id}")
async def get_task_status(task_id: str) -> dict:
    """返回已提交任务的状态"""
    return get_task_info(task_id)


# 该 API 用于演示如何使用 Celery 将大型任务拆分为较小的子任务并并行执行
# 当我们调用此 API 时,我们会为作为输入提供的每个国家/地区创建一个任务。一旦所有任务完成,小组就完成了,我们将得到结果。
@router.post("/parallel")
async def get_universities_parallel(country: Country) -> dict:
    """
    返回提供的国家/地区的大学列表,例如 [“土耳其”、“印度”、“澳大利亚”],
    以同步方式输入。这将使用 Celery 以并行方式执行子任务
    """
    data: dict = {}
    tasks = []
    for cnt in country.countries:
        tasks.append(get_university_task.s(cnt))
    # create a group with all the tasks
    job = group(tasks)
    result = job.apply_async()
    ret_values = result.get(disable_sync_subtasks=False)
    for result in ret_values:
        data.update(result)
    return data
  1. 文件 universities.py 获取大学列表
"""
@ File        : universities.py
@ Author      : yqbao
@ Version     : V1.0.0
@ Description : 获取大学列表
"""
import json
import httpx
from schemas import University


def get_all_universities_for_country(country: str) -> dict:
    url = 'http://universities.hipolabs.com/search'
    params = {'country': country}
    client = httpx.Client()
    response = client.get(url, params=params)
    response_json = json.loads(response.text)
    universities = []
    for university in response_json:
        university_obj = University.model_validate(university)
        universities.append(university_obj)
    return {country: universities}
  1. 文件 schemas.py 用于数据校验的 pydantic模型
"""
@ File        : schemas.py
@ Author      : yqbao
@ Version     : V1.0.0
@ Description : 数据校验
"""
from typing import Optional, List
from pydantic import BaseModel


class University(BaseModel):
    country: Optional[str] = None
    web_pages: List[str] = []
    name: Optional[str] = None
    alpha_two_code: Optional[str] = None
    domains: List[str] = []


class Country(BaseModel):
    countries: List[str] = ['turkey', 'india']

启动 API 与 Celery

  1. 启动 api:启动后访问 http://127.0.0.1:9000/docs查看 API 文档
# 项目根目录下执行
Python main.py

image

image

  1. 启动 Celery:
# 项目根目录下执行,-P:Windos下必须用,否则会报错,-c:windows下是协程数。默认是10
celery -A main.celery worker -l info -P gevent -c 2

image


执行任务API,查看任务结果

  1. 在 API 文档中 执行 http://127.0.0.1:9000/universities/async你分别会看到下面这样的结果,就表示成功了

image

image


启动 Flower 监控 Celery 的状态

  1. 启动 Flower(注意:Celery也要启动)
# 项目根目录下执行,-P:Windos下必须用,否则会报错,-c:windows下是协程数。默认是10
celery -A main.celery flower --port=5555

image

  1. 访问Flower:浏览器打开 http://127.0.0.1:5555/

image

  1. 在 API 文档中 执行 http://127.0.0.1:9000/universities/async后会看到这样的结果,点击可查看任务执行详情

image

image

Flower 文档
Celery 配置文档

本文章的原文地址
GitHub主页

标签:universities,celery,task,get,FastAPi,Celery,Flower,import
From: https://www.cnblogs.com/yqbaowo/p/17967525

相关文章

  • FastAPI学习-29 log_config 设置 logger 日志格式
    前言FastAPI服务是通过uvicorn来提供的,日志都是uvicorn里配置的。官方文档地址:https://www.uvicorn.org/settings/#logginguvicorn的logging日志我们可以通过uvicorn.run()方式启动服务uvicorn.run("example:app",port=5000,reload=True,access_log=False)于......
  • FastAPI学习-28 alembic数据迁移报错:Target database is not up to date 报错解决办法
    前言当表结构有变更,数据迁移时,出现报错:Targetdatabaseisnotuptodate遇到的问题执行迁移命令alembicrevision--autogenerate-m"testv4"出现如下报错>alembicrevision--autogenerate-m"testv4"INFO[alembic.runtime.migration]ContextimplMySQLImpl.INF......
  • fastapi项目 08-持久化APScheduler
    前言在上一篇的中,我们写到可以根据APScheduler第三方库,创建定时任务,但是主程序直接创建完后,定时任务只是存在内存中,如果重启启动主程序,那么我们创建的任务就会消失,需要重新创建,这显然是不行的。我们需要的是不管程序是否启动,我们创建的任务都存在,而不会被删除。于是我们就引入了......
  • celery 任务
    一、周期性任务示例代码fromdjango.core.mailimportsend_mailfromcelery.task.baseimportperiodic_taskfromcelery.schedulesimportcrontabfromcelery.exceptionsimportSoftTimeLimitExceeded#@periodic_task(run_every=crontab(minute=1,hour='0,7'))......
  • fastapi项目 04-JWT-Token
    前言对于flask,有 flask-jwt-extended插件快速实现生成登录token。fastapi-jwt-auth .它的灵感来自于flask-jwt-extended。官网教程地址https://indominusbyte.github.io/fastapi-jwt-auth/usage/basic/。1.fastapi-jwt-auth演示首先需要通过fastapi库生成登录token:pipi......
  • fastapi项目 03-注册,密码加密
    1.前沿一般对于后端的接口,特别是注册接口而言,密码都不是明文存储的,而是通过加密的方式,存储加密后的密码的。1.1环境准备我们需要下载第三方加密库:>pipinstallpasslibpasslib库里面会用到2个方法encrypt()-生成新的值,返回密码哈希verify()-根据现有哈希验证密码.......
  • docker部署fastapi
    使用Docker部署FastAPI应用程序可以提供更好的可移植性和隔离性。以下是使用Docker部署FastAPI应用程序的一般步骤:创建Dockerfile:在项目的根目录下创建一个名为Dockerfile的文件,用于定义Docker镜像的构建过程。在Dockerfile中,你需要指定基础镜像、复制应用程序......
  • 饮冰十年-人工智能-FastAPI-01- 深入理解 Python 协程
    Python协程是一种强大的异步编程工具,可以有效地处理并发任务,提高程序性能。在这篇博客中,我们将深入探讨协程的概念、用法以及如何在Python中使用它们。一、什么是协程协程定义协程(Coroutine)是一种特殊的函数,它可以在执行中暂停并在稍后的时间点继续执行。这种能力使得我们能......
  • FastAPI 中设置定时任务的方法:从入门到精通
    Web应用程序开发中,及时高效处理常规任务至关重要,包括定时收集数据或管理任务计划。针对强大且性能卓越的 FastAPI 框架,我们可以通过几种策略来管理这些必要的定时任务。实现FastAPI中的定时任务本指南将探讨在FastAPI环境中管理定时任务的三种实用方法:使用APScheduler,利用......
  • python celery的使用
    celery本生就不介绍了感兴趣的看https://c.biancheng.net/view/s0j4eth.html这个人家介绍的挺好的1.安装部署Celery涉及任务队列和结果存储,我们使用Redis,做例子前要先安装好redis。我们可以通过命令行方式下载和安装指定版本:#安装Celerypipinstallcelery==5.2.3#......