首页 > 其他分享 >FastApi定时任务发送钉钉消息

FastApi定时任务发送钉钉消息

时间:2022-10-22 20:36:04浏览次数:66  
标签:None df FastApi price repetitions 发送 time import 定时

fastapi是一个高性能的异步框架,实现 定时任务 需要 task.py 提供一个装饰器

# task.py
# 网上百度的,地址:https://blog.csdn.net/hekaiyou/article/details/125072249
import asyncio
from functools import wraps
from asyncio import ensure_future
from starlette.concurrency import run_in_threadpool
from typing import Any, Callable, Coroutine, Optional, Union

from public.log import logger

NoArgsNoReturnFuncT = Callable[[], None]
NoArgsNoReturnAsyncFuncT = Callable[[], Coroutine[Any, Any, None]]
NoArgsNoReturnDecorator = Callable[
    [Union[NoArgsNoReturnFuncT, NoArgsNoReturnAsyncFuncT]],
    NoArgsNoReturnAsyncFuncT
]


def repeat_task(
        *,
        seconds: float,
        wait_first: bool = False,
        raise_exceptions: bool = False,
        max_repetitions: Optional[int] = None,
) -> NoArgsNoReturnDecorator:
    """
    返回一个修饰器, 该修饰器修改函数, 使其在首次调用后定期重复执行.
    其装饰的函数不能接受任何参数并且不返回任何内容.
    参数:
        seconds: float
            等待重复执行的秒数
        wait_first: bool (默认 False)
            如果为 True, 该函数将在第一次调用前先等待一个周期.
        raise_exceptions: bool (默认 False)
            如果为 True, 该函数抛出的错误将被再次抛出到事件循环的异常处理程序.
        max_repetitions: Optional[int] (默认 None)
            该函数重复执行的最大次数, 如果为 None, 则该函数将永远重复.
    """

    def decorator(func: Union[NoArgsNoReturnAsyncFuncT, NoArgsNoReturnFuncT]) -> NoArgsNoReturnAsyncFuncT:
        """
        将修饰函数转换为自身重复且定期调用的版本.
        """
        is_coroutine = asyncio.iscoroutinefunction(func)
        had_run = False

        @wraps(func)
        async def wrapped() -> None:
            nonlocal had_run
            if had_run:
                return
            had_run = True
            repetitions = 0

            async def loop() -> None:
                nonlocal repetitions
                if wait_first:
                    await asyncio.sleep(seconds)
                while max_repetitions is None or repetitions < max_repetitions:
                    try:
                        if is_coroutine:
                            # 以协程方式执行
                            await func()  # type: ignore
                        else:
                            # 以线程方式执行
                            await run_in_threadpool(func)
                        repetitions += 1
                    except Exception as exc:
                        logger.error(f'执行重复任务异常: {exc}')
                        if raise_exceptions:
                            raise exc
                    await asyncio.sleep(seconds)

            ensure_future(loop())

        return wrapped

    return decorator
# task.py
# 网上百度的,地址:https://blog.csdn.net/hekaiyou/article/details/125072249
import asyncio
from functools import wraps
from asyncio import ensure_future
from starlette.concurrency import run_in_threadpool
from typing import Any, Callable, Coroutine, Optional, Union

from public.log import logger

NoArgsNoReturnFuncT = Callable[[], None]
NoArgsNoReturnAsyncFuncT = Callable[[], Coroutine[Any, Any, None]]
NoArgsNoReturnDecorator = Callable[
    [Union[NoArgsNoReturnFuncT, NoArgsNoReturnAsyncFuncT]],
    NoArgsNoReturnAsyncFuncT
]


def repeat_task(
        *,
        seconds: float,
        wait_first: bool = False,
        raise_exceptions: bool = False,
        max_repetitions: Optional[int] = None,
) -> NoArgsNoReturnDecorator:
    """
    返回一个修饰器, 该修饰器修改函数, 使其在首次调用后定期重复执行.
    其装饰的函数不能接受任何参数并且不返回任何内容.
    参数:
        seconds: float
            等待重复执行的秒数
        wait_first: bool (默认 False)
            如果为 True, 该函数将在第一次调用前先等待一个周期.
        raise_exceptions: bool (默认 False)
            如果为 True, 该函数抛出的错误将被再次抛出到事件循环的异常处理程序.
        max_repetitions: Optional[int] (默认 None)
            该函数重复执行的最大次数, 如果为 None, 则该函数将永远重复.
    """

    def decorator(func: Union[NoArgsNoReturnAsyncFuncT, NoArgsNoReturnFuncT]) -> NoArgsNoReturnAsyncFuncT:
        """
        将修饰函数转换为自身重复且定期调用的版本.
        """
        is_coroutine = asyncio.iscoroutinefunction(func)
        had_run = False

        @wraps(func)
        async def wrapped() -> None:
            nonlocal had_run
            if had_run:
                return
            had_run = True
            repetitions = 0

            async def loop() -> None:
                nonlocal repetitions
                if wait_first:
                    await asyncio.sleep(seconds)
                while max_repetitions is None or repetitions < max_repetitions:
                    try:
                        if is_coroutine:
                            # 以协程方式执行
                            await func()  # type: ignore
                        else:
                            # 以线程方式执行
                            await run_in_threadpool(func)
                        repetitions += 1
                    except Exception as exc:
                        logger.error(f'执行重复任务异常: {exc}')
                        if raise_exceptions:
                            raise exc
                    await asyncio.sleep(seconds)

            ensure_future(loop())

        return wrapped

    return decorator

main.py 

from fastapi import FastAPI

from tasks import repeat_task
from public.shares import shares

app = FastAPI()

@app.on_event('startup')
@repeat_task(seconds=60 * 5, wait_first=False)
def repeat_task_aggregate_request_records() -> None:
    # 发送钉钉消息
    shares()

shares.py

from datetime import datetime, date
import efinance as ef
from chinese_calendar import is_workday
import matplotlib.pyplot as plt
import time

from public.send_ding import send_ding
from public.log import logger, BASE_PATH, os


def shares():
    stock_code = "601069"
    year = date.today().year
    month = date.today().month
    day = date.today().day
    now_time = datetime.now()
    now_img = int(round(time.time() * 1000))
    weekday = date(year, month, day).strftime("%A")
    # 非法定节假日,周六、周日也需要去掉
    if (not is_workday(date(year, month, day)) or weekday in ["Saturday", "Sunday"]) and not make:
        logger.info(f"当前时间 {now_time} 休市日!!!")
        return
    start_time = datetime(year, month, day, 9, 15, 0)
    end_time = datetime(year, month, day, 15, 5, 0)
    am_time = datetime(year, month, day, 11, 35, 0)
    pm_time = datetime(year, month, day, 13, 00, 0)
    if (now_time < start_time or now_time > end_time or am_time < now_time < pm_time) and not make:
        logger.info(f"当前时间 {now_time} 未开盘!!!")
        return
    # 数据间隔时间为 1 分钟
    freq = 1
    # 获取最新一个交易日的分钟级别股票行情数据
    df = ef.stock.get_quote_history(stock_code, klt=freq)
    if df.empty:
        logger.info(f"当前时间 {now_time} 未获取到股票数据!!!")
        return
    # 绘制折线图
    logger.info(f"当前时间戳: {now_img}")
    plt.plot(df["开盘"].values, linewidth=1, color="red")
    plt.savefig(os.path.join(BASE_PATH, "media", f"Chart-{now_img}.jpg"), bbox_inches='tight')
    # 保存后需要清除图形,不然会重叠
    plt.clf()
    
    share_name = df["股票名称"].values[0]
    open_price = df["开盘"].values[0]
    new_price = df["收盘"].values[-1]
    new_time = df["日期"].values[-1]
    top_price = df["最高"].max()
    down_price = df["最低"].min()
    turnover = df["成交量"].sum()
    average = round(df["开盘"].mean(), 2)
    rise_and_fall = round(df["涨跌幅"].sum(), 2)
    rise_and_price = round(df["涨跌额"].sum(), 2)
    turnover_rate = round(df["换手率"].sum(), 2)
    # markdown 发送字体颜色
    rise_and_fall_color = "#FF0000" if rise_and_fall > 0 else "#00FF00"
    rise_and_price_color = "#FF0000" if rise_and_price > 0 else "#00FF00"
    new_price_color = "#FF0000" if new_price > open_price else "#00FF00"
    top_price_color = "#FF0000" if top_price > open_price else "#00FF00"
    down_price_color = "#FF0000" if down_price > open_price else "#00FF00"

    body = {
        "msgtype": "markdown",
        "markdown": {
            "title": share_name,
            "text": f"### {share_name}\n\n"
                    f"> **开盘价** <font>{open_price}</font> 元/股\n\n"
                    f"> **最高价** <font color={top_price_color}>{top_price}</font> 元/股\n\n"
                    f"> **最低价** <font color={down_price_color}>{down_price}</font> 元/股\n\n"
                    f"> **平均价** <font color=''>{average}</font> 元/股\n\n"
                    f"> **涨跌幅** <font color={rise_and_fall_color}>{rise_and_fall}</font> %\n\n"
                    f"> **涨跌额** <font color={rise_and_price_color}>{rise_and_price}</font> 元\n\n"
                    f"> **成交量** <font>{turnover}</font> 手\n\n"
                    f"> **换手率** <font>{turnover_rate}</font> %\n\n"
                    f"> **时间** <font>{new_time}</font>\n\n"
                    f"> **最新价** <font color={new_price_color}>{new_price}</font> 元/股\n\n"
                    f"> **状态** <font>开盘中</font> \n\n"
                    f"> **折线图:** ![screenshot](http://121.41.54.234/Chart-{now_img}.jpg) @15235514553\n\n"
        },
        "at": {
            "atMobiles": ["15235514553"],
            "isAtAll": False,
        }}
    send_ding(body)

send_ding.py

import hmac
import urllib.parse
import hashlib
import base64
import requests
import urllib3
import time
from public.log import logger

urllib3.disable_warnings()


def ding_sign():
    """
    发送钉钉消息加密
    :return:
    """
    timestamp = str(round(time.time() * 1000))
    secret = "xxx"
    secret_enc = secret.encode('utf-8')
    string_to_sign = '{}\n{}'.format(timestamp, secret)
    string_to_sign_enc = string_to_sign.encode('utf-8')
    hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
    sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
    return timestamp, sign


def send_ding(body: dict):
    """
    发送钉钉消息
    :param body
    :return:
    """
    headers = {"Content-Type": "application/json"}
    access_token = "xxx"
    timestamp, sign = ding_sign()

    res = requests.post(
        "https://oapi.dingtalk.com/robot/send?access_token={}&timestamp={}&sign={}".format(
            access_token, timestamp, sign), headers=headers, json=body, verify=False).json()
    if res["errcode"] == 0 and res["errmsg"] == "ok":
        logger.info("钉钉通知发送成功!info:{}".format(body["text"]["content"]))
    else:
        logger.error("钉钉通知发送失败!返回值:{}".format(res))

钉钉消息

 

标签:None,df,FastApi,price,repetitions,发送,time,import,定时
From: https://www.cnblogs.com/changqing8023/p/16817217.html

相关文章

  • 基于FastApi的微信公众号开发
    个人申请的订阅号,未认证,可用功能可在 微信公众号平台 - 接口权限 处查看使用代码开发,首先需要在 基础配置-服务器配置 中进行设置 填写服务器地址(URL)、Token和Enc......
  • 内核定时器以及应用
    1.内核定时器的作用当中断触发时,修改定时器时间间隔,进入定时器回调函数,待完成回调则恢复。 2.定时器嵌入其他数据结构   structtimer_listtimer,这个结构......
  • LY3207/LY3208可定时18秒36秒集成充电马达
    概述LY3208 是一款可充电的带使能控制的马达驱动芯片,集成了锂电池充电管理模块、马达驱动控制模块和保护模块,关机待机电流仅 5uA。LY3208 充电电流为 0.3A-1A 可调,最大......
  • 定时器、外部中断0,以及查询和中断的模板
    这里拿一个0-60秒表做案例://sbit定义四个数码管unsignedcharcount,miao;voidmain(){  TMOD=0X01;  //设置T0为工作方式1  TH0=0XEE;    TL0=0X00......
  • 让 Python 程序定时执行的 8 种姿势~
     八种用Python实现定时执行任务的方案,一定有你用得到的!_嗨学编程的博客-CSDN博客让Python程序定时执行的8种姿势~-文章详情(itpub.net)......
  • 关于sleep和定时器
    平时使用sleep多一些,如缓冲满了,等一会再送。while(缓冲满了){sleep(MS)};某个任务,20毫秒执行一次, while(TRUE){ 做任务(用了1毫秒),sleep(18,19毫秒)};几乎很少使用定时器;也感觉不出......
  • Python 发送邮件的几种情况
    本次记录一下Python发送邮件的几种情况:1、正常发送2、正文带图片3、正文带表格4、正文带附件 首先来看一下Python发送邮件使用到的模块##导入模块fromemail.mi......
  • 使用System.Net.Mail中的SMTP发送邮件
    使用简单邮件传输协议SMTP异步发送邮件想要实现SMTP发送邮件,你需要了解这些类SmtpClient:使用配置文件设置来初始化 SmtpClient类的新实例。它包含以下属性:Host:设置用......
  • java发送post请求传json数据
    importcom.alibaba.fastjson.JSON;importcom.alibaba.fastjson.JSONObject;importorg.apache.http.HttpResponse;importorg.apache.http.client.methods.HttpPost;......
  • python 连库定时生成excel文件并转成可执行文件
    importscheduleimportpandasaspdimportpymysql,xlwtfromdatetimeimportdatetimefromtimeimportstrftimedt=datetime.now()#print(dt)defexport_excel(dt):......