前言
Celery是一个开源的分布式任务队列系统,用于处理异步任务和分布式任务调度。使用消息代理(如RabbitMQ、Redis)来实现任务队列和消息传递。
在使用Python开发web应用过程中,经常使用Celery完成两种任务需求:
- 异步任务。将任务提交到任务队列中,然后继续处理其他任务,而不必等待任务完成。
- 定时任务。根据需求设置任务的执行时间和频率。
本文的目的就是通过讲解Celery的相关信息,来实现计划任务与异步任务的操作。
Celery的组成
Celery由以下几个核心组件组成:
- Celery应用程序(Celery Application):Celery应用程序是整个Celery系统的核心。它负责任务的创建、调度和分发。应用程序通常在项目的入口处初始化,包括配置Celery的消息代理、结果存储等参数。
- 任务(Tasks):任务是要执行的操作或函数,可以是任何Python可调用对象。任务函数使用装饰器@task进行修饰,将其注册到Celery应用程序中。任务可以同步执行或异步执行,可以根据需要设置任务的参数、执行时间和其他属性。
- 消息代理(Message Broker):消息代理是任务队列的基础,负责接收和存储任务消息,并将它们传递给工作节点进行处理。Celery支持多种消息代理,如RabbitMQ、Redis、Amazon SQS等。消息代理负责确保任务的可靠传递和分发。
- 工作节点(Worker Nodes):工作节点是实际执行任务的计算节点。它们连接到消息代理,接收任务消息并执行任务函数。一个Celery应用程序可以有多个工作节点,允许任务在分布式环境中并行执行。工作节点可以水平扩展,以适应任务负载的增加。
- 结果存储(Result Backend):结果存储是用于存储任务执行结果的地方。当任务完成后,工作节点将结果存储在结果存储中,供应用程序查询和获取。常见的结果存储包括数据库、缓存、消息队列等。
- 调度器(Scheduler):调度器负责定时触发任务的执行。Celery中的调度器组件称为Celery Beat(简称beat)。它允许您定义定时任务,指定任务执行的时间和频率。beat会根据您定义的调度配置将任务发送到消息代理中的任务队列。
通过这些组件的协作,Celery实现了异步任务处理、分布式任务调度和执行,并提供了灵活的定时任务功能。开发人员可以使用这些组件来构建可靠、高性能的分布式应用程序。
Celery即相关服务的安装
本文中的代码部署在home/hero/celery_test
目录下,在虚拟环境下运行。
cd /home/hero/celery_test
virtualenv celery_test
source venv/bin/activate
使用pip可以直接安装Celery模块
pip install Celery
一般情况下,Celery需要使用一些消息代理服务如redis来实现队列机制。作为示例在本机启动一个redis-server即可。
Celery的使用
文件结构
Celery实例
首先定义一个Celery实例,通过此实例来实现Celery的功能。
文件名为celeryobj.py。这段代码主要是定义了一个celery对象,并向其注册了两个包含异步任务的模块以及两个定时任务。
from celery import Celery
from celery.schedules import crontab
from datetime import timedelta
def init_celery():
broker_url = 'redis://127.0.0.1:6379/1' # 指定 Broker消费者,我们使用redis 1号数据库
result_backend = 'redis://127.0.0.1:6379/2' # 指定 Backend,最终消费结果,我们使用redis 2号数据库
timezone = 'Asia/Shanghai' # 指定时区,默认是 UTC
# 用于注册异步任务
# 注册内容为包含celery装饰器异步任务函数的文件名径字符串列表,
# 示例内容为 test目录下的work和
include = ['test.work',
'test.beat']
# 用于注册定时任务
# 字典格式,键为定时任务的名称,值为任务信息。
# 任务信息依旧字典格式。 键task为包含Celery装饰器定时任务函数模块引入的文件路径函数字符串。键schedule为定时任务执行周期频率
beat_schedule = {
'cron_1min': {
'task': 'test.beat.cron_1min',
'schedule': crontab(minute='*/1')
},
'timedelta_5s': {
'task': 'test.beat.timedelta_5s',
'schedule': timedelta(seconds=5)
}
}
# 初始化Celery对象
cel = Celery('CeleryOBJ', backend=result_backend, broker=broker_url, include=include) # 创建 Celery 实例
cel.conf.timezone = timezone
cel.conf.enable_utc = False
cel.conf.beat_schedule = beat_schedule
return cel
c = init_celery()
任务的编写
在test目录下work和beat文件中编写使用celery装饰器的函数代码。
test/work.py
from celeryobj import c
import time
from datetime import datetime
@c.task # celery 装饰器
def work_test(user, second):
"""
测试异步任务
:param user:
:param second:
:return:
"""
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print('当前时间:{}'.format(now))
print('{} 发起了任务,需要执行{}s'.format(user, second))
time.sleep(second) # 模拟串行任务长时间处理
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print('当前时间:{}'.format(now))
print('{} 发起的任务完成'.format(user))
test/beat.py
from celeryobj import c
from datetime import datetime
@c.task # celery 装饰器
def cron_1min():
"""
1min 计划任务 crontab方式
:return:
"""
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print('1min', now)
@c.task
def timedelta_5s():
"""
5s 计划任务 timedelte方式
:return:
"""
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print('5s', now)
运行Celery
若只是为了调试,可以使用命令行直接启动,但是后续进入正式环境,为方便启停推荐使用supervisord进行操作,以下是配置文件。
异步任务守护
[program:celery_worker]
user=hero
directory=/home/hero/celery_test
command=/home/hero/celery_test/venv/bin/celery -A celeryobj.c worker -l info --concurrency=4 --logfile=/home/hero/log/celery_work.log
定时任务守护
[program:celery_beat]
user=hero
directory=/home/hero/celery_test
command=/home/hero/celery_test/venv/bin/celery -A celeryobj.c beat -l info --logfile=/home/hero/log/celery_beat.log
启动Celery
supervisorctl reload
supervisorctl restart all
在supervisord的守护下,前文定义的celery_work 和 celery_beat可以正常运行。
使用命令supervisorctl status
可以查看运行状态,并可以通过配置文件中的日志文件查看具体运行状态。
测试Celery
定时任务
celery_beat根据celeryobj.py中beat_schedule定义的计划周期将任务发送给celery_work。从日志上看就是这样的:
celery_beat的日志
celery_work 的日志
异步任务
测试任务使用一下代码
celery_run.py
from test import work
if __name__ == "__main__":
work.work_test.delay("GuoGuo", 10)
work.work_test.delay("Guo", 3)
work.work_test.delay("G", 5)
使用celery装饰器的函数可以通过delay方法来启用celery的功能,将其交给celery worker来执行。如果不加delay仍可以作为普通函数来使用。
如果不使用celery可以使用一下代码来进行对比
run.py
from test import work
if __name__ == "__main__":
work.work_test("GuoGuo", 10)
work.work_test("Guo", 3)
work.work_test("G", 5)
先执行run.py
任务是逐个串行完成的。
运行celery_run.py,同时观察日志可以直观的感受到celery是如何异步执行的。
同样是三个任务,几乎同时开始。