celery
介绍
https://github.com/celery/celery/
https://docs.celeryq.dev/en/stable/
-
celery是一个分布式异步任务框架,是一个灵活且可靠的,处理大量消息的分布式系统,可以在多个节点之间处理某个任务,是一个专注于实时处理的任务队列,支持任务调度,所以 celery 本质上是一个分布式的异步任务调度框架,类似于 Apache 的 airflow,只是用来调度任务的,但它本身并不具备存储任务的功能,而调度任务的时候肯定是要把任务存起来的。因此要使用 celery 的话,还需要搭配一些具备存储、访问功能的工具,比如:消息队列、Redis缓存、数据库等等。
-
celery能做什么
- 定时任务
- 异步任务
- 延迟任务
celery的使用场景
# 1 异步任务
-一些耗时的操作可以交给celery异步执行,而不用等着程序处理完才知道结果。
-视频转码、邮件发送、消息推送等等
# 2 定时任务
-定时推送消息、定时爬取数据、定时统计数据等
# 3 延迟任务
-提交任务后,等待一段时间再执行某个任务
1 Celery 架构,它采用典型的生产者-消费者模式,主要由以下部分组成:
2 Celery Beat,任务调度器,Beat 进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。
3 Producer:需要在队列中进行的任务,一般由用户、触发器或其他操作将任务入队,然后交由 workers 进行处理。调用了 Celery 提供的 API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。
4 Broker,即消息中间件,在这指任务队列本身,Celery 扮演生产者和消费者的角色,brokers 就是生产者和消费者存放/获取产品的地方(队列)。
5 Celery Worker,执行任务的消费者,从队列中取出任务并执行。通常会在多台服务器运行多个消费者来提高执行效率。
6 Result Backend:任务处理完后保存状态信息和结果,以供查询。Celery 默认已支持 Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy 等方式。
实际应用中,用户从 Web 前端发起一个请求,我们只需要将请求所要处理的任务丢入任务队列 broker 中,由空闲的 worker 去处理任务即可,处理的结果会暂存在后台数据库 backend 中。我们可以在一台机器或多台机器上同时起多个 worker 进程来实现分布式地并行处理任务。
快速使用
# 0 创建Python项目
# 1 创建虚拟环境
# 2 安装celery
pip install celery
# 3 安装redis(消息队列和结果存储使用redis)
pip install redis
# 4 安装eventlet(win 平台,如果是mac,linux不需要)
pip install eventlet
celery_demo/main.py 主文件
from celery import Celery
import time
broker = "redis://127.0.0.1:6379/1"
backend = "redis://127.0.0.1:6379/2"
# 创建app对象
app = Celery("test", broker=broker, backend=backend)
@app.task
def add(x, y):
time.sleep(2)
return x + y
@app.task
def send_msg(mobile, code):
print(f"手机号:{mobile},发送短信{code}成功")
return "发送成功!"
celery_demo/task_add.py
from main import add, send_msg
# 同步执行任务
res = add(4, 5)
print(res)
# 提交到broker消息队列中,异步执行
res = add.delay(4, 5)
print(res) # c1b10e79-37a5-41c5-8fc5-5a189bce1951
# 返回的是任务的id号,任务被提交到消息中间件 broker redis
查看提交的任务
让worker执行任务
# win启动
celery -A main worker -l info -P eventlet
# mac linux
celery -A main worker -l info
结果存储查看结果
# 1 直接看redis 有数据
# 2 通过代码,拿到结果
from main import app
from celery.result import AsyncResult
id = 'c1b10e79-37a5-41c5-8fc5-5a189bce1951'
if __name__ == '__main__':
result = AsyncResult(id=id, app=app)
if result.successful():
result = result.get()
print(result) # 9 因为之前执行的是add
elif result.failed():
print('任务失败')
elif result.status == 'PENDING':
print('任务等待中被执行')
elif result.status == 'RETRY':
print('任务异常后正在重试')
elif result.status == 'STARTED':
print('任务已经开始被执行')
celery包结构
目录结构
celery_demo
├── celery_task # celery包
│ ├── __init__.py # 包文件
│ ├── celery.py # celery连接和配置相关文件,且名字必须叫celery.py
│ ├── user_tasks.py # 所有任务函数
│ └── order_tasks.py # 所有任务函数
├── add_task.py # 添加任务
└── get_result.py # 获取结果
celery.py
from celery import Celery
broker = "redis://127.0.0.1:6379/1"
backend = "redis://127.0.0.1:6379/2"
app = Celery(
"testcelery",
backend=backend,
broker=broker,
include=["celery_task.order_tasks", "celery_task.user_tasks"],
)
user_tasks.py
from .celery import app
import time
# 用户相关任务
@app.task
def add(x, y):
time.sleep(3)
return x + y
order_task.py
from .celery import app
# 订单相关任务
# 下单成功,发送短信
@app.task
def send_sms(mobile, code):
print(f"手机号:{mobile},发送成功{code}")
return "ok,发送成功!"
add_task.py 提交任务
from celery_task.order_tasks import send_sms
# 1 同步调用
# res=send_sms('111111',888)
# print(res)
# 2 提交到任务队列被 worker执行
res = send_sms.delay("15666777888", 9999)
print(res) # 39a9a1f8-9907-4589-a5ba-6b1b291b42ab
get_result.py 查看任务结果
from celery_task.celery import app
from celery.result import AsyncResult
id = "39a9a1f8-9907-4589-a5ba-6b1b291b42ab"
if __name__ == "__main__":
result = AsyncResult(id=id, app=app)
if result.successful():
result = result.get()
print(result)
elif result.failed():
print("任务失败")
elif result.status == "PENDING":
print("任务等待中被执行")
elif result.status == "RETRY":
print("任务异常后正在重试")
elif result.status == "STARTED":
print("任务已经开始被执行")
celery异步-延迟-定时任务
异步任务
任务名.delay(参数)
延迟任务
任务名.apply_async(args=[参数],eta=时间对象)
from celery_task.user_tasks import add
from datetime import datetime, timedelta
# datetime.utcnow() 取utc时间---》默认使用utc时间
# 当前时间加了30s
eta = datetime.utcnow() + timedelta(seconds=30)
# eta 要放时间对象
res = add.apply_async(args=[5, 6], eta=eta)
print(res) # f2dc3e99-4232-48dd-860b-cfe4bb2fe8b7
等了30秒返回结果
定时任务
用beat启动
要写配置文件 在celery中修改
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
'add-task': {
'task': 'celery_task.user_task.add',
'schedule': timedelta(seconds=3),
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
'args': (300, 150),
},
'send-sms-task': {
'task': 'celery_task.order_task.send_sms',
# 'schedule': timedelta(seconds=30),
'schedule': crontab(hour=11,minute=20), # 每天11点20执行
'args': ('189232222',888),
},
}
启动beat
只要启动了beat就自动会按照设置的时间提交任务
celery -A celery_task beat -l debug
启动worker
celery -A celery_task worker -l info -P eventlet
django中使用celery
通用方案
1 把项目结构的包直接放到项目根路径中
2 在视图函数中提交任务
3 启动worker
4 运行django,正常使用接口
from celery_task.user_tasks import add
class CeleryView(GenericViewSet):
def list(self, request, *args, **kwargs):
res = add.delay(4, 5)
return APIResponse(msg=str(res))
# 3 启动worker
celery -A celery_task worker -l info -P eventlet
# 4 运行django,正常使用接口即可
python manage.py runserver
注意:
要在celery.py中配置django的环境变量让celery能识别到
from celery import Celery
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffy_api.settings.dev")
broker = "redis://127.0.0.1:6379/1"
backend = "redis://127.0.0.1:6379/2"
app = Celery(
"testcelery",
backend=backend,
broker=broker,
include=["celery_task.order_tasks", "celery_task.user_tasks"],
)
启动的时候直接用包名启动即可,在根路径
(luffy) PS D:\2023propygo\luffy_api>
celery -A celery_task worker -l info -P eventlet
celery官方方案
# 1 安装模块
pip install Django==3.2.22
pip install celery
pip install redis
pip install eventlet #在windows环境下需要安装eventlet包
luffy_api/celery.py
from celery import Celery
import django
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffy_api.settings.dev")
django.setup()
app = Celery('testcelery')
app.config_from_object("django.conf:settings",namespace="CELERY")
app.autodiscover_tasks()
common_settings.py/dev.py
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/1'
# BACKEND配置,使用redis
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
# 结果序列化方案
CELERY_RESULT_SERIALIZER = 'json'
# 任务结果过期时间,秒
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
# 时区配置
CELERY_TIMEZONE = 'Asia/Shanghai'
luffy_api/user/tasks.py
from celery import shared_task
@shared_task
def add(x, y):
return x + y
views.py
class CeleryTestView(GenericViewSet):
def list(self, request, *args, **kwargs):
res = add.delay(1,2)
return APIResponse(msg=str(res))
启动的时候要使用app名字.celery,比如:
(luffy) PS D:\2023propygo\luffy_api> celery -A luffy_api.celery worker -l info -P eventlet
并发量和qps
# 1 并发量是此刻有多少并发--》请求
# 2 qps:每秒钟响应的数量
# 并发量定了:10,这个接口2s钟响应
-2s能处理10个用户请求
-1s能处理5个用户请求
-qps就是5
-提高qps,如何做?
-1 提高并发--》提不了了
-2 提供响应速度--》0.5s响应回去
-3 1s钟就能处理20请求--》qps就是20
# 使用异步,提高项目的qps
定时更新缓存
加入缓存之后,数据库改了,缓存中也要改,缓存双写一致性
celery_task/home_tasks.py
查询出来的数据不带前缀,需要自己手动拼
from .celery import app
from home.models import Banner
from home.serializer import Bannerserializer
from django.conf import settings
from django.core.cache import cache
@app.task
def update_banner():
queryset = (
Banner.objects.all()
.filter(is_delete=False, is_show=True)
.order_by("orders")[0 : settings.BANNER_COUNT]
)
serializer = Bannerserializer(instance=queryset, many=True)
for i in serializer.data:
i["image"] = "http://127.0.0.1:8000" + i["image"]
cache.set("banner_list", serializer.data)
return "轮播图缓存更新成功"
celery_task/celery.py
定时任务
包结构要在app的include中注册,才能被检索到
from celery import Celery
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffy_api.settings.dev")
broker = "redis://127.0.0.1:6379/1"
backend = "redis://127.0.0.1:6379/2"
app = Celery(
"testcelery",
backend=backend,
broker=broker,
include=["celery_task.home_tasks"],
)
# 时区
app.conf.timezone = "Asia/Shanghai"
# 是否使用UTC
app.conf.enable_utc = False
# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
"banner-task": {
"task": "celery_task.home_tasks.update_banner",
"schedule": timedelta(seconds=30),
"args": (),
},
}
启动任务
celery -A celery_task beat -l info
celery -A celery_task worker -l info
task和share_task的区别
django-celery中有两个装饰函数。一个是@task,另一个是@share_task。两者区别在于,前者只能自己这个APP使用。后者是一个全局的配置,多个初始化的APP都可以使用。
task
装饰函数,将函数当成celery的任务函数
import time
from celery import Celery
broker = "redis://127.0.0.1:6379/1"
backend = "redis://127.0.0.1:6379/2"
app = Celery(
"testcelery",
backend=backend,
broker=broker,
)
@app.task
def add(x, y):
time.sleep(10)
return x + y
share_task
-
装饰函数,将函数当成celery的任务函数
-
不依赖某个celery对象,而是加载到内存之后自动添加到celery对象中
-
与多个celery对象进行关联
from celery import shared_task
@shared_task
def add(x, y):
return x + y
官方方案配置定时任务
dev.py
一定要是tasks才行
CELERY_BEAT_SCHEDULE = {
'every_1_minutes': {
'task': 'home.tasks.add',
'schedule': timedelta(seconds=2),
'args': (1,2)
},
}
tasks.py
from celery import shared_task
@shared_task
def add(x, y):
return x + y
celery -A luffy_api.celery worker -l info -P eventlet
celery -A luffy_api.celery beat -l debug
admin配置定时任务(手动)
pip install django-celery-beat
dev.py
INSTALLED_APPS = [
....
'django_celery_beat',
]
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers.DatabaseScheduler'
LANGUAGE_CODE = 'zh-hans'
TIME_ZONE = 'Asia/Shanghai'
USE_I18N = True
USE_TZ = True
celery.py
# 配置和django设置中一样的时区
from django.conf import settings
app.conf.timezone = settings.TIME_ZONE
# 迁移数据库
python manage.py migrate django_celery_beat
#在两个控制台分别启动woker和beat
celery -A luffy_api.celery worker -l debug -P eventlet
celery -A luffy_api.celery beat -l debug
访问admin
admin监视任务
- 在控制台监控任务执行情况,还不是很方便,最好是能够通过web界面看到任务的执行情况,如有多少任务在执行,有多少任务执行失败了等
- 这个Celery也是可以做到了,就是将任务执行结果写到数据库中,通过web界面显示出来。
- 这里要用到django-celery-results插件。
- 通过插件可以使用Django的orm作为结果存储,这样的好处在于我们可以直接通过django的数据查看到任务状态,同时为可以制定更多的操作
pip install django-celery-results
INSTALLED_APPS = (
...,
'django_celery_results',
)
# 使用django-orm 作为结果存储
CELERY_RESULT_BACKEND = 'django-db'
# 迁移数据库
python manage.py migrate django_celery_results
访问admin
- 后期后台管理可以自己写
- 可以直接使用django的orm取数据
- 也可以放到redis中,自己写接口处理
Flower监控celery任务
如果不想通django的管理界面监控任务的执行,还可以通过Flower插件来进行任务的监控。Flower的界面更加丰富,可以监控的信息更全
Flower 是一个用于监控和管理 Celery 集群的开源 Web 应用程序。它提供有关 Celery workers 和tasks状态的实时信息
Flower可以:
1 实时监控celery的Events
-查看任务进度和历史记录
-查看任务详细信息(参数、开始时间、运行时间等)2 远程操作
-查看workers 状态和统计数据
-关闭并重新启动workers 实例
-控制工作池大小和自动缩放设置
-查看和修改工作实例消耗的队列
-查看当前正在运行的任务
-查看计划任务(预计到达时间/倒计时)
-查看保留和撤销的任务
-应用时间和速率限制
-撤销或终止任务3 Broker 监控
-查看所有 Celery 队列的统计信息
pip install flower
# 方式一
celery -A celery_demo flower --port-5555
# 方式二
celery --broker=redis://127.0.0.1:6379/1 flower
# 浏览器访问
http://127.0.0.1:5555/
任务异常自动告警
虽然可以通过界面来监控了,但是我们想要得更多,人不可能天天盯着界面看吧,如果能实现任务执行失败就自动发邮件告警就好了。这个Celery当然也是没有问题的。
通过钩子程序在异常的时候触发邮件通知
tasks.py
from celery import shared_task
import time
from celery import Task
from django.core.mail import send_mail
from django.conf import settings
# 成功失败邮件告警
class SendEmailTask(Task):
def on_success(self, retval, task_id, args, kwargs):
info = f'任务成功-- 任务id是:{task_id} , 参数是:{args} , 执行成功 !'
send_mail('celery任务监控成功告警', info, settings.EMAIL_HOST_USER, ["ssrheart@outlook.com",])
print('------------成功')
def on_failure(self, exc, task_id, args, kwargs, einfo):
info = f'任务失败-- 任务id为:{task_id} , 参数为:{args} , 失败 ! 失败信息为: {exc}'
send_mail('celery任务监控失败告警', info, settings.EMAIL_HOST_USER, ["ssrheart@outlook.com",])
print('------------失败')
def on_retry(self, exc, task_id, args, kwargs, einfo):
print(f'任务id位::{task_id} , 参数为:{args} , 重试了 ! 错误信息为: {exc}')
@shared_task(base=SendEmailTask, bind=True)
def add(a,b):
time.sleep(1)
return a+b
@shared_task()
def send_email(mail):
print(f'给{mail}发送邮件了')
return '成功'
# celery -A celery_demo worker -l debug -P eventlet
# celery -A celery_demo beat -l debug
# celery -A celery_demo flower --port-5566
django发送邮件
# 1 邮箱开启smtp
# 2 django配置文件配置
### 发送邮件
EMAIL_HOST = 'smtp.qq.com' # 如果是 qq 改成 smtp.qq.com
EMAIL_PORT = 465
EMAIL_HOST_USER = '446367977@qq.com' # 帐号
EMAIL_HOST_PASSWORD = '' # 密码
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
#这样收到的邮件,收件人处就会这样显示
#DEFAULT_FROM_EMAIL = 'heart<'446367977@qq.com>'
EMAIL_USE_SSL = True #使用ssl
#EMAIL_USE_TLS = False # 使用tls
#EMAIL_USE_SSL 和 EMAIL_USE_TLS 是互斥的,即只能有一个为 True
# 3 发送邮件
class EmailView(ViewSet):
def list(self, request, *args, **kwargs):
to_user = request.query_params.get('to_user')
send_mail('test1', 'test', settings.EMAIL_HOST_USER, [to_user, ])
return APIResponse(msg=f'邮件已经发送:{str(to_user)}')
异步秒杀方案
# 秒杀功能
- qps要高:承载住很多用户1s内把功能完成
-创建订单
-扣减库存
- 效率要高
# 同步秒杀
-假设秒杀需要10s钟,项目并发量是3,总共5件商品要秒杀
-10s内,只有3个人能进入到系统,并且开始秒杀
# 异步秒杀
-假设秒杀需要10s,项目并发量是3,总共5个商品要秒杀
-使用异步,用户提交后,立马返回
-10s内,可以响应很多很多用户提交秒杀任务:假设提交了100个用户
-这100个用户中只有5个成功
同步秒杀
前端
<template>
<div>
<Header></Header>
<div style="padding: 50px;margin-left: 100px">
<h1>Go语言课程</h1>
<img src="http://blog.ssrheart.top/img/202405171633804.png"
height="300px"
width="300px">
<br>
<el-button type="danger" @click="handleSeckill" v-loading.fullscreen.lock="fullscreenLoading">秒杀课程
</el-button>
</div>
<br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br>
<Footer></Footer>
</div>
</template>
<script setup>
import Header from "../components/Headers.vue"
import Footer from "../components/Footer.vue"
import {ref} from "vue"
import axios from "axios";
const fullscreenLoading = ref(false)
const task_id = ref('')
let t = null
const handleSeckill = () => {
//同步秒杀
fullscreenLoading.value = true;
axios.post('http://127.0.0.1:8000/api/v1/home/seckill/seckill/', {
course_id: '99',
}).then(res => {
fullscreenLoading.value = false;
alert(res.data.msg)
}).catch(err => {
this.fullscreenLoading = false;
alert(err)
})
}
</script>
后端
class SeckillView(ViewSet):
@action(methods=['POST'], detail=False)
def seckill(self, request, *args, **kwargs):
'''
#1 取出传入的 课程id
#2 查询课程 是否还有剩余 1s
#2.1 有剩余,开始下单扣减库存 1s
#2.2,在订单表中生成一条记录 2s
#2.3 秒杀成功返回给前端
#3 课程没有剩余,秒杀失败,返回给前端
'''
course_id = request.data.get('course_id')
#
print('根据课程id:%s,查询课程是否还有剩余,耗时3s' % course_id)
time.sleep(1)
res = random.choice([True, False])
if res: # 库存够
print('扣减库存,耗时3s')
time.sleep(1)
print('下单,耗时4s')
time.sleep(2)
return APIResponse(msg='恭喜您秒到了')
else:
return APIResponse(code=101, msg='库存不足,秒杀失败')
异步秒杀
前端
<template>
<div>
<Header></Header>
<div style="padding: 50px;margin-left: 100px">
<h1>Go语言课程</h1>
<img src="http://blog.ssrheart.top/img/202405171633804.png"
height="300px"
width="300px">
<br>
<el-button type="danger" @click="handleSeckill" v-loading.fullscreen.lock="fullscreenLoading">秒杀课程
</el-button>
</div>
<br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br>
<Footer></Footer>
</div>
</template>
<script setup>
import Header from "../components/Headers.vue"
import Footer from "../components/Footer.vue"
import {ref} from "vue"
import axios from "axios";
const fullscreenLoading = ref(false)
const task_id = ref('')
let t = null
const handleSeckill = () => {
// 异步秒杀
fullscreenLoading.value = true;
axios.post('http://127.0.0.1:8000/api/v1/home/seckill/seckill/',{
course_id: '99'
}).then(res => {
// 在排队,转圈的,还需要继续显示
alert(res.data.msg)
task_id.value = res.data.task_id
// 继续发送请求---》查询是否秒杀成功:1 成功 2 没成功 3 秒杀任务还没执行
// 启动定时任务,每隔1s,向后端发送一次请求
t = setInterval(() => {
axios.get('http://127.0.0.1:8000/api/v1/home/seckill/get_result/', {
params:{
task_id: task_id.value
}
}).then(res => {
// 100 成功,success : 1 成功 0 失败 2 还没开始
if (res.data.success == '1') {
// 转圈框不显示
fullscreenLoading.value = false;
// 停止定时任务
clearInterval(t)
t = null
alert(res.data.msg)
} else if (res.data.success == '0') {
// 转圈框不显示
fullscreenLoading.value = false;
// 停止定时任务
clearInterval(t)
t = null
alert(res.data.msg)
} else {
// alert(res.msg)
console.log(res.msg)
}
})
}, 1000)
}).catch(err => {
fullscreenLoading.value = false;
alert(err)
})
}
</script>
<style scoped>
</style>
后端
class SeckillView(ViewSet):
@action(methods=['POST'], detail=False)
def seckill(self, request, *args, **kwargs):
course_id = request.data.get('course_id')
task_id = seckill.delay(course_id)
return APIResponse(msg='您正在排队', task_id=str(task_id))
@action(methods=['GET'], detail=False)
def get_result(self, request, *args, **kwargs):
task_id = request.query_params.get('task_id')
a = AsyncResult(id=task_id)
if a.successful():
result = a.get() # True 和 False
if result:
return APIResponse(success='1', msg='秒杀成功')
else:
return APIResponse(success='0', msg='秒杀失败')
elif a.status == 'PENDING':
print('任务等待中被执行')
return APIResponse(success='2', msg='任务等待中被执行')
else:
return APIResponse(success='3', msg='秒杀任务正在执行')
tasks.py
from celery import shared_task
import time,random
@shared_task
def seckill(course_id):
print('根据课程id:%s,查询课程是否还有剩余,耗时2s' % course_id)
time.sleep(2)
res = random.choice([True, False])
if res: # 库存够
print('扣减库存,耗时1s')
time.sleep(1)
print('下单,耗时2s')
time.sleep(2)
return True
else:
return False
标签:异步,task,框架,app,celery,任务,import,id
From: https://www.cnblogs.com/ssrheart/p/18198440