fastapi是一个高性能的异步框架,实现 定时任务 需要 task.py 提供一个装饰器
# task.py # 网上百度的,地址:https://blog.csdn.net/hekaiyou/article/details/125072249 import asyncio from functools import wraps from asyncio import ensure_future from starlette.concurrency import run_in_threadpool from typing import Any, Callable, Coroutine, Optional, Union from public.log import logger NoArgsNoReturnFuncT = Callable[[], None] NoArgsNoReturnAsyncFuncT = Callable[[], Coroutine[Any, Any, None]] NoArgsNoReturnDecorator = Callable[ [Union[NoArgsNoReturnFuncT, NoArgsNoReturnAsyncFuncT]], NoArgsNoReturnAsyncFuncT ] def repeat_task( *, seconds: float, wait_first: bool = False, raise_exceptions: bool = False, max_repetitions: Optional[int] = None, ) -> NoArgsNoReturnDecorator: """ 返回一个修饰器, 该修饰器修改函数, 使其在首次调用后定期重复执行. 其装饰的函数不能接受任何参数并且不返回任何内容. 参数: seconds: float 等待重复执行的秒数 wait_first: bool (默认 False) 如果为 True, 该函数将在第一次调用前先等待一个周期. raise_exceptions: bool (默认 False) 如果为 True, 该函数抛出的错误将被再次抛出到事件循环的异常处理程序. max_repetitions: Optional[int] (默认 None) 该函数重复执行的最大次数, 如果为 None, 则该函数将永远重复. """ def decorator(func: Union[NoArgsNoReturnAsyncFuncT, NoArgsNoReturnFuncT]) -> NoArgsNoReturnAsyncFuncT: """ 将修饰函数转换为自身重复且定期调用的版本. """ is_coroutine = asyncio.iscoroutinefunction(func) had_run = False @wraps(func) async def wrapped() -> None: nonlocal had_run if had_run: return had_run = True repetitions = 0 async def loop() -> None: nonlocal repetitions if wait_first: await asyncio.sleep(seconds) while max_repetitions is None or repetitions < max_repetitions: try: if is_coroutine: # 以协程方式执行 await func() # type: ignore else: # 以线程方式执行 await run_in_threadpool(func) repetitions += 1 except Exception as exc: logger.error(f'执行重复任务异常: {exc}') if raise_exceptions: raise exc await asyncio.sleep(seconds) ensure_future(loop()) return wrapped return decorator
# task.py # 网上百度的,地址:https://blog.csdn.net/hekaiyou/article/details/125072249 import asyncio from functools import wraps from asyncio import ensure_future from starlette.concurrency import run_in_threadpool from typing import Any, Callable, Coroutine, Optional, Union from public.log import logger NoArgsNoReturnFuncT = Callable[[], None] NoArgsNoReturnAsyncFuncT = Callable[[], Coroutine[Any, Any, None]] NoArgsNoReturnDecorator = Callable[ [Union[NoArgsNoReturnFuncT, NoArgsNoReturnAsyncFuncT]], NoArgsNoReturnAsyncFuncT ] def repeat_task( *, seconds: float, wait_first: bool = False, raise_exceptions: bool = False, max_repetitions: Optional[int] = None, ) -> NoArgsNoReturnDecorator: """ 返回一个修饰器, 该修饰器修改函数, 使其在首次调用后定期重复执行. 其装饰的函数不能接受任何参数并且不返回任何内容. 参数: seconds: float 等待重复执行的秒数 wait_first: bool (默认 False) 如果为 True, 该函数将在第一次调用前先等待一个周期. raise_exceptions: bool (默认 False) 如果为 True, 该函数抛出的错误将被再次抛出到事件循环的异常处理程序. max_repetitions: Optional[int] (默认 None) 该函数重复执行的最大次数, 如果为 None, 则该函数将永远重复. """ def decorator(func: Union[NoArgsNoReturnAsyncFuncT, NoArgsNoReturnFuncT]) -> NoArgsNoReturnAsyncFuncT: """ 将修饰函数转换为自身重复且定期调用的版本. """ is_coroutine = asyncio.iscoroutinefunction(func) had_run = False @wraps(func) async def wrapped() -> None: nonlocal had_run if had_run: return had_run = True repetitions = 0 async def loop() -> None: nonlocal repetitions if wait_first: await asyncio.sleep(seconds) while max_repetitions is None or repetitions < max_repetitions: try: if is_coroutine: # 以协程方式执行 await func() # type: ignore else: # 以线程方式执行 await run_in_threadpool(func) repetitions += 1 except Exception as exc: logger.error(f'执行重复任务异常: {exc}') if raise_exceptions: raise exc await asyncio.sleep(seconds) ensure_future(loop()) return wrapped return decorator
main.py
from fastapi import FastAPI from tasks import repeat_task from public.shares import shares app = FastAPI() @app.on_event('startup') @repeat_task(seconds=60 * 5, wait_first=False) def repeat_task_aggregate_request_records() -> None: # 发送钉钉消息 shares()
shares.py
from datetime import datetime, date import efinance as ef from chinese_calendar import is_workday import matplotlib.pyplot as plt import time from public.send_ding import send_ding from public.log import logger, BASE_PATH, os def shares(): stock_code = "601069" year = date.today().year month = date.today().month day = date.today().day now_time = datetime.now() now_img = int(round(time.time() * 1000)) weekday = date(year, month, day).strftime("%A") # 非法定节假日,周六、周日也需要去掉 if (not is_workday(date(year, month, day)) or weekday in ["Saturday", "Sunday"]) and not make: logger.info(f"当前时间 {now_time} 休市日!!!") return start_time = datetime(year, month, day, 9, 15, 0) end_time = datetime(year, month, day, 15, 5, 0) am_time = datetime(year, month, day, 11, 35, 0) pm_time = datetime(year, month, day, 13, 00, 0) if (now_time < start_time or now_time > end_time or am_time < now_time < pm_time) and not make: logger.info(f"当前时间 {now_time} 未开盘!!!") return # 数据间隔时间为 1 分钟 freq = 1 # 获取最新一个交易日的分钟级别股票行情数据 df = ef.stock.get_quote_history(stock_code, klt=freq) if df.empty: logger.info(f"当前时间 {now_time} 未获取到股票数据!!!") return # 绘制折线图 logger.info(f"当前时间戳: {now_img}") plt.plot(df["开盘"].values, linewidth=1, color="red") plt.savefig(os.path.join(BASE_PATH, "media", f"Chart-{now_img}.jpg"), bbox_inches='tight') # 保存后需要清除图形,不然会重叠 plt.clf() share_name = df["股票名称"].values[0] open_price = df["开盘"].values[0] new_price = df["收盘"].values[-1] new_time = df["日期"].values[-1] top_price = df["最高"].max() down_price = df["最低"].min() turnover = df["成交量"].sum() average = round(df["开盘"].mean(), 2) rise_and_fall = round(df["涨跌幅"].sum(), 2) rise_and_price = round(df["涨跌额"].sum(), 2) turnover_rate = round(df["换手率"].sum(), 2) # markdown 发送字体颜色 rise_and_fall_color = "#FF0000" if rise_and_fall > 0 else "#00FF00" rise_and_price_color = "#FF0000" if rise_and_price > 0 else "#00FF00" new_price_color = "#FF0000" if new_price > open_price else "#00FF00" top_price_color = "#FF0000" if top_price > open_price else "#00FF00" down_price_color = "#FF0000" if down_price > open_price else "#00FF00" body = { "msgtype": "markdown", "markdown": { "title": share_name, "text": f"### {share_name}\n\n" f"> **开盘价** <font>{open_price}</font> 元/股\n\n" f"> **最高价** <font color={top_price_color}>{top_price}</font> 元/股\n\n" f"> **最低价** <font color={down_price_color}>{down_price}</font> 元/股\n\n" f"> **平均价** <font color=''>{average}</font> 元/股\n\n" f"> **涨跌幅** <font color={rise_and_fall_color}>{rise_and_fall}</font> %\n\n" f"> **涨跌额** <font color={rise_and_price_color}>{rise_and_price}</font> 元\n\n" f"> **成交量** <font>{turnover}</font> 手\n\n" f"> **换手率** <font>{turnover_rate}</font> %\n\n" f"> **时间** <font>{new_time}</font>\n\n" f"> **最新价** <font color={new_price_color}>{new_price}</font> 元/股\n\n" f"> **状态** <font>开盘中</font> \n\n" f"> **折线图:** ![screenshot](http://121.41.54.234/Chart-{now_img}.jpg) @15235514553\n\n" }, "at": { "atMobiles": ["15235514553"], "isAtAll": False, }} send_ding(body)
send_ding.py
import hmac import urllib.parse import hashlib import base64 import requests import urllib3 import time from public.log import logger urllib3.disable_warnings() def ding_sign(): """ 发送钉钉消息加密 :return: """ timestamp = str(round(time.time() * 1000)) secret = "xxx" secret_enc = secret.encode('utf-8') string_to_sign = '{}\n{}'.format(timestamp, secret) string_to_sign_enc = string_to_sign.encode('utf-8') hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest() sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) return timestamp, sign def send_ding(body: dict): """ 发送钉钉消息 :param body :return: """ headers = {"Content-Type": "application/json"} access_token = "xxx" timestamp, sign = ding_sign() res = requests.post( "https://oapi.dingtalk.com/robot/send?access_token={}×tamp={}&sign={}".format( access_token, timestamp, sign), headers=headers, json=body, verify=False).json() if res["errcode"] == 0 and res["errmsg"] == "ok": logger.info("钉钉通知发送成功!info:{}".format(body["text"]["content"])) else: logger.error("钉钉通知发送失败!返回值:{}".format(res))
钉钉消息
标签:None,df,FastApi,price,repetitions,发送,time,import,定时 From: https://www.cnblogs.com/changqing8023/p/16817217.html