使用Python的apscheduler库中的BackgroundScheduler实现投递多线程任务的示例代码。这个示例将展示如何根据任务ID投递和停止任务,设置任务同时执行的上限,以及删除全部任务。
首先,确保你已经安装了apscheduler库:
``
pip install apscheduler
``
代码示例:
``
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.memory import MemoryJobStore
import time
# 定义任务函数
def my_task(task_id):
print(f"任务 {task_id} 正在执行")
time.sleep(5) # 模拟任务执行需要一些时间
print(f"任务 {task_id} 完成")
# 初始化调度器
jobstores = {
'default': MemoryJobStore()
}
executors = {
'default': ThreadPoolExecutor(2) # 设置任务同时执行上限为2
}
job_defaults = {
'coalesce': False, # 设置任务上限时不合并
'max_instances': 2 # 每个任务的最大实例数为2
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
scheduler.start()
# 根据任务ID添加任务
def add_task(task_id):
scheduler.add_job(my_task, 'interval', seconds=10, id=task_id, args=[task_id])
print(f"任务 {task_id} 已添加")
# 根据任务ID停止任务
def stop_task(task_id):
scheduler.remove_job(task_id)
print(f"任务 {task_id} 已停止")
# 删除全部任务
def remove_all_tasks():
scheduler.remove_all_jobs()
print("所有任务已删除")
# 示例使用
if __name__ == "__main__":
# 添加任务
add_task('task1')
add_task('task2')
# 等待一段时间,让任务执行几次
time.sleep(30)
# 停止任务
stop_task('task1')
# 等待一段时间,观察任务2继续执行
time.sleep(20)
# 删除所有任务
remove_all_tasks()
# 等待调度器关闭
scheduler.shutdown()
print("调度器已关闭")
``
注意事项
任务上限设置:通过设置executors中的ThreadPoolExecutor来限制同时执行的任务数量。这里设置为2。
任务合并(coalesce):设置job_defaults中的coalesce为False,表示当任务积压时不合并执行。
任务实例限制:设置job_defaults中的max_instances为2,表示每个任务的最大实例数为2。
任务ID:确保任务ID唯一,以便正确管理任务的启动和停止。
通过以上示例代码和解释,你可以了解如何使用BackgroundScheduler实现多线程任务的投递、停止和管理。