celery介绍和安装
celery是什么
翻译过来就是 芹菜
的意思,跟芹菜没有关系
框架:服务,python的框架,跟django无关
能用来做什么
异步任务
定时任务
延迟任务
理解celery的运行原理
1.可以不依赖任何服务器,通过自身命令,启动服务
2.celcry服务为其他项目提供异步解决任务需求的会有两个服务为同时运行,一个是项目服务,一个是celery服务为,项目服务将需要异步处理的任务交给celery服务,celery就会再需要使用异步完成项目的需求
例如:人是一个独立的服务|医院也是一个独立运行的服务
正常情况下,人可以完成所有健康情况的动作,不需要医院的参与,但当人生病时,就会被医院接收,解决人生病问题
人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
celery架构(broker,backend,都用redis)
1.任务中间件broker,其他服务提交的异步任务,放在里面需要排队
需要借助于第三方redis rabbitmq
2.任务执行单元 worker 真正执行异步任务的进程
celery提供的
3.结果存储 backend 结果存储,函数的返回结果,存到backend中
需要借助于第三方redis,mysql
使用场景
异步执行:解决耗时任务
延迟执行:解决延迟任务
定时执行:解决周期任务
celery快速使用
安装模块安装完成,会有一个可执行文件celery
pip install celery
windwos还有额外安装一个模块
pip install eventlet
1.第一步新建一个main.py
from celery import Celery
# 提交的异步任务,放在里面
broker = 'redis://127.0.0.1:6379/1'
# 执行完的结果,放在这里
backend = 'redis://127.0.0.1:6379/2'
app = Celery('test', broker=broker, backend=backend)
@app.task
def add(a, b):
import time
time.sleep(3)
print('------',a + b)
return a + b
放在一起就可以了
2.其他程序,提交任务add.py
res = add.delay(5,6) #原来add的参数,直接放在delay中传入即可
print(res) # f150d8a5-c955-478d-9343-f3b60d0d5bdb
3.启动worker
启动worker命令 win需要安装eventlet
win:
4.x之前版本
celery worker -A main(与新建的一样) -l info -P eventlet
4.x之后
celery -A main worker -l info -P eventlet
mac:
celery -A main worker -l info
4.worker会执行消息中间件中的任务,把结果存起来
5.我们要看执行结果,拿到执行的结果
from main import app
from celery.result import AsyncResult
id = '51611be7-4914-4bd2-992d-749008e9c1a6'
if __name__ == '__main__':
a = AsyncResult(id=id, app=app)
if a.successful(): # 执行完了
result = a.get() #
print(result)
elif a.failed():
print('任务失败')
elif a.status == 'PENDING':
print('任务等待中被执行')
elif a.status == 'RETRY':
print('任务异常后正在重试')
elif a.status == 'STARTED':
print('任务已经开始被执行')
celery包结构
project
├── celery_task # celery包
│ ├── __init__.py # 包文件
│ ├── celery.py # celery连接和配置相关文件,且名字必须交celery.py
│ └── tasks.py # 所有任务函数
├── add_task.py # 添加任务
└── get_result.py # 获取结果
1.新建包celery_taks(自己起什么就行)
再包下新建【必须叫celery】的py文件celery.py写代码
celery.py
from celery import Celery
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery('test', broker=broker, backend=backend, include=['celery_task.order_task', 'celery_task.user_task'])
2.再包内部,写task,任务异步任务
order_task.py
from .celery import app
import time
@app.task
def add(a, b):
print('-----', a + b)
time.sleep(2)
return a + b
user_task.py
from .celery import app
import time
@app.task
def send_sms(phone, code):
print("给%s发送短信成功,验证码为:%s" % (phone, code))
time.sleep(2)
return True
3.启动worker,包所在目录下
celery -A celery_task(就是自己起的包名) worker -l info -P eventlet
4.其他程序,提交任务,被提交到中间件,等待worker执行,因为worker启动了,就会被worker执行
from celery_task import send_sms
res=send_sms.delay('1999999', 8888)
print(res) # 7d39033c-4cc7-4af2-8d78-e62c277db183
5.worke执行完毕,结果存到backend中
6.查看结果
from celery_task import app
from celery.result import AsyncResult
id = '7d39033c-4cc7-4af2-8d78-e62c277db183'
if __name__ == '__main__':
a = AsyncResult(id=id, app=app)
if a.successful(): # 执行完了
result = a.get() #
print(result)
elif a.failed():
print('任务失败')
elif a.status == 'PENDING':
print('任务等待中被执行')
elif a.status == 'RETRY':
print('任务异常后正在重试')
elif a.status == 'STARTED':
print('任务已经开始被执行')
celery执行异步任务,延迟任务,定时任务
异步任务
任务.delay(参数)
延迟任务
任务.apply_async(args=[参数],eta=时间对象)
如果没有修改时区,需要使用utc时间
定时任务
需要启动beat 和 worker
beat 定时提交任务的进程》配置在
app.conf.beat_schedule的任务
worker 执行任务的
使用步骤
第一步:在celery的py文件中写入
# 第一步:在celery的py文件中写入
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# celery的配置文件#####
# 任务的定时配置
app.conf.beat_schedule = {
'send_sms': {
'task': 'celery_task.user_task.send_sms',
# 'schedule': timedelta(seconds=3), # 时间对象
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
'schedule': crontab(hour=9, minute=43), # 每天9点43
'args': ('18888888', '6666'),
},
}
第二步:启动beat
记得在包所在的目录运行
celery -A celery_test beat -l info
第三步:启动worker
同样需要在包所在目录运行
celery -A celery_test worker -l info -P eventlet
注意点
1.启动命令的执行位置,如果是包结构,一定要在包这一层
2.include=['celery_task.order_task'],路径从包名下开始导入,因为我们在包这层执行的命令
django种使用celery
补充如果在公司中,只做定时任务,还有一个更简单的框架
APSchedule:https://blog.csdn.net/qq_41341757/article/details/118759836
使用步骤:
1.把我们写的包,复制到项目目录下
-luffy_api
-celery_task #celery的包路径
-luffy_api #源代码路径
2.在使用提交异步任务的位置,导入使用即可
视图函数种,导入任务
任务.delay() # 提交任务
3.启动worker,如果有定时任务,启动beat
4.等待任务被worker执行
5.在视图函数中,查询任务执行的结果
秒杀功能
秒杀逻辑分析
1.前端秒杀按钮,用户点击,发送ajax请求到后端
2.视图函数》提交秒杀任务》借助于celery,提交到中间件中了
3.当次秒杀的请求,就回去了,携带则任务id号在前端
4.前端开启定时任务,每隔3秒钟带着任务,向后端发送请求,查看是否秒杀成功
5.后端情况
1任务还在等待被执行》返回给前端,前端继续每隔3s发送一次请求
2任务执行完了,秒杀成功了》返回给前端,恭喜您秒杀成功》关闭前端任务定时器
3任务执行完了,秒杀失败了》返回给前端,秒杀失败了》关闭前端任务定时器
视图
from celery_task.sckill_task import sckill
from celery.result import AsyncResult
from celery_task.celery import app
class SckillView(GenericViewSet):
"""秒杀视图类"""
def list(self, request, *args, **kwargs):
"""使用定时任务,不等待直接返回任务id"""
user_id = request.query_params.get('id')
#异步提交任务返回任务id号
task_id = sckill.delay(user_id)
return APIResponse(task_id=task_id.id)
@action(methods=['GET'], detail=False)
def get_result(self, request, *args, **kwargs):
task_id = request.query_params.get('task_id')
res = AsyncResult(id=task_id, app=app)
if res.successful(): # 执行完了
result = res.get() #
return APIResponse(data=result)
elif res.failed():
print('任务失败')
elif res.status == 'PENDING':
print('任务等待中被执行')
elif res.status == 'RETRY':
print('任务异常后正在重试')
elif res.status == 'STARTED':
print('任务已经开始被执行')
return APIResponse(code=101, msg=res.status)
任务sckill.py
from .celery import app
import random
import time
@app.task
def sckill(id):
print(f'用户{id},正在秒杀')
time.sleep(random.choice([6, 7, 8, 9]))
print(f'用户{id},秒杀成功')
return random.choice([True, False])
celery.py
from celery import Celery
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
# 写了一个任务不要忘记在这里注册
app = Celery('test', broker=broker, backend=backend,
include=[
'celery_task.order_task',
'celery_task.user_task',
'celery_task.sckill_task'
])
前端Sckill.vue
<template>
<button @click="handlerSckill">点击开始秒杀</button>
</template>
<script>
export default {
name: "Sckill",
data() {
return {
task_id: '',
t: ''
}
},
methods: {
handlerSckill() {
this.$axios({
url: this.$setting.BASE_URL + '/user/sckill/',
method: 'get',
params: {id: 1},
}).then(res => {
this.$message({
message: '正在秒杀中,请稍等',
duration: 2000,
type: 'success'
})
this.task_id = res.data.task_id
this.t = setInterval(() => {
this.$axios({
url: this.$setting.BASE_URL + '/user/sckill/get_result/',
method: 'get',
params: {task_id: this.task_id}
}).then(res => {
let msg
let type
if (res.data.code == 100) {
if (res.data.data == true) {
msg = '秒杀成功'
type = 'success'
} else {
msg = '秒杀失败'
type = 'error'
}
clearInterval(this.t)
} else {
msg = res.data.msg
type = 'info'
}
this.$message({
message: msg,
duration: 1000,
type: type
})
})
}, 3000)
})
}
}
}
</script>
<style scoped>
</style>
django中使用celery
1把我们写的包,复制到项目下
-luffy_api
-celery_task #celery的包路径
celery.py # 一定不要忘了一句话
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
-luffy_api #源代码路径
2在使用提交异步任务的位置,导入使用即可
视图函数中使用,导入任务
任务.delay() 提交任务
3启动worker,如果有定时任务,启动beat
4等待任务被worker执行
5在视图函数中,查询任务执行的结果
重点celery中使用django,有时候,任务中会使用django的orm缓存,表模型。。。一定要加
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
轮播图接口加缓存
1.网站首页被访问的频率很高瞬间1w个人在访问,首页的轮播图接口会执行1w次,1w次查询轮播图标的sql在执行,轮播图基本不变
2.想一种方式,让这1w个访问,效率更高一些,不查数据库了,直接走缓存》redis》效率高
3现在的轮播图逻辑变成了
1轮播图接口请求来了,先去缓存中看,如果有,直接返回缓存中的数据,如果没有,查数据库,然后把轮播图数据,放到redis中,缓存起来
1改接口
# 加入缓存的轮播图接口
def list(self, request, *args, **kwargs):
# 查看缓存有没有数据,如果没有,再走数据库
banner_list = cache.get('banner_list')
if banner_list:
print('走了缓存')
return APIResponse(data=banner_list)
else: # 走数据库
print('走了数据库')
res = super().list(request, *args, **kwargs)
# 把序列化后的数据存到缓存中,redis中
cache.set('banner_list', res.data)
return APIResponse(data=res.data)
2采用继承的方式
common_cache
from rest_framework.mixins import ListModelMixin
from django.core.cache import cache
from utils.common_response import APIResponse
class CacheListModelMixin(ListModelMixin):
cache_key = None
def list(self, request, *args, **kwargs):
assert self.cache_key, Exception('没有设置缓存值')
res = cache.get(self.cache_key)
# 首先获取redis中缓存的数据如果获取到了直接返回
if res:
data = res
else:
# 如果获取不到查询数据库中的数据放到缓存中
res = super().list(request, *args, **kwargs)
cache.set(self.cache_key, res.data)
data = res.data
return APIResponse(data=data)
views.py使用
# 采用继承的方式进行缓存数据
class BannerView(GenericViewSet, CacheListModelMixin):
queryset = Banner.objects.filter(is_delete=False, is_show=True).order_by('orders')
serializer_class = BannerSerializer
cache_key = 'banner_list'
双写一致性
加入缓存后,缓存中有数据,先去缓存中国拿,但是如果mysql中数据变了,缓存不会自动变化,出现数据不一致的问题
mysql和缓存数据库 数据不一直
双写一致性
写入mysql,redis没动,数据不一致存在问题
如何解决
1.修改数据,删除缓存
2.修改数据,更新缓存
3.定时更新缓存,实时性差
定时任务:celery
首页轮播图定时任务
第一步:在celery中配置定时任务
from celery import Celery
import os
# 因为要使用django的orm所有必须要加这个
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
# 写了一个任务不要忘记在这里注册
app = Celery('test', broker=broker, backend=backend,
include=[
'celery_task.order_task',
'celery_task.user_task',
'celery_task.sckill_task',
'celery_task.banner_task'
])
from datetime import timedelta
# 第一步:在celery的py文件中写入
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
app.conf.beat_schedule = {
'cache_banner': {
'task': 'celery_task.banner_task.banner',
'schedule': timedelta(seconds=3), # 时间对象
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
# 'schedule': crontab(hour=9, minute=43), # 每天9点43
# 'args': ('18888888', '6666'),
},
}
第二步启动worker,启动beat记得在包所在的目录启动
banner代码
from .celery import app
from home.models import Banner
from django.core.cache import cache
from home.serializer import BannerSerializer
from django.conf import settings
@app.task
def banner():
# 获取轮播图数据
banner_list = Banner.objects.filter(is_delete=False, is_show=True).order_by('orders')
res = BannerSerializer(instance=banner_list, many=True)
# 存到缓存中
# 如果有request对象了他会在rest_framework>fields文件中的FileFields中的to_representation调用request.build_absolute_uri(url)
# 对这个文件url进行修改,如果没有则不会修改
# 这里因为是没有request对象所以不会自动帮我们进行拼接url地址要我们自己进行拼接
# 所以这里需要我们自己进行拼接
for data in res.data:
data['image'] = settings.BASE_URL + data['image']
cache.set('banner_list', res.data)
return True
标签:异步,task,res,app,celery,路飞,任务,import
From: https://www.cnblogs.com/clever-cat/p/17206873.html