1.celery介绍
celery是python一个框架,与django无关,可以用在django中,也能用在flask中,运行起来就是一个服务。
它的功能:
1.异步任务
2.定时任务
3.延迟任务
celery的运行原理:
1.可以不依赖任何服务器,通过自身命令启动服务
2.celery服务为其他项目提供异步解决任务需求,比如有两个服务同时运行,一个是服务项目,一个是celery服务,项目服务有需求,celery就会在需要的时候异步完成项目的需求
"""
一个很形象的例子:
人是一个独立运行的服务,医院也是一个独立运行的服务。正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题。
人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
"""
celery由以下几部分构成(broker和backend都使用redis):
1.任务中间件(broker),其他服务提交的异步任务,放在里面排队。Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等。
2.执行任务单元(worker)
3.结果存储(backend),对应到函数中就是函数的返回结果存储到backend中。Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
django就相当于生产者,worker就相当于消费者
使用场景:
1.异步执行:解决耗时任务
2.延迟执行:解决延迟任务
3.定时执行:解决周期任务
2.celery快速使用
安装:
pip install celery
安装完成会有一个可执行文件:celery.exe
我们首先演示一个同步调用的效果,同步调用因为有time.sleep,所以需要等三秒之后才能拿到结果:
main.py:
from celery import Celery
'''提交的异步任务放在redis中的第1个库'''
broker = 'redis://127.0.0.1:6379/1'
'''执行结果放在redis中的第2个库'''
backend = 'redis://127.0.0.1:6379/2'
app = Celery('test', broker=broker, backend=backend)
@app.task
def add(a, b):
import time
time.sleep(3)
return a + b
s1.py:
from main import add
res = add(2,3)
print(res) # 5
接下来我们演示异步操作:
1.main.py和之前代码没有变过:
from celery import Celery
'''提交的异步任务放在redis中的第1个库'''
broker = 'redis://127.0.0.1:6379/1'
'''执行结果放在redis中的第2个库'''
backend = 'redis://127.0.0.1:6379/2'
app = Celery('test', broker=broker, backend=backend)
@app.task
def add(a, b):
import time
time.sleep(3)
return a + b
2.在s1.py中需要将代码修改为异步执行,然后执行该代码,就是将该任务提交到broker:
s1.py:
from main import add
res = add.delay(5,6)
print(res)
执行完毕之后命令行会弹出一个随机字符串:
3.启动work之前win系统需要安装一个模块(linux和mac不需要):
pip install eventlet
4.启动worker:将路径切到main.py所在的目录下(D:\上海python金牌班\20230306 路飞\day08\luffy\luffy_api\scripts\celery_t),执行操作(启动worker操作只需要做一次就好,之后可以不用做):
win:
celery4.x之前版本
celery worker -A main -l info -P eventlet
celery4.x之后
celery -A main worker -l info -P eventlet
mac:
celery -A main worker -l info
然后查看backend指定的数据库查看,可以看到执行结果在result中:
但是我们查看不应该在redis中查看,而是在pycharm中查看,这是我们再建一个py文件s2.py:
s2.py
from main import app
from celery.result import AsyncResult
id = 'aa6dc05c-1108-48de-8a8d-0870eff9c25b'
if __name__ == '__main__':
a = AsyncResult(id=id, app=app)
if a.successful(): # 执行完了
result = a.get() #
print(result)
elif a.failed():
print('任务失败')
elif a.status == 'PENDING':
print('任务等待中被执行')
elif a.status == 'RETRY':
print('任务异常后正在重试')
elif a.status == 'STARTED':
print('任务已经开始被执行')
执行这个文件就可以在终端看到执行结果:
3.celery包结构
以上代码可以执行任务,但是我们象吧celery做成一个包结构,在需要的时候直接导入这个包就可以。我们首先需要建立这样的目录结构:
project
├── celery_task # celery包
│ ├── init.py # 包文件
│ ├── celery.py # celery连接和配置相关文件,且名字必须交celery.py
│ └── tasks.py # 所有任务函数(之前写在main中的任务)
├── add_task.py # 添加任务
└── get_result.py # 获取结果
我们需要按照步骤执行一下操作:
1.首先需要新建一个包:celery_task,在该包下新建celery.py,该文件下生成了一个Celery对象app,app内需要指定broker和backend。
celery_task/celery.py:
from celery import Celery
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery('test',broker=broker,backend=backend,include=['celery_task.order_task','celery_task.user_task'])
2.之前在main.py中我们生成了Celery对象并且写了方法。但是由于实际业务中方法较多,我们需要分类写在不同的py文件中(也就是目录结构中的task.py):
celery_task/user_task.py:
from .celery import app
import time
@app.task
def send_msg(phone,code):
'''模拟发送短信'''
print('给%s发送短信成功,验证码为:%s'%(phone,code))
time.sleep(3)
return True
celery_task/order_task.py:
from .celery import app
import time
@app.task
def add(price, count):
print('总价格为%s' % price * count)
time.sleep(2)
return price * count
3.在celery_task同级别新建一个py文件:add_task.py:
add.task.py:
from celery_task.user_task import send_msg
res = send_msg.delay('18888',5678)
print(res) # 735ba869-41fa-4af6-aa6e-d3f8c1d34787
这时任务已经提交上去:
4.启动worker:
celery -A celery_task worker -l info -P eventlet
'''执行该命令需要到celery_task这一级目录'''
5.在scripts中新建一个文件:get_result.py
get_result.py(和之前s2代码一致)
from main import app
from celery.result import AsyncResult
id = 'aa6dc05c-1108-48de-8a8d-0870eff9c25b'
if __name__ == '__main__':
a = AsyncResult(id=id, app=app)
if a.successful(): # 执行完了
result = a.get() #
print(result)
elif a.failed():
print('任务失败')
elif a.status == 'PENDING':
print('任务等待中被执行')
elif a.status == 'RETRY':
print('任务异常后正在重试')
elif a.status == 'STARTED':
print('任务已经开始被执行')
标签:task,app,py,介绍,celery,print,import,安装
From: https://www.cnblogs.com/ERROR404Notfound/p/17196570.html