Python 实现定时任务有以下几种思路
- 使用子进程(现成)+ time.sleep 间隔执行
- 使用现有的库管理定时任务如,celery, tornado等
- 使用系统的机制执行linux 下面 crontab ,windows 下面taskschd.msc
本次调查 celery 这个常用的异步任务管理框架,它有一下好处
- 支持分布式
- 支持任务确认,即如果worker 重启了, 没有被确认的任务还是会被执行。
windows 下面会需要一些特定的前置条件
- 需要安装 eventlet 没有这个lib task 不执行。
依赖
pip install eventlet==0.38.2
pip install celery==5.4.0
pip install redis==5.2.1
pip install flower==2.0.1
Demo 代码
from celery import Celery from celery.schedules import crontab import time from loguru import logger import os CUR_DIR = os.path.dirname(os.path.abspath(__file__)) app = Celery('HW', broker='redis://:4NqTyeVc9duG%jha@ys-syn-store.redis.rds.aliyuncs.com:6379/12') @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): # Calls test('hello') every 10 seconds. sender.add_periodic_task(10.0, test.s('hello'), name='add every 10') # Calls test('hello') every 30 seconds. # It uses the same signature of previous task, an explicit name is # defined to avoid this task replacing the previous one defined. sender.add_periodic_task(30.0, test.s('hello'), name='add every 30') # Calls test('world') every 30 seconds sender.add_periodic_task(5.0, add.s(1,2), expires=10) # Executes every Monday morning at 7:30 a.m. sender.add_periodic_task( crontab(hour=7, minute=30, day_of_week=1), test.s('Happy Mondays!'), ) @app.task def test(arg): with open(os.path.join(CUR_DIR, 'test.log'), 'a+', encoding='utf-8') as wh: wh.write(f"{arg}\n") print(arg) logger.info(arg) @app.task def add(x, y): z = x + y data = f"{x}+{y}={z}" with open(os.path.join(CUR_DIR, 'add.log'), 'a+', encoding='utf-8') as wh: wh.write(f"{data}\n") print(data) logger.info(data)
运行方式
创建 worker
celery -A tasks worker -c 4 -P eventlet -l INFO
创建任务分发器,一个计时器。
celery -A tasks beat -l INFO
创建任务管理界面
celery -A tasks flower
效果
beat worker flower对比当前的 tornado 实现方式
如果使用celery 实际上会有两个问题:- worker 需要后台运行(服务的方式)
- beat 也需要后台运行(服务的方式)
- fower 也需要一个后台运行