05-01 celery
一. 官方
Celery 官网:http://www.celeryproject.org/
Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html
Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/
Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统
专注于实时处理的异步任务队列
同时也支持任务调度
注意:
Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform. Celery是一个资金很少的项目,所以我们不支持微软的Windows。请不要打开任何与该平台相关的问题。
二. Celery异步任务框架
1) 可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket) 2) celery服务为为其他项目服务提供异步解决任务需求的 注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求 人是一个独立运行的服务 | 医院也是一个独立运行的服务 正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题 人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
三. Celery架构
Celery的架构由三部分组成:
# 1. 消息中间件(message broker) Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等 # 2. 任务执行单元(worker)和 Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。 # 3. 任务执行结果存储(task result store)组成。 Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等
三. 使用场景
异步执行:解决耗时任务,将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等
延迟执行:解决延迟任务
定时执行:解决周期(周期)任务,比如每天数据统计
四. Celery安装
pip install celery==4.4.6 # 匹配django2.2
五. 两种celery任务结构
1. 如果 Celery对象:Celery(...) 是放在一个模块下的
1)终端切换到该模块所在文件夹位置:scripts 2)执行启动worker的命令:celery worker -A 模块名 -l info -P eventlet 注意: 1. windows系统需要eventlet支持: pip insall eventlet 2. Linux与MacOS直接执行:celery worker -A 模块名 -l info
2. 如果 Celery对象:Celery(...) 是放在一个包下的
1)必须在这个包下建一个celery.py的文件,将Celery(...)产生对象的语句放在该文件中 2)执行启动worker的命令:celery worker -A 包名 -l info -P eventlet 注意: 1. windows系统需要eventlet支持: pip insall eventlet 2. Linux与MacOS直接执行:celery worker -A 模块名 -l info
如果报错,可以在dev中配置
# 配置celery CELERY_IMPORTS =['comm.tasks']
ctr+c 停止celery任务
六. Celery执行异步任务
1. 模块结构
t_celery/celery_task.py
from celery import Celery broker = 'redis://127.0.0.1:6379/1' # broker任务队列 backend = 'redis://127.0.0.1:6379/2' # 结构存储,执行完的结果存在这 # include=[被管理的任务文件路径, ] app = Celery(__name__, broker=broker, backend=backend,, include=('t_celery', )) # 放在模块下的启动celery命令 ''' windows中: celery worker -A 模块名 -l info -P eventlet linux中: clery worker -A 模块名 -l info -A: 表示被执行的模块路径. 如果是相对, 那么就需要cd到该模块下执行. -l: 表示展示日志 ''' # 添加任务(使用这个装饰器装饰,@app.task) @app.task def add(x, y): print(x, y) return x+y
windows中启动命令:
celery worker -A celery_task -l info -P eventlet
2) t_celery/add_task.py
from celery_task import add # add(3,4) # 直接执行,不会被添加到broker中 # 往broker中添加一个任务. ''' 只要是worker一直是在启动的状态, 一旦往broker中添加了任务. 那么这个任务就会立刻被worker执行, 执行的结果存储到backend中 ''' res = add.delay(3, 4) # 想broker中添加一个任务 print(res) # 34c1a6f2-16f7-47f1-ba76-57051c52650c
3) t_celery/get_result.py
from celery.result import AsyncResult # from scripts.celery_task.celery import app from scripts.t_celery.celery_task import app task_id = '884d9540-ec97-4b07-aacf-7ea9a69ee375' if __name__ == '__main__': async_obj = AsyncResult(id=task_id, app=app) if async_obj.successful(): result = async_obj.get() print(result) elif async_obj.failed(): print('任务失败') elif async_obj.status == 'PENDING': print('任务等待中被执行') elif async_obj.status == 'RETRY': print('任务异常后正在重试') elif async_obj.status == 'STARTED': print('任务已经开始被执行')
2. 包结构
1) scripts/celery_task
__init.py__
celery.py 包里面必须含有名叫celery.py的文件,django的放射机制决定的
from celery import Celery broker = 'redis://127.0.0.1:6379/1' # broker任务队列 backend = 'redis://127.0.0.1:6379/2' # 结构存储,执行完的结果存在这 app = Celery(__name__, broker=broker, backend=backend, include=['celery_task.task1', 'celery_task.task2']) # 定时任务 from datetime import timedelta from celery.schedules import crontab app.conf.timezone = 'Asia/Shanghai' # 默认是UTC. 这里切换到了上海的时区. app.conf.enable_utc = False # False表示禁用默认的UTC时间作为当前的定时时间, 而是以上面指定的上海的的时区作为定时开始时间 app.conf.beat_schedule = { 'add-task': { 'task': 'celery_task.task1.add', # 配置定时任务执行task1任务的路径 'schedule': timedelta(seconds=3), # 每3秒种执行一次 # 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点 'args': (300, 150), # 给task1进行传递的参数. 指定kwargs就可以传递关键字参数. }, # 'task2': { # 'task': 'celery_task.task2.mutile', # 'schedule': timedelta(seconds=3), # # 'schedule': crontab(hour=8, day_of_week=1), # 'args': (300, 150), # } } # 一定要启动beat ''' celery beat -A celery_task -l info --pidfile= 如果报错,就添加--pidfile= celery beat在运行时会生成一个pidfile文件用于记录当前的pid。且该文件不会因进程的结束而自动删除。当再次创建celery beat进程的时候,会因为已存在这个文件而启动失败。 解决方式有两种: 在启动前检测是否存在该文件,若存在删除后再启动 通过启动时指定空参数的方式规避: '''
异步任务和延时任务,执行celery worker -A celery_task -l info -P eventlet
命令,任务起来后,手动提交t_celery_add_task.py。
定时任务,先在local终端执行celery worker -A celery_task -l info -P eventlet
,
再另起一个终端,执行celery beat -A celery_task -l info --pidfile=
task1.py
from .celery import app @app.task def task1(x, y): return x * y
task2.py
from .celery import app @app.task def task2(x, y): return x + y
2) scripts/t_celery_add_task.py
from celery_task.task1 import add from celery_task.task2 import mutile # 提交任务,异步任务 res = add.delay(8, 8) print(res) res1 = mutile.delay(8, 8) print(res1) # 执行命令: celery worker -A celery_task -l info -P eventlet # 执行延迟任务 # from datetime import datetime, timedelta # # eta = datetime.utcnow() + timedelta(seconds=10) # res = add.apply_async(args=(10, 9), eta=eta) # 延迟10秒执行add任务 # print(res) # res是id号 # 执行命令: celery worker -A celery_task -l info -P eventlet
3)scripts/t_get_result.py
from celery.result import AsyncResult # from scripts.celery_task.celery import app from celery_task.celery import app task_id = '501451f8-b15e-42be-9ac8-71274fc841e2' if __name__ == '__main__': async_obj = AsyncResult(id=task_id, app=app) if async_obj.successful(): result = async_obj.get() print(result) elif async_obj.failed(): print('任务失败') elif async_obj.status == 'PENDING': print('任务等待中被执行') elif async_obj.status == 'RETRY': print('任务异常后正在重试') elif async_obj.status == 'STARTED': print('任务已经开始被执行')
3. 关于celery执行任务的坑
# from scripts.celery_task import task2 # from scripts.celery_task import task1 ''' celery_task作为包, 被命令执行时. 如果不将celery_task作为定级导入, 那么任务的执行将会是未被注册的. 任务将不会被运行的worker获取, 将会抛出异常. [2020-07-26 19:40:56,983: ERROR/MainProcess] Received unregistered task of type 'scripts.celery_task.task2.task2'. 因此, 为了在项目中能够在任意位置都可以执行任务. 因此celery_task必定要放在项目的根目录下的. 那么无论在任何位置导入, 都是没有问题的. 如下: from celery_task.a.b.c.d import xxx 如果不是在项目的根目录下, 你的导入也许是这样的. from scripts.celery_task import task 那么worker将找不到你指定的任务的路径. 本质: 就是由于采用了django的反射机制,使用celery.py所在的celery_task包必须放置项目的根目录下. '''
总结
# 1 celery定时更新首页轮播图接口(只要涉及到加入缓存的接口,一定涉及到更新---(双写一致性),执行异步任务,定时任务(apscheduler)和延时任务 # 2 课程群查接口 -视图类: 如果想改路由(自动生成路由:ViewSetMixin), 如果想跟数据库和序列化类打交道:GenericAPIView 增删改查(5个视图扩展类) # 3 排序和过滤(一定要继承:GenericAPIView,ListModelMixin)
标签:Celery,task,app,celery,任务,import From: https://www.cnblogs.com/coderxueshan/p/17854755.html