一、基本方法,用python得知 cron表达式
"""计算定时任务下次运行时间
sched str: 定时任务时间表达式
timeFormat str: 格式为"%Y-%m-%d %H:%M"
queryTimes int: 查询下次运行次数
"""
try:
now = datetime.datetime.now()
except ValueError:
raise
else:
# 以当前时间为基准开始计算
cron = croniter.croniter(sched, now)
return [cron.get_next(datetime.datetime).strftime(timeFormat) for i in range(queryTimes)]
import croniter, datetime def CronRunNextTimelist(sched, frequency=1): now = datetime.datetime.now() cron = croniter.croniter(sched, now) print("cron",cron) return [cron.get_next(datetime.datetime).strftime("%Y-%m-%d %H:%M:%S") for i in range(frequency)] sched = "1 2 3 * *" # sched = "0 8 * * *" # sched = "0 0-1 * * *" # sched = "0 1-2 0/1 1/1 * *" # sched = "5 19 * * WED,THU,FRI 44" print(CronRunNextTimelist(sched, frequency=3))
二、python 对于cron表达式支持不是太好,不支持秒级 , ?在croniter上面封装一层支持标准
from croniter import croniter from datetime import datetime, timedelta week_day_dict = { 1: 7, 2: 1, 3: 2, 4: 3, 5: 4, 6: 5, 7: 6 } class MyCroniter(croniter): def __init__(self, expr_format, start_time=None, ret_type=float, day_or=True): # 秒 分 时 日 月 星期 年 expr_format = expr_format.strip() expr_format = self.get_year(expr_format) expr_format = self.get_second(expr_format) expr_format = self.get_week(expr_format) # 如果以?结尾则设置day_or = false 不匹配星期 if expr_format.endswith('?'): expr_format = expr_format.replace("?", "*") day_or = False else: # 如果?不在结尾则说明是日期为? if "?" in expr_format: expr_format = expr_format.replace("?", "*") self.next_date = None self.second_index = 0 super(MyCroniter, self).__init__(" ".join(expr_format.split(" ")[1:]), datetime.now(), ret_type, day_or) def get_next_second(self, ret_type=None): # 如果秒数为*,则每次next加一秒,直到59秒调用self._get_next # 否则直接在秒位加上 应该的秒数 try: if len(self.second) == 1 and self.second[0].isdigit(): self.next_date = self._get_next(ret_type or self._ret_type, is_prev=False) self.next_date += + timedelta(seconds=int(self.second[0])) else: if not self.next_date: self.next_date = self._get_next(ret_type or self._ret_type, is_prev=False) if "all" in self.second: if self.next_date.strftime("%S") == '59': self.next_date = self._get_next(ret_type or self._ret_type, is_prev=False) else: self.next_date = self.next_date + timedelta(seconds=1) else: if int(self.next_date.strftime("%S")) >= int(self.second[-1]): self.next_date = self._get_next(ret_type or self._ret_type, is_prev=False) self.second_index = 0 # self.next_date = self.next_date + timedelta(seconds=int(self.second[self.second_index])) # else: if self.second_index == 0: self.next_date = self.next_date + timedelta(seconds=int(self.second[self.second_index])) else: self.next_date = self.next_date - timedelta(seconds=int(self.second[self.second_index - 1])) self.next_date = self.next_date + timedelta(seconds=int(self.second[self.second_index])) self.second_index += 1 except Exception as e: print("格式错误 : {}".format(e)) def get_year(self, expr_format): if len(expr_format.split(" ")) == 6: # 如果缺少 年 就相当于年为* self.year = ["all"] elif expr_format.endswith('*'): # 任意年都可以 expr_format = " ".join(expr_format.split(" ")[:-1]) self.year = ["all"] else: self.year = expr_format.split(" ")[-1] expr_format = " ".join(expr_format.split(" ")[:-1]) if "-" in self.year: temp_year = self.year.split("-") # 例如 "2019-2021" 》》 [2019 ,2020, 2021] self.year = list() for year in range(int(temp_year[0]), int(temp_year[1]) + 1): self.year.append(str(year)) elif "," in self.year: # 例如 "2019,2020 " 》》 [2019 ,2029] self.year = self.year.split(",") elif "/" in self.year: temp_year = self.year.split("/") # 例如 "2019/3 " 》》 [2019 ,2021,2024,...] self.year = list() for year in range(int(temp_year[0]), 2100, int(temp_year[1])): self.year.append(str(year)) else: self.year = [self.year] return expr_format def get_second(self, expr_format): if expr_format.startswith('*'): self.second = ["all"] else: self.second = expr_format.split(" ")[0] if "-" in self.second: temp_second = self.second.split("-") self.second = list() for second in range(int(temp_second[0]), int(temp_second[1]) + 1): self.second.append(str(second)) elif "," in self.second: self.second = self.second.split(",") elif "/" in self.second: temp_second = self.second.split("/") self.second = list() for second in range(int(temp_second[0]), 59, int(temp_second[1])): self.second.append(str(second)) else: self.second = [self.second] return expr_format def get_week(self, expr_format): global week_day_dict week = expr_format.split(" ")[-1] result = "" for item in week: if item.isdigit(): result += str(week_day_dict[int(item)]) else: result += item temp_list = expr_format.split(" ") temp_list[-1] = result return " ".join(temp_list) def get_next(self, ret_type=None): self.get_next_second(ret_type) if "all" in self.year: return self.next_date else: while True: if self.next_date.strftime("%Y") in self.year: return self.next_date else: if self.next_date.strftime("%Y") > self.year[-1]: return None else: self.get_next_second(ret_type) iter = None CRON = "" def cron_to_date(cron): # 0 0 12 ? * 3 : 秒 分 时 日 月 星期 # tmp = cron.replace('?', '*') global iter global CRON if not iter or CRON != cron: iter = MyCroniter(cron) CRON = cron next_time = iter.get_next(datetime) if next_time: return next_time else: return '' if __name__ == '__main__': # for i in range(100): # # if i >50: # print(cron_to_date("0/6 * * * * ?")) # else: # print(cron_to_date("0/5 * * * * ?")) # print(cron_to_date("0 59 17 6 * ?")) for i in range(100): # print(cron_to_date("56 0/2 20 6 * ?")) print(cron_to_date("0 0 0-1 * * ?")) # print(cron_to_date("0/5 * * * * ?"))
标签:format,python,expr,self,next,cron,second,秒级,year From: https://www.cnblogs.com/zhaoruixiao/p/17028087.html