首页 > 编程语言 >python cron croniter优化封装标准-支持秒级 , ?

python cron croniter优化封装标准-支持秒级 , ?

时间:2023-01-05 17:00:37浏览次数:64  
标签:format python expr self next cron second 秒级 year

一、基本方法,用python得知 cron表达式
"""计算定时任务下次运行时间
sched str: 定时任务时间表达式
timeFormat str: 格式为"%Y-%m-%d %H:%M"
queryTimes int: 查询下次运行次数
"""
try:
now = datetime.datetime.now()
except ValueError:
raise
else:
# 以当前时间为基准开始计算
cron = croniter.croniter(sched, now)
return [cron.get_next(datetime.datetime).strftime(timeFormat) for i in range(queryTimes)]


import croniter, datetime

def CronRunNextTimelist(sched, frequency=1):
    now = datetime.datetime.now()
    cron = croniter.croniter(sched, now)
    print("cron",cron)
    return [cron.get_next(datetime.datetime).strftime("%Y-%m-%d %H:%M:%S") for i in range(frequency)]


sched = "1 2 3 * *"
# sched = "0 8 * * *"
# sched = "0 0-1 * * *"
# sched = "0 1-2 0/1 1/1 * *"
# sched = "5 19 * * WED,THU,FRI 44"
print(CronRunNextTimelist(sched, frequency=3))

二、python 对于cron表达式支持不是太好,不支持秒级 , ?在croniter上面封装一层支持标准

 

from croniter import croniter
from datetime import datetime, timedelta

week_day_dict = {
    1: 7,
    2: 1,
    3: 2,
    4: 3,
    5: 4,
    6: 5,
    7: 6
}


class MyCroniter(croniter):
    def __init__(self, expr_format, start_time=None, ret_type=float,
                 day_or=True):
        # 秒 分 时 日 月 星期 年
        expr_format = expr_format.strip()
        expr_format = self.get_year(expr_format)
        expr_format = self.get_second(expr_format)
        expr_format = self.get_week(expr_format)
        # 如果以?结尾则设置day_or = false 不匹配星期
        if expr_format.endswith('?'):
            expr_format = expr_format.replace("?", "*")
            day_or = False
        else:
            # 如果?不在结尾则说明是日期为?
            if "?" in expr_format:
                expr_format = expr_format.replace("?", "*")

        self.next_date = None
        self.second_index = 0
        super(MyCroniter, self).__init__(" ".join(expr_format.split(" ")[1:]), datetime.now(), ret_type, day_or)

    def get_next_second(self, ret_type=None):
        # 如果秒数为*,则每次next加一秒,直到59秒调用self._get_next
        # 否则直接在秒位加上 应该的秒数
        try:
            if len(self.second) == 1 and self.second[0].isdigit():
                self.next_date = self._get_next(ret_type or self._ret_type, is_prev=False)
                self.next_date += + timedelta(seconds=int(self.second[0]))
            else:
                if not self.next_date:
                    self.next_date = self._get_next(ret_type or self._ret_type, is_prev=False)
                if "all" in self.second:
                    if self.next_date.strftime("%S") == '59':
                        self.next_date = self._get_next(ret_type or self._ret_type, is_prev=False)
                    else:
                        self.next_date = self.next_date + timedelta(seconds=1)
                else:
                    if int(self.next_date.strftime("%S")) >= int(self.second[-1]):
                        self.next_date = self._get_next(ret_type or self._ret_type, is_prev=False)
                        self.second_index = 0
                        # self.next_date = self.next_date + timedelta(seconds=int(self.second[self.second_index]))
                    # else:
                    if self.second_index == 0:
                        self.next_date = self.next_date + timedelta(seconds=int(self.second[self.second_index]))
                    else:
                        self.next_date = self.next_date - timedelta(seconds=int(self.second[self.second_index - 1]))
                        self.next_date = self.next_date + timedelta(seconds=int(self.second[self.second_index]))
                    self.second_index += 1

        except Exception as e:
            print("格式错误 : {}".format(e))

    def get_year(self, expr_format):
        if len(expr_format.split(" ")) == 6:  # 如果缺少 年  就相当于年为*
            self.year = ["all"]
        elif expr_format.endswith('*'):  # 任意年都可以
            expr_format = " ".join(expr_format.split(" ")[:-1])
            self.year = ["all"]
        else:
            self.year = expr_format.split(" ")[-1]
            expr_format = " ".join(expr_format.split(" ")[:-1])
            if "-" in self.year:
                temp_year = self.year.split("-")  # 例如 "2019-2021" 》》 [2019 ,2020, 2021]
                self.year = list()
                for year in range(int(temp_year[0]), int(temp_year[1]) + 1):
                    self.year.append(str(year))
            elif "," in self.year:  # 例如 "2019,2020 " 》》 [2019 ,2029]
                self.year = self.year.split(",")
            elif "/" in self.year:
                temp_year = self.year.split("/")  # 例如 "2019/3 " 》》 [2019 ,2021,2024,...]
                self.year = list()
                for year in range(int(temp_year[0]), 2100, int(temp_year[1])):
                    self.year.append(str(year))
            else:
                self.year = [self.year]

        return expr_format

    def get_second(self, expr_format):
        if expr_format.startswith('*'):
            self.second = ["all"]
        else:
            self.second = expr_format.split(" ")[0]
            if "-" in self.second:
                temp_second = self.second.split("-")
                self.second = list()
                for second in range(int(temp_second[0]), int(temp_second[1]) + 1):
                    self.second.append(str(second))
            elif "," in self.second:
                self.second = self.second.split(",")
            elif "/" in self.second:
                temp_second = self.second.split("/")
                self.second = list()
                for second in range(int(temp_second[0]), 59, int(temp_second[1])):
                    self.second.append(str(second))
            else:
                self.second = [self.second]

        return expr_format

    def get_week(self, expr_format):
        global week_day_dict
        week = expr_format.split(" ")[-1]
        result = ""
        for item in week:
            if item.isdigit():
                result += str(week_day_dict[int(item)])
            else:
                result += item
        temp_list = expr_format.split(" ")
        temp_list[-1] = result
        return " ".join(temp_list)

    def get_next(self, ret_type=None):
        self.get_next_second(ret_type)
        if "all" in self.year:
            return self.next_date
        else:
            while True:
                if self.next_date.strftime("%Y") in self.year:
                    return self.next_date
                else:
                    if self.next_date.strftime("%Y") > self.year[-1]:
                        return None
                    else:
                        self.get_next_second(ret_type)


iter = None
CRON = ""


def cron_to_date(cron):
    # 0 0 12 ? * 3 : 秒 分 时 日 月 星期
    # tmp = cron.replace('?', '*')

    global iter
    global CRON
    if not iter or CRON != cron:
        iter = MyCroniter(cron)
        CRON = cron
    next_time = iter.get_next(datetime)
    if next_time:
        return next_time
    else:
        return ''


if __name__ == '__main__':

    # for i in range(100):
    #
    #     if i >50:
    #         print(cron_to_date("0/6 * * * * ?"))
    #     else:
    #         print(cron_to_date("0/5 * * * * ?"))

    # print(cron_to_date("0 59 17 6 * ?"))
    for i in range(100):
        # print(cron_to_date("56 0/2 20 6 * ?"))
        print(cron_to_date("0 0 0-1 * * ?"))
    # print(cron_to_date("0/5 * * * * ?"))

 

 

标签:format,python,expr,self,next,cron,second,秒级,year
From: https://www.cnblogs.com/zhaoruixiao/p/17028087.html

相关文章

  • 用Python实现Socket编程
    SocketServer端和SocketClient端传输解析服务端和客户端的关系如下所示:1.SocketServer绑定IP地址和端口,并开始监听端口Server=socket.socket()Server.bind(("127.......
  • 为什么Python中的re.compile()输入要用raw string(r'')?
    在re这一模块的官方文档的解释如下Regularexpressionsusethebackslashcharacter('')toindicatespecialformsortoallowspecialcharacterstobeusedwitho......
  • Python 函数
    一.函数的初识函数:以功能(完成一件事)为导向,登录,注册,len一个函数就是一个功能。随调随用函数的优势1.减少了代码的重复性。2.增强了代码的可读性 二.函数的结构和调用......
  • 用Python来做一个简单的学生管理系统(附源码)
    小学妹说要毕业了,学了一学期Python等于没学,现在要做毕设做不出来,让我帮帮她,晚上去她家吃夜宵。当时我心想,这不是分分钟的事情,还要去她家,男孩子晚上不要随便出门,要学会......
  • 基于OpenCV DNN模块给黑白老照片上色(附Python/C++源码)
    导读本文给大家分享一个用OpenCVDNN模块给黑白老照片上色的实例,并给出Python和C++版本源码。 背景介绍    这个项目是基于在加利福尼亚大学,伯克利,RichardZhang,Phil......
  • crontab 命令详情
    安装crontabyuminstallcrontabs查看定时任务crontab-l设置定时任务crontab-e查看crontab状态systemctlstatuscrond.service启动crontabsystemctl......
  • IPython快捷键操作和魔术命令
    快捷键操作tab补全ctrl-c中断运行的代码ctrl-l清空屏幕ctrl-u删除当前行所有文本ctrl-p用当前输入的文本搜索之前的命令ctrl-n用当前输入的文本搜索之后的命......
  • linux crontab 定时任务详解
    前言正如闹钟对于日常生活的重要性一样,linuxcrontab定时任务在开发中是必不可少的工具,诸如:每六个月清理一次日志,每天凌晨12.00重启服务等多种场景,都可以用crontab......
  • 用python得知 cron表达式-执行时间
    #!/user/bin/python3#-*-coding:utf-8-*-#@Author:zhaoruixiao#@Datetime:2023/1/514:30#@Software:PyCharm#@File:cron.py"""workfor$用python得......
  • 为什么 Python 中没有 main() 函数?
    在本文中,我们将学习为什么Python中没有main()函数。毫无疑问,Python没有所谓的main函数,但是,关于互联网经常引用“Python的主要功能”和“建议编写主要功能。"它们的目......