一、周期性任务
示例代码
from django.core.mail import send_mail from celery.task.base import periodic_task from celery.schedules import crontab from celery.exceptions import SoftTimeLimitExceeded # @periodic_task(run_every=crontab(minute=1, hour='0,7')) @periodic_task(run_every=crontab(minute='*/5')) # 设置任务的定时执行时间间隔 def celery_is_run(): try: # 在这里编写你的任务逻辑 pass except SoftTimeLimitExceeded as e: # 处理任务超时的异常 send_email_on_failure(e) except Exception as e: # 处理其他异常 send_email_on_failure(e) def send_email_on_failure(exception): # 构造邮件内容 subject = 'Celery任务失败' message = f'Celery任务执行失败:{str(exception)}' from_email = '[email protected]' recipient_list = ['[email protected]', '[email protected]'] # 发送邮件 send_mail(subject, message, from_email, recipient_list)View Code
二、
标签:task,send,celery,任务,import,email From: https://www.cnblogs.com/tslam/p/17958566