1.celery:分步式异步任务框架
/1 异步任务
/2 延迟任务
/3 定时任务
/4 celery架构
消息中间件(broker):消息队列:可以使用redis,rabbitmq
任务执行单元(worker):执行单元 执行提交的任务
任务执行结果存储(banckend):可以使用mysql,redis
/5 安装celery模块:
cmd 中输入 -pip install Celery
celery不支持win,所以想再win上运行,需要额外安装eventlet
/6 安装eventlet : pip3 install eventlet
补充操作:退出虚拟环境 deactivate
卸载安装的模块:pip3 uninstall celery
2.celery执行异步任务
/1 新建py文件,注册celery任务
# 导入Celery类
from celery import Celery
broker = 'redis://127.0.0.1:6379/1' # 消息中间件 redis
backend = 'redis://127.0.0.1:6379/2' # 结果存储 redis
# 实例化得出对象
app = Celery(__name__,broker=broker, backend=backend,)
@app.task # 加上这个装饰器,让add函数变成了celery的任务
def add(a, b):
c = a + b
return c
/2 新建文件,在文件中写入异步调用操作
# 1 异步调用 : 执行下面的函数,把任务提交消息队列中,并没有执行,等待work的执行
# 返回给我们一个id号,我们之后通过id号来查结果
res = add.delay(2,3)
print(res) # 得到了一个ID号49e33190-3499-48a3-968b-5f1f59888011
/3 在文科路径下启动worker
cmd中进入到提交消息队列代码所在的路径中
celery -A demo worker -l info -P eventlet
启动worker和提交消息并没有先后顺序
/4 新建一个 文件来执行结果储存
# 执行结果存储
from demo import app
# 从celery的包下
from celery.result import AsyncResult
id = '49e33190-3499-48a3-968b-5f1f59888011'
if __name__ == '__main__':
a = AsyncResult(id=id, app=app)
if a.successful(): # 正常执行完成
result = a.get() # 任务返回的结果
print(result)
elif a.failed():
print('任务失败')
elif a.status == 'PENDING':
print('任务等待中被执行')
elif a.status == 'RETRY':
print('任务异常后正在重试')
elif a.status == 'STARTED':
print('任务已经开始被执行')
3.celery的包结构使用
/1 创建包,在里面创建celery.py(名字固定)
from celery import Celery
broker = 'redis://127.0.0.1:6379/1' # 消息中间件 redis
backend = 'redis://127.0.0.1:6379/2' # 结果存储 redis
# 在括号中添加include参数,include的中括号中写入需要被app管理的文件,
# 这样写可以免去在上面加装饰器进行管理
app = Celery(__name__,broker=broker, backend=backend,
include=['celery_task.user_task','celery_task.home_task','celery_task.course_task'])
/2 在包中新建任务文件
例如user_task
# 新建任务文件
# 需要在celery前面加点,不加点就表示从第三方包导入app
# celery前面加点表示从当前路径下导入
from .celery import app
import time
@app.task
def send_sms(mobile,code):
time.sleep(3)
print('成功发送短信给%s,验证码为%s' % (mobile,code))
return True
/3 在包的外边创建提交任务的代码
# 提交任务
from celery_task.user_task import send_sms
res = send_sms.delay(88888888,77777)
print(res)
/4 启动worker
启动worker
要在包所在的路径下启动
celery -A 包名 worker -l info -P eventlet
/5 查看结果
# 查询结果
from celery_task.celery import app
# 从celery的包下
from celery.result import AsyncResult
id = '190ffcc5-9cb5-4781-978f-9d1949c68d86'
if __name__ == '__main__':
a = AsyncResult(id=id, app=app)
if a.successful(): # 正常执行完成
result = a.get() # 任务返回的结果
print(result)
elif a.failed():
print('任务失败')
elif a.status == 'PENDING':
print('任务等待中被执行')
elif a.status == 'RETRY':
print('任务异常后正在重试')
elif a.status == 'STARTED':
print('任务已经开始被执行')
4.celery执行延时任务和定时任务
/1 定义:
延时任务:等待一定的时间再执行任务
定时任务:规定例如每天几点执行一次
/2 延时任务
先导入异步任务的函数
创建时间对象
例:eta=datetime.utcnow() + timedelta(seconds=5) # datetime.utcnow() 拿取的是UTC时间(是个对象),timedelta(seconds=5)(变为对象)可以和上面拿到的UTC时间相加
# 使用apply_async传参数加上时间对象
res = send_sms.apply_async(kwargs={'mobile':88888888,'code':77777},eta=eta)
/3 定时任务 (写在celery文件中)
# 配置时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 定时任务的配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
'send_sms': {
'task': 'celery_task.user_task.send_sms',
'schedule': timedelta(seconds=5),
'args': ('99999999', 8888),
},
'obtain_app': {
'task': 'celery_task.home_task.obtain_app',
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
'schedule': crontab(hour=11, minute=38), # 每天11点35,执行
'args': (),
}
}
5.django中使用celery
/1 首先把建立的celery模块的包文件夹移动到根路径下
/2 增加新的任务文件banner_task,并添加新任务
from .celery import app
@app.task
def add_banner():
from home.models import Banner
Banner.objects.create(title='测试', image='/1.png', link='/banner', info='xxx',orders=99)
return 'banner增加成功'
/3 在celery.py 中加载django配置
# 注:只有在需要用到django中的
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffy_01.settings.dev")
/4 视图函数中添加接口,导入任务,提交即可
class CeleryView(APIView):
def get(self, request):
res = add_banner.delay()
return APIResponse(msg='新增banner的任务已经提交了')
/5 启动worker
在根路径下启动就行
6.接口的缓存
/1 双写一致性:
缓存数据和数据库的数据不一致了
写入数据库,删除缓存
写入数据库,更新缓存
/2 在轮播图接口下添加方法
def list(self, request, *args, **kwargs):
# 接口中接入缓存的逻辑
1 先去缓存中查一下有没有数据
2 如果有,直接返回,不走父类的list了(list在走数据库)
3 如果没有,走父类list,查询数据库
4 把返回的数据,放到缓存中
data = cache.get('home_banner_list')
if not data: # 缓存中没有
print('走了数据库')
res = super().list(request, *args, **kwargs) # 查询数据库
# 返回的数据,放到缓存中
data = res.data.get('data') # {code:100,msg:成功,data:[{},{}]}
cache.set('home_banner_list', data)
return APIResponse(data=data)