Celery
-
Celery 是基于Python实现的模块, 用于执行异步定时周期任务的
-
celery组成结构
# 1.app 应用 任务 # 2.缓存 存放任务的 Broker - Backend 缓存任务和缓存任务结果 redis or RabbitMQ # 3.工人 Worker 执行任务
1.Celery简单示例
-
s1.py
from celery import Celery # 创建一个Celery实例,这就是我们用户的应用app my_task = Celery('tasks', backend='redis://127.0.0.1:6379', broker='redis://127.0.0.1:6379') # 为应用创建任务,my_func1 import time @my_task.task def my_func1(a, b, i): time.sleep(i) return a + b 15180400904
-
s2.py
from s1 import my_func1 # 将任务交给Celery的Worker执行 # 4.0版本之后不支持window,需要下载模块eventlet # celery worker -A s1 -l INFO -P eventlet 版本celery 4.4.7, async为python3.7关键字,在4.4.7版本中已修复 # celery worker -A s1 -c 4 -l INFO -P eventlet # 创建一个worker组,里面有4个员工,只能执行s1中的任务,打印 -l 中的info信息 # c:几个员工 -l INFO: 打印log中INFO信息 -P 更改任务处理的方式,eventlet方式运行 res_id_list = [] for i in range(10): # 发布任务 res = my_func1.delay(2, 3, i) # print(res) res_id_list.append(res) print(res_id_list) # 任务发布出去了,将发布任务的id存入列表(实际上是一个对象) # 在等待任务完成前 result.get()处于堵塞状态 # time.sleep()设定堵塞延时 待任务完成,result.get()也完成,此时将会将结果打印 for result in res_id_list: print(f'{result}的值为{result.get()}')
-
监听任务:
celery worker -A s1 -c 4 -l INFO -P eventlet
- 发布任务
-
worker监听到发布的任务并执行
2.Celery项目目录
- 在实际项目中我们运用的Celery是有规则的
要满足这样的条件才可以哦,目录celery_tasks这个名字可以随意起,但是一定要注意在这个目录下一定要有一个celery.py这个文件
-
celery.py
from celery import Celery # 创建一个Celery实例 my_task = Celery('my_tasks', broker='redis://127.0.0.1:6379', backend='redis://127.0.0.1:6379', include=['celery_tasks.task_one'] # include 这个参数适用于寻找目录中所有的task )
-
task_one.py
from Celery_task.celery import my_task # 为我们的Celery实例创建任务 @my_task.task def task_one(a, b): return a * b
-
my_celery.py
from Celery_tasks.task_one import task_one # 发布任务 res = task_one.delay(3, 2) print(res)
-
启动Worker
启动Worker的时候无需再使用文件启动,直接启动你的celery_tasks目录就行了 它会自动找到celery文件并开始执行,这也就是为什么要创建celery名的文件 celery worker -A celery_tasks -l INFO -P eventlet
3.Celery定时任务
- celery.py
from celery import Celery
my_task = Celery('my_tasks', broker='redis://127.0.0.1:6379',
backend='redis://127.0.0.1:6379',
include=['celery_tasks.task_one', 'celery_tasks.task_two']
)
-
task.two.py
from .celery import my_task @my_task.task def task_two(): return "task_two - 完成定时任务"
-
my_celery.py
from celery_tasks import task_one, task_two import time, datetime # 定时任务我们不在使用delay这个方法了,delay是立即交给 task 去执行 # 现在我们使用apply_async定时执行 now_time = time.time() # 将当前的东八区时间改为 UTC时间 注意这里一定是UTC时间,没有其他说法 utc_now = datetime.datetime.utcfromtimestamp(now_time) # 为当前时间增加 1 天 add_time = datetime.timedelta(days=1) action_time = utc_now + add_time # action_time 就是当前时间未来1天之后的时间 # 现在我们使用apply_async定时执行 res = task_two.task_two.apply_async(args=(), eta=action_time) print(res)
-
发布任务后,启动worker:
celery worker -A celery_tasks -l INFO -P eventlet
4.Celery周期任务
-
celery.py
from celery import Celery from celery.schedules import crontab # 做时间转换用的 my_task = Celery('my_tasks', broker='redis://127.0.0.1:6379', backend='redis://127.0.0.1:6379', include=['celery_tasks.task_one', 'celery_tasks.task_two'] ) # 对beat任务生产做一个配置 my_task.conf.beat_schedule = { "each10s_task": { "task": "task": "celery_tasks.task_one.task_one", # 具体哪个任务 "schedule": 10, # 每10秒钟执行一次 "args": (10, 10) # 函数接收的参数值 }, "each1m_task": { "task": "celery_tasks.task_two.task_two", "schedule": crontab(minute=1), # 每一分钟执行一次 }, }
-
#以上配置完成之后,还有一点非常重要 # 不能直接创建Worker了,因为我们要执行周期任务,所以首先要先有一个任务的生产方beat # 该生产方用于每隔一段时间创建一个任务然后交给worker去执行 # celery beat -A Celery_task # celery worker -A Celery_task -l INFO -P eventlet