介绍
# celery是什么?
分布式异步任务框架:第三方框架,celery翻译过来是芹菜,吉祥物就是芹菜
项目中使用异步任务的场景,可以使用它
之前做异步,如何做? 异步发送短信---》开启多线程---》不便于管理
# celery有什么作用?
-执行异步任务
-执行延迟任务
-执行定时任务
# celery原理
1)可以不依赖任何服务器,通过自身命令,启动服务
2)celery服务为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求
人是一个独立运行的服务 | 医院也是一个独立运行的服务
正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
django如果不用异步,正常运行即可,如果想做异步,就借助于 celery来完成
# celery架构
-broker:消息中间件,任务中间件(消息队列:redis,rabbitmq)
django要做异步,提交任务到 任务中间件中(redis),存储起来
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等
-worker:任务执行者,任务执行单元
不停的从任务中间件中取任务,执行
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中
-backend:结果存储,任务结果存储
把任务执行结果(函数返回值),存放到结果存储中(redis)
用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等
### 任务中间件:redis
### 结果存储:redis
celery的快速使用
# 0 开源的,小组织,不支持win,不要就win的问题展开讨论了
win上:需要借助于第三方
1 安装:
pip install celery # 最新 5.3.4
2 写代码
* scripts\t_celery\main.py
import time
from celery import Celery
# 1 实例化得到对象
broker = 'redis://127.0.0.1:6379/1' # 消息中间件 redis,1表示第一个db库
backend = 'redis://127.0.0.1:6379/2' # 结果存,用redis,2表示第二个db库
# broker和backend不要用同一个库,否则数据容易乱
app = Celery('app', broker=broker, backend=backend)
# 编写任务,必须用app.task 装饰才变成了celery的任务,否则就是普通函数
@app.task
def send_sms():
time.sleep(1)
print('短信发送成功')
return '手机号短信发送成功'
3 提交任务,使用别的进程
* scripts\t_celery\add_task.py
from main import send_sms
res=send_sms.delay() #就能在resp中看到celery提交的任务
print(res) #cdcea241-2353-439f-8069-539147f4361a
4 执行任务
启动worker---》可以在3之前
celery -A tasks worker --loglevel=INFO
#Windows下启动Worker :
pip3 install eventlet
#终端执行命令,注意路径切换cd scripts >cd t_celery
celery -A main worker -l info -P eventlet
# main为包含app代码py文件名
# mac\linux下启动Worker :
celery -A main worker -l info
5 worker就会执行任务,把执行的结果,放到结果存储中
6 查看结果
* scripts\t_celery\get_result.py
from celery.result import AsyncResult
from main import app
id = '92987636-ae9e-4be9-828b-8c2d10fe066a'
if __name__ == '__main__':
a = AsyncResult(id=id, app=app)
if a.successful():
result = a.get()
print(result)
elif a.failed():
print('任务失败')
elif a.status == 'PENDING':
print('任务等待中被执行')
elif a.status == 'RETRY':
print('任务异常后正在重试')
elif a.status == 'STARTED':
print('任务已经开始被执行')
** 常用命令**
# mac\linux启动Worker
celery -A tasks worker --loglevel=INFO
# Windows下启动Worker
celery -A tasks worker --loglevel=INFO -P eventlet
# 关闭Worker
Ctrl + C,按两次关闭终端
#启动Beat程序 可以帮我们定时发送任务到消息队列
celery -A tasks beat --loglevel=INFO
- scripts\t_celery\main.py
import time
from celery import Celery
# 1 实例化得到对象
broker = 'redis://127.0.0.1:6379/1' # 消息中间件 redis,1表示第一个db库
backend = 'redis://127.0.0.1:6379/2' # 结果存,用redis,2表示第二个db库
# broker和backend不要用同一个库,否则数据容易乱
app = Celery('app', broker=broker, backend=backend)
# 编写任务,必须用app.task 装饰才变成了celery的任务,否则就是普通函数
@app.task
def send_sms():
time.sleep(1)
print('短信发送成功')
return '手机号短信发送成功'
- scripts\t_celery\add_task.py
from main import send_sms
#1 同步执行
# res = send_sms()
# print(res)
# 2 异步发送短信
res=send_sms.delay() #返回结果不是send_sms的返回值,它是一个任务id号, 其实这个任务还没执行呢,只是提交到了任务中间件中了(redis)
print(res) #cdcea241-2353-439f-8069-539147f4361a
终端执行celery -A main worker -l info -P eventlet
- scripts\t_celery\get_result.py
from celery.result import AsyncResult
from main import app
id = '92987636-ae9e-4be9-828b-8c2d10fe066a'
if __name__ == '__main__':
a = AsyncResult(id=id, app=app)
if a.successful():
result = a.get()
print(result)
elif a.failed():
print('任务失败')
elif a.status == 'PENDING':
print('任务等待中被执行')
elif a.status == 'RETRY':
print('任务异常后正在重试')
elif a.status == 'STARTED':
print('任务已经开始被执行')
celery 包结构
project
├── celery_task # celery包
│ ├── __init__.py # 包文件
│ ├── celery.py # celery连接和配置相关文件,且名字必须叫celery.py
│ └── tasks.py # 所有任务函数
├── add_task.py # 添加任务
└── get_result.py # 获取结果
1 创建 celery_task 包,包内部有celery.py和一堆task-->['celery_task.home_task','celery_task.user_task']
2 celery.py
from celery import Celery
broker = 'redis://127.0.0.1:6379/1' # 消息中间件 redis
backend = 'redis://127.0.0.1:6379/2' # 结果存,用redis
app = Celery('app', broker=broker, backend=backend,include=['celery_task.home_task','celery_task.user_task'])
3 每个task,写自己相关的任务
4 启动worker
celery -A celery_task worker -l info -P eventlet
5 提交任务
from celery_task.home_task import add
res=add.delay(3,4)
print(res)
6 查看结果
from celery_task.celery import app
from celery.result import AsyncResult
id = 'e31441d9-e9a6-4d70-9a66-a9227a6bc273'
if __name__ == '__main__':
a = AsyncResult(id=id, app=app)
if a.successful():
result = a.get()
print(result)
elif a.failed():
print('任务失败')
elif a.status == 'PENDING':
print('任务等待中被执行')
elif a.status == 'RETRY':
print('任务异常后正在重试')
elif a.status == 'STARTED':
print('任务已经开始被执行')
- luffy_api\celery_task\celery.py
from celery import Celery
broker='redis://127.0.0.1:6379/1'
backend='redis://127.0.0.1:6379/2'
app=Celery('app',broker=broker,backend=backend,include=['celery_task.home_task','celery_task.user_task'])
- luffy_api\celery_task\home_task.py
import time
from .celery import app
@app.task
def add(a, b):
time.sleep(2)
return a + b
- luffy_api\celery_task\user_task.py
import time
from .celery import app
@app.task
def send_sms(phone):
time.sleep(3)
return f'{phone}发送成功'
- add_task.py
from celery_task.home_task import add
from celery_task.user_task import send_sms
res=add.delay(3,4)
# print(res)
send_sms.delay(13788938684)
- get_result.py
from celery_task.celery import app
from celery.result import AsyncResult
id = 'e31441d9-e9a6-4d70-9a66-a9227a6bc273'
if __name__ == '__main__':
a = AsyncResult(id=id, app=app)
if a.successful():
result = a.get()
print(result)
elif a.failed():
print('任务失败')
elif a.status == 'PENDING':
print('任务等待中被执行')
elif a.status == 'RETRY':
print('任务异常后正在重试')
elif a.status == 'STARTED':
print('任务已经开始被执行')
celery 延迟任务和定时任务
异步任务
### 提交任务,使用delay提交即可
from celery_task.home_task import add
res=add.delay(3,4)
print(res)
延迟任务
## 提交延迟任务 apply_async
from datetime import datetime, timedelta
print(datetime.utcnow()) # utc 时间,跟咱们差8个小时
# eta 就是 10s 后的时间
eta = datetime.utcnow() + timedelta(seconds=30)
res = send_sms.apply_async(args=(18953675221,), eta=eta)
print(res)
"""
eta时间对象
apply()方法是阻塞的,等待当前子进程执行完毕后,再执行下一个进程。
apply_async()是异步非阻塞式,即多个进程并行执行,提高程序的执行效率。
"""
定时任务
1 在celery.py中
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
#任务add
'add': {
'task': 'celery_task.home_task.add',
'schedule': timedelta(seconds=3),
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
'args': (5, 6),
},
#任务send_sms
'send_sms': {
'task': 'celery_task.user_task.send_sms',
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
'schedule': crontab(hour=9, minute=39), # 每天9点39,执行
'args': (18923748221,),
},
}
2 终端一启动worker
celery -A celery_task worker -l info -P eventlet
3 终端二启动beat(它来定时提交任务)
celery -A celery_task beat -l info
- celery.py
from celery import Celery
broker = 'redis://127.0.0.1:6379/1' # 消息中间件 redis
backend = 'redis://127.0.0.1:6379/2' # 结果存,用redis
app = Celery('app', broker=broker, backend=backend, include=['celery_task.home_task', 'celery_task.user_task'])
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
'add': {
'task': 'celery_task.home_task.add',
'schedule': timedelta(seconds=3),
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
'args': (5, 6),
},
'send_sms': {
'task': 'celery_task.user_task.send_sms',
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
'schedule': crontab(hour=9, minute=39), # 每天9点39,执行
'args': (18923748221,),
},
}
django中使用celery
# 通用方案
1 把咱们之前写的包,放到项目路径下
2 提交异步或延迟任务,导入直接提交即可
from celery_task.user_task import cache_demo
res = cache_demo.delay('phone','1872324242')
3 只要启动worker,这些任务就会被执行
4 如果要使用django中的东西(配置文件,缓存,orm。。。),都需要在celery.py中写
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
5 使用django内置东西的任务
@app.task
def cache_demo(key, value):
cache.set(key, value)
return '缓存成功'
- luffy_api\celery_task\celery.py
from celery import Celery
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
broker='redis://127.0.0.1:6379/1'
backend='redis://127.0.0.1:6379/2'
app=Celery('app',broker=broker,backend=backend,include=['celery_task.home_task','celery_task.user_task'])
标签:task,app,celery,任务,print,import
From: https://www.cnblogs.com/Super-niu/p/17829452.html