首页 > 编程语言 >Python定时任务框架apscheduler

Python定时任务框架apscheduler

时间:2023-06-22 18:12:09浏览次数:48  
标签:Python 作业 调度 apscheduler job scheduler import 定时

Python定时任务框架apscheduler

目录

说到定时任务,你会想起 linux 自带的 crontab ,windows 自带的任务计划,都可以实现守时任务。没错,操作系统基本都会提供定时任务的实现,但是如果你想要更加精细化的控制,或者说任务程序需要跨平台运行,最好还是自己实现定时任务框架,Python 的 apscheduler 提供了非常丰富而且方便易用的定时任务接口。本文介绍如何使用 apscheduler 实现你的定时任务。

apscheduler 使用起来十分方便。提供了基于日期、固定时间间隔以及crontab 类型的任务,我们可以在主程序的运行过程中快速增加新作业或删除旧作业,如果把作业存储在数据库中,那么作业的状态会被保存,当调度器重启时,不必重新添加作业,作业会恢复原状态继续执行。apscheduler 可以当作一个跨平台的调度工具来使用,可以做为 linux 系统crontab 工具或 windows 计划任务程序的替换。注意,apscheduler 不是一个守护进程或服务,它自身不带有任何命令行工具。它主要是要在现有的应用程序中运行,也就是说,apscheduler 为我们提供了构建专用调度器或调度服务的基础模块。

安装

安装非常简单,会用 pip 的人都知道

 pip install apscheduler

基本概念介绍

触发器(triggers):触发器包含调度逻辑,描述一个任务何时被触发,按日期或按时间间隔或按 cronjob 表达式三种方式触发。每个作业都有它自己的触发器,除了初始配置之外,触发器是完全无状态的。

作业存储器(job stores):作业存储器指定了作业被存放的位置,默认情况下作业保存在内存,也可将作业保存在各种数据库中,当作业被存放在数据库中时,它会被序列化,当被重新加载时会反序列化。作业存储器充当保存、加载、更新和查找作业的中间商。在调度器之间不能共享作业存储。

执行器(executors):执行器是将指定的作业(调用函数)提交到线程池或进程池中运行,当任务完成时,执行器通知调度器触发相应的事件。

调度器(schedulers):任务调度器,属于控制角色,通过它配置作业存储器、执行器和触发器,添加、修改和删除任务。调度器协调触发器、作业存储器、执行器的运行,通常只有一个调度程序运行在应用程序中,开发人员通常不需要直接处理作业存储器、执行器或触发器,配置作业存储器和执行器是通过调度器来完成的。

调度器的工作流程

图片

实例1 -间隔性任务----每 3 秒打印出了当前时间。

from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingScheduler

def tick():
    print('Tick! The time is: %s' % datetime.now())

if __name__ == '__main__':
    scheduler = BlockingScheduler()
    scheduler.add_job(tick, 'interval', seconds=3)
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C    '))
try:
    scheduler.start()
except (KeyboardInterrupt, SystemExit):
    pass

实例2 - cron 任务

from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingScheduler

def tick():
    print('Tick! The time is: %s' % datetime.now())

if __name__ == '__main__':
    scheduler = BlockingScheduler()
    scheduler.add_job(tick, 'cron', hour=19,minute=23)
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C    '))

    try:
        scheduler.start()
    except (KeyboardInterrupt, SystemExit):
        pass

定时 cron 任务也非常简单,直接给触发器 trigger 传入 ‘cron’ 即可。hour =19 ,minute =23 这里表示每天的19:23 分执行任务。这里可以填写数字,也可以填写字符串

hour ='19', minute ='23'
minute = '*/3' 表示每 5 分钟执行一次
hour ='19-21', minute= '23' 表示 19:23、 20:23、 21:23 各执行一次任务

配置调度器

调度器的主循环其实就是反复检查是不是有到时需要执行的任务,分以下几步进行:

  1. 询问自己的每一个作业存储器,有没有到期需要执行的任务,如果有,计算这些作业中每个作业需要运行的时间点,如果时间点有多个,做 coalesce 检查。
  2. 提交给执行器按时间点运行。

在配置调度器前,我们首先要选取适合我们应用环境场景的调度器,存储器和执行器。下面是各调度器的适用场景:

  • BlockingScheduler:适用于调度程序是进程中唯一运行的进程,调用start函数会阻塞当前线程,不能立即返回。
  • BackgroundScheduler:适用于调度程序在应用程序的后台运行,调用start后主线程不会阻塞。
  • AsyncIOScheduler:适用于使用了asyncio模块的应用程序。
  • GeventScheduler:适用于使用gevent模块的应用程序。
  • TwistedScheduler:适用于构建Twisted的应用程序。
  • QtScheduler:适用于构建Qt的应用程序。

上述调度器可以满足我们绝大多数的应用环境,本文以两种调度器为例说明如何进行调度器配置。

作业存储器的选择有两种:

一、是内存,也是默认的配置

二、是数据库

具体选哪一种看我们的应用程序在崩溃时是否重启整个应用程序,如果重启整个应用程序,那么作业会被重新添加到调度器中,此时简单的选取内存作为作业存储器即简单又高效。但是,当调度器重启或应用程序崩溃时您需要您的作业从中断时恢复正常运行,那么通常我们选择将作业存储在数据库中,使用哪种数据库通常取决于为在您的编程环境中使用了什么数据库。我们可以自由选择,PostgreSQL 是推荐的选择,因为它具有强大的数据完整性保护。

同样的,执行器的选择也取决于应用场景。通常默认的 ThreadPoolExecutor 已经足够好。如果作业负载涉及CPU 密集型操作,那么应该考虑使用 ProcessPoolExecutor,甚至可以同时使用这两种执行器,将ProcessPoolExecutor 行器添加为二级执行器。

apscheduler 提供了许多不同的方法来配置调度器。可以使用字典,也可以使用关键字参数传递。首先实例化调度程序,添加作业,然后配置调度器,获得最大的灵活性。

如果调度程序在应用程序的后台运行,选择 BackgroundScheduler,并使用默认的 jobstore 和默认的executor,则以下配置即可:

from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()

假如我们想配置更多信息:设置两个执行器、两个作业存储器、调整新作业的默认值,并设置不同的时区。下述三个方法是完全等同的。
配置需求

  • 配置名为“mongo”的MongoDBJobStore作业存储器
  • 配置名为“default”的SQLAlchemyJobStore(使用SQLite)
  • 配置名为“default”的ThreadPoolExecutor,最大线程数为20
  • 配置名为“processpool”的ProcessPoolExecutor,最大进程数为5
  • UTC作为调度器的时区
  • coalesce默认情况下关闭
  • 作业的默认最大运行实例限制为3

方法一:

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExec    utor


jobstores = {
    'mongo': MongoDBJobStore(),
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors,    job_defaults=job_defaults, timezone=utc) 

方法二:

from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler({
   'apscheduler.jobstores.mongo': {
        'type': 'mongodb'
   },
   'apscheduler.jobstores.default': {
       'type': 'sqlalchemy',
       'url': 'sqlite:///jobs.sqlite'
   },
   'apscheduler.executors.default': {
       'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
       'max_workers': '20'
   },
   'apscheduler.executors.processpool': {
       'type': 'processpool',
       'max_workers': '5'
   },
   'apscheduler.job_defaults.coalesce': 'false',
   'apscheduler.job_defaults.max_instances': '3',
   'apscheduler.timezone': 'UTC',
})

方法三:

from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor

jobstores = {
   'mongo': {'type': 'mongodb'},
   'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
   'default': {'type': 'threadpool', 'max_workers': 20},
   'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
   'coalesce': False,
   'max_instances': 3
}
scheduler = BackgroundScheduler()

# .. do something else here, maybe add jobs etc.

以上涵盖了大多数情况的调度器配置,在实际运行时可以试试不同的配置会有怎样不同的效果。

启动调度器

启动调度器前需要先添加作业,有两种方法向调度器添加作业:

  1. 通过接口add_job():add_job() 返回一个apscheduler.job.Job类的实例,用于后续修改或删除作业。
  2. 通过使用函数装饰器

我们可以随时在调度器上调度作业。如果在添加作业时,调度器还没有启动,那么任务将不会运行,并且第一次运行时间在调度器启动时计算。

注意:如果使用的是序列化作业的执行器或作业存储器,那么要求被调用的作业(函数)必须是全局可访问的,被调用的作业的参数是可序列化的,作业存储器中,只有 MemoryJobStore 不会序列化作业。执行器中,只有ProcessPoolExecutor 将序列化作业。

启用调度器只需要调用调度器的 start() 方法,下面分别使用不同的作业存储器来举例说明:

方法一:使用默认的作业存储器:

#coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

def my_job(id='my_job'):
   print (id,'-->',datetime.datetime.now())

jobstores = {
   'default': MemoryJobStore()

}
executors = {
   'default': ThreadPoolExecutor(20),
   'processpool': ProcessPoolExecutor(10)
}
job_defaults = {
   'coalesce': False,
   'max_instances': 3
}

scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
scheduler.add_job(my_job, args=['job_interval',],id='job_interval',trigger='interval', seconds=5,replace_existing=True)
scheduler.add_job(my_job, args=['job_cron',],id='job_cron',trigger='cron',month='4-8,11-12',hour='7-11', second='*/10', end_date='2018-05-30')
scheduler.add_job(my_job, args=['job_once_now',],id='job_once_now')
scheduler.add_job(my_job, args=['job_date_once',],id='job_date_once',trigger='date',run_date='2018-04-05 07:48:05')
try:
   scheduler.start()
except SystemExit:
   print('exit')
   exit()

方法二:使用数据库作为存储器:

# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore


def my_job(id='my_job'):
    print(id, '-->', datetime.datetime.now())


jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(10)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}

scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)

scheduler.add_job(my_job, args=['job_interval', ], id='job_interval', trigger='interval', seconds=5,
                  replace_existing=True)
scheduler.add_job(my_job, args=['job_cron', ], id='job_cron', trigger='cron', month='4-8,11-12', hour='7-11',
                  second='*/10', \
                  end_date='2018-05-30')
scheduler.add_job(my_job, args=['job_once_now', ], id='job_once_now')
scheduler.add_job(my_job, args=['job_date_once', ], id='job_date_once', trigger='date', run_date='2018-04-05 07:48:05')
try:
    scheduler.start()
except SystemExit:
    print('exit')
    exit()

说明,在第 6 行、第 10 行代码修改为数据库作为作业存储器

提示我们有作业本应在 2018-04-05 07:48:05 运行的作业没有运行,因为现在的时间为 2018-04-05 08:06:34,错过了 0:18:28 的时间。
如果将上术代码第 21-25 行注释掉,重新运行本程序,则四种类型的作业仍会运行,结果如下:

作业仍会运行,说明作业被添加到数据库中,程序中断后重新运行时会自动从数据库读取作业信息,而不需要重新再添加到调度器中,如果不注释 21-25 行添加作业的代码,则作业会重新添加到数据库中,这样就有了两个同样的作业,避免出现这种情况可以在 add_job 的参数中增加 replace_existing=True,如

scheduler.add_job(my_job, args=['job_interval',],id='job_interval',trigger='interval',seconds=3,replace_existing=True)

如果我们想运行错过运行的作业,使用 misfire_grace_time,如

scheduler.add_job(my_job,args = ['job_cron',] ,id='job_cron',trigger='cron',month='4-8,11-12',hour='7-11',second='*/15',coalesce=True,misfire_grace_time=30,replace_existing=True,end_date='2018-05-30')

说明:misfire_grace_time,假如一个作业本来 08:00 有一次执行,但是由于某种原因没有被调度上,现在 08:01 了,这个 08:00 的运行实例被提交时,会检查它预订运行的时间和当下时间的差值(这里是1分钟),大于我们设置的 30 秒限制,那么这个运行实例不会被执行。最常见的情形是 scheduler 被 shutdown 后重启,某个任务会积攒了好几次没执行如 5 次,下次这个作业被提交给执行器时,执行 5 次。设置 coalesce=True 后,只会执行一次。
其他操作如下:

scheduler.remove_job(job_id,jobstore=None)#删除作业
scheduler.remove_all_jobs(jobstore=None)#删除所有作业
scheduler.pause_job(job_id,jobstore=None)#暂停作业
scheduler.resume_job(job_id,jobstore=None)#恢复作业
scheduler.modify_job(job_id, jobstore=None, **changes)#修改单个作业属性信息
scheduler.reschedule_job(job_id, jobstore=None, trigger=None,**trigger_args)#修改单个作业的触发器并更新下次运行时间
scheduler.print_jobs(jobstore=None, out=sys.stdout)#输出作业信息

调度器事件监听

scheduler 的基本应用,在前面已经介绍过了,但仔细思考一下:如果程序有异常抛出会影响整个调度任务吗?请看下面的代码,运行一下看看会发生什么情况:

# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime

def aps_test(x):
   print (1/0)
   print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)

scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, args=('定时任务',), trigger='cron', second='*/5')

scheduler.start()  

可能看出每 5 秒抛出一次报错信息。任何代码都可能抛出异常,关键是,发生导常事件,如何第一时间知道,这才是我们最关心的,apscheduler 已经为我们想到了这些,提供了事件监听来解决这一问题。
将上述代码稍做调整,加入日志记录和事件监听,如下所示。

   # coding:utf-8
   from apscheduler.schedulers.blocking import BlockingScheduler
   from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
   import datetime
   import logging

   logging.basicConfig(level=logging.INFO,
                       format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                       datefmt='%Y-%m-%d %H:%M:%S',
                       filename='log1.txt',
                       filemode='a')


   def aps_test(x):
       print (datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)


   def date_test(x):
       print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
       print (1/0)


   def my_listener(event):
       if event.exception:
           print ('任务出错了!!!!!!')
       else:
           print ('任务照常运行...')

   scheduler = BlockingScheduler()
   scheduler.add_job(func=date_test, args=('一次性任务,会出错',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=15), id='date_task')
   scheduler.add_job(func=aps_test, args=('循环任务',), trigger='interval', seconds=3, id='interval_task')
   scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
   scheduler._logger = logging

   scheduler.start()  

说明:
第 7-11 行配置日志记录信息,日志文件在当前路径,文件名为 “log1.txt”。
第 33 行启用 scheduler 模块的日记记录。
第 23-27 定义一个事件监听,出现意外情况打印相关信息报警。

在生产环境中,可以把出错信息换成发送一封邮件或者发送一个短信,这样定时任务出错就可以立马就知道。

转自http://https://blog.csdn.net/somezz/article/details/83104368

标签:Python,作业,调度,apscheduler,job,scheduler,import,定时
From: https://www.cnblogs.com/starkzz/p/17498078.html

相关文章

  • APScheduler+MySQL实现定时任务及其持久化存储
    APScheduler+MySQL实现定时任务及其持久化存储转自:APScheduler+MySQL实现定时任务及其持久化存储-简书(jianshu.com)最近,想要实现一个功能,就是添加定时任务的同时记录其任务信息到数据库中,可以通过接口查看设定了哪些定时任务,经过调研,发现大家公认的Python最好用的定时任务框......
  • python练习-爬虫
    场景:1、网址hppt://xxx.yyy.zzz.cn2、打开网页后显示:  3、填上姓名身份证和验证码,点击查询后,返回查询结果。 4、页面有cookie。方案一:程序中嵌入浏览器根据网址打开得到页面,然后程序读取记录自动填写数据,程序截取验证码图片,然后解析,并且填入验证码然后程序点击查......
  • python 将中文数字转换成阿拉伯数字
    日常遇到的中文数字主要有两种情况:1.“二零零一”这种类型,只包含[0-9]对应的十个中文字,需要转换成数字:2001。这种情况的转换十分简单。2.“三百二十一”这种类型,还有“十百千万亿……”等汉字,需要转换成数字:321,这种情况比较复杂。考虑:1.“四千三百二十一”,即万以内的数......
  • python通过字典生成随机城市
    先把省和城市存入一个字典中用到random模块中的choice方法,在列表中随机选一个元素。 importrandom省=random.choice(list(city.keys()))#把字典的键表转换成列表,再随机选一个市=random.choice(city[省])#指定省后,在城市列表中随机选一个print(省,市) ......
  • Python | import json模块详解
    json是Python内置的一个用于处理JSON数据的模块。JSON(JavaScriptObjectNotation)是一种轻量级的数据交换格式,常用于Web应用程序之间的数据传输。json模块提供了四个主要的方法:json.dumps()-将Python对象转换为JSON格式的字符串。json.loads()-将JSON格式的字符串转换为Py......
  • 数据分享|PYTHON用决策树分类预测糖尿病和可视化实例|附代码数据
    全文下载链接:http://tecdat.cn/?p=23848最近我们被客户要求撰写关于决策树的研究报告,包括一些图形和统计输出。在本文中,决策树是对例子进行分类的一种简单表示。它是一种有监督的机器学习技术,数据根据某个参数被连续分割。决策树分析可以帮助解决分类和回归问题决策树算法将数......
  • 蔬菜识别系统Python+TensorFlow+Django+卷积神经网络算法
    一、介绍蔬菜识别系统,使用Python作为主要开发语言,基于深度学习TensorFlow框架,搭建卷积神经网络算法。并通过对数据集进行训练,最后得到一个识别精度较高的模型。并基于Django框架,开发网页端操作平台,实现用户上传一张图片识别其名称。二、效果图片三、演示视频+代码视频+完整......
  • 鸟类识别系统Python+Django+TensorFlow+卷积神经网络算法【完整代码】
    一、介绍鸟类识别系统,使用Python作为主要开发语言,基于深度学习TensorFlow框架,搭建卷积神经网络算法。并通过对数据集进行训练,最后得到一个识别精度较高的模型。并基于Django框架,开发网页端操作平台,实现用户上传一张图片识别其名称。数据集选自加州理工学院200种鸟类数据集二、......
  • 蔬菜识别系统Python+TensorFlow+Django+卷积神经网络算法
    一、介绍蔬菜识别系统,使用Python作为主要开发语言,基于深度学习TensorFlow框架,搭建卷积神经网络算法。并通过对数据集进行训练,最后得到一个识别精度较高的模型。并基于Django框架,开发网页端操作平台,实现用户上传一张图片识别其名称。二、效果图片三、演示视频+代码视频+完整代码:http......
  • 手写数字识别系统Python+CNN卷积神经网络算法【完整代码】
    一、介绍手写数字识别系统,使用Python语言,基于TensorFlow搭建CNN卷积神经网络算法对数据集进行训练,最后得到模型,并基于FLask搭建网页端界面,基于Pyqt5搭建桌面端可视化界面。二、效果展示三、演示视频+完整代码视频+代码:https://www.yuque.com/ziwu/yygu3z/tb1mzqi847daqkru......