1. 什么是Celery
4.4.0 | Celery 中文手册 (celerycn.io)
1.1 介绍
Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。
Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
消息中间件
Celety本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成,包括RabbitMQ、Redis等等。
任务执行单元
Worker是Celery提供的任务执行单元,Worker并发的运行在分布式的系统节点中。
任务结果存储
Task result store 用来存储Worker执行的任务结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等,另外Celery还支持不同的并发和系列化的手段。
-
并发: Prefork、Eventlet、 gevent、 threads/single threaded
-
序列化:pickle、json、yaml、msgpack、zlib、bzip2 compression、Cryptographic message signing等等
1.2 使用场景
celery是一个强大的 分布式任务队列的异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。
-
异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等
-
定时任务:定时执行某件事情,比如每天数据统计
1.3 Celery的优点
Simple(简单)Celery 使用和维护都非常简单,并且不需要配置文件。
-
Highly Available(高可用):woker和client会在网络连接丢失或者失败时,自动进行重试。并且有的brokers 也支持“双主”或者“主/从”的方式实现高可用。
-
Fast(快速):单个的Celery进程每分钟可以处理百万级的任务,并且只需要毫秒级的往返延迟(使用 RabbitMQ, librabbitmq, 和优化设置时)
-
Flexible(灵活):Celery几乎每个部分都可以扩展使用,自定义池实现、序列化、压缩方案、日志记录、调度器、消费者、生产者、broker传输等等。
2. celery 的基本使用
2.1 处理异步任务
消费者
import celery
import time
broker = "redis://192.168.1.200:6379/1"
backend = "redis://192.168.1.200:6379/2"
app = celery.Celery("test", backend=backend, broker=broker)
@app.task
def send_email(name):
print(f"向{name}发送邮件...")
time.sleep(5)
print(f"向{name}发送邮件完成")
return "ok"
"""
celery worker -A celery_task -l info -P gevent
"""
生成者
import time
from celery_task import send_email
from celery.result import AsyncResult
from celery_task import app
task_list = []
# 发布任务
result = send_email.delay("小明")
print(result.id)
task_list.append(result.id)
result = send_email.delay("小红")
print(result.id)
task_list.append(result.id)
# 异步监控任务
while task_list:
task_id = task_list[0]
async_result = AsyncResult(id=task_id, app=app)
if async_result.successful():
result = async_result.get()
# 会从数据库中删除
# async_result.forget()
print(result)
# 任务执行完成从任务列表中移除
task_list.remove(task_id)
elif async_result.failed():
print("执行失败")
elif async_result.status == "PENDING":
print("任务等待被执行")
elif async_result.status == "RETRY":
print("任务异常正在重试")
elif async_result.status == "STARTED":
print("任务正在执行")
else:
print("任务其他的状态")
time.sleep(1)
2.2 多任务执行
文件结构
| application.py │ product_task.py ├─celery_task │ │ task1.py │ │ task2.py
执行任务
# task1.py
import time
from application import app
@app.task
def send_email(name):
print(f"向{name}发送邮件...")
time.sleep(5)
print(f"向{name}发送邮件完成")
return "ok"
# task2.py
import time
from application import app
@app.task
def send_smg(name):
print(f"向{name}发送短信...")
time.sleep(5)
print(f"向{name}发送短信完成")
return "ok"
应用
# application.py
import celery
broker = "redis://192.168.1.200:6379/1"
backend = "redis://192.168.1.200:6379/2"
app = celery.Celery("test", backend=backend, broker=broker,
include=["celery_task.task1", "celery_task.task2"])
"""
celery -A application worker -l info -P gevent
"""
发布任务
# product_task.py
import time
from celery_task.task1 import send_email
from celery_task.task2 import send_smg
from celery.result import AsyncResult
from application import app
task_list = []
# 发布任务
result = send_email.delay("小明")
print(result.id)
task_list.append(result.id)
result = send_smg.delay("小红")
print(result.id)
task_list.append(result.id)
# 异步监控任务
while task_list:
task_id = task_list[0]
async_result = AsyncResult(id=task_id, app=app)
if async_result.successful():
result = async_result.get()
# 会从数据库中删除
# async_result.forget()
print(result)
# 任务执行完成从任务列表中移除
task_list.remove(task_id)
elif async_result.failed():
print("执行失败")
elif async_result.status == "PENDING":
print("任务等待被执行")
elif async_result.status == "RETRY":
print("任务异常正在重试")
elif async_result.status == "STARTED":
print("任务正在执行")
else:
print("任务其他的状态")
time.sleep(1)
3.3 定时任务
发布延时任务
文件结构
| application.py │ product_task.py ├─celery_task │ │ task1.py │ │ task2.py
执行任务
# task1.py
import time
from application import app
@app.task
def send_email(name):
print(f"向{name}发送邮件...")
time.sleep(5)
print(f"向{name}发送邮件完成")
return "ok"
# task2.py
import time
from application import app
@app.task
def send_smg(name):
print(f"向{name}发送短信...")
time.sleep(5)
print(f"向{name}发送短信完成")
return "ok"
应用
# application.py
import celery
broker = "redis://192.168.1.200:6379/1"
backend = "redis://192.168.1.200:6379/2"
app = celery.Celery("test", backend=backend, broker=broker,
include=["celery_task.task1", "celery_task.task2"])
"""
celery -A application worker -l info -P gevent
"""
发布任务
# product_task.py
import time
from datetime import datetime, timedelta
from celery.result import AsyncResult
from celery_task.task1 import send_email
from celery_task.task2 import send_smg
from application import app
task_list = []
now_time = datetime.now()
now_stamp = datetime.utcfromtimestamp(now_time.timestamp())
# 发布定时任务, 延时10秒钟执行
result = send_email.apply_async(args=["小明"], eta=now_stamp + timedelta(seconds=10))
print(result.id)
task_list.append(result.id)
result = send_smg.apply_async(args=["小红"], eta=now_stamp + timedelta(seconds=10))
print(result.id)
task_list.append(result.id)
# 异步监控任务
while task_list:
task_id = task_list[0]
async_result = AsyncResult(id=task_id, app=app)
if async_result.successful():
result = async_result.get()
# 会从数据库中删除
# async_result.forget()
print(result)
# 任务执行完成从任务列表中移除
task_list.remove(task_id)
elif async_result.failed():
print("执行失败")
elif async_result.status == "PENDING":
print("任务等待被执行")
elif async_result.status == "RETRY":
print("任务异常正在重试")
elif async_result.status == "STARTED":
print("任务正在执行")
else:
print("任务其他的状态")
time.sleep(1)
使用beat执行定时任务
文件结构
| application.py │ product_task.py ├─celery_task │ │ task1.py │ │ task2.py
执行任务
# task1.py
import time
from application import app
@app.task
def send_email(name):
print(f"向{name}发送邮件...")
time.sleep(5)
print(f"向{name}发送邮件完成")
return "ok"
# task2.py
import time
from application import app
@app.task
def send_smg(name):
print(f"向{name}发送短信...")
time.sleep(5)
print(f"向{name}发送短信完成")
return "ok"
应用
# application.py
import celery
broker = "redis://192.168.1.200:6379/1"
backend = "redis://192.168.1.200:6379/2"
app = celery.Celery("test", backend=backend, broker=broker,
include=["celery_task.task1", "celery_task.task2"])
"""
celery -A application worker -l info -P gevent
"""
发布任务
# product_task.py
from celery.schedules import crontab
from application import app
# 发布定时任务
app.conf.beat_schedule = {
# 10秒执行一次
"each10s_task": {
"task": "celery_task.task1.send_email",
"schedule": 10,
"args": ("小明", )
},
# 一分钟执行一次
"each1m_task": {
"task": "celery_task.task2.send_smg",
"schedule": crontab(minute="*/1"),
"args": ("小红", )
}
}
"""
celery -A product_task beat
"""
crotab语法规则
crontab的语法规则格式:
代表意义 | 分钟 | 小时 | 日期 | 月份 | 周 | 命令 |
---|---|---|---|---|---|---|
数字范围 | 0~59 | 0~23 | 1~31 | 1~12 | 0~7 | 需要执行的命令 |
周的数字为 0 或 7 时,都代表“星期天”的意思。
另外,还有一些辅助的字符,大概有下面这些:
-
每分钟定时执行一次规则: 每1分钟执行: */1 * * * 或者 * * * * 每5分钟执行: */5 * * * *
-
每小时定时执行一次规则: 每小时执行: 0 * * * *或者0 */1 * * * 每天上午7点执行:0 7 * * * 每天上午7点10分执行:10 7 * * *
-
每天定时执行一次规则: 每天执行 0 0 * * *
-
每周定时执行一次规则: 每周执行 0 0 * * 0
-
每月定时执行一次规则: 每月执行 0 0 1 * *
-
每年定时执行一次规则: 每年执行 0 0 1 1 *
-
其他例子 5 * * * * 指定每小时的第5分钟执行一次ls命令 30 5 * * * ls 指定每天的 5:30 执行ls命令 30 7 8 * * ls 指定每月8号的7:30分执行ls命令 30 5 8 6 * ls 指定每年的6月8日5:30执行ls命令 30 6 * * 0 ls 指定每星期日的6:30执行ls命令[注:0表示星期天,1表示星期1,以此类推,也可以用英文来表示,sun表示星期天,mon表示星期一等。] 30 3 10,20 * * ls 每月10号及20号的3:30执行ls命令[注:“,”用来连接多个不连续的时段] 25 8-11 * * * ls 每天8-11点的第25分钟执行ls命令[注:“-”用来连接连续的时段] */15 * * * * ls 每15分钟执行一次ls命令 [即每个小时的第0 15 30 45 60分钟执行ls命令 ] 30 6 */10 * * ls 每个月中,每隔10天6:30执行一次ls命令[即每月的1、11、21、31日是的6:30执行一次ls命令。 ]
4.3 配置文件配置参数
文件结构
| celeryApp.py │config.py |tasks.py
配置文件
# config.py
from kombu import Queue
from datetime import timedelta
broker_url = 'redis://192.168.1.200:6379/4' # 使用Redis作为消息代理
result_backend = 'redis://192.168.1.200:6379/5' # 把任务结果存在了Redis
task_serializer = 'msgpack' # 任务序列化和反序列化使用msgpack方案
result_serializer = 'json' # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
result_expires = 60 * 60 * 24 # 任务过期时间
accept_content = ['json', 'msgpack'] # 指定接受的内容类型
# 使用beat进程自动生成任务
beat_schedule = {
'print_datetime': {
'task': 'proj.tasks.print_datetime',
'schedule': timedelta(seconds=10),
'args': ()
},
'add': {
'task': 'proj.tasks.add',
'schedule': timedelta(seconds=60),
'args': (1, 3)
}
}
应用
# celeryApp.py
from celery import Celery
app = Celery('celery', include=["proj.tasks"])
app.config_from_object("proj.config")
"""
1.启动worker任务
celery -A proj.celeryApp worker -l info -P gevent
2.启动beat 任务
celery -A proj.celeryApp beat
"""
任务
# tasks.py
import time
import datetime
from proj.celeryApp import app
@app.task
def add(x, y):
time.sleep(10)
print("{} + {} = {}".format(x, y, x+y))
return x + y
@app.task
def print_datetime():
print(datetime.datetime.now().strftime("%Y%m%d %H%M%S"))
4. 4 django 中使用celery
项目结构
│ manage.py ├─djang_demo │ │ settings.py │ │ urls.py │ │ wsgi.py └─my_celery │ config.py │ main.py │ __init__.py ├─sms │ │ tasks.py
# config.py
broker_url = 'redis://192.168.1.200:6379/4' # 使用Redis作为消息代理
result_backend = 'redis://192.168.1.200:6379/5' # 把任务结果存在了Redis
# main.py
import celery
app = celery.Celery("django")
app.config_from_object("my_celery.config")
# 加载任务
# 参数必须必须是一个列表,里面的每一个任务都是任务的路径名称
# app.autodiscover_tasks(["任务1","任务2"])
app.autodiscover_tasks(["my_celery.sms", ])
"""
celery -A my_celery.main worker -l info -P gevent
"""
# tasks.py
import time
from my_celery.main import app
@app.task
def send_email(name):
print(f"向{name}发送邮件...")
time.sleep(5)
print(f"向{name}发送邮件完成")
return "ok"
-
启动celery celery -A my_celery.main worker -l info -P gevent
-
启动django python manage runserver
-
访问127.0.0.1:8000/test/ 触发异步任务
使用可视化工具
pip install flower celery -A application.app flower # 访问 http://127.0.0.1:5555/ 标签:异步,task,celery,队列,py,Celery,result,print,import From: https://blog.csdn.net/weixin_43413871/article/details/136824962