前言
在上一篇的中,我们写到可以根据APScheduler第三方库,创建定时任务,但是主程序直接创建完后,定时任务只是存在内存中,如果重启启动主程序,那么我们创建的任务就会消失,需要重新创建,这显然是不行的。我们需要的是不管程序是否启动,我们创建的任务都存在,而不会被删除。于是我们就引入了持久化APScheduler。
1. 持久化APScheduler
APScheduler 有四种组件,分别是:调度器(scheduler),作业存储(job store),触发器(trigger),执行器(executor)。
1.1 job stores 存储
job stores 支持四种任务存储方式
- memory:默认配置任务存在内存中
- mongdb:支持文档数据库存储
- sqlalchemy:支持关系数据库存储
- redis:支持键值对数据库存储
默认是存储在内存中,也就是重启服务后,就无法查看到之前添加的任务了。我们希望任务能保存到数据库,让任务一直都在,可以使用sqlalchemy保存到mysql数据库。
mysql 数据库持久化配置
1.1.1 配置调度器
scheduler = BackgroundScheduler({
'apscheduler.jobstores.default': {
'type': 'sqlalchemy',
'url': 'mysql+pymysql://username:password@127.0.0.1:3306/dbname?charset=utf8',
'tablename': 'task_job'
},
'apscheduler.executors.default': {
'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
'max_workers': '20'
},
'apscheduler.executors.processpool': {
'type': 'processpool',
'max_workers': '10'
},
'apscheduler.job_defaults.coalesce': 'false',
'apscheduler.job_defaults.max_instances': '10',
'apscheduler.timezone': 'UTC',
})
其中,apscheduler.jobstores.default
字段表示存储器的配置,字段说明如下:
-
type
表示数据库的类型,MySQL
属于SQLAlchemy
,因此这里值填写sqlalchemy
-
url 表示存储器访问呢的地址,其中的这些字段需要配置:
username
为你的数据库的用户名password
为对应的账号的密码dbname
为数据库名称
-
tablename
表示将定时任务数据存储的表名
然后就可以使用配置好的scheduler
添加任务了 。
scheduler.add_job(task, 'interval', seconds=3, id=job_id, replace_existing=True)
2. 使用示例
- 1.创建一个task.py文件, 实例化scheduler。
from apscheduler.schedulers.asyncio import AsyncIOScheduler
import time
scheduler = AsyncIOScheduler({
'apscheduler.jobstores.default': {
'type': 'sqlalchemy',
'url': 'mysql+pymysql://username:password@127.0.0.1:3306/dbname?charset=utf8', # 数据库的基本连接配置信息
'tablename': 'task_job' # 数据库中创建的表明
},
'apscheduler.executors.default': {
'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
'max_workers': '20'
},
'apscheduler.executors.processpool': {
'type': 'processpool',
'max_workers': '10'
},
'apscheduler.job_defaults.coalesce': 'false',
'apscheduler.job_defaults.max_instances': '10',
'apscheduler.timezone': 'UTC',
})
- 2.view_task.py 写创建任务接口
import time
from fastapi import APIRouter, Depends
from xxx.task_w import scheduler
router = APIRouter()
async def task3(x):
print(f'task 3 executed {x}--------', time.time())
async def task4(y):
print(f'task 4 executed {y}--------', time.time())
@router.post("/start_task3")
async def start_task3():
scheduler.add_job(task3, 'interval', args=["xxxx"], seconds=10)
return {"msg": "任务启动成功"}
@router.post("/start_task4")
async def start_task4():
scheduler.add_job(task4, 'interval', args=["yyy"], seconds=3)
return {"msg": "任务启动成功"}
@router.get("/list")
async def jobs():
jobs = await scheduler.get_jobs()
return [job.id for job in jobs]
-
- main.py主程序入口
from fastapi import FastAPI
from .task import scheduler
from .view_task import router
app = FastAPI()
@app.on_event('startup')
async def init_scheduler():
"""初始化"""
scheduler.start()
app.include_router(router, tags=['我的任务'])
if __name__ == "__main__":
import uvicorn
uvicorn.run(
app='main:app',
host="127.0.0.1",
port=8020, reload=True
)
于是就可以查看到,在数据库中,生成了task_job表
3. 结合fastapi项目
结合到fastapi项目中,我们就需要将对应的模块写在不同的文件中。创建apps/aps_scheduler.py写定时任务的配置文件。
# apps/aps_scheduler.py
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apps.setting import Settings
# 读取配置
db_info = Settings.db_setting()
# 为SQLAlchemy 定义数据库url地址
SQLALCHEMY_DATABASE_URI = f'mysql+pymysql://{db_info.data_user}:{db_info.password}@{db_info.host}:{db_info.port}/{db_info.database}'
scheduler = AsyncIOScheduler({
'apscheduler.jobstores.default': {
'type': 'sqlalchemy',
'url': SQLALCHEMY_DATABASE_URI,
'tablename': 'task_job'
},
'apscheduler.executors.default': {
'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
'max_workers': '20'
},
'apscheduler.executors.processpool': {
'type': 'processpool',
'max_workers': '10'
},
'apscheduler.job_defaults.coalesce': 'false',
'apscheduler.job_defaults.max_instances': '10',
'apscheduler.timezone': 'Asia/Shanghai', # 设置时区
})
还需要将任务和定时任务接口分离,写在不同的文件,方便后期的维护和管理。同时我们将数据库等一些配置信息单独放一个地方,方便后面的修改这些配置信息,存放在apps/setting.py中。
# apps/setting.py
import os
import uvicorn
from fastapi import FastAPI
from pydantic import BaseSettings
import datetime
import pathlib
ROOT_DIR = pathlib.Path(__file__).parent.parent
# 配置allure报告服务器地址
ALLURE_REPORT = 'http://127.0.0.1/api'
class AuthSettings(BaseSettings):
""" 配置 jwt token"""
authjwt_secret_key: str = "secret"
authjwt_token_location: set = {"headers", "cookies"}
# 设置cookie有效期30天,如果不设置默认关闭浏览器cookie失效
authjwt_cookie_max_age: int = 60*60*24*30
# 使用Cookie时启用 / 禁用CSRF保护。默认为True
authjwt_cookie_csrf_protect = False
class DBSettings(BaseSettings):
"""数据库配置"""
host: str = '127.0.0.1'
port: int = 3306
data_user: str = 'root' # mac系统的话,这里不能使用user,否则会报错,bug
password: str = '12345678'
database: str = 'fastapi_v1'
class Settings(BaseSettings):
# app_name: str = "Awesome API"
# admin_email: str
# items_per_user: int = 50
# 配置 jwt token
auth_setting = AuthSettings
# token 设置30天过期
auth_expires = datetime.timedelta(days=30)
# 数据库配置
db_setting = DBSettings
# apps/utils/tasks.py
"""存放定时任务"""
import time
async def task3(x):
print(f'task 3 executed {x}--------', time.time())
async def task4(y):
print(f'task 4 executed {y}--------', time.time())
# apps/routers/view_tasks.py
from fastapi import APIRouter
from apps.aps_scheduler import scheduler
from apps.utils.tasks import task3, task4
router = APIRouter()
@router.get("/start_task3")
async def start_task3():
scheduler.add_job(task3, id='task3', trigger='date', run_date='2024-01-14 20:55:00', args=["aaa"])
return {"msg": "任务启动成功"}
@router.get("/start_task4")
async def start_task4():
scheduler.add_job(task4, id='task4',trigger='date', run_date='2024-01-14 20:55:00', args=["bbb"])
return {"msg": "任务启动成功"}
@router.get("/task_list")
async def task_list():
jobs = scheduler.get_jobs() # 获取全部的jobs
jobs_info = []
for job in jobs:
info = {}
info['id'] = job.id
info['next_run_time'] = job.next_run_time
jobs_info.append(info)
return jobs_info
@router.get("/task_pause")
async def task_pause(task_id: str):
job = scheduler.get_job(task_id)
if job:
job.pause()
return {"msg": "task id 已暂停"}
else:
return {"msg": "task id 不存在"}
@router.get("/task_delete")
async def task_del(task_id: str):
job = scheduler.get_job(task_id)
if job:
job.remove()
return {"msg": "task id 已删除"}
else:
return {"msg": "task id 不存在"}
@router.get("/task_resume")
async def task_resume(task_id: str):
job = scheduler.get_job(task_id)
if job:
job.resume()
return {"msg": "task id 已恢复"}
else:
return {"msg": "task id 不存在"}
编写完上面的模块后,首先需要去app中去将view_tasks.py路由进行注册,并且APScheduler定时任务需要在启动服务前进行调用。
def create_app():
"""工厂函数"""
......
@app.on_event("startup")
async def init_scheduler():
"""初始化, 在任务启动之前执行"""
scheduler.start()
app.include_router(view_tasks.router, prefix="/api/v1", tags=["定时任务"])
return app
运行主函数,并进行注册,调用任务注册接口,查看数据库中任务是否写入成功。下次重启程序的时候,依然可以查看到上次定时的任务。
增加任务3和任务4。