简介
celery 是一个分布式任务调度框架,由 python 编写。它专注于实时处理,在任务发布后,管理分配任务到不同的服务器,并取得结果。在执行任务分配时需要一个消息中间件(Broker),在 客户端 和 Worker 之间进行协调,比如:MQ/redis等。最后将任务返回的结果存储到数据库(Backend)
Celery 使用
1、安装
使用 pip 命令安装
pip install celery
2、初始化
我的文件目录如下:
2.1 创建 celery_myself.py 文件,实例化 Celery 类
from celery import Celery
# celery_myself是当前模块的名称,可以省略,建议以当前模块名为名称
celery = Celery('celery_myself',
# 选择中间件redis
# broker='redis://sy-suz-test03:6379/0'
# 选择中间件 rabbitmq
broker='amqp://账号:密码@127.0.0.1:5672/tob')
# 注意这里,必须导入自定义的tasks任务到实例对象celery
# celery.conf['imports'] = ['core.tasks', ]
celery.conf.update(imports = ['core.tasks',])
2.2 创建 tasks.py 文件,定义任务函数
from core.celery_myself import celery
@celery.task
def fn(a, b):
data = a*b
print(f'参数{a}、{b}。计算结果{data}')
return data
3、运行 worker
CD 到 celery_myself.py 目录下,执行以下命令:
celery -A celery_myself worker --pool=solo --loglevel=INFO
看到以下信息成功运行:
遇到一个问题,写这篇博客的时候,新建了一个虚拟环境运行demo,但运行命令时报错:ImportError: cannot import name 'Celery' from 'celery'
,排查了好久,我的另一个python虚拟环境是没问题的,网上也搜不到解决方法,后来google外网看到了一篇和我同样问题的帖子。原因是库 importlib-metadata 版本太高了和 celery 不兼容,将 importlib-metadata 降成 4.13.0 就OK了。
4、调用任务
创建 run.py 文件,调用任务函数
from core.tasks import fn
fn.delay(2,3)
通过调用任务的 delay 来执行对应的任务。celery 会把执行命令发送到 broker,broker 再将消息发送给 worker 服务来执行,如果一切正常你将会在 worker 服务的日志中看到接收任务和执行任务的日志。
5、保存结果
修改 celery_myself.py 文件,增加 backend 参数,比如设置 redis 存储
from celery import Celery
# celery_myself是当前模块的名称,可以省略,建议以当前模块名为名称
celery = Celery('celery_myself',
# 指定中间件 rabbitmq
broker='amqp://账号:密码@127.0.0.1:5672/tob',
# 指定存储数据库 redis
backend='redis://127.0.0.1:6379/0')
再调用任务函数
from core.tasks import fn
res = fn.delay(2,3)
# 查看任务是否执行完成了。返回True表示执行完成,False表示未完成
print(res.ready())
查看 redis 数据,result 值为 6 。
6、在 Flask 框架下运用
先安装pip install flask-celery-helper
,辅助处理 Celery 的初始化,运用 init_app() 方法。Flask 官方文档说是现在不需要再扩展了,但如果使用 工厂模式 创建app的时候 实例化Celery 的参数配置在app对象里的话,就得扩展使用 init_app() 来初始化参数。
标签:Celery,myself,异步,python,redis,celery,任务,import From: https://www.cnblogs.com/shenh/p/16788709.html