首页 > 其他分享 >Celery 任务路由的使用,在多任务时,实现分组管理任务

Celery 任务路由的使用,在多任务时,实现分组管理任务

时间:2024-01-16 17:36:40浏览次数:39  
标签:universities celery task update Celery 任务 result conf 多任务

Celery 任务路由的使用,在多任务时,实现分组管理任务

Celery 任务路由的使用,本文参考了国外大佬的文章,并做了修改与补充原文见这里,Bjoern Stiel

Celery 官方文档:Routing Tasks

任务路由的主要作用:把任务分类,让他们在不同队列中运行,互不干扰,同时方便管理

本文介绍 3 种任务路由添加方式,如不分组,则全部任务都执行在 celery 默认队列中


代码

本文的代码 基于 FastAPi Celery RabbitMQ 与 Redis 的使用,并使用 Flower 监控 Celery 状态 这篇文章修改


第1种

使用指定格式的 字典对象来实现,其中的字典 key main.*,表示匹配所有 main.开头的任务,universities:*同理

修改文件 celery_utils.py

"""
@ File        : celery_utils.py
@ Author      : yqbao
@ Version     : V1.0.0
@ Description : 配置并获取 celery 实例
"""
from celery import Celery
from celery.result import AsyncResult


def create_celery():
    celery = Celery(__name__)

    celery.conf.update(timezone="Asia/Shanghai")  # 时区
    celery.conf.update(enable_utc=False)  # 关闭UTC时区。默认启动

    # 任务路由,为任务分配不同的队列(可理解为分组)
    # 方案1
    # main.*,表示匹配所有 main.开头的任务,匹配支持正则表达式
    # {'queue': 'celery'},中,celery 表示队列名称
    celery.conf.update(task_routes={
        'main.*': {'queue': 'celery'},
        'universities:*': {'queue': 'universities'},
        'university:*': {'queue': 'university'}
    })

    celery.conf.update(task_track_started=True)  # 启动任务跟踪
    celery.conf.update(task_serializer='pickle')  # 任务序列化方式
    celery.conf.update(result_serializer='pickle')  # 结果序列化方式
    celery.conf.update(accept_content=['pickle', 'json'])  # 接受的类型
    celery.conf.update(result_expires=200)  # 结 果过期时间,200s
    celery.conf.update(result_persistent=True)
    celery.conf.update(worker_send_task_events=False)
    celery.conf.update(worker_prefetch_multiplier=1)
    celery.conf.update(broker_connection_retry_on_startup=True)  # 启动时重试代理连接

    return celery


def get_task_info(task_id):
    """返回给定task_id的任务信息"""
    task_result = AsyncResult(task_id)
    result = {
        "task_id": task_id,
        "task_status": task_result.status,
        "task_result": task_result.result
    }
    return result

第2种

使用 Queue 实例的包含列表的元组

修改文件 celery_utils.py

"""
@ File        : celery_utils.py
@ Author      : yqbao
@ Version     : V1.0.0
@ Description : 配置并获取 celery 实例
"""
from celery import Celery
from celery.result import AsyncResult
from kombu import Queue


def create_celery():
    celery = Celery(__name__)

    celery.conf.update(timezone="Asia/Shanghai")  # 时区
    celery.conf.update(enable_utc=False)  # 关闭UTC时区。默认启动

    # 任务路由,为任务分配不同的队列(可理解为分组)
    # 方案2
    # main.*,表示匹配所有 main.开头的任务,匹配支持正则表达式
    celery.conf.update(task_routes=(
        Queue(name='celery', routing_key='main.*'),
        Queue(name='universities', routing_key='universities:*'),
        Queue(name='university', routing_key='university:*'),
    ))

    celery.conf.update(task_track_started=True)  # 启动任务跟踪
    celery.conf.update(task_serializer='pickle')  # 任务序列化方式
    celery.conf.update(result_serializer='pickle')  # 结果序列化方式
    celery.conf.update(accept_content=['pickle', 'json'])  # 接受的类型
    celery.conf.update(result_expires=200)  # 结 果过期时间,200s
    celery.conf.update(result_persistent=True)
    celery.conf.update(worker_send_task_events=False)
    celery.conf.update(worker_prefetch_multiplier=1)
    celery.conf.update(broker_connection_retry_on_startup=True)  # 启动时重试代理连接

    return celery


def get_task_info(task_id):
    """返回给定task_id的任务信息"""
    task_result = AsyncResult(task_id)
    result = {
        "task_id": task_id,
        "task_status": task_result.status,
        "task_result": task_result.result
    }
    return result

第3种

使用任务名称,自动分配队列

  • 修改文件 celery_utils.py
"""
@ File        : celery_utils.py
@ Author      : yqbao
@ Version     : V1.0.0
@ Description : 配置并获取 celery 实例
"""
from celery import Celery
from celery.result import AsyncResult


def route_for_ask(name, *args, **kwargs):
    if ":" not in name:
        return {"queue": "celery"}
    queue, _ = name.split(":")
    return {"queue": queue}


def create_celery():
    celery = Celery(__name__)

    celery.conf.update(timezone="Asia/Shanghai")  # 时区
    celery.conf.update(enable_utc=False)  # 关闭UTC时区。默认启动

    # 任务路由,为任务分配不同的队列(可理解为分组)
    # 方案3
    celery.conf.update(task_routes=(route_for_ask,))

    celery.conf.update(task_track_started=True)  # 启动任务跟踪
    celery.conf.update(task_serializer='pickle')  # 任务序列化方式
    celery.conf.update(result_serializer='pickle')  # 结果序列化方式
    celery.conf.update(accept_content=['pickle', 'json'])  # 接受的类型
    celery.conf.update(result_expires=200)  # 结 果过期时间,200s
    celery.conf.update(result_persistent=True)
    celery.conf.update(worker_send_task_events=False)
    celery.conf.update(worker_prefetch_multiplier=1)
    celery.conf.update(broker_connection_retry_on_startup=True)  # 启动时重试代理连接

    return celery


def get_task_info(task_id):
    """返回给定task_id的任务信息"""
    task_result = AsyncResult(task_id)
    result = {
        "task_id": task_id,
        "task_status": task_result.status,
        "task_result": task_result.result
    }
    return result
  • 任务名称的设置,见 tasks.py文件中的name
"""
@ File        : tasks.py
@ Author      : yqbao
@ Version     : V1.0.0
@ Description : 添加 Celery 任务
"""
from typing import List
from celery import shared_task
import universities

# name='universities:get_all_universities_task',冒号前的值,决定此任务被分配到那个队列中执行
@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={"max_retries": 5},
             name='universities:get_all_universities_task')
def get_all_universities_task(self, countries: List[str]):
    data: dict = {}
    for cnt in countries:
        data.update(universities.get_all_universities_for_country(cnt))
    return data


@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={"max_retries": 5},
             name='university:get_university_task')
def get_university_task(self, country: str):
    university = universities.get_all_universities_for_country(country)
    return university

启动 API 与 Celery:

  1. 启动 api:启动后访问 http://127.0.0.1:9000/docs查看 API 文档

  2. 添加队列分组后启动命令,与不分组有所不同

# -Q:启动队列,celery为默认队列,不需要则不加
celery -A main.celery worker -l info -Q universities,university -P gevent -c 2

image


执行任务API,查看任务结果

  1. 在 API 文档中 分别执行 /universities/async /universities/parallel你分别会看到下面这样的结果

image

本文章的原文地址
GitHub主页

标签:universities,celery,task,update,Celery,任务,result,conf,多任务
From: https://www.cnblogs.com/yqbaowo/p/17968110

相关文章

  • FastAPi Celery RabbitMQ 与 Redis 的使用,并使用 Flower 监控 Celery 状态
    FastAPiCeleryRabbitMQ与Redis的使用,并使用Flower监控Celery状态本文介绍了Windows下FastAPiCelery使用RabbitMQ与Redis做代理的使用方法,本文参考了国外大佬的文章,并做了修改与补充,原文见这里,SumanDas,他文章中的完整代码,见这里,GitHubRabbitMQ与Redis的......
  • sqlserver查询最近失败的任务
    selectjob_id,step_name,message,cast((cast(LEFT(run_date,4)ASVARCHAR)+'-'+SUBSTRING(cast(run_dateASVARCHAR),5,2)+'-'+cast(RIGHT(run_date,2)ASVARCHAR))+'......
  • 时光机启动:Spring中如何巧妙实现定时任务?
    嗨,亲爱的小伙伴们!小米在这里又来和大家分享一些技术干货啦!今天我们要探讨的话题是关于Spring框架中如何实现定时任务。对于我们这些热爱技术的小伙伴来说,定时任务可是一个非常有趣而且实用的话题哦!引子首先,让我们简单了解一下什么是定时任务。在软件开发中,定时任务就是按照一定的时......
  • C#线程篇---Task(任务)和线程池不得不说的秘密
    C#线程篇---Task(任务)和线程池不得不说的秘密 我们要知道的是,QueueUserWorkItem这个技术存在许多限制。其中最大的问题是没有一个内建的机制让你知道操作在什么时候完成,也没有一个机制在操作完成是获得一个返回值,这些问题使得我们都不敢启用这个技术。Microsoft为了克服......
  • 定时任务及异步,自定义注解进行参数校验
    简单来说:浅拷贝:对基本数据类型进行值传递,对引用数据类型进行引用传递般的拷贝,此为浅拷贝深拷贝:对基本数据类型进行值传递,对引用数据类型,创建一个新的对象,并复制其内容,此为深拷贝。 如何在Spring/SpringBoot中优雅地做参数校验?springboot项目使用validation-api......
  • JUC并发编程 用CompletableFuture 创建异步任务
    1CompletableFuture对Future的改进1.1CompletableFuture为什么会出现get()方法在Future计算完成之前会一直处在阻塞状态下,阻塞的方式和异步编程的设计理念相违背。isDene()方法容易耗费cpu资源(cpu空转),对于真正的异步处理我们希望是可以通过传入回调函数,在Future结束时自动......
  • Controller(StatefulSet)-部署有状态应用,部署守护进程,一次任务和定时任务
    Controller(StatefulSet)-部署有状态应用在Kubernetes中,StatefulSet是一种用于部署有状态应用的控制器。与无状态应用不同,有状态应用需要保持持久性和可识别的网络标识。在有状态应用中,每个Pod都有一个唯一的标识符,并且Pod的创建和删除顺序是有序的。在StatefulSet中创建的Pod具有以......
  • crontab 任务报错生成小文件及邮件告警处理
    CRONTAB服务不断发邮件问题解决问题背景操作系统:LinuxSuSe12ps5规格:4C8G现在描述:操作系统异常卡慢,甚至无法正常登录到服务器,服务器/var/log/messages的日志过大,撑爆了/var目录磁盘空间。/var/spool/postfix/maildrop目录不存在会导致一起调起sendmail与postdrop的......
  • celery 任务
    一、周期性任务示例代码fromdjango.core.mailimportsend_mailfromcelery.task.baseimportperiodic_taskfromcelery.schedulesimportcrontabfromcelery.exceptionsimportSoftTimeLimitExceeded#@periodic_task(run_every=crontab(minute=1,hour='0,7'))......
  • 地推拉新任务管理助手程序开发
    拉新地推任务管理分销助手程序开发,旨在为企业提供一套专业、高效的管理工具,以优化地推任务分配、提高销售业绩。以下是该程序的核心功能说明:任务管理模块:允许管理员创建、分配和管理地推任务。管理员能够精确地指派任务给特定的地推人员,确保工作的高效执行。地推人员则可通过程序接......