首页 > 其他分享 >django-apschedule定时任务异常停止

django-apschedule定时任务异常停止

时间:2023-12-03 11:08:07浏览次数:42  
标签:self py apschedule django job ._ 定时 local


背景

在django项目中使用django-apschedule来实现定时任务,使用的是BackgroundScheduler调度类,该调度的实现是通过后台线程的方式执行定时任务。其中任务都是持久化到数据库中的。

在项目的运行过程中,因为数据库的异常,导致定时任务线程异常终止,即使数据库后续恢复正常,但也不再继续执行。我多次尝试复现未果,在开启定时任务期间,手动将数据库连接断开,定时任务执行失败,然后再将数据库建立连接,定时任务竟然重新恢复了,这让我一时摸不着头脑。

具体的错误日志如下,通过分析,是update_job连接数据库异常,没有任何捕获机制,然后层层网上抛,最终导致线程停止,可以很肯定的是,绝对是因为数据库连接失败导致的定时任务失败,那为什么无法复现呢?

Traceback (most recent call last):
  File "/usr/local/python3/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/usr/local/python3/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/python3/lib/python3.7/site-packages/apscheduler/schedulers/blocking.py", line 32, in _main_loop
    wait_seconds = self._process_jobs()
  File "/usr/local/python3/lib/python3.7/site-packages/apscheduler/schedulers/base.py", line 1009, in _process_jobs
    jobstore.update_job(job)
  File "/usr/local/python3/lib/python3.7/site-packages/django_apscheduler/util.py", line 105, in func_wrapper
    result = func(*args, **kwargs)
  File "/usr/local/python3/lib/python3.7/site-packages/django_apscheduler/jobstores.py", line 249, in update_job
    with transaction.atomic():
  File "/usr/local/python3/lib/python3.7/site-packages/django/db/transaction.py", line 189, in __enter__
    if not connection.get_autocommit():
  File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/base/base.py", line 389, in get_autocommit
    self.ensure_connection()
  File "/usr/local/python3/lib/python3.7/site-packages/django/utils/asyncio.py", line 33, in inner
     return func(*args, **kwargs)
  File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/base/base.py", line 219, in ensure_connection
    self.connect()
  File "/usr/local/python3/lib/python3.7/site-packages/django/db/utils.py", line 90, in __exit__
    raise dj_exc_value.with_traceback(traceback) from exc_value
  File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/base/base.py", line 219, in ensure_connection
    self.connect()
  File "/usr/local/python3/lib/python3.7/site-packages/django/utils/asyncio.py", line 33, in inner
    return func(*args, **kwargs)
  File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/base/base.py", line 200, in connect
    self.connection = self.get_new_connection(conn_params)
  File "/usr/local/python3/lib/python3.7/site-packages/django/utils/asyncio.py", line 33, in inner
    return func(*args, **kwargs)
  File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/postgresql/base.py", line 187, in get_new_connection
    connection = Database.connect(**conn_params)
  File "/usr/local/python3/lib/python3.7/site-packages/psycopg2/__init__.py", line 122, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
django.db.utils.OperationalError: connection to server at "xxxx.postgresql.svc.cluster.local" (xx.xx.xx.xx), port xxxx failed: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.

源码分析原因

可以先看下BackgroundScheduler的实现方式,在start方法中创建了个子线程。

class BackgroundScheduler(BlockingScheduler):

    _thread = None

    def start(self, *args, **kwargs):
        if self._event is None or self._event.is_set():
            self._event = Event()

        BaseScheduler.start(self, *args, **kwargs)
        self._thread = Thread(target=self._main_loop, name='APScheduler')
        self._thread.daemon = self._daemon
        self._thread.start()

    def shutdown(self, *args, **kwargs):
        super(BackgroundScheduler, self).shutdown(*args, **kwargs)
        self._thread.join()
        del self._thread

其中_main_loopBlockingScheduler中实现,是一个死循环,执行_process_jobs方法

class BlockingScheduler(BaseScheduler):
    
    ...

    def _main_loop(self):
        wait_seconds = TIMEOUT_MAX
        while self.state != STATE_STOPPED:
            self._event.wait(wait_seconds)
            self._event.clear()
            wait_seconds = self._process_jobs()
    
    ...

再看_process_jobs中的内容,在BaseScheduler实现的,主要流程如下,先找到所有要执行的job,然后进行遍历运行并更新Job的状态。之前的错误日志,也就是这里的update_job抛出异常,而这里并没有捕获异常,最终层层往上抛,update_job -> _process_jobs -> _main_loop,最终线程异常终止。

def _process_jobs(self):
    for jobstore_alias, jobstore in six.iteritems(self._jobstores):
        try:
            due_jobs = jobstore.get_due_jobs(now)
        except Exception as e:
            ...
            continue

        ...
                
        for job in due_jobs:
      
            ...
            
            try:
                executor.submit_job(job, run_times)
            except BaseException:
                ...

            ...
            jobstore.update_job(job)

那为什么复现不了呢?这个是因为,关闭数据库连接时,程序不一定可以正好运行在update_job,可以看到前面的get_due_jobs进行了异常捕获,如果这里抛出数据库连接异常是可以捕获到的,然后跳过后面的操作,等待下一次定时任务的执行,如果还是失败,则再次等待,所以这里的异常不会抛到最上层导致线程停止。

但如果某个时机,上面连接数据库都成功了,到update_job这里异常抛出,则会导致整个线程停止,定时任务不再执行。

那如何解决该问题呢?

搭建demo

首先我们搭建一个demo出来,模拟复现该问题。

  1. 创建django项目
django-admin startproject apschedule_demo

python manage.py startapp demo

python manage.py makemigrations

python manage.py migrate
  1. 在settings.py中配置到好数据库信息
DATABASES = {
    "default": {
        "ENGINE": "django.db.backends.postgresql",
        "NAME": "apschedule_demo",
        "HOST": "xxxx",
        "PORT": 5432,
        "USER": "xxx",
        "PASSWORD": "xxx"
    }
}
  1. 根据django-apschedule官方提供的文档搭建demo

在settings.py中添加该APP

INSTALLED_APPS = (
    # ...
    "django_apscheduler",
)

创建目录demo/management/commands,并在其下面创建runapscheduler.py文件,代码内容如下:

import logging

from django.conf import settings

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from django.core.management.base import BaseCommand
from django_apscheduler.jobstores import DjangoJobStore

logger = logging.getLogger(__name__)


def my_job():
  # Your job processing logic here...
  print("job..")


class Command(BaseCommand):
  help = "Runs APScheduler."

  def handle(self, *args, **options):
    scheduler = BlockingScheduler(timezone=settings.TIME_ZONE)
    scheduler.add_jobstore(DjangoJobStore(), "default")

    scheduler.add_job(
      my_job,
      trigger=CronTrigger(second="*/3"),  # Every 3 seconds
      id="my_job",  # The `id` assigned to each job MUST be unique
      max_instances=1,
      replace_existing=True,
    )
    logger.info("Added job 'my_job'.")

    try:
      logger.info("Starting scheduler...")
      scheduler.start()

    # 因为上面是非阻塞开启定时任务,所以这里需要阻塞,不让主线程结束。
    while True:
            time.sleep(10)
    except KeyboardInterrupt:
      logger.info("Stopping scheduler...")
      scheduler.shutdown()
      logger.info("Scheduler shut down successfully!")

可以通过python manage.py runapscheduler执行上面的命令运行定时任务,该脚本创建了一个每3秒执行一次的任务。

  1. 复现

我们将断点打在jobstore.update_job(job)上,然后使用debug模式进行调试,当程序运行到断点上时,将数据库关闭,然后程序继续运行,则会报错,并抛出异常,线程停止了运行。至此,我们复现了该问题。

线程重启

我一开始想,我可以判断该线程是否异常,如果异常则将线程重启就好了

while True:
        if not scheduler._thread.is_alive():
            scheduler._thread.start()

        time.sleep(10)

但事与愿违,抛出了异常,异常信息如下:

RuntimeError: threads can only be started once

通过查看官方文档可以知道,线程的start方法只能调用一次。

listener

apschedule中提供了监听器机制,也就是在定时任务的成功、失败等状态都可以通过提前注册的listener方法来进行回调。但通过分析源码,其并不能捕获到定时任务线程的异常。

下面是简化了代码的listeners的原理流程:

  1. 外部通过add_listener方法注册回调方法
  2. 在定时任务线程主流程_process_jobs中发生的各个事件添加到events中
  3. 遍历events事件,然后通过与注册的回调方法mask进行匹配,匹配上则调用回调方法
class BaseScheduler:
    def __init__(...):
        self._listeners = []

    def add_listener(self, callback, mask=EVENT_ALL):
        self._listeners.append((callback, mask))

    def _process_jobs(self):

        events = []
        
        ...

        events.append(event)
 
        ...


        for event in events:
            self._dispatch_event(event)


    def _dispatch_event(self, event):
        for cb, mask in listeners:
            if event.code & mask:
                try:
                    cb(event)
                except BaseException:
                    self._logger.exception('Error notifying listener')

如果线程本身挂了,回调方法是不可执行的。

捕获线程中函数的异常

如果update_job抛出异常导致线程停止,那我捕获它的异常,然后再continue,等待下次定时任务运行再重试不就好了,但是这就需要改动源码,能不能改源码就尽量不改。所以这边我采用了继承BackgroundScheduler类,然后再重写_process_jobs方法来解决。

在重写的_process_jobs方法中,对父类的_process_jobs()进行异常的捕获,然后再不断的进行重试,这样即使update_job抛出异常了,也可以不断的进行尝试恢复,直至成功。

class DemoBackgroundScheduler(BackgroundScheduler):
    def _process_jobs(self):
        while True:
            try:
                return super()._process_jobs()
            except BaseException:
                time.sleep(5)

class Command(BaseCommand):
    help = "Runs APScheduler."

    def handle(self, *args, **options):
        scheduler = DemoBackgroundScheduler(timezone=settings.TIME_ZONE)
        ...

然后再次尝试复现该问题,可以发现在断开数据库后,它能够一直进行重试,线程没有停止,当数据库恢复运行后,job执行成功,不再抛出异常。

相关链接

欢迎关注,互相学习,共同进步~

我的个人博客 公众号:编程黑洞


标签:self,py,apschedule,django,job,._,定时,local
From: https://blog.51cto.com/zhengwenfeng/8665652

相关文章

  • Django补4
    过滤器写一个过滤器---》一堆内容---》经过过滤器后---》把关键词屏蔽#自定义过滤器{{变量|过滤器名字}}编写步骤1注册app2在某个app下:创建templatetags模块(模块名只能是templatetags)3在包下写一个py文件,随便命名4在py文件中:写入fromdjangoimporttemplateregister=t......
  • windows定时自动关机
    最近在测试pos机,要求下班前要自动关闭pos机,否则扣绩效,因此研究了一下windows的自动关机脚本。编写此文作为记录 步骤:1、在Windows10桌面,右键点击此电脑图标,在弹出菜单中选择“管理”菜单项。2、然后在打开的计算机管理窗口中,找到“任务计划程序”菜单项。3、接下来依次点击......
  • Golang学习笔记-定时任务
    指定具体时间执行packagemainimport( "fmt" "time")funcmain(){ //指定执行时间为2023-11-2900:00:00 executionTime:=time.Date(2023,time.November,29,0,0,0,0,time.UTC) //当前时间 now:=time.Now().UTC() //计算距离执行时间的持续时间 d......
  • 江科大STM32(3):定时器(4)定时器的编码器接口
    1.编码器接口简介EncoderInterface编码器接口编码器接口可接收增量(正交)编码器的信号,根据编码器旋转产生的正交信号脉冲,自动控制CNT自增或自减,从而指示编码器的位置、旋转方向和旋转速度每个高级定时器和通用定时器都拥有1个编码器接口两个输入引脚借用了输入捕获的通道1和......
  • 江科大STM32(3):定时器(3)定时器的输入捕获
    1.TIM输入捕获1.1基本概念IC(InputCapture)输入捕获输入捕获模式下,当通道输入引脚出现指定电平跳变时,当前CNT的值将被锁存到CCR中,可用于测量PWM波形的频率、占空比、脉冲间隔、电平持续时间等参数每个高级定时器和通用定时器都拥有4个输入捕获通道可配置为PWMI模式,同时测量频......
  • 微服务定时任务解决方案-分布式定时任务解决方案(讨论)
     市面上的要么集成项目里,要么单独一个项目。我说的虽然也要单独集成,但可以通过python,shell多种方式去做更轻量、灵活,而且难度为0,不需要任何文档。外部单机的定时任务,每个任务执行通过http去调用网关。失败了再调个短信通知的接口,最差就是单独加一个定时任务专用的网关 ......
  • SpringBoot集成hutool配置定时任务,支持crontab和quartz表达式
    1、pom.xml引入hutool<dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>修改version</version></dependency>2、Java文件packagecom.xxx.schedule;importcn.hutool.cron.CronUtil......
  • Django四件套在响应头中放入数据的方法
    HttpResponse方法一:returnHttpResponse('ok',headers={'xxx':'xxx'})方法二:obj=HttpResponse('ok')obj['yyy']='yyy'#像字典一样放入,最终会放在http的响应头中returnobj redirect无法直接传headers=传入数据方法:obj=......
  • Django学习(一) 之 环境搭建
    写在前面最近比较迷AI绘图,那就上个图吧,我感觉还挺好看的。可能会有人说,之前不一致分享的是flask吗,怎么突然改到django了?这个问题问得好,开发环境遇到了一些小困难!不过django,真的是很流行,一点都不过时,这您放心好了!不多说,直接看效果吧!环境搭建1、当前环境版本python==3.9.1......
  • django中实现事务的几种方式
    django中实现事务的几种方式https://zhuanlan.zhihu.com/p/622987268具体表现形式为:每次数据库操作(比如调用save()方法)会立即被提交到数据库中。但是如果你希望把连续的SQL操作包裹在一个事务里,就需要手动开启事务根据粒度不同,三种全局:全局,每次请求在一个事务中,粒度太大,事......