首页 > 其他分享 >使用Celery实现计划任务与异步任务

使用Celery实现计划任务与异步任务

时间:2023-05-19 20:32:53浏览次数:47  
标签:异步 celery beat work Celery 任务 test

前言

Celery是一个开源的分布式任务队列系统,用于处理异步任务和分布式任务调度。使用消息代理(如RabbitMQ、Redis)来实现任务队列和消息传递。

在使用Python开发web应用过程中,经常使用Celery完成两种任务需求:

  1. 异步任务。将任务提交到任务队列中,然后继续处理其他任务,而不必等待任务完成。
  2. 定时任务。根据需求设置任务的执行时间和频率。

本文的目的就是通过讲解Celery的相关信息,来实现计划任务与异步任务的操作。

Celery的组成

使用Celery实现计划任务与异步任务_异步任务

Celery由以下几个核心组件组成:

  • Celery应用程序(Celery Application):Celery应用程序是整个Celery系统的核心。它负责任务的创建、调度和分发。应用程序通常在项目的入口处初始化,包括配置Celery的消息代理、结果存储等参数。
  • 任务(Tasks):任务是要执行的操作或函数,可以是任何Python可调用对象。任务函数使用装饰器@task进行修饰,将其注册到Celery应用程序中。任务可以同步执行或异步执行,可以根据需要设置任务的参数、执行时间和其他属性。
  • 消息代理(Message Broker):消息代理是任务队列的基础,负责接收和存储任务消息,并将它们传递给工作节点进行处理。Celery支持多种消息代理,如RabbitMQ、Redis、Amazon SQS等。消息代理负责确保任务的可靠传递和分发。
  • 工作节点(Worker Nodes):工作节点是实际执行任务的计算节点。它们连接到消息代理,接收任务消息并执行任务函数。一个Celery应用程序可以有多个工作节点,允许任务在分布式环境中并行执行。工作节点可以水平扩展,以适应任务负载的增加。
  • 结果存储(Result Backend):结果存储是用于存储任务执行结果的地方。当任务完成后,工作节点将结果存储在结果存储中,供应用程序查询和获取。常见的结果存储包括数据库、缓存、消息队列等。
  • 调度器(Scheduler):调度器负责定时触发任务的执行。Celery中的调度器组件称为Celery Beat(简称beat)。它允许您定义定时任务,指定任务执行的时间和频率。beat会根据您定义的调度配置将任务发送到消息代理中的任务队列。

通过这些组件的协作,Celery实现了异步任务处理、分布式任务调度和执行,并提供了灵活的定时任务功能。开发人员可以使用这些组件来构建可靠、高性能的分布式应用程序。


Celery即相关服务的安装

本文中的代码部署在home/hero/celery_test目录下,在虚拟环境下运行。

cd /home/hero/celery_test
virtualenv celery_test
source venv/bin/activate

使用pip可以直接安装Celery模块

pip install Celery

一般情况下,Celery需要使用一些消息代理服务如redis来实现队列机制。作为示例在本机启动一个redis-server即可。

使用Celery实现计划任务与异步任务_linux_02

Celery的使用

文件结构

使用Celery实现计划任务与异步任务_异步任务_03

Celery实例

首先定义一个Celery实例,通过此实例来实现Celery的功能。

文件名为celeryobj.py。这段代码主要是定义了一个celery对象,并向其注册了两个包含异步任务的模块以及两个定时任务。

from celery import Celery
from celery.schedules import crontab
from datetime import timedelta


def init_celery():
    broker_url = 'redis://127.0.0.1:6379/1'  # 指定 Broker消费者,我们使用redis 1号数据库
    result_backend = 'redis://127.0.0.1:6379/2'  # 指定 Backend,最终消费结果,我们使用redis 2号数据库
    timezone = 'Asia/Shanghai'  # 指定时区,默认是 UTC
    # 用于注册异步任务
    # 注册内容为包含celery装饰器异步任务函数的文件名径字符串列表,
    # 示例内容为 test目录下的work和
    include = ['test.work',
               'test.beat']
    # 用于注册定时任务
    # 字典格式,键为定时任务的名称,值为任务信息。
    # 任务信息依旧字典格式。 键task为包含Celery装饰器定时任务函数模块引入的文件路径函数字符串。键schedule为定时任务执行周期频率
    beat_schedule = {
        'cron_1min': {
          'task': 'test.beat.cron_1min',
          'schedule': crontab(minute='*/1')
        },
        'timedelta_5s': {
            'task': 'test.beat.timedelta_5s',
            'schedule': timedelta(seconds=5)
        }
    }
    # 初始化Celery对象
    cel = Celery('CeleryOBJ', backend=result_backend, broker=broker_url, include=include)  # 创建 Celery 实例
    cel.conf.timezone = timezone
    cel.conf.enable_utc = False
    cel.conf.beat_schedule = beat_schedule
    return cel


c = init_celery()

任务的编写

在test目录下work和beat文件中编写使用celery装饰器的函数代码。

test/work.py

from celeryobj import c
import time
from datetime import datetime


@c.task   # celery 装饰器
def work_test(user, second):
    """
    测试异步任务
    :param user:
    :param second:
    :return:
    """
    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print('当前时间:{}'.format(now))
    print('{} 发起了任务,需要执行{}s'.format(user, second))
    time.sleep(second)  # 模拟串行任务长时间处理
    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print('当前时间:{}'.format(now))
    print('{} 发起的任务完成'.format(user))

test/beat.py

from celeryobj import c
from datetime import datetime


@c.task   # celery 装饰器
def cron_1min():
    """
    1min 计划任务 crontab方式
    :return:
    """
    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print('1min', now)

@c.task
def timedelta_5s():
    """
    5s 计划任务 timedelte方式
    :return:
    """
    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print('5s', now)

运行Celery

若只是为了调试,可以使用命令行直接启动,但是后续进入正式环境,为方便启停推荐使用supervisord进行操作,以下是配置文件。

异步任务守护

[program:celery_worker]
user=hero
directory=/home/hero/celery_test
command=/home/hero/celery_test/venv/bin/celery -A celeryobj.c worker -l info  --concurrency=4  --logfile=/home/hero/log/celery_work.log

定时任务守护

[program:celery_beat]
user=hero
directory=/home/hero/celery_test
command=/home/hero/celery_test/venv/bin/celery -A celeryobj.c beat -l info --logfile=/home/hero/log/celery_beat.log

启动Celery

supervisorctl reload
supervisorctl restart all

在supervisord的守护下,前文定义的celery_work 和 celery_beat可以正常运行。

使用命令supervisorctl status可以查看运行状态,并可以通过配置文件中的日志文件查看具体运行状态。

使用Celery实现计划任务与异步任务_异步任务_04

测试Celery

定时任务

celery_beat根据celeryobj.py中beat_schedule定义的计划周期将任务发送给celery_work。从日志上看就是这样的:

celery_beat的日志

使用Celery实现计划任务与异步任务_linux_05

celery_work 的日志

使用Celery实现计划任务与异步任务_linux_06

异步任务

测试任务使用一下代码

celery_run.py

from test import work

if __name__ == "__main__":
    work.work_test.delay("GuoGuo", 10)
    work.work_test.delay("Guo", 3)
    work.work_test.delay("G", 5)

使用celery装饰器的函数可以通过delay方法来启用celery的功能,将其交给celery worker来执行。如果不加delay仍可以作为普通函数来使用。

如果不使用celery可以使用一下代码来进行对比

run.py

from test import work

if __name__ == "__main__":
    work.work_test("GuoGuo", 10)
    work.work_test("Guo", 3)
    work.work_test("G", 5)

先执行run.py

使用Celery实现计划任务与异步任务_异步任务_07

任务是逐个串行完成的。

运行celery_run.py,同时观察日志可以直观的感受到celery是如何异步执行的。

使用Celery实现计划任务与异步任务_linux_08

同样是三个任务,几乎同时开始。




标签:异步,celery,beat,work,Celery,任务,test
From: https://blog.51cto.com/quietguoguo/6314854

相关文章

  • 单片机的裸机系统和多任务系统总结
    一、裸机系统1.1轮询系统 轮询系统是裸机编程时,先初始化好相关硬件,然后让主程序在一个死循环内不断循环,顺序完成各种事情。伪代码如下所示:1intmain(void)2{3/*硬件相关初始化*/4HardWareInit();56/*无限循环*/7for(;;){8......
  • 异步CDC及异步FIFO分享
    分享两篇很棒的论文:1.《ClockDomainCrossing(CDC)Design&VerificationTechniquesUsingSystemVerilog》http://www.sunburst-design.com/papers/CummingsSNUG2008Boston_CDC.pdf2.《SimulationandSynthesisTechniquesforAsynchronousFIFO......
  • C# Async异步
    原文链接:https://blog.csdn.net/zuheyawen/article/details/99863588转载连接:https://www.cnblogs.com/wcrBlog/p/11690460.html前言C#异步编程有几种实现方式,异步方法就是其中的一种。异步方法是C#5.0才有的新特性,主要采用async、await关键字声明为异步方法,完成对方法的异......
  • 异步爬虫
    异步爬虫多线程多进程协程多线程与多进程进程:运行中的程序,每次我们执行一个程序,操作系统对自动的为这个程序准备一些必要的资源(如:分配内存,创建一个能够执行的线程)线程:程序内,可以直接被CPU调度的执行过程,是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的......
  • LeetCode/完成任务的最少工作时间段
    一个工作时间段可以连续工作sessiontime个小时给你任务列表task,task[i]表示第i项任务花费时间求完成全部工作所需最小时间段(可以按任意顺序完成任务)1.回溯法回溯时按任务下标推进,边界条件为任务下标等于任务长度同时要记录回溯几个状态,分别是当前任务下标、已用时间段、各......
  • Centos7 设置定时任务
    原文链接:https://blog.csdn.net/weixin_38565317/article/details/127039873配置定时任务1.下载定时任务依赖yuminstallcrontabs12.设置为可用状态并启动systemctlenablecrondsystemctlstartcrond12如图所示,定时任务程序安装过程:3.配置定时任务文件vim/etc/crontab1如......
  • STI比赛任务一:【智能问答baseline】
    比赛简介百度搜索首届技术创新挑战赛:赛道一答案抽取STI比赛任务一:【比赛数据分析与长尾发现】STI比赛任务一:【NLP常见优化算法和上分Trick】STI比赛任务一:【智能问答baseline】任务定义本赛题任务是:给定一个用户搜索问题集合Q,基于每个搜索问题q,给定搜索引擎检索得到的网页文档集合......
  • distcp任务超时(Time out after 300 secs)的原因及优化方案
    distcp使用MapReduce执行数据复制操作时也可能会出现超时的情况,其可能的原因与普通MapReduce任务相似,包括以下几点:数据量过大:如果您的复制任务的数据量非常大,MapReduce任务可能需要更多时间来处理,从而导致超时。硬件不足:如果您的硬件资源不足以支持MapReduce任务,例如内存、C......
  • 关于razor 异步调用的一些新鲜点,记录
    很久没有写razor了,今天在做一个小工具的时候,通过查资料等,学习了新东西。关于razor通过js异步提交的问题。(不是访问特定的webapi)1.razro自带防 XSRF攻击,因而,调用后台的OnGet或者Onpost方法的时候,都会返回400错误。第一,razor页面,带上 @Html.AntiForgeryToken()第二,在startu......
  • 工作任务不再遗忘,好用的待办事项APP
    在快速发展的现代职场中,很多职场人士表示自己每天要记住、要完成的工作任务非常多,并且这些任务错综复杂,一不留神就会忘记今天待办的工作任务,这应该怎么办呢?其实每个人的记忆力都是有限的,如果想要不再遗忘每项工作任务,借助一款待办事项提醒APP就可以了。那么有没有一款好用的待办......