起源于一个比较奇葩的需求,默认的celery无法实现:
需要用户输入一个开始时间,结束时间,以及时间间隔,需要在该时间段内指定间隔执行
import datetime def task(start: datetime.datetime, end: datetime.datetime, interval: datetime.timedelta): """ * * * * * """ time_list = [] result_dict = {} today = datetime.datetime.now() current = start while current < end: current_time = current.time() time_list.append(current_time) current += interval for i in time_list: if i.hour not in result_dict: result_dict[i.hour] = [] result_dict[i.hour].append(i) for k, v in result_dict.items(): m_list = [] for j in v: m_list.append(str(j.minute)) print(f"{','.join(m_list)} {k} {today.day} {today.month} *") if __name__ == '__main__': time2 = "15:16:08" time4 = "18:16:08" start_time = datetime.datetime.strptime(time2, "%H:%M:%S") end_time = datetime.datetime.strptime(time4, "%H:%M:%S") task(start_time, end_time, datetime.timedelta(seconds=420))
标签:python,list,crontab,datetime,current,dict,result,time,表达式 From: https://www.cnblogs.com/52-qq/p/17337844.html