今日学习内容
celery介绍
它是一个异步任务提交的框架
作用:
- 完成异步任务:提高项目的并发量。之前开启线程做,现在用celery做。
- 完成延迟任务
- 完成定时任务
架构:
- 消息中间件:
- broker提交任务也就是函数都放在消息中间件中,celery本身不提供消息中间件,需要借助于第三方的redis、rabbitmq。
- 任务执行单位:
- worker,真正执行任务的地方,一个个进程执行函数。
- 结果存储:
- backend,函数return的结果都在这里,celery本身不提供结果存储,借助于第三方redis、数据库、rabbitmq。
celery快速使用
-
celery
官网:http://www.celeryproject.org/
-
介绍:
Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform
-
celery 是独立的服务
-
可以不依赖任何服务器,通过自身命令,启动服务
-
celery
服务为其他服务提供异步
解决任务需求的-
注意:会有两个服务同时运行,一个是项目服务,一个
celery
服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要异步完成项目的需求。 -
比如:人相当于一个独立运行的服务(django),医院也是一个独立运行服务(celery)
正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但人生病时,就会被医院接收,解决人生病问题。
人生病的处理方案交给医院来解决,所有人不生病时,医院正常独立运行,人生病时,医院就来解决人生病的需求。
-
-
-
安装:
pip3 install celery
-
使用步骤:
-
写一个main.py:实例化得到app对象,写函数,任务,注册成celery的任务
-
别的程序中:提交任务---》提交到broker中去
-
启动worker从broker中取出任务执行,执行完放到backend中
-
win: celery worker -A main -l info -P eventlet # 4.x及之前用这个 celery -A main worker -l info -P eventlet # 5.x及之后用这个 lin, mac: celery worker -A main -l info celery -A main worker -l info 注意:main 是我们当前跑项目的包名 celery_task 对于 执行 worker命令 需要在该项目文件夹下执行。
-
再
backend
中查看任务执行的结果 -
直接在
redis desktop manager
看 -
from main import app from celery.result import AsyncResult id = '7bef14a0-f83e-4901-b4d8-e47aaac09b39' if __name__ == '__main__': res = AsyncResult(id=id, app=app) if res.successful(): result = res.get() print(result) elif res.failed(): print('任务失败') elif res.status == 'PENDING': print('任务等待中被执行') elif res.status == 'RETRY': print('任务异常后正在重试') elif res.status == 'STARTED': print('任务已经开始被执行')
-
-
celery包结构
写一个celery
的包,在任意项目中,把包copy
进去,导入使用即可。
项目:
celery_task
-__init__.py
-celery.py
-user_task.py
-home_task.py
add_task.py
get_result.py
使用步骤:
-
新建包:
celery_task
-
包里新建一个
celery.py
-
包里写
app的初始化
-
包里新建
user_task.py
编写用户相关任务 -
包里新建
home_task.py
编写首页相关任务 -
其他程序,提交任务
-
启动
worker
---》它可以先启动,在提交任务之前 ---》包所在的目录下celery -A celery_task worker -l info -P eventlet
-
查看任务执行的结果
celery_task/celery.py
from celery import Celery
backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/0'
# 一定不要忘了include
app = Celery(__name__, broker=broker, backend=backend,include=['celery_task.home_task','celery_task.user_task'])
celery_task/home_task.py
from .celery import app
@app.task
def add(a, b):
time.sleep(3)
print('计算结果是:%s' % (a + b))
return a + b
celery_task/user_task.py
import time
from .celery import app
@app.task
def send_sms(mobile, code):
time.sleep(1)
print('短信发送成功:%s,验证吗是%s' % (mobile, code))
return True
add_task.py
from celery_task.user_task import send_sms
# 提交了一个发送短信异步任务
res=send_sms.delay('18723345455','9999')
print(res) # 672237ce-c941-415e-9145-f31f90b94627
# 任务执行,要启动worker
# 查看任务执行的结果
get_result.py
# 查询执行完的结果
from celery_task.celery import app
from celery.result import AsyncResult
id = '672237ce-c941-415e-9145-f31f90b94627'
if __name__ == '__main__':
res = AsyncResult(id=id, app=app)
if res.successful():
result = res.get() #7
print(result)
elif res.failed():
print('任务失败')
elif res.status == 'PENDING':
print('任务等待中被执行')
elif res.status == 'RETRY':
print('任务异常后正在重试')
elif res.status == 'STARTED':
print('任务已经开始被执行')
celery异步任务,延迟任务,定时任务
异步任务:
任务.delay(参数, 参数)
延迟任务:
任务.apply_async(args=[参数, 参数], eta=时间对象(utc时间))
定时任务:
1.定时任务配置
2.启动worker:真正干活的人
celery -A celery_task worker -l info -P eventlet
3.启动beat:提交任务的人
celery -A celery_task beat -l info
django中使用celery
使用步骤
-
把写好的包赋值到项目路径下
-
把包内的celery.py的上面加入代码
import os os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api1.settings.dev') import django django.setup()
-
在django视图类中导入,提交任务。
-
启动worker,beat
秒杀逻辑
前端
-
秒杀按钮
-
事件:想后端秒杀接口发送请求,发送完立马起了一个定时任务,每隔5s,向后端查看一下是否秒杀成功。如果秒杀没成功,定时任务继续执行,如果秒杀成功了,清空定时任务,弹窗提示。
handleClick() { this.$axios.get(this.$settings.BASE_URL + 'userinfo/seckill/').then(res => { if (res.data.code == 100) { let task_id = res.data.id this.$message({ message: res.data.msg, type: 'error' }); // 起个定时任务,每隔5s向后端查询一下是否秒杀成功 let t = setInterval(() => { this.$axios.get(this.$settings.BASE_URL + 'userinfo/get_result/?id=' + task_id).then( res => { if (res.data.code == 100 || res.data.code == 101) { //秒杀结束了,要么成功,要么失败了 alert(res.data.msg) // 销毁掉定时任务 clearInterval(t) } else if (res.data.code == 102) { //什么事都不干 } } ) }, 5000) } }) }
后端
-
秒杀接口
-
提交秒杀任务
def seckill(request): # 提交秒杀任务 res = seckill_task.delay() return JsonResponse({'code': 100, 'msg': '正在排队', 'id': str(res)})
-
-
查询是否秒杀成功接口
-
根据用户传入的id,查询任务是否成功
def get_result(request): task_id = request.GET.get('id') res = AsyncResult(id=task_id, app=app) if res.successful(): result = res.get() # 7 return JsonResponse({'code': 100, 'msg': str(result)}) elif res.failed(): print('任务失败') return JsonResponse({'code': 101, 'msg': '秒杀失败'}) elif res.status == 'PENDING': print('任务等待中被执行') return JsonResponse({'code': 102, 'msg': '还在排队'})
-
双写一致性
接口加缓存
- 首页轮播图接口,加缓存
- 提交了接口的响应速度
- 提高并发量
class BannerView(GenericViewSet, CommonListModelMixin):
queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT]
serializer_class = BannerSerializer
def list(self, request, *args, **kwargs):
result = cache.get('banner_list')
if result: # 缓存里有
print('走了缓存,速度很快')
return APIResponse(result=result)
else:
# 去数据库拿
print('走了数据库,速度慢')
res = super().list(request, *args, **kwargs)
result = res.data.get('result') # {code:100,msg:成功,result:[{},{}]}
cache.set('banner_list', result)
return res
- 加了缓存,如果mysql数据变了,由于请求的是缓存的数据,导致mysql和redis的数据不一致。
- 双写一致性问题
- 修改mysql数据库,删除缓存===》缓存的修改在后
- 修改数据库,修改缓存===》缓存的修改在后
- 定时更新缓存 ===》针对于实时性不是很高的接口适合定时更新
- 给首页轮播图接口加入了缓存,出现了双写一致性问题,使用定时更新来解决双写一致性的问题
- 如果存在不一致的情况,可以先忽略。执行celery的定时任务
celery定时任务实现双写一致性
home_task.py
@app.task
def update_banner():
# 更新缓存
# 查询出现在轮播图的数据
queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT]
ser = BannerSerializer(instance=queryset, many=True)
# ser 中得图片,没有前面地址
for item in ser.data:
item['image'] = settings.HOST_URL + item['image']
cache.set('banner_list', ser.data)
return True
celery.py
app.conf.beat_schedule = {
'update_banner': {
'task': 'celery_task.home_task.update_banner',
'schedule': timedelta(seconds=50),
'args': (),
}
}
步骤:
- 启动django=》启动worker=》启动beat
- 第一次访问,先查数据库,然后放入缓存
- 以后再访问,就先走缓存,一旦mysql数据改了,缓存可能就不一样了。
- 定时更新可以始终保持一致。