首页 > 其他分享 >fastapi项目 08-持久化APScheduler

fastapi项目 08-持久化APScheduler

时间:2024-01-14 18:45:53浏览次数:29  
标签:task apscheduler fastapi 08 APScheduler job scheduler import id

前言

在上一篇的中,我们写到可以根据APScheduler第三方库,创建定时任务,但是主程序直接创建完后,定时任务只是存在内存中,如果重启启动主程序,那么我们创建的任务就会消失,需要重新创建,这显然是不行的。我们需要的是不管程序是否启动,我们创建的任务都存在,而不会被删除。于是我们就引入了持久化APScheduler。

1. 持久化APScheduler

APScheduler 有四种组件,分别是:调度器(scheduler),作业存储(job store),触发器(trigger),执行器(executor)。

1.1 job stores 存储

job stores 支持四种任务存储方式

  • memory:默认配置任务存在内存中
  • mongdb:支持文档数据库存储
  • sqlalchemy:支持关系数据库存储
  • redis:支持键值对数据库存储

默认是存储在内存中,也就是重启服务后,就无法查看到之前添加的任务了。我们希望任务能保存到数据库,让任务一直都在,可以使用sqlalchemy保存到mysql数据库。
mysql 数据库持久化配置

1.1.1 配置调度器

scheduler = BackgroundScheduler({  
    'apscheduler.jobstores.default': {  
        'type': 'sqlalchemy',  
        'url': 'mysql+pymysql://username:password@127.0.0.1:3306/dbname?charset=utf8',  
        'tablename': 'task_job'  
    },  
    'apscheduler.executors.default': {  
        'class': 'apscheduler.executors.pool:ThreadPoolExecutor',  
        'max_workers': '20'  
    },  
    'apscheduler.executors.processpool': {  
        'type': 'processpool',  
        'max_workers': '10'  
    },  
    'apscheduler.job_defaults.coalesce': 'false',  
    'apscheduler.job_defaults.max_instances': '10',  
    'apscheduler.timezone': 'UTC',  
})

其中,apscheduler.jobstores.default字段表示存储器的配置,字段说明如下:

  • type表示数据库的类型,MySQL属于SQLAlchemy,因此这里值填写sqlalchemy

  • url 表示存储器访问呢的地址,其中的这些字段需要配置:

    • username为你的数据库的用户名
    • password为对应的账号的密码
    • dbname为数据库名称
  • tablename表示将定时任务数据存储的表名

然后就可以使用配置好的scheduler添加任务了 。

scheduler.add_job(task, 'interval', seconds=3, id=job_id, replace_existing=True)

2. 使用示例

  • 1.创建一个task.py文件, 实例化scheduler。
from apscheduler.schedulers.asyncio import AsyncIOScheduler  
import time   
  
scheduler = AsyncIOScheduler({  
    'apscheduler.jobstores.default': {  
        'type': 'sqlalchemy',  
        'url': 'mysql+pymysql://username:password@127.0.0.1:3306/dbname?charset=utf8',    # 数据库的基本连接配置信息
        'tablename': 'task_job'    # 数据库中创建的表明
    },  
    'apscheduler.executors.default': {  
        'class': 'apscheduler.executors.pool:ThreadPoolExecutor',  
        'max_workers': '20'  
    },  
    'apscheduler.executors.processpool': {  
        'type': 'processpool',  
        'max_workers': '10'  
    },  
    'apscheduler.job_defaults.coalesce': 'false',  
    'apscheduler.job_defaults.max_instances': '10',  
    'apscheduler.timezone': 'UTC',  
})
  • 2.view_task.py 写创建任务接口
import time  
from fastapi import APIRouter, Depends  
from xxx.task_w import scheduler  
  
router = APIRouter()  
  
  
  
async def task3(x):  
    print(f'task 3 executed  {x}--------', time.time())  
  
  
async def task4(y):  
    print(f'task 4 executed  {y}--------', time.time())  
  
  
@router.post("/start_task3")  
async def start_task3():  
    scheduler.add_job(task3, 'interval', args=["xxxx"], seconds=10)  
    return {"msg": "任务启动成功"}  
  
  
@router.post("/start_task4")  
async def start_task4():  
    scheduler.add_job(task4, 'interval', args=["yyy"], seconds=3)  
    return {"msg": "任务启动成功"}  
  
  
@router.get("/list")  
async def jobs():  
    jobs = await scheduler.get_jobs()  
    return [job.id for job in jobs]
    1. main.py主程序入口
from fastapi import FastAPI  
from .task import scheduler  
from .view_task import router  
  
app = FastAPI()  
  
  
@app.on_event('startup')  
async def init_scheduler():  
    """初始化"""  
    scheduler.start()  
  
  
app.include_router(router, tags=['我的任务'])  
  
if __name__ == "__main__":  
    import uvicorn  
    uvicorn.run(  
        app='main:app',  
        host="127.0.0.1",  
        port=8020, reload=True  
    )

于是就可以查看到,在数据库中,生成了task_job表

3. 结合fastapi项目

结合到fastapi项目中,我们就需要将对应的模块写在不同的文件中。创建apps/aps_scheduler.py写定时任务的配置文件。

# apps/aps_scheduler.py

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apps.setting import Settings

# 读取配置
db_info = Settings.db_setting()


# 为SQLAlchemy 定义数据库url地址
SQLALCHEMY_DATABASE_URI = f'mysql+pymysql://{db_info.data_user}:{db_info.password}@{db_info.host}:{db_info.port}/{db_info.database}'

scheduler = AsyncIOScheduler({
    'apscheduler.jobstores.default': {
        'type': 'sqlalchemy',
        'url': SQLALCHEMY_DATABASE_URI,
        'tablename': 'task_job'
    },
    'apscheduler.executors.default': {
        'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
        'max_workers': '20'
    },
    'apscheduler.executors.processpool': {
        'type': 'processpool',
        'max_workers': '10'
    },
    'apscheduler.job_defaults.coalesce': 'false',
    'apscheduler.job_defaults.max_instances': '10',
    'apscheduler.timezone': 'Asia/Shanghai',  # 设置时区
})

还需要将任务和定时任务接口分离,写在不同的文件,方便后期的维护和管理。同时我们将数据库等一些配置信息单独放一个地方,方便后面的修改这些配置信息,存放在apps/setting.py中。

# apps/setting.py

import os
import uvicorn
from fastapi import FastAPI
from pydantic import BaseSettings
import datetime
import pathlib

ROOT_DIR = pathlib.Path(__file__).parent.parent
# 配置allure报告服务器地址
ALLURE_REPORT = 'http://127.0.0.1/api'


class AuthSettings(BaseSettings):
    """ 配置 jwt token"""
    authjwt_secret_key: str = "secret"
    authjwt_token_location: set = {"headers", "cookies"}
    # 设置cookie有效期30天,如果不设置默认关闭浏览器cookie失效
    authjwt_cookie_max_age: int = 60*60*24*30
    # 使用Cookie时启用 / 禁用CSRF保护。默认为True
    authjwt_cookie_csrf_protect = False


class DBSettings(BaseSettings):
    """数据库配置"""
    host: str = '127.0.0.1'
    port: int = 3306 
    data_user: str = 'root'  # mac系统的话,这里不能使用user,否则会报错,bug
    password: str = '12345678'
    database: str = 'fastapi_v1'


class Settings(BaseSettings):
    # app_name: str = "Awesome API"
    # admin_email: str
    # items_per_user: int = 50

    # 配置 jwt token
    auth_setting = AuthSettings
    # token 设置30天过期
    auth_expires = datetime.timedelta(days=30)

    # 数据库配置
    db_setting = DBSettings

# apps/utils/tasks.py

"""存放定时任务"""

import time

async def task3(x):
    print(f'task 3 executed  {x}--------', time.time())

async def task4(y):
    print(f'task 4 executed  {y}--------', time.time())
# apps/routers/view_tasks.py

from fastapi import APIRouter
from apps.aps_scheduler import scheduler
from apps.utils.tasks import task3, task4

router = APIRouter()


@router.get("/start_task3")
async def start_task3():
    scheduler.add_job(task3, id='task3', trigger='date', run_date='2024-01-14 20:55:00', args=["aaa"])
    return {"msg": "任务启动成功"}


@router.get("/start_task4")
async def start_task4():
    scheduler.add_job(task4, id='task4',trigger='date', run_date='2024-01-14 20:55:00', args=["bbb"])
    return {"msg": "任务启动成功"}


@router.get("/task_list")
async def task_list():
    jobs = scheduler.get_jobs()  # 获取全部的jobs
    jobs_info = []
    for job in jobs:
        info = {}
        info['id'] = job.id
        info['next_run_time'] = job.next_run_time
        jobs_info.append(info)
    return jobs_info

@router.get("/task_pause")
async def task_pause(task_id: str):
    job = scheduler.get_job(task_id)
    if job:
        job.pause()
        return {"msg": "task id 已暂停"}
    else:
        return {"msg": "task id 不存在"}


@router.get("/task_delete")
async def task_del(task_id: str):
    job = scheduler.get_job(task_id)
    if job:
        job.remove()
        return {"msg": "task id 已删除"}
    else:
        return {"msg": "task id 不存在"}


@router.get("/task_resume")
async def task_resume(task_id: str):
    job = scheduler.get_job(task_id)
    if job:
        job.resume()
        return {"msg": "task id 已恢复"}
    else:
        return {"msg": "task id 不存在"}

编写完上面的模块后,首先需要去app中去将view_tasks.py路由进行注册,并且APScheduler定时任务需要在启动服务前进行调用。

def create_app():
    """工厂函数"""

......

    @app.on_event("startup")
        async def init_scheduler():
            """初始化, 在任务启动之前执行"""
            scheduler.start()
        app.include_router(view_tasks.router, prefix="/api/v1", tags=["定时任务"])
        return app

运行主函数,并进行注册,调用任务注册接口,查看数据库中任务是否写入成功。下次重启程序的时候,依然可以查看到上次定时的任务。
增加任务3和任务4。

标签:task,apscheduler,fastapi,08,APScheduler,job,scheduler,import,id
From: https://www.cnblogs.com/dack-zt-deng/p/17964019

相关文章

  • Libevent [补档-2023-08-29]
    libevent的使用8-1安装​自己百度一下,安装它不是特别难,加油!!!8-2libevent介绍​它是一个开源库,用于处理网络和定时器等等事件。它提供了跨平台的API,能够在不同的操作系统上实现高性能,可扩展的世界去的编程。​1.事件驱动:libevent使用事件驱动模型,通过监听事件的......
  • 08.抓包工具 Fiddler
    目录 工具介绍界面简介抓取HTTP请求抓取HTTPS请求抓取移动端请求查看接口信息AutoResponder断点弱网工具介绍 官网:https://www.telerik.com/fiddler/fiddler-classicFiddler是位于客户端和服务器端的HTTP代理也是目前最常用的HTTP抓包工具之......
  • [ 20230308 CQYC省选模拟赛 T2 ] 塑料内存条
    题意给定\(n\)个不可重集,初始每个集合\(i\)有元素\(c_i\)。请你以下\(3\)种操作:1xy在集合\(x\)插入\(y\)。2xy将\(y\)集合所有数插入\(x\),并删除\(y\)集合(不影响别的集合的下标)3xy求\(x\)集合与\(y\)集合的交之和。Sol可塑性记忆。注意到前......
  • 08-配置管理:Kubernete 管理业务配置方式有哪些?配置管理:Kubernete 管理业务配置方式有
    通过前面几节课的学习,我们已经对Kubernetes中的Pod以及一些业务负载有所了解。你可以根据课程中提供的示例,自己动手尝试在集群中实践起来。在使用过程中,我们常常需要对Pod进行一些配置管理,比如参数配置文件怎么使用,敏感数据怎么保存传递,等等。有些人可能会觉得,为什么不把这......
  • 考研408之C语言基础学习记录
    考研408之C语言基础学习记录汇总前言这篇文章是我决定考研后写下的C语言基础学习记录,因为在此之前我有过Java项目开发的相关经验,对基础语言的学习也有一些心得,所以学习C语言时也只是快速过一下语法重点,并进行记录总结。这里只是第一次学习C语言基础进行的知识总结,未涉及到刷题......
  • Flask-apscheduler获取或修改上下文中config数据
    __init__.py 1fromflask_apschedulerimportAPScheduler2...34scheduler=APScheduler()567fromappimportapp891011121314defcreate_app(config_name):15app=Flask(__name__)1617CORS(app,resources=r'/*......
  • P1085题解
    思路1.定义校内时间/校外时间/最大值(记录不高兴值)/记录星期intn,m,maxx=-1,tmp;2.使用循环输入并判断for(inti=1;i<=7;i++){//循环一周的日期cin>>n>>m;if(n+m>8&&maxx<n+m){//如果津津不高兴了且它比以往的值都大maxx=n+m;//更改最大值tmp=i;......
  • 《PySpark大数据分析实战》-08.宽窄依赖和阶段划分
    ......
  • class086 动态规划中得到具体决策方案的技巧【算法】
    class086动态规划中得到具体决策方案的技巧【算法】算法讲解086【必备】动态规划中得到具体决策方案的技巧code1最长公共子序列//最长公共子序列其中一个结果//给定两个字符串str1和str2//输出两个字符串的最长公共子序列//如果最长公共子序列为空,则输出-1//测试链接:......
  • 人工智能_机器学习081_聚类评价指标_轮廓系数_公式理解---人工智能工作笔记0121
    然后我们再来看,那么对于数据来说,我们分成几类比较合适呢,怎么衡量呢?我们之前做分类的时候,用的是准确率对吧,然后做回归问题的时候,用均方误差.而我们在Kmeans衡量分成几类比较好用的函数是,轮廓系数对吧,可以看到上面是轮廓系数的公式可以看到,他有两个点 可以看到公式中的a......