celery快速使用
1.celery是独立的访问
-官网
http://www.celeryproject.org/
'''
1)可以不依赖任何服务器,通过自身命令,启动服务
2)celery服务为为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求
人是一个独立运行的服务(django) | 医院也是一个独立运行的服务(celery)
正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
'''
2.安装
pip3 install celery
3.使用
3.1写一个main.py,实例化得到app对象,写函数,任务,注册成celery的任务
from celery import Celery
backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/0'
app = Celery('test', backend=backend, broker=broker)
@app.task
def add(a, b):
time.sleep(2)
print(a)
return a + b
3.2在其他文件中提交任务到broker中
from main import add
print('1')
# 执行同步任务
# res = add(3, 4)
# print(res)
# 1 3 7
# 执行异步任务
res = add.delay(3, 4)
print(res)
3.3启动worker,从broker中取任务执行,执行完放到backend中
'''
win:
celery worker -A main -l info -P eventlet # 4.x及之前用这个
celery -A main worker -l info -P eventlet # 5.x及之后用这个
lin,mac:
celery worker -A main -l info
celery -A main worker -l info
'''
3.4再backend中查看任务执行的结果(可以通过代码查看)
from main import app
from celery.result import AsyncResult
id = '7bef14a0-f83e-4901-b4d8-e47aaac09b39'
if __name__ == '__main__':
res = AsyncResult(id=id, app=app)
if res.successful():
result = res.get() # 7
print(result)
elif res.failed():
print('任务失败')
elif res.status == 'PENDING':
print('任务等待中被执行')
elif res.status == 'RETRY':
print('任务异常后正在重试')
elif res.status == 'STARTED':
print('任务已经开始被执行')
celery包结构
1.写一个celery的包,再任意项目中,把包copy进去,导入使用即可
celery_task
__init__.py
celery.py
user_task.py
home_task.py
add_task.py
get_result.py
2.使用
-新建包 celery_task
-在包下新建一个 celery.py
-在里面写app的初始化
-在包里新建user_task.py编写用户相关任务
-在包里新建home_task.py编写首页相关任务
-其它程序,提交任务
-启动worker ---》它可以先启动,在提交任务之前-->包所在的目录下
celery -A celery_task worker -l info -P eventlet
-查看任务执行的结果了
celery_task/celery.py
from celery import Celery
backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/0'
app = Celery('test', backend=backend, broker=broker, include=['celery_task.home_task', 'celery_task.user_task'])
celery_task/home_task.py
import time
from .celery import app
@app.task
def add(a, b):
time.sleep(2)
print(f'计算结果为{a + b}')
return a + b
celery_task/user_task.py
import time
from .celery import app
@app.task
def send_sms(mobile, code):
time.sleep(1)
print(f'短信发送成功{mobile},验证码{code}')
return True
add_task.py
from celery_task.user_task import send_sms
# 提交了一个发送短信异步任务
res = send_sms.delay('15976725488', '6666')
print(res) # 13c2208a-07fd-4f27-9691-ea96836a6f66
# 任务执行,要启动worker
get_result.py
from celery_task.celery import app
from celery.result import AsyncResult
id = '13c2208a-07fd-4f27-9691-ea96836a6f66'
if __name__ == '__main__':
res = AsyncResult(id=id, app=app)
if res.successful():
result = res.get() # 7
print(result)
elif res.failed():
print('任务失败')
elif res.status == 'PENDING':
print('任务等待中被执行')
elif res.status == 'RETRY':
print('任务异常后正在重试')
elif res.status == 'STARTED':
print('任务已经开始被执行')
celery异步任务,延迟任务,定时任务
1.异步任务
res = add.delay(3, 4)
print(res)
2.提交延时任务
from celery_task.home_task import add
from datetime import datetime, timedelta
# 得到10秒后的utc时间
eta = datetime.utcnow() + timedelta(seconds=10)
# args内是传给函数的参数,eta是时间对象
res = add.apply_async(args=(200, 50), eta=eta)
print(res)
3.定时任务
3.1启动worker(执行任务)
celery -A celery_task worker -l info -P eventlet
3.2启动beat(提交任务)
celery -A celery_task beat -l info
3.3在app的配置文件中配置
from celery.schedules import crontab
app.conf.beat_schedule = {
'send_sms_task': {
'task': 'celery_task.user_task.send_sms',
'schedule': timedelta(seconds=5), # 每5秒提交
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
'args': ('1897334444', '7777'),
},
'add_task': {
'task': 'celery_task.home_task.add',
'schedule': crontab(hour=12, minute=10, day_of_week=3), # 每周三12点10分提交
'args': (10, 20),
}
}
django中使用celery
1.把写好的包复制到项目路径下
2.在包内celery.py中加入代码
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
import django
django.setup()
3.在django的视图类中,导入,提交任务
from .celery import app
from libs.send_tx_sms import send_sms_by_phone
from apps.user.views import UserInfo
@app.task
def send_sms(mobile, code):
send_sms_by_phone(mobile, code)
user = UserInfo.objects.all().filter(mobile=mobile).filter()
print(f'给{user.username}发送短信发送成功:{mobile},验证码{code}')
return True
4.启动worker,beat
秒杀逻辑
1.秒杀事件
向后端秒杀接口发送请求,发送完立马起了一个定时任务,每个5s,向后端查看一下是否秒杀成功,如果秒杀没成功,定时任务继续执行,如果秒杀成功了,清空定时任务,弹窗告诉他
2.前端
<template>
<div>
<el-button type="danger" plain @click="handleClick">一键秒杀</el-button>
</div>
</template>
<script>
export default {
name: "Seckill",
methods: {
handleClick() {
this.$axios.get(this.$settings.BASE_URL + 'userinfo/seckill/').then(res => {
if (res.data.code == 100) {
let task_id = res.data.id
this.$message({
message: res.data.msg,
type: 'error'
});
// 起个定时事件,每隔5秒向后端查询一下是否秒杀成功
let t = setInterval(() => {
this.$axios.get(this.$settings.BASE_URL + 'userinfo/get_result/?id=' + task_id).then(res => {
// 秒杀结束了,要么成功,要么失败了
if (res.data.code == 100 || res.data.code == 101) {
alert(res.data.msg)
// 销毁掉定时任务
clearInterval(t)
} else if (res.data.code == 102) {
alert(res.data.msg)
}
}
)
}, 5000)
}
})
}
}
}
</script>
3.后端
3.1秒杀接口
from celery_task.user_task import seckill_task
from django.http import JsonResponse
def seckill(request):
res = seckill_task.delay()
return JsonResponse({'code': 100, 'msg': '正在排队', 'id': str(res)})
3.2查询是否秒杀成功接口
from celery_task.celery import app
from celery.result import AsyncResult
def get_result(request):
task_id = request.GET.get('id')
res = AsyncResult(id=task_id, app=app)
if res.successful():
result = res.get()
return JsonResponse({'code': 100, 'msg': str(result)})
elif res.failed():
return JsonResponse({'code': 101, 'msg': '秒杀失败'})
elif res.status == 'PENDING':
return JsonResponse({'code': 100, 'msg': '还在排队'})
3.3秒杀任务
@app.task
def seckill_task():
time.sleep(3) # 秒杀需要3秒
res = random.choice([True, False]) # 可能成功,可能不成功
if res:
return '秒杀成功'
return '很遗憾,你没有秒到'
双写一致性
接口加缓存
首页轮播图接口,加缓存,提交了接口的响应速度、并发量
from utils.response import APIResponse
from django.core.cache import cache
class BannerView(GenericViewSet, CommonListModelMixin):
queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:BANNER_COUNT]
serializer_class = BannerSerializer
def list(self, request, *args, **kwargs):
result = cache.get('banner_list')
if result:
print('走缓存,速度快')
return APIResponse(result=result)
else:
print('走数据库,速度慢')
res = super().list(request, *args, **kwargs)
result = res.data.get('result')
cache.set('banner_list', result)
return res
celery定时任务实现双写一致性
1.在数据加到缓存中后,如果mysql数据变了,由于请求的都是缓存的数据,导致mysql和redis的数据不一致
2.双写一致性问题
1 修改mysql数据库,删除缓存 (缓存的修改是在后)
2 修改数据库,修改缓存 (缓存的修改是在后)
3 定时更新缓存 (针对于实时性不是很高的接口适合定时更新)
'给首页轮播图接口加入了缓存,出现了双写一致性问题,使用定时更新来解决双写一致性的问题【会存在不一致的情况,我们可以忽略】---》定时任务,celery的定时任务'
- home_task.py
from celery import Celery
from datetime import timedelta
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
import django
django.setup()
backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/0'
app = Celery('test', backend=backend, broker=broker, include=['celery_task.home_task'])
from celery.schedules import crontab
app.conf.beat_schedule = {
'update_banner': {
'task': 'celery_task.home_task.update_banner',
'schedule': timedelta(seconds=20), # 20秒更新轮播图
'args': (),
}
}
- celery.py
from .celery import app
from home.models import Banner
from django.conf import settings
from settings.common_settings import *
from django.core.cache import cache
from home.serializer import BannerSerializer
@app.task
def update_banner():
queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[
:BANNER_COUNT]
ser = BannerSerializer(instance=queryset, many=True)
for item in ser.data:
item['image'] = settings.HOST_URL + item['image']
cache.set('banner_list', ser.data)
return True
标签:task,res,app,celery,缓存,print,import,双写
From: https://www.cnblogs.com/riuqi/p/16897765.html