一、celery介绍
Celery是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具。
它是一个任务队列,专注于实时处理,同时还支持任务调度。
# celery是独立的服务
1. 可以不依赖任何服务器,通过自身命令,启动服务
2. celery服务为为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求
人是一个独立运行的服务(django) | 医院也是一个独立运行的服务(celery)
正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
# celery 的优点
简单:celery的 配置和使用还是比较简单的, 非常容易使用和维护和不需要配置文件
高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务
如果连接丢失或发生故障,worker和client 将自动重试,并且一些代理通过主/主或主/副本复制方式支持HA。
快速:一个单进程的celery每分钟可处理上百万个任务
灵活: 几乎celery的各个组件都可以被扩展及自定制
# celery有什么用?
1. 完成异步任务:可以提高项目的并发量,之前开启线程做,现在使用celery做
2. 完成延迟任务
3. 完成定时任务
# celery的架构
消息中间件:broker 提交的任务(函数)都放在这里,celery本身不提供消息中间件,需要借助于第三方:redis,rabbitmq
任务执行单元:worker,真正执行任务的地方,一个个进程,执行函数
结果存储:backend,函数return的结果存储在这里,celery本身不提供结果存储,借助于第三方:redis,数据库,rabbitmq
# 官方地址:https://docs.celeryq.dev/en/latest/getting-started/first-steps-with-celery.html
二、celery快速使用
# 安装celery
pip install celery
# 安装eventlet
pip install eventlet
2.1 test_celery/main.py
import time
from celery import Celery
backend = 'redis://127.0.0.1:6379/1' # 结果存储
broker = 'redis://127.0.0.1:6379/2' # 消息中间件
app = Celery('test', backend=backend, broker=broker) # 实例化得到app对象
@app.task() # 写个函数,并使用装饰器装饰为celery的任务
def add(x, y):
time.sleep(3)
print(x + y)
return x + y
2.2 test_celery/test1.py
from main import add
print('hello word')
res = add.delay(4, 4)
print(res)
2.3 test_celery/get_result.py
from main import app
from celery.result import AsyncResult
task_id = '1256a2d2-777c-4838-924f-f40ebe36ae2f' # 执行任务,需要有任务id
if __name__ == '__main__':
res = AsyncResult(id=task_id, app=app)
if res.successful():
result = res.get() # 7
print(result)
elif res.failed():
print('任务失败')
elif res.status == 'PENDING':
print('任务等待中被执行')
elif res.status == 'RETRY':
print('任务异常后正在重试')
elif res.status == 'STARTED':
print('任务已经开始被执行')
2.4 启动worker测试
启动worker,从broker中取任务执行,执行完放到backend中
win:
celery worker -A main -l info -P eventlet # 4.x及之前用这个
celery -A main worker -l info -P eventlet # 5.x及之后用这个
lin,mac:
celery worker -A main -l info
celery -A main worker -l info
再backend中查看任务执行的结果
三、celery的包结构
project
├── celery_task # celery包
│ ├── __init__.py # 包文件
│ ├── celery.py # celery连接和配置相关文件,且名字必须叫celery.py
│ └── user_task.py # 任务函数
| └── home_task.py
├── add_task.py # 添加任务
└── get_result.py # 获取结果
3.1 celery_task/celery.py
from celery import Celery
backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/0'
app = Celery(__name__, broker=broker, backend=backend, include=['celery_task.home_task', 'celery_task.user_task'])
3.2 celery_task/user_task.py
import time
from .celery import app
@app.task
def send_sms(mobile, code):
time.sleep(3)
print('短信发送成功:%s,验证码是%s' % (mobile, code))
return True
3.3 celery_task/home_task.py
from .celery import app
import time
@app.task
def add(a, b):
time.sleep(3)
print('计算结果是:%s' % (a + b))
return a + b
3.4 add_task.py
from celery_task.user_task import send_sms
# 提交了一个发送短信异步任务
res = send_sms.delay('18723345455', '9999')
print(res) # 672237ce-c941-415e-9145-f31f90b94627
# 任务执行,要启动worker celery -A celery_task worker -l info -P eventlet
3.4 get_result.py
# 查询执行完的结果
from celery_task.celery import app
from celery.result import AsyncResult
task_id = '8bb50a78-7af8-4152-9d44-566432cb1277' # 执行任务,需要有任务id
if __name__ == '__main__':
res = AsyncResult(id=task_id, app=app)
if res.successful():
result = res.get()
print(result)
elif res.failed():
print('任务失败')
elif res.status == 'PENDING':
print('任务等待中被执行')
elif res.status == 'RETRY':
print('任务异常后正在重试')
elif res.status == 'STARTED':
print('任务已经开始被执行')
四、celery异步任务、延迟任务、定时任务
# 添加异步任务
任务名.delay(参数,参数)
# 添加延迟任务
from datetime import datetime, timedelta
eta = datetime.utcnow() + timedelta(seconds=10) # eta时间对象
res = add.apply_async(args=(200, 50), eta=eta)
print(res)
# 添加定时任务
1. 在celery.py中添加如下配置:
from datetime import timedelta
from celery.schedules import crontab
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 定时任务
app.conf.beat_schedule = {
'send_sms_task': {
'task': 'celery_task.user_task.send_sms',
'schedule': timedelta(seconds=5),
# 'schedule': crontab(hour=20, day_of_week=3), # 每周三晚八点
'args': ('1538680821', '7777'),
},
'add_task': {
'task': 'celery_task.home_task.add',
'schedule': crontab(hour=20, minute=20, day_of_week=3), # 每周三晚八点
'args': (10, 20),
}
}
2. 启动worker
celery -A celery_task worker -l info -P eventlet
3. 启动beat
celery -A celery_task beat -l info
五、Django集成celery
# 使用步骤:
1. 把写好的celery_task包复制到项目路径下
2. 在包内的celery.py中的最上面加入以下代码
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
import django
django.setup()
3. 在视图类中写一个发送短信的任务函数
from celery_task.user_task import send_sms
def index(request):
mobile = request.GET.get('mobile')
res = send_sms.delay(mobile, '9999')
print(res)
return HttpResponse('短信发送成功')
4. 在user_task.py中加入以下代码
from .celery import app
from libs.tx_sms import send_sms_by_phone
from user.models import UserInfo
@app.task
def send_sms(mobile, code):
send_sms_by_phone(mobile, code)
user = UserInfo.objects.all().filter(mobile=mobile).first()
print('给用户%s发送短信成功,手机号为:%s,验证码为%s' % (user.username, mobile, code))
return True
5. 把路由配置好,然后启动worker测试即可
六、秒杀逻辑
6.1 seckill.vue
<template>
<div>
<img
src="https://gimg2.baidu.com/image_search/src=http%3A%2F%2Fpic.616pic.com%2Fys_bnew_img%2F00%2F65%2F81%2FEQq9UR90Hr.jpg"
alt="" height="300px" width="300px">
<el-button type="danger" plain @click.once="handlerClick">点击秒杀</el-button>
</div>
</template>
<script>
export default {
name: "Seckill",
methods: {
handlerClick() {
this.$axios.get(this.$settings.BASE_URL + 'userlogin/seckill/').then(res => {
if (res.data.code == 200) {
let task_id = res.data.id
this.$message({
message: res.data.msg,
type: 'error'
});
// 写一个定时任务,每隔5秒向后端查询一下是否秒杀成功
let t = setInterval(() => {
this.$axios.get(this.$settings.BASE_URL + 'userlogin/get_result/?id=' + task_id).then(
res => {
if (res.data.code == 200 || res.data.code == 201) {
alert(res.data.msg)
// 销毁掉定时任务
clearInterval(t)
} else if (res.data.code == 202) {
}
}
)
}, 5000)
}
})
}
}
}
</script>
<style scoped>
</style>
6.2 index.js
import Vue from 'vue'
import VueRouter from 'vue-router'
import HomeView from '../views/HomeView.vue'
import Seckill from "@/views/Seckill";
Vue.use(VueRouter)
const routes = [
{
path: '/',
name: 'home',
component: HomeView
},
{
path: '/seckill',
name: 'seckill',
component: Seckill
},
]
const router = new VueRouter({
mode: 'history',
base: process.env.BASE_URL,
routes
})
export default router
6.3 celery_task/user_task.py
import time
import random
from .celery import app
@app.task
def seckill_task():
time.sleep(8) # 模拟秒杀需要5秒
res = random.choice([True, False])
if res:
return '恭喜您,秒杀成功'
else:
return '秒杀结束,很遗憾您没有秒杀到'
6.4 user/views.py
from celery_task.user_task import seckill_task
# 1.提交一个秒杀任务
def seckill(request):
res = seckill_task.delay()
return JsonResponse({'code': 200, 'msg': '秒杀进行中,您正在排队', 'id': str(res)})
# 2.查询是否秒杀成功
from celery.result import AsyncResult
from celery_task.user_task import app
def get_result(request):
task_id = request.GET.get('id')
res = AsyncResult(id=task_id, app=app)
if res.successful():
result = res.get()
return JsonResponse({'code': 200, 'msg': str(result)})
elif res.failed():
print('任务失败')
return JsonResponse({'code': 201, 'msg': '秒杀失败'})
elif res.status == 'PENDING':
print('任务等待中被执行')
return JsonResponse({'code': 202, 'msg': '还在排队'})
七、双写一致性
7.1 接口加缓存
# 首页轮播图接口加缓存,提高了接口的响应速度和并发量
class BannerView(GenericViewSet, CommonListModelMixin):
queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT]
serializer_class = BannerSerializer
def list(self, request, *args, **kwargs):
result = cache.get('banner_list')
if result: # 去redis缓存拿
return APIResponse(result=result)
else:
# 去mysql数据库拿
res = super().list(request, *args, **kwargs)
result = res.data.get('result') # {code:100,msg:成功,result:[{},{}]}
cache.set('banner_list', result)
return res
# 增加缓存,如果mysql数据变了,那么由于请求的都是缓存的数据,就会导致mysql和redis的数据不一致,这就是双写一致性问题
# 双写一致性问题,解决方法
1 修改mysql数据库,删除缓存,缓存的修改是在后
2 修改数据库,修改缓存,缓存的修改是在后
3 定时更新缓存,针对于实时性不是很高的接口适合定时更新
# 给首页轮播图接口加入了缓存,出现了双写一致性问题,使用定时更新来解决双写一致性的问题,起一个celery的定时任务
7.2 celery定时任务实现双写一致性
celery_task/home_task.py
from home.models import Banner
from home.serializer import BannerSerializer
from django.conf import settings
from django.core.cache import cache
@app.task
def update_banner():
# 更新缓存
# 查询出现在轮播图的数据
queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT]
ser = BannerSerializer(instance=queryset, many=True)
for item in ser.data:
item['image'] = settings.HOST_URL + item['image']
cache.set('banner_list', ser.data)
return True
celery.py
app.conf.beat_schedule = {
'update_banner': {
'task': 'celery_task.home_task.update_banner',
'schedule': timedelta(seconds=50),
'args': (),
}
}
settings/dev.py
HOST_URL = 'http://127.0.0.1:8000'
标签:11,task,16,res,app,py,celery,2022,import
From: https://www.cnblogs.com/dy12138/p/16897780.html