第一步:安装APScheduler
pip install apscheduler
第二步:配置APScheduler
# 导入模块
from apscheduler.schedulers.background import BackgroundScheduler
# 自定义定时启动的任务
def my_job():
print("Hello World")
# 创建调度器实例
scheduler = BackgroundScheduler()
# 添加任务,指定任务执行的方式
scheduler.add_job(my_job, 'interval', hours=1)
# 启动调度器
scheduler.start()
add_job的详细配置案例
1.每隔一定时间运行job —— interval
from apscheduler.schedulers.blocking import BlockingScheduler
def my_interval_job():
print("Running my interval job!")
scheduler = BlockingScheduler()
# 每隔2分钟运行一次作业
scheduler.add_job(my_interval_job, 'interval', minutes=2, id='my_interval_job')
scheduler.start()
2.根据Cron时间表运行job —— interval
from apscheduler.schedulers.blocking import BlockingScheduler
def my_cron_job():
print("Running my cron job!")
scheduler = BlockingScheduler()
# 每天上午10:15运行作业
scheduler.add_job(my_cron_job, 'cron', hour=10, minute=15, id='my_cron_job')
scheduler.start()
3.限制同时运行的job数量 —— max_instances
from apscheduler.schedulers.blocking import BlockingScheduler
def my_job():
print("Running my job!")
scheduler = BlockingScheduler()
# 同时只允许有3个实例运行
scheduler.add_job(my_job, 'interval', seconds=5, max_instances=3, id='my_job_with_limited_instances')
scheduler.start()
案例:指定时间段内间隔固定时间执行job
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
def my_complex_job():
print("Running my complex job!")
scheduler = BlockingScheduler()
scheduler.add_job(
my_complex_job,
'cron',
day_of_week='mon-fri', # 周一至周五
hour=10,
minute=15,
start_date='2024-06-01', # 开始日期
end_date='2024-12-31', # 结束日期
week='*/2', # 每隔2周
misfire_grace_time=3600, # 1小时的宽限期
coalesce=True, # 合并错过的作业
max_instances=2, # 最多2个实例
id='my_complex_cron_job'
)
scheduler.start()
查看job状态信息
# 获取所有作业
jobs = scheduler.get_jobs()
job = scheduler.get_job('my_job_id') # 获取指定job信息
# 打印每个作业的状态
for job in jobs:
print(f"Job ID: {job.id}")
print(f"Name: {job.name}")
print(f"Next Run Time: {job.next_run_time}")
print(f"Trigger: {job.trigger}")
print("-" * 20)
# 监听job执行状态
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
def my_listener(event):
if event.exception:
print(f"The job crashed : {event.job_id}")
else:
print(f"The job worked : {event.job_id}")
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
标签:基本,interval,APScheduler,job,scheduler,使用,print,my,BlockingScheduler
From: https://www.cnblogs.com/wellplayed/p/18206820