首页 > 其他分享 >APScheduler的基本使用

APScheduler的基本使用

时间:2024-05-22 18:10:13浏览次数:27  
标签:基本 interval APScheduler job scheduler 使用 print my BlockingScheduler

第一步:安装APScheduler

pip install apscheduler

第二步:配置APScheduler

# 导入模块
from apscheduler.schedulers.background import BackgroundScheduler

# 自定义定时启动的任务
def my_job():
    print("Hello World")

# 创建调度器实例
scheduler = BackgroundScheduler()

# 添加任务,指定任务执行的方式
scheduler.add_job(my_job, 'interval', hours=1)

# 启动调度器
scheduler.start()

add_job的详细配置案例

1.每隔一定时间运行job —— interval

from apscheduler.schedulers.blocking import BlockingScheduler

def my_interval_job():
    print("Running my interval job!")

scheduler = BlockingScheduler()

# 每隔2分钟运行一次作业
scheduler.add_job(my_interval_job, 'interval', minutes=2, id='my_interval_job')

scheduler.start()

2.根据Cron时间表运行job —— interval

from apscheduler.schedulers.blocking import BlockingScheduler

def my_cron_job():
    print("Running my cron job!")

scheduler = BlockingScheduler()

# 每天上午10:15运行作业
scheduler.add_job(my_cron_job, 'cron', hour=10, minute=15, id='my_cron_job')

scheduler.start()

3.限制同时运行的job数量 —— max_instances

from apscheduler.schedulers.blocking import BlockingScheduler

def my_job():
    print("Running my job!")

scheduler = BlockingScheduler()

# 同时只允许有3个实例运行
scheduler.add_job(my_job, 'interval', seconds=5, max_instances=3, id='my_job_with_limited_instances')

scheduler.start()

案例:指定时间段内间隔固定时间执行job

from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime

def my_complex_job():
    print("Running my complex job!")

scheduler = BlockingScheduler()

scheduler.add_job(
    my_complex_job,
    'cron',
    day_of_week='mon-fri',  # 周一至周五
    hour=10,
    minute=15,
    start_date='2024-06-01',  # 开始日期
    end_date='2024-12-31',    # 结束日期
    week='*/2',               # 每隔2周
    misfire_grace_time=3600,  # 1小时的宽限期
    coalesce=True,            # 合并错过的作业
    max_instances=2,          # 最多2个实例
    id='my_complex_cron_job'
)

scheduler.start()

查看job状态信息

# 获取所有作业
jobs = scheduler.get_jobs()
job = scheduler.get_job('my_job_id')  # 获取指定job信息
# 打印每个作业的状态
for job in jobs:
    print(f"Job ID: {job.id}")
    print(f"Name: {job.name}")
    print(f"Next Run Time: {job.next_run_time}")
    print(f"Trigger: {job.trigger}")
    print("-" * 20)

# 监听job执行状态
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR

def my_listener(event):
    if event.exception:
        print(f"The job crashed : {event.job_id}")
    else:
        print(f"The job worked : {event.job_id}")

scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

标签:基本,interval,APScheduler,job,scheduler,使用,print,my,BlockingScheduler
From: https://www.cnblogs.com/wellplayed/p/18206820

相关文章

  • 如何全程使用docker部署jeecg平台,无需安装开发环境(主要是如何使用Docker来进行Maven打
    在部署jeecg平台时,文档中即使通过docker部署,也需要安装开发环境编译一部分内容,本文记录使用docker替代安装环境的过程。使用docker的目的是在平台选型的过程中,不希望麻烦的安装环境,同时如果选型不满意,无需卸载环境就能恢复一个干净的系统。部署环境:UbuntuServer20.04docker,......
  • hidet使用rule based调度
    定义computation整体流程类似于tvm的计算描述定义输入、输出tensor,指定名称、数据类型和shapea=tensor_input('a',dtype='float32',shape=[10])b=tensor_input('b',dtype='float32',shape=[])b=tensor_input('data',dtype='float16&......
  • 使用本地MSSQL管理器链接本地/服务器上的MySql
    使用本地MSSQL管理器链接本地/服务器上的MySql步骤如下:第一步,安装ODBC数据源驱动打开控制面板》管理工具》找到ODBC数据源 第二步、添加数据源驱动程序打开ODBC数据源后点击添加》选择MySQL数据源的驱动程序,然后点击完成第三步、填写MySQL连接信息按照弹出的页面。填......
  • 经常出差用哪些办公软件记录工作?可多设备同步使用的便签笔记软件
    对于许多职场人士来说,出差已成为工作常态。在旅途中,如何高效处理工作,确保信息不遗漏,成为了一个不小的挑战。那么,对于经常需要移动办公的我们,哪款办公软件才是最佳选择呢?可多设备同步使用的便签笔记软件是哪款?答案就是——敬业签,一款强大而便捷的便签笔记软件。它的强大之处在于其......
  • Python使用Mypy进行静态变量审查
    python作为一个动态语言,可以对变量不进行类型定义就使用,这就导致随着项目的增大,我们很难回顾以前某些模块中每一个变量是什么类型(特别是在函数定义里,如果不进行类型检查,很可能对一个str类型进行了()调用等等),为了在编码时能够让编译器提醒我们,也防止在运行时出现问题,采用静态变......
  • PHP函数 三角函数的基本使用
    直角三角函数的定义:正弦(sin)等于对边比斜边;sinA=a/c;余弦(cos)等于邻边比斜边;cosA=b/c;正切(tan)等于对边比邻边;tanA=a/b;余切(cot)等于邻边比对边;cotA=b/a;<?phpheader('Content-Type:text/html;charset=utf-8');define('ROOT',$_SERVER['DOCUMENT_ROOT']);include......
  • PHP函数 explode和eval的使用
    <?phpheader('Content-Type:text/html;charset=utf-8');define('ROOT',$_SERVER['DOCUMENT_ROOT']);includeROOT.'/assets/php/head.php';/***explode()函数把字符串打散为数组。*eval()函数把字符串按照PHP代码来计算。**/$val=&#......
  • debug技巧之使用Arthes调试
    一、前言大家好啊,我是summo,今天给大家分享一下我平时是怎么调试代码的,不是权威也不是教学,就是简单分享一下,如果大家还有更好的调试方式也可以多多交流哦。前面我介绍了本地调试和远程调试,今天再加一种:利用Arthes进行调试。二、Arthes是什么?以下是Arthes官网原文:通常,本地开发......
  • identityserver4使用
    参考网站https://blog.csdn.net/weixin_43847283/article/details/125708613请求路径请求参数client_id:simple_clientclient_secret:simple_client_secretgrant_type:client_credentials源码https://pan.baidu.com/s/1aWXzzl2fJ9ZeqAdXQfRCBA提取码:r6ef......
  • .NET 8 使用官方OpenXml SDK,替换Word中的文字和图片
    安装好DocumentFormat.OpenXml后,准备好一个docx文件usingDocumentFormat.OpenXml.Drawing.Wordprocessing;usingDocumentFormat.OpenXml.Packaging;usingDocumentFormat.OpenXml.Wordprocessing;usingSystem.Text.RegularExpressions;usingA=DocumentFormat.OpenXm......