APScheduler+MySQL实现定时任务及其持久化存储
转自:APScheduler+MySQL实现定时任务及其持久化存储 - 简书 (jianshu.com)
最近,想要实现一个功能,就是添加定时任务的同时记录其任务信息到数据库中,可以通过接口查看设定了哪些定时任务,经过调研,发现大家公认的Python最好用的定时任务框架是APScheduler。
Python Scheduler(APScheduler)是一个Python库,它允许您安排稍后要执行的Python代码,只需一次或定期执行。 您可以在您的同时添加新的作业或删除旧的旧作业。 如果在数据库中存储工作,他们还将存活调度程序重新启动并保持状态。 调度程序重新启动时,它将运行它在脱机时应该运行的所有作业
在网上找了一堆资料,包括官方教程,但对于和MySQL协作使用的方面语焉不详,因此,这里做个记录。
项目使用的环境如下:
- Python 3.7
- MySQL
- APScheduler
- Navicat for MySQL
APScheduler Startup
首先,安装APScheduler:
pip install apscheduler
然后就可以编写简单的demo了:
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
def job():
# 输出当前时间
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# BlockingScheduler
scheduler = BlockingScheduler()
scheduler.add_job(job, 'cron', day_of_week='0-4', hour=6, minute=30)
这个demo表示每周的星期一到星期五的六点半报时
APScheduler基础教程
具体见大佬2的博客,这里不做赘述
这里总结一下常用的任务模式。
interval间隔时间任务
即每隔一段时间执行一次,可能的字段为:
字段 | 值类型 | 说明 |
---|---|---|
weeks | int | number of weeks to wait |
days | int | number of days to wait |
hours | int | number of hours to wait |
minutes | int | number of minutes to wait |
seconds | int | number of seconds to wait |
start_date | datetime/str | starting point for the interval calculation |
end_date | datetime/str | latest possible date/time to trigger on |
timezone | datetime.tzinfo/str | time zone to use for the date/time calculations |
例子:
#表示每隔3秒执行一次任务
scheduler.add_job(tok, 'interval', seconds=3)
#表示每隔3天17时19分07秒执行一次任务
sched.add_job(my_job, 'interval',days = 03,hours = 17,minutes = 19,seconds = 07)
date定点任务
即在某个时刻运行,但仅执行一次,可能的字段为:
字段 | 值类型 | 说明 |
---|---|---|
run_date | datetime/str | the date/time to run the job at -(任务开始的时间) |
timezone | datetime.tzinfo/str | time zone for run_date if it doesn’t have one already |
栗子:
# 任务将在2009年11月6日00:00:00执行
sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text'])
# 任务将在2009年11月6日16:30:05执行
sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])
cron某一时刻任务
可能的字段为:
字段 | 值类型 | 值 | 说明 |
---|---|---|---|
year | int/str | XXXX | 表示四位数的年份,如2008年 |
month | int/str | 1-12 | 表示取值范围为1-12月 |
day | int/str | 1-31 | 表示取值范围为1-31日 |
week | int/str | 1-53 | 格里历2006年12月31日可以写成2006年-W52-7 扩展形式或2006W527 紧凑形式 |
day_of_week | int/str | 0-6 或 mon,tue,wed,thu,fri,sat,sun | 表示一周中的第几天,既可以用0-6表示也可以用其英语缩写表示 |
hour | int/str | 0-23 | 表示取值范围为0-23时 |
minute | int/str | 0-59 | 表示取值范围为0-59分 |
second | int/str | 0-59 | 表示取值范围为0-59秒 |
start_date | datetime/str | XXXX-XX-XX hh:mm:ss | 表示开始时间 |
end_date | datetime/str | XXXX-XX-XX hh:mm:ss | 表示结束时间 |
timezone | datetime.tzinfo/str | time zone to use for the date/time calculations | 表示时区取值 |
其中,int/str
表示参数既可以是int类型,也可以是str类型;datetime/str
表示参数既可以是datetime类型,也可以是str类型
例子:
#表示2017年3月22日17时19分07秒执行该程序
sched.add_job(my_job, 'cron', year=2017,month = 03,day = 22,hour = 17,minute = 19,second = 07)
#表示任务在6,7,8,11,12月份的第三个星期五的00:00,01:00,02:00,03:00 执行该程序
sched.add_job(my_job, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
#表示从星期一到星期五5:30(AM)直到2014-05-30 00:00:00
sched.add_job(my_job(), 'cron', day_of_week='mon-fri', hour=5, minute=30,end_date='2014-05-30')
#表示每5秒执行该程序一次,相当于interval 间隔调度中seconds = 5
sched.add_job(my_job, 'cron',second = '*/5') 作者:燕洼仙草 https://www.bilibili.com/read/cv7868920/ 出处:bilibili
APScheduler + MySQL
不过我们肯定并不满足于实现一个小小的demo。
如果我们想要连接MySQL
实现持久化存储应该怎么做呢?
这里需要配置调度器:
scheduler = BackgroundScheduler({
'apscheduler.jobstores.default': {
'type': 'sqlalchemy',
'url': 'mysql+pymysql://username:[email protected]:3306/dbname?charset=utf8',
'tablename': 'api_job'
},
'apscheduler.executors.default': {
'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
'max_workers': '20'
},
'apscheduler.executors.processpool': {
'type': 'processpool',
'max_workers': '10'
},
'apscheduler.job_defaults.coalesce': 'false',
'apscheduler.job_defaults.max_instances': '10',
'apscheduler.timezone': 'UTC',
})
其中,apscheduler.jobstores.default
字段表示存储器的配置,字段说明如下:
-
type
表示数据库的类型,MySQL
属于SQLAlchemy
,因此这里值填写sqlalchemy
-
url
表示存储器访问呢的地址,其中的这些字段需要配置:
username
为你的数据库的用户名password
为对应的账号的密码dbname
为数据库名称
-
tablename
表示将定时任务数据存储的表名
然后就可以使用配置好的scheduler
添加任务了:
scheduler.add_job(tok, 'interval', seconds=3, id=job_id, coalesce=True, replace_existing=True)
其中id
表示任务的唯一标识符,coalesce
表示忽略服务器宕机时间段内的任务执行(否则就会出现服务器恢复之后一下子执行多次任务的情况),replace_existing
表示如果有重名的任务,直接覆盖
添加任务后就可以看到数据库多出了内容:
添加任务后的数据库
这里使用了中文字符串作为id,可以语义地表示任务
然鹅,要想任务存储其他信息怎么办,这里思来想去,决定在数据库创建一个新的表,用于记录任务的详细信息,方便后续查看,每次添加任务时往该表插值即可,删除任务时也删除对应的数据,由此实现任务数据的同步。
不过需要注意的是,在添加重复名称的定时任务时,设置replace_existing=True
,直接覆盖已有的任务;相应地,在对其任务信息表,我们采用REPLACE
替代INSERT
语句来执行插入操作,其使用方法和INSERT
完全一样,但如果表中有重复的数据,则会直接覆盖,其优势不言而喻。由此,实现了协同操作。
其数字孪生任务信息表的字段可为:
定时任务详情表
作者:走错说爱你
链接:https://www.jianshu.com/p/e36236c1df08
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。