前言
官方文档
中文官方文档
基于 python 开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理,但本身不提供消息服务。
Celery 通过消息机制进行通信,通常使用中间人(Broker)作为客户端和职程(Worker)调节。
- 中间件 (Broker):接收和发送消息,通常以独立的服务形式出现。常用 Redis 或 RabbitMQ
使用
启动方式
- 直接运行文件
python app.py
示例
from celery import Celery
broker = ...
backend = ...
app = Celery('tasks', broker=broker, backend=backend)
if __name__ == '__main__':
args = ['worker', '--loglevel=INFO']
# app.worker_main(argv=args)
app.start(args)
- 命令行启动
示例
# module/app.py
celery worker -A module.app -l INFO -c 4
# 后台启动
celery multi start worker -A module.app -l INFO
- -c:进程数
- -l:日志级别
- 重启:
celery multi restart ...
- 停止:
celery multi stop ...
- 等待任务完成再停止:
celery multi stopwait ...
基础使用
单文件任务
创建实例
import time
import celery
# 'amqp://USER:PASSWORD@HOST:PORT/QUEUE' # RabbitMQ
# 'redis://PASSWORD@HOST:PORT/DB' # redis
broker = 'amqp://guest:guest@127.0.0.1'
backend = 'redis://localhost:6379/1'
# 创建实例
# tasks:项目名
app = celery.Celery('tasks', backend=backend, broker=broker)
@app.task
def add(*args):
return sum(args)
@app.task
def multi(*args):
result = 1
for i in args:
result *= i
return result
if __name__ == '__main__':
args = ['worker', '--loglevel=INFO']
# app.worker_main(argv=args)
app.start(args)
生产者(发布消息)
from app import add, multi
result_add = add.delay(2, 3) # delay()是apply_async()的快捷方法
print(f'add的任务id:{result_add.id}')
result_multi = multi.delay(2, 3)
print(f'multi的任务id:{result_multi.id}')
任务结果校验
import time
from celery.result import AsyncResult
from app import app
result_id = '' # 填入某个任务的id
async_result = AsyncResult(id=result_id, app=app)
while True:
if async_result.successful():
result = async_result.get()
print(f'结果:{result}')
# result.forget() # 将结果删除。执行完成后,结果不会自动删除
# result.revoke(terminate=True) # 无论现在是什么时候,都终止
# result.revoke(terminate=False) # 如果任务还没开始执行,则终止
break
elif async_result.failed():
print('执行失败')
break
elif async_result.status == 'PENDING':
print('等待执行')
elif async_result.status == 'RETRY':
print('重试中')
elif async_result.status == 'STARTED':
print('执行中')
time.sleep(3)
多任务
通过 include 参数指定任务模块
创建实例
# module/app.py
from celery import Celery
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/1'
app = Celery(
'celery',
broker=broker,
backend=backend,
include=[
'task01',
'task02',
]
)
if __name__ == '__main__':
args = ['worker', '--loglevel=INFO']
app.start(args)
任务1
# module/task01.py
from app import app
@app.task
def add(*args):
return sum(args)
任务2
# module/task02.py
from app import app
@app.task
def multi(x):
return x * x
生产者(producer.py)
import task01, task02
result_add = task01.add.delay(3)
result_multi= task02.multi.delay(3)
print(f'add的任务id:{result_add.id}')
print(f'multi的任务id:{result_multi.id}')
定时任务
创建实例
# 参考“基础使用”
import time
import celery
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://localhost:6379/1'
app = celery.Celery('tasks', backend=backend, broker=broker)
@app.task(ignore_result=True) # 不需要结果
def send_email(name):
print(f'Start send email to {name}')
time.sleep(5)
print(f'Over')
return {'code': 200}
生产者
import datetime
from app import add
now = datetime.datetime.now()
send_time = now + datetime.timedelta(seconds=10)
send_time = datetime.datetime.strftime(send_time, '%Y-%m-%d %H:%M:%S')
print(f'now:{now}')
result = send_email.apply_async(args=['abc', ], eta=send_time)
print(result.id)
进阶
TODO
标签:multi,app,args,celery,result,使用,import From: https://www.cnblogs.com/FevolQ/p/17577383.html