celery
介绍
是一个python的框架。
主要用来解决 定时任务
异步任务
延迟任务
的框架
官方文档
https://docs.celeryq.dev/en/stable/
运行原理
1 可以不依赖任何服务器,通过自身命令,启动服务
2 celery服务为为其他项目服务提供异步解决任务需求的
注: 会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,ceTery就会在需要时异步完成项目的需求
人(django)是一个独立运行的服务 医院(celery)也是一个独立运行的服务
正常情况下,人可以完成所有健康情况的动作,不需要医院的参与:但当人生病时,就会被医院接收,解决人生病问题
人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
celery架构
任务中间件 Broker
, 其他服务提交的异步任务,放在中间件里面排队。
任务中间件是需要借助于第三方redis
任务执行单元 worker
执行异步任务的进程
celery 提供的
结果存储 backend
在项目中就是函数的返回结果,存到backend中
也需要借助第三方 redis, mysql
安装
pip install celery
windos启动worker需要下载pip install eventlet 这个来支持
快速使用
第一步
新建main,py设置中间件与执行结果库,写任务
from celery import Celery
"""
参数
中间件 broker
执行结果 backend
"""
# 中间件 用本地的redis 1库,提交的任务放在里面
broker ='redis://127.0.0.1:6379/1'
# 执行结果 用本地的redis 2库,执行完的结果放在里面
backend ='redis://127.0.0.1:6379/2'
# 实例化得到对象,并起个名字可以__name__获得这个文件的名字
app = Celery('test',broker=broker,backend=backend)
# 写任务 需要使用装饰器的形式
@app.task
def add(a,b):
import time
time.sleep(3)
return a+b
提交任务 s1.py
# 导入任务提交使用
from main import add
# 同步调用
# res = add(5,6)
# print(res)
# 异步调用
res = add.delay(2,3)
print(res) # 4ebd35bc-4dc4-4c96-9af7-81649ce607f7 返回了该任务的id值可以根据这个拿到任务结果
"这个时候任务还没有执行,worker没启动,他才是真正干活的人"
启动worker
windos启动worker需要下载pip install eventlet 这个来支持
# 启动worker命令
"""需要切换到任务所在目录下
main 任务文件名
"""
4.x之前版本
celery worker -A main -l info -P eventlet
4.x之后
celery -A main worker -l info -P eventlet
mac
celery -A main worker -l info
"""敲完命令会一直运行,只要提交任务就会自动执行"""
获取执行结果s2.py
# 把main的里面实例对象app导进来
from main import app
# 导入异步结果模块
from celery.result import AsyncResult
# 放入提交任务得到的id
id='4ebd35bc-4dc4-4c96-9af7-81649ce607f7'
if __name__=='__main__':
# 实例化得到对象,传id与执行app
a=AsyncResult(id=id,app=app)
# 判断是否成功
if a.successful():
# 拿到结果
result = a.get()
print(result)
# 执行失败
elif a.failed():
print('任务失败')
# 执行等待
elif a.status == 'PENDING':
print('任务等待着被执行')
elif a.status == 'RETRY':
print('任务异常正在重试')
elif a.status == 'STARTED':
print('任务已经开始被执行')
celery包的使用
celery_task # celery包
__init__.py # 包文件
celery.py # celery连接和配置相关文件,且名字必须交celery.py
tasks.py # 所有任务函数
add_task.py # 添加任务
get_result.py # 获取结果
第一步
新建一个celery_task包
第二步
在包的下面新建celery.py存放
from celery import Celery
# 中间件 用本地的redis 1库,提交的任务放在里面
broker ='redis://127.0.0.1:6379/1'
# 执行结果 用本地的redis 2库,执行完的结果放在里面
backend ='redis://127.0.0.1:6379/2'
# 实例化得到对象,并起个名字可以__name__获得这个文件的名字 include 指定管理的任务
app = Celery('test',broker=broker,backend=backend,include=['celery_task.order_task','celery_task.user_task'])
第三步
在celery_task下新建user_task.py存放用户任务
# 编写任务
from .celery import app
import time
@app.task
def send_sms(phone,code):
print("给%s发送短信成功,验证码为:%s" % (phone, code))
time.sleep(2)
return True
在celery_task下新建order_task.py 存放订单任务
from .celery import app
import time
@app.task
def add(a, b):
print('-----', a + b)
time.sleep(2)
return a + b
第四步
提交任务和包没关系在外面创建
send_sms_task.py
from celery_task.user_task import send_sms
# 同步调用
# res = send_sms('18888888',8888)
# 异步调用
res = send_sms.delay('18888888',8888)
print(res) #f6e913b7-da1a-462c-b5f4-6a8211d1d565
第五步
启动worker
到包的同级目录下执行命令
celery -A celery_task worker -l info -P eventlet
第六步
获取结果
包的同级目录下新建get_result.py
# 把main的里面实例对象app导进来
from celery_task.celery import app
# 导入异步结果模块
from celery.result import AsyncResult
# 放入提交任务得到的id
id='f6e913b7-da1a-462c-b5f4-6a8211d1d565'
if __name__=='__main__':
# 实例化得到对象,传id与执行app
a=AsyncResult(id=id,app=app)
# 判断是否成功
if a.successful():
# 拿到结果
result = a.get()
print(result)
# 执行失败
elif a.failed():
print('任务失败')
# 执行等待
elif a.status == 'PENDING':
print('任务等待着被执行')
elif a.status == 'RETRY':
print('任务异常正在重试')
elif a.status == 'STARTED':
print('任务已经开始被执行')
标签:task,app,py,celery,任务,print
From: https://www.cnblogs.com/LiaJi/p/17196262.html