1 celery介绍和安装
# Celery 是什么
-翻译过来是 芹菜 的意思,跟芹菜没有关系
-框架:服务,python的框架,跟django无关
-能用来做什么
-1 异步任务
-2 定时任务
-3 延迟任务
# 理解celery的运行原理
"""
1) 可以步依赖任何服务器,通过自身命令,启动服务
2) celery服务为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求
人是一个独立运行的服务 | 医院也是一个独立运行的服务
正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;单当人生病时,就会被医院接收,解决人生病问题
人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
"""
# celery架构(Broker, backend 都用redis)
-1 任务中间件 Broker(中间件), 其他服务提交的异步任务,放在里面排队
-徐来借助第三方 redis rabbitmq
-2 任务执行单元 worker 真正执行异步任务的进程
-celery提供的
-3 结果存储 backend 结果存储,函数的返回结果,存到 backend中
-需要借助于第三方:redis,mysql
# 使用场景
异步执行:解决耗时任务
延迟执行;解决延迟任务
定时执行:解决周期(周期)任务
# celery 不支持win,通过eventlet支持在win上运行
2 celery快速使用
# 安装---》安装完成,会有一个可执行文件 celery
pip install celery
win: pip install eventlet
# 快速使用
####### 第一步:新建 main.py########
from celery import Celery
# 提交的异步任务,放在里面
broker = 'redis://127.0.0.1:6379/1'
# 执行完的结果 放在这里
backend = 'redis://127.0.0.1:6379/2'
app = Celery('test', broker=broker, backend=backend)
@app.task
def add(a, b):
import time
time.sleep(3)
print('-------', a+b)
return a + b
######### 第二步:其他程序,提交任务#########
res = add.delay(5,6) # 原来add的参数,直接放在delay中传入即可
print(res) # f150d8a5-c955-478d-9343-f3b60d0d5bdb
######## 第三步:启动worker #########
# 启动worker命令,win需要安装 eventlet
win:
-4.x之前的版本
celery worker -A main -l info -P eventlet
-4.x之后
celery -A main worker -l info -P eventlet
mac:
celery -A main worker -l info
####### 第四步: worker会执行消息中间件中的任务,把结果存起来######
####### 第五步:咱们要看执行结果,拿到执行的结果#######
from main import app
from celery.result import AsyncResult
id = '51611be7-4914-4bd2-992d-749008e9c1a6'
if __name__ = '__main__':
a = AsyncResult(id=id, app=app)
if a.successful() # 执行完了
result = a.get()
print(result)
elif a.failed():
print('任务失败')
elif a.status == 'PENDING':
print('任务等待中被执行')
elif a.status == 'RETRY':
print('任务异常后正在重试')
elif a.status == 'STARTED':
print('任务已经开始被执行')
3 celery包结构
project
├── celery_task # celery包
│ ├── __init__.py # 包文件
│ ├── celery.py # celery连接和配置相关文件,且名字必须交celery.py
│ └── tasks.py # 所有任务函数
├── add_task.py # 添加任务
└── get_result.py # 获取结果
########## 第一步:新建包 celery_task ##########
# 在包下新建[必须叫celery]的py文件,celery.py 写代码
from celery import Celery
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery('test', broker=broker, backend=backend, include=['celery_task.order_task', 'celery_task.user_task'])
######### 第二步:在包内部,写task,任务异步任务#######
# order_task
from .celery import app
import time
@app.task
def add(a, b)
print('-----', a + b)
time.sleep(2)
return a + b
# user_task
from .celery import app
import time
@app.task
def send_sms(phone, code):
print("给%s发送短信成功,验证码为:%s" % (phone, code))
time.sleep(2)
return True
######第三步:启动worker,包所在目录下
celery -A celery_task worker -l info -P eventlet
#####第四步:其他程序 提交任务,被提交到中间件中,等待worker执行,因为worker启动了,就会被worker执行
from celery_task import send_sms
res = send_sms.delay('199999999', 8888)
print(res) # 7d39033c-4cc7-4af2-8d78-e62c277db183
### 第五步:worker执行完,结果存到backend中
### 第六步:我们查看结构
from celery_task import app
from celery.result import AsyncResult
id = '7d39033c-4cc7-4af2-8d78-e62c277db183'
if __name__ == '__main__':
a = AsyncResult(id=id, app=app)
if a.successful(): # 执行完了
result = a.get() #
print(result)
elif a.failed():
print('任务失败')
elif a.status == 'PENDING':
print('任务等待中被执行')
elif a.status == 'RETRY':
print('任务异常后正在重试')
elif a.status == 'STARTED':
print('任务已经开始被执行')
标签:基本,task,app,worker,用法,celery,任务,print
From: https://www.cnblogs.com/DragonY/p/17196403.html