首页 > 编程语言 >python apscheduler 定时任务的基本使用-8-线程执行器ThreadPoolExecutor

python apscheduler 定时任务的基本使用-8-线程执行器ThreadPoolExecutor

时间:2023-03-28 11:38:34浏览次数:50  
标签:执行器 name python job apscheduler 任务 线程 time import

python apscheduler 定时任务的基本使用-8-线程执行器ThreadPoolExecutor

1、线程执行器ThreadPoolExecutor

先说个人总结

  • 假设启动线程数为N,任务数为M,misfire_grace_time为F,则执行的逻辑是这样子的:

    • 先启动一个线程执行一个任务,如果还有任务,则再启动一个线程去执行,直到没有线程或者没有任务。
    • 当某个线程执行完成任务后,则会再去找是否还有任务,如果有,则判断超出任务执行的时间为多少秒,记为Q,如果Q大于F,则任务Miss,否则,继续执行该任务
    • 重复上一点,直到没有任务
    • 当下一轮任务到达时,如果上一轮任务还没结束,即没有空闲线程,则下一轮任务均skip。
    • 当下一轮任务到达时,如果上一轮任务有空闲线程,则顺序执行任务,任务还没结束的,直接skip,结束了的,直接就开始进行任务
  • 每次最多起N个线程去执行任务,如果任务数M大于线程数,且不设置misfire_grace_time参数,则每次执行任务必Miss,且个位为M-N

  • 执行的情况一般是一个线程一个任务,(除非任务很快结束,比如print一下就退出)当任务结束时,查看是否还有任务需要执行,有,就去执行,没有就结束。释放出线程。

  • 判断是否还有任务需要执行,主要看misfire_grace_time参数,根据该参数判断是否任务还在合理的执行时间

  • 当下一轮的触发器时间到了,如果上一轮任务还在执行,且没有空闲线程,则下一轮的任务均SKIP

  • 出现Miss情况过多,则该考虑提高misfire_grace_time参数,或设置更大的线程池

例子

(1)定义使用5个线程,任务数为4个

  • 由于任务数总是少于线程数,肯定不会出现miss任务的情况

  • 最多会启动5个线程来执行任务,如果任务执行过快,可能一个线程就执行完所有任务,如果任务执行过慢,则一个线程一个任务,执行完的线程又执行另一个任务

import datetime
import random
import threading
import time

from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.blocking import BlockingScheduler


def log(msg):
    t = threading.currentThread()
    name = t.name
    ident = t.ident
    print(f"[{ident}][{name}]{msg}")  # 打印线程号和线程名称


def my_job(name):
    time.sleep(random.randint(1, 5))
    log(f'我是{name}')


def main():
    log(f'我是主线程, 现在时间:{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}')
    job_stores = {
        'default': SQLAlchemyJobStore(url='sqlite:///sqlite.sqlite3'),
    }
    executors = {
        "default": ThreadPoolExecutor(5),
    }
    shanghai = 'Asia/Shanghai'
    scheduler = BlockingScheduler(jobstores=job_stores, executors=executors)

    for i in range(4):
        scheduler.add_job(my_job, trigger='interval', args=[f'任务{i + 1}'],
                          name=f'任务{i + 1}', seconds=10, timezone=shanghai,
                          id=f'任务{i + 1}', replace_existing=True)

    scheduler.start()
    # 使用了BlockingScheduler 则无法执行start后面的代码


if __name__ == '__main__':
    main()

运行

image-20230324163703930

(2)定义使用5个线程,任务数为8个

  • 由于任务数总是多余于线程数,且未设置misfire_grace_time参数,因此,每次都会有Miss任务

  • 最多会启动5个线程来执行任务,一个线程一个任务,共8个任务,因此每次都有3个任务Miss

import datetime
import random
import threading
import time

from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.blocking import BlockingScheduler


def log(msg):
    t = threading.currentThread()
    name = t.name
    ident = t.ident
    print(f"[{ident}][{name}]{msg}")  # 打印线程号和线程名称


def my_job(name):
    time.sleep(random.randint(1, 5))
    log(f'我是{name}')


def main():
    log(f'我是主线程, 现在时间:{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}')
    job_stores = {
        'default': SQLAlchemyJobStore(url='sqlite:///sqlite.sqlite3'),
    }
    executors = {
        "default": ThreadPoolExecutor(5),
    }
    shanghai = 'Asia/Shanghai'
    scheduler = BlockingScheduler(jobstores=job_stores, executors=executors)

    for i in range(8):
        scheduler.add_job(my_job, trigger='interval', args=[f'任务{i + 1}'],
                          name=f'任务{i + 1}', seconds=10, timezone=shanghai,
                          id=f'任务{i + 1}', replace_existing=True)

    scheduler.start()
    # 使用了BlockingScheduler 则无法执行start后面的代码


if __name__ == '__main__':
    main()

运行

image-20230324164025022

(3)定义使用3个线程,任务数为5个,设置misfire_grace_time=10

  • 由于任务数总是多余于线程数,则先启动3个线程,执行三个任务,当三个任务执行完成后,空出了3个线程,此时看到还有两个任务需要执行(因为设置了misfire_grace_time=10,且任务时间总是少于10秒,所以任务不会Miss,)所以任务都能执行完成
  • N个线程,M个任务数,M>N,misfire_grace_time=Q,每个任务执行时间总是小于Q,因此,不会出现Miss任务,除非M远远大于N,才会出现Miss任务
import datetime
import random
import threading
import time

from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.blocking import BlockingScheduler


def log(msg):
    t = threading.currentThread()
    name = t.name
    ident = t.ident
    print(f"[{ident}][{name}]{msg}")  # 打印线程号和线程名称


def my_job(name):
    log(f'我是{name} 现在时间:{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}')
    time.sleep(random.randint(1, 5))


def main():
    log(f'我是主线程, 现在时间:{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}')
    job_stores = {
        'default': SQLAlchemyJobStore(url='sqlite:///sqlite.sqlite3'),
    }
    executors = {
        "default": ThreadPoolExecutor(3),
    }
    shanghai = 'Asia/Shanghai'
    scheduler = BlockingScheduler(jobstores=job_stores, executors=executors)

    for i in range(5):
        scheduler.add_job(my_job, trigger='interval', args=[f'任务{i + 1}'],
                          name=f'任务{i + 1}', seconds=25, timezone=shanghai,
                          id=f'任务{i + 1}', replace_existing=True,misfire_grace_time=10)

    scheduler.start()
    # 使用了BlockingScheduler 则无法执行start后面的代码


"""

"""

if __name__ == '__main__':
    main()

(4)定义使用3个线程,任务数为5个,设置misfire_grace_time=10,但10小于任务执行时间

  • 首先启动3个线程执行任务,执行完三个任务后,空出的第一个线程去执行下一个任务,但发现此时已经超过任务执行时间12秒了,且12秒大于misfire_grace_time,因此,认为该任务错过了12秒。
  • 此时第一个线程又接着看下一个任务,看到任务类似于上一个,错过了12秒,且不在misfire_grace_time范围内,因此认为任务Miss
import datetime
import threading
import time

from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.blocking import BlockingScheduler


def log(msg):
    t = threading.currentThread()
    name = t.name
    ident = t.ident
    print(f"[{ident}][{name}]{msg}")  # 打印线程号和线程名称


def my_job(name):
    log(f'我是{name} 现在时间:{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}')
    time.sleep(12)


def main():
    log(f'我是主线程, 现在时间:{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}')
    job_stores = {
        'default': SQLAlchemyJobStore(url='sqlite:///sqlite.sqlite3'),
    }
    executors = {
        "default": ThreadPoolExecutor(3),
    }
    shanghai = 'Asia/Shanghai'
    scheduler = BlockingScheduler(jobstores=job_stores, executors=executors)

    for i in range(5):
        scheduler.add_job(my_job, trigger='interval', args=[f'任务{i + 1}'],
                          name=f'任务{i + 1}', seconds=20, timezone=shanghai,
                          id=f'任务{i + 1}', replace_existing=True, misfire_grace_time=10)

    scheduler.start()
    # 使用了BlockingScheduler 则无法执行start后面的代码


"""

"""

if __name__ == '__main__':
    main()

(5)定义使用3个线程,任务数为5个,设置misfire_grace_time=1,但1小于任务执行时间,且任务时间大于下一轮时间

  • 先启动3个线程跑3个任务,
  • 任务还没跑完,到下一轮了,下一轮任务看到上一轮还在跑,且max_instances=1(默认最多同时存在一个任务在跑),认为上一次任务还没结束,还在跑,因此,直接跳过该次所有的任务,不管上一次任务真实跑了没,都直接SKIP掉了
  • 对于上一轮任务,也有还没跑的,也直接跳过
  • 上一轮任务跑完了,空出了线程,去看上一轮任务还有需要跑的,看到了,且超过了40秒,远大于misfire_grace_time,则任务Miss
  • 以后每次都一样,skip一轮,且每次Miss两个任务
import datetime
import threading
import time

from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.blocking import BlockingScheduler


def log(msg):
    t = threading.currentThread()
    name = t.name
    ident = t.ident
    print(f"[{ident}][{name}]{msg}")  # 打印线程号和线程名称


def my_job(name):
    log(f'我是{name} 现在时间:{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}')
    time.sleep(40)


def main():
    log(f'我是主线程, 现在时间:{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}')
    job_stores = {
        'default': SQLAlchemyJobStore(url='sqlite:///sqlite.sqlite3'),
    }
    executors = {
        "default": ThreadPoolExecutor(3),
    }
    shanghai = 'Asia/Shanghai'
    scheduler = BlockingScheduler(jobstores=job_stores, executors=executors)

    for i in range(5):
        scheduler.add_job(my_job, trigger='interval', args=[f'任务{i + 1}'],
                          name=f'任务{i + 1}', seconds=30, timezone=shanghai,
                          id=f'任务{i + 1}', replace_existing=True, misfire_grace_time=2)

    scheduler.start()
    # 使用了BlockingScheduler 则无法执行start后面的代码


"""

"""

if __name__ == '__main__':
    main()

(6)定义使用3个线程,任务数为4个,设置misfire_grace_time=10

  • 开始3个线程去执行,线程1执行后,释放出来,继续执行任务4,且执行完后,也释放了线程1,因此线程1是空闲的
  • 第二轮任务来了,线程1空闲,执行任务1,执行完,执行任务2,任务2还在上一轮,则skip跳过,执行任务3,任务上也还在上一轮执行,则跳过任务3,执行任务4
  • 第三轮来了,现在都空闲,则类似上面两点,形成循环
import datetime
import threading
import time

from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.blocking import BlockingScheduler


def log(msg):
    t = threading.currentThread()
    name = t.name
    ident = t.ident
    print(f"[{ident}][{name}]{msg}")  # 打印线程号和线程名称


def my_job(name):
    log(f'我是{name} 现在时间:{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}')
    if name=="任务1" or name=="任务4":
        time.sleep(5)
    else:
        time.sleep(40)


def main():
    log(f'我是主线程, 现在时间:{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}')
    job_stores = {
        'default': SQLAlchemyJobStore(url='sqlite:///sqlite.sqlite3'),
    }
    executors = {
        "default": ThreadPoolExecutor(3),
    }
    shanghai = 'Asia/Shanghai'
    scheduler = BlockingScheduler(jobstores=job_stores, executors=executors)

    for i in range(4):
        scheduler.add_job(my_job, trigger='interval', args=[f'任务{i + 1}'],
                          name=f'任务{i + 1}', seconds=30, timezone=shanghai,
                          id=f'任务{i + 1}', replace_existing=True, misfire_grace_time=10)

    scheduler.start()
    # 使用了BlockingScheduler 则无法执行start后面的代码


"""
- 先启动3个线程跑3个任务,
- 任务还没跑完,到下一轮了,下一轮任务看到上一轮还在跑,且max_instances=1(默认最多同时存在一个任务在跑),认为上一次任务还没结束,还在跑,因此,直接跳过该次所有的任务,不管上一次任务真实跑了没,都直接SKIP掉了
- 对于上一轮任务,也有还没跑的,也直接跳过
- 上一轮任务跑完了,空出了线程,去看上一轮任务还有需要跑的,看到了,且超过了40秒,远大于misfire_grace_time,则任务Miss
- 以后每次都一样,skip一轮,且每次Miss两个任务
"""

if __name__ == '__main__':
    main()

运行

image-20230327193424908

github

标签:执行器,name,python,job,apscheduler,任务,线程,time,import
From: https://www.cnblogs.com/rainbow-tan/p/17264444.html

相关文章

  • Python高级特性-生成器
    前言生成器相比普通迭代器的实现,不会像普通迭代器生成完整集合返回,而是一边循环一边计算的机制,从而节省大量的空间。普通迭代器deftriangles(line):result=[]......
  • MicroPython All In One
    MicroPythonAllInOneMicroPythonisafullimplementationofthePython3programminglanguagethatrunsdirectlyonembeddedhardwarelikeRaspberryPiPico......
  • python apscheduler 定时任务的基本使用-5-添加任务
    pythonapscheduler定时任务的基本使用-5-添加任务1、添加定时任务可以随时随地添加任务,不论调度器是否启动。如果未启动时,添加了定时任务,则会在调度器启动时,正常执行该......
  • python一个带参数脚本调用另一个带参数的脚本 问题
    python  a.pymain:args=argget.parse_args(args=[])b.funa() pythonb.pydeffuna():args=argget.parse_args() 当两个py文件都需要arg参数的......
  • 练习04_Python常用库
    本周我们一起来了解一下:python库的导入数据分析常用python库:Numpy,SciPy,Matplotlib,pandas,StatsModels,scikit-learn,keraspython库的导入前两周我们练习......
  • 在python的setup.py中如何指定多个extra-index-url?
    [本文出自天外归云的博客园]以下内容来自chatGPT的回答(正解,已验证)问题:在pythonsetup.py中如何指定多个extra-index-url?在`setup.py`文件中,我们可以使用`setuptools`......
  • Python毕业设计推荐
    今天给大家推荐4个基于python的毕业设计/课程设计1.网上商城系统这是一个基于python+vue开发的商城网站,平台采用B/S结构,后端采用主流的Python语言进行开发,前端采用主流的......
  • Python八大数据类型内置方法(不是详细讲解)
    目录一、字符串str二、列表list三、字典dict四、元组tuple五、集合set首先Python八大数据类型之间可以转换,但是有时候不是通用即需要特定条件才可以转换,不过日常开发工作......
  • python-01
    一:python2和python3的区别:1.python2中没有默认编码格式,如果遇到中英文的内容需要做声明coding=utf-8,python3已经默认系统中有coding=utf-8的编码格式print语句在python2......
  • python-03
    1、if判断语句1.单分支 :如果 。。。。否则就。。。。 if  +条件判断   如果满足条件就执行语句1     语句1 else:           否则就执行语......