celery介绍和安装
Celery 的作用:
-1 异步任务
-2 定时任务
-3 延迟任务
celery的运行原理:
1)可以不依赖任何服务器,通过自身命令,启动服务
2)celery服务为为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求
人是一个独立运行的服务 | 医院也是一个独立运行的服务
正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题
人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
celery架构(Broker,backend 都用redis)
- 1 任务中间件 Broker(中间件),其他服务提交的异步任务,放在里面排队
-需要借助于第三方 redis rabbitmq - 2 任务执行单元 worker 真正执行异步任务的进程
-celery提供的 - 3 结果存储 backend 结果存储,函数的返回结果,存到 backend中
-需要借助于第三方:redis,mysql
使用场景 :
异步执行:解决耗时任务
延迟执行:解决延迟任务
定时执行:解决周期(周期)任务
celery快速使用
第一步:安装celery模块
pip install celery
第二步:
在scripts文件夹中 创建 celery_m文件夹
在celery_m文件夹下创建 main.py文件
from celery import Celery
# 提交的异步任务在这里面 指定redis地址和库
broker = 'redis://127.0.0.1:6379/2'
# 完成的结果放在这里面 指定redis地址和库
backend = 'redis://127.0.0.1:6379/3'
app = Celery(main='celery_text', backend=backend, broker=broker)
# 生成一个对象
@app.task
def add(a, b):
import time
time.sleep(3)
return a + b
# 给需要用到异步执行的方法装上装饰器
第三步:
提交异步任务
可以再任意文件中执行
from main import add
把带异步执行的方法导入
# 异步调用
res = add.delay(2,3)
# 放入异步中待执行
print(res)
# 得到一个任务id 已经把这个任务提交到了异步待执行状态
# 9eb94989-ed8a-4d70-a349-658d026aa419
第四步:
开启异步任务执行 所有放在异步中的任务都会执行
在pycharm终端开启 路径要在 celery_m文件夹下
'''执行该命令需要到celery_mk这一级目录'''
执行命令
celery -A celery worker -l info
第五步:
根据待执行任务id 查看结果
# 查看异步提交的结果
from main import app
# 导入我们创建的Celery对象 名叫app
from celery.result import AsyncResult
id = '0af2910f-f8aa-4f70-9c4e-bf09b95c8672'
# 放入异步是就会返回一个id给你, 根据id来查看异步执行的结果
if __name__ == '__main__':
a = AsyncResult(id=id, app=app)
if a.successful():
result = a.get()
print(result)
elif a.failed():
print('任务失败')
elif a.status == 'PENDING':
print('任务等待中被执行')
elif a.status == 'RETRY':
print('任务异常后正在重试')
elif a.status == 'STARTED':
print('任务已经开始被执行')
celery包结构
celery_pack # celery包
├── __init__.py # 包文件
├── celery.py # celery连接和配置相关文件,且名字必须交celery.py
└── tasks.py # 所有任务函数
├── add_task.py # 添加任务
└── get_result.py # 获取结果
执行异步任务/延迟任务
celery.py---创建app
# 1)创建app + 任务
# 2)启动celery(app)服务:
MAC:
命令:celery -A celery_pack worker -l info
要在celery_pack同级目录下执行命令/或上级目录
windows:
pip3 install eventlet
celery -A celery_pack worker -l info -P eventlet
# 3)添加任务:手动添加,要自定义添加任务的脚本,右键执行脚本
# 4)获取结果:手动获取,要自定义获取任务的脚本,右键执行脚本
from celery import Celery
broker = 'redis://127.0.0.1:6379/1'
# 设置提交任务到redis的哪个库
backend = 'redis://127.0.0.1:6379/2'
# 设置结果存到到redis的哪个库
app = Celery(broker=broker, backend=backend, include=['celery_pack.tasks'])
include=[需要管理的任务函数文件]
tasks.py--设置任务
统一存放编写需要用到celery服务的方法
from .celery import app
import time
@app.task
def add(n, m):
time.sleep(10)
return n + m
@app.task
def low(n, m):
print(n)
print(m)
print('n-m的结果:%s' % (n - m))
return n - m
add_task.py--提交任务文件可单独放置
导入设置好的任务 通过.delay方法提交到worker中执行
得到一个任务id返回值,可以用此返回值查询异步worker执行好的结果
需要在父级路径下执行
celery -A celery_pack worker -l info
# 开始worker服务
#1 异步任务
任务.delay(参数)
# 延迟任务
任务.apply_async(args=[参数],eta=时间对象)
from celery_pack.tasks import add
# 传入需要异步执行的方法
# 添加立即执行任务
d1 = add.delay()
print(d1)
'''
设置延迟任务
生成一个时间对象,执行.apply_async方法,需要参数传入 args=(200, 50)中,
eta= 过期时间 配置一个时间对象
'''
# 添加延迟任务
eg:
from datetime import datetime, timedelta
time_obj=datetime.utcnow() + timedelta(seconds=30)
# 生成一个时间对象,延迟30秒执行(timedelta(seconds=30))
任务.apply_async(args=(200, 50), eta=time_obj)
该任务将会在运行后延迟30执行
get_result.py--获取结果
from scripts.celery_pack.celery import app
from celery.result import AsyncResult
id = '1937598a-f62a-4682-b93c-8a97a7e6e8fd'
if __name__ == '__main__':
a = AsyncResult(id=id, app=app)
if a.successful():
result = a.get()
print(result)
elif a.failed():
print('任务失败')
elif a.status == 'PENDING':
print('任务等待中被执行')
elif a.status == 'RETRY':
print('任务异常后正在重试')
elif a.status == 'STARTED':
print('任务已经开始被执行')
执行定时任务
celery.py文件中 ---添加定时任务配置
第一步:
from celery import Celery
broker = 'redis://127.0.0.1:6379/2'
# 设置提交任务到redis的哪个库
backend = 'redis://127.0.0.1:6379/3'
# 设置结果存到到redis的哪个库
app = Celery(broker=broker, backend=backend, include=['celery_pack.tasks'])
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 任务的定时配置
from celery.schedules import crontab
app.conf.beat_schedule = {
'task_1': {
'task': 'celery_pack.tasks.bobo',
# 'schedule': timedelta(seconds=3),
# 时间对象 每3秒执行一次
'schedule': crontab(hour=23,minute=58), # 每天晚上23点58分执行
# 'args': (300, 150),
},
'task_2': {
'task': 'celery_pack.tasks.bobobo',
# 需要执行的函数
# 'schedule': timedelta(seconds=3),
# 时间对象 每3秒执行一次
'schedule': crontab(hour=8, day_of_week=1) # 每周一早八点执行
# 'args': (300, 150),
}
}
第二步:
tasks.py中 编写定时任务
from .celery import app
@app.task
def bobo():
print('我执行了')
return '定时任务'
@app.task
def bobobo():
print('我执行了')
return '定时任务'
第三步:
确定开始启动定时任务
切记 要在celery_pack包所在的父级路径下执行
1.启动beat
celery -A celery_pack beat -l info
2.启动worker
celery -A celery_pack worker -l info
# 只要任务的代码有改动都需要重启worker
django中使用celery
celery.py中
from celery import Celery
import os
broker = 'redis://127.0.0.1:6379/2'
# 设置提交任务到redis的哪个库
backend = 'redis://127.0.0.1:6379/3'
# 设置结果存到到redis的哪个库
app = Celery(broker=broker, backend=backend, include=['celery_pack.tasks'])
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'LufeiApi.settings.dev')
如果要在django中使用就一定要加这句话
tasks.py中
from .celery import app
from LufeiApi.apps.user.models import User
@app.task
def get_username(user_id):
user_obj = User.objects.filter(pk=user_id).first()
return user_obj.username
把需要异步执行的任务 编写到这里面
标签:异步,app,redis,celery,任务,执行
From: https://www.cnblogs.com/moongodnnn/p/17201063.html