celery是什么
Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。
Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
消息中间件
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等
任务执行单元
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
任务结果存储
Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等
另外, Celery还支持不同的并发和序列化的手段
并发:Prefork, Eventlet, gevent, threads/single threaded
序列化:pickle, json, yaml, msgpack. zlib, bzip2 compression, Cryptographic message signing 等等
使用场景
celery是一个强大的 分布式任务队列的异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。
异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等
定时任务:定时执行某件事情,比如每天数据统计
安装
pip install -U Celery
或着:
sudo easy_install Celery
基本使用
单文件夹下使用
创建celery_test文件夹,pro写生产端,tasks写消费端,result用来获取请求结果
- 创建消费端代码 tasks.py
import celery
import time
backend = "redis://:密码@ip:port/14"
broker = "redis://:密码@ip:port/15"
cel = celery.Celery('test', backend=backend, broker=broker)
# 使用装饰器 加载任务
@cel.task()
def send_sms(name):
print("开始发送")
time.sleep(3)
return "发送成功%s" % name
在tasks.py的当前目录运行celery的worker
celery -A tasks worker -l info
5版本命令和之前略有不同
出现下面信息表示成功运行
- 创建生产端代码 Pro.py
运行后返回一个id值
from tasks import send_sms
ret = send_sms.delay("hahaha ")
print(ret)
- 创建异步获取结果 result.py
from tasks import cel
from celery.result import AsyncResult
# id是pro运行后获取的id值,app是生产端的实例对象
asyncresult = AsyncResult(id="4b235142-1921-4eff-9e41-27886b273a8e",app=cel)
print(asyncresult.status)
ret = asyncresult.get()
asyncresult.forget()
print(ret)
多文件夹下使用
文件结构如下, mycelery用来保存celery实例的配置
- mycelery.py 注意此时需要用include关键字,指定异步的任务路径
import celery
backend='redis://:password@ip:port/14'
broker='redis://:password@ip:port/15'
cel=celery.Celery('test',backend=backend,broker=broker,include=[
'celery_tasks.task01',
'celery_tasks.task02'
])
# 时区
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
cel.conf.enable_utc = False
- task01/02.py
写入任务函数,注意导包路径
from celery_tasks.mycelery import cel
import time
# 使用装饰器 加载任务
@cel.task()
def send_sms(name):
print("短信开始发送")
time.sleep(3)
return "短信发送成功%s" % name
- 启动worker,注意启动路径和参数里包含的相对路径
celery_tasks.mycelery
建议后续都从根路径启动workder,并用相对路径指定配置文件的位置,
这里是在celery_test2的根路径下启动的worker
young_shi@MacBook-Air-2 celery_test2 % ls
celery_tasks pro.py result.py
young_shi@MacBook-Air-2 celery_test2 % celery -A celery_tasks.mycelery worker -l info
- 生产端代码
from celery_tasks.task01 import send_sms
result = send_sms.delay("yuan")
print(result.id)
result2 = send_sms.delay("alex")
print(result2.id)
celery执行定时任务
celey的delay方法可以异步执行,而定时任务要用到apply_async
方法
- 简单结构下的定时任务 在pro文件给apply_async添加延时
from celery_task import send_email
from datetime import datetime
# 方式一
# v1 = datetime(2020, 3, 11, 16, 19, 00)
# print(v1)
# v2 = datetime.utcfromtimestamp(v1.timestamp())
# print(v2)
# result = send_email.apply_async(args=["egon",], eta=v2)
# print(result.id)
# 方式二
ctime = datetime.now()
# 默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay
# 使用apply_async并设定时间
result = send_email.apply_async(args=["egon"], eta=task_time)
print(result.id)
- 多任务结构中