首页 > 系统相关 >Django+nginx+uwsgi +apscheduler任务重复执行问题

Django+nginx+uwsgi +apscheduler任务重复执行问题

时间:2024-04-01 15:01:16浏览次数:21  
标签:nginx lock get redis Django apscheduler job func date

Django+nginx+uwsgi +apscheduler任务重复执行问题

1、问题描述

    通过Django+nginx+uwsgi 部署项目,前端页面通过API接口动态添加、管理apscheduler定时任务。

    由于uwsgi启动多个worker,导致每个worker执行一次定时任务,导致任务的重复执行。

    如下图,因为uwsgi.ini 文件中processes=4,所以对应的启动了4个worker。

    同时需配置enable-threads=true来启动多线程)


2、apscheduler部署

    本次举例的django项目名称为:filemanage;app名称为:schedulerApp        

    此部分先描述apscheduler任务的添加方式,便于后续的解决方案描述。

2.1 API接口添加定时任务

其中的s_func_name为指定的定时任务待执行函数,位于schedulerApp.schefunc.py文件中

### schedulerApp.view.py
 
from filemanage.apscheduler_start import scheduler  #注意此处scheduler的来源
 
 
def add_job(request):
    ctx = {}
    job_id = ''
    s_params = {}
    try:
        # 获取参数
        s_name = request.GET.get('sname')  # 任务名称
        s_trigger = request.GET.get('s_trigger')  # 执行方式:date,interval,cron'
        s_func_name = request.GET.get('s_func_name')  # 任务执行的函数名称
        desc = request.GET.get('desc')  # 任务描述
 
        job_id = s_func_name + '_' + str(toolbox.get_timestamp())
 
        # 判断任务是否存在
        count = sche_manage.objects.filter(s_func_name=s_func_name, status__in=['Executed', 'Pause']).count()
        if count > 0:
            ctx['code'] = 'no'
            ctx['msg'] = '该函数已经被使用,请确认'
            return HttpResponse(json.dumps(ctx), content_type='application/json')
        else:
            # aFunc = getattr(func, "GetFuncByStr")  # 获取对象
            # funD = getattr(aFunc, s_func_name)  # 获取对象的属性值,以字符串调用对应的同名函数
            # print(funD)
            funD = ''
            # 创建任务-根据不同执行方式添加不同任务
            if s_trigger == 'date':  # 某个日期执行
                date_param = request.GET.get('date_param')
                s_params = {'date': date_param}
                scheduler.add_job(eval(s_func_name), 'date', run_date=date_param, id=job_id)
            elif s_trigger == 'interval':  # 指定间隔执行
                dd = request.GET.get('interval_param_day')
                if not dd:
                    dd = 0
                hh = request.GET.get('interval_param_hour')
                if not hh:
                    hh = 0
                mm = request.GET.get('interval_param_minute')
                if not mm:
                    mm = 0
                s_params = {'day': dd, 'hour': hh, 'minute': mm}
                scheduler.add_job(eval(s_func_name), 'interval', days=int(dd), hours=int(hh), minutes=int(mm),
                                      id=job_id)
            elif s_trigger == 'cron':  # 指定时间循环执行
                mm = request.GET.get('cron_param_minute')
                if not mm:
                    mm = None
                hh = request.GET.get('cron_param_hour')
                if not hh:
                    hh = None
                dd = request.GET.get('cron_param_day')
                if not dd:
                    dd = None
                mon = request.GET.get('cron_param_month')
                if not mon:
                    mon = None
                week = request.GET.get('cron_param_week')
                if not week:
                    week = None
                start_date = request.GET.get('cron_param_start_date')
                if not start_date:
                    start_date = None
                s_params = {'minute': mm, 'hour': hh, 'day': dd, 'month': mon, 'week': week, 'start_date': start_date}
                scheduler.add_job(eval(s_func_name), 'cron', minute=mm, hour=hh, day=dd, month=mon,
                                      day_of_week=week,
                                      start_date=start_date, replace_existing=True, id=job_id)
 
            # 保存任务信息到 sche_manage 表
            sche_manage.objects.create(sid=job_id, sname=s_name, desc=desc, s_func_name=s_func_name,
                                                         s_trigger=s_trigger, s_params=s_params, status='Executed')
 
            ctx['code'] = 'ok'
            ctx['msg'] = '创建成功'
 
    except Exception as e:
        log.info('添加任务异常,jobid:' + job_id + ',异常:' + str(e))
        ctx['code'] = 'no'
        ctx['msg'] = '创建失败'
        # 有错误就停止定时器
        scheduler.shutdown()
 
    return HttpResponse(json.dumps(ctx, cls=toolbox.DateEncoder), content_type='application/json')
### schedulerApp.schefunc.py
 
from alarmApp.models import alarm_obj, alarm_param_record, alarm_param
from publictool import toolbox, logtool, dbtool
from publictool.dataEnum import DataEnum
from publictool.msgtool import MsgApi
from alarmApp import getFuncByStr as func
import redis_lock
 
logger = logging.getLogger()
log = logtool.CommonLog(logger=logger, logname="scheduler")
 
 
def test_task():
    # conn redis链接对象; lock-key:写入redis的key
    log.info('task1-开始执行')
    conn = get_redis_connection('myredis')
    lock = redis_lock.Lock(conn, "lock-key")
    if lock.acquire(blocking=False):
        log.info("Got the lock1.")
        # 此处为上述动态添加接口中:定时执行func的逻辑
        log.info('task1-' + toolbox.get_now('%Y-%m-%d %H:%M:%S'))
        # 最后需要手动释放掉锁
        lock.release()
    else:
        log.info("Someone else has the lock1.")
 
 
 
def test_task2():
    # conn redis链接对象; lock-key:写入redis的key
    log.info('task2-开始执行')
    conn = get_redis_connection('myredis')
    lock = redis_lock.Lock(conn, "lock-key")
    if lock.acquire(blocking=False):
        log.info("Got the lock2.")
        # 此处为上述动态添加接口中:定时执行func的逻辑
        log.info('task2-' + toolbox.get_now('%Y-%m-%d %H:%M:%S'))
        # 最后需要手动释放掉锁
        lock.release()
    else:
        log.info("Someone else has the lock2.")

这里贴上部分前端页面(任务管理页面),便于理解。

3、问题解决

通过redis_lock锁,解决多worker的重复执行问题

3.1 apscheduler_start .py

# -*-coding:utf-8-*-
import logging
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore
 
from publictool import logtool
 
logger = logging.getLogger()
log = logtool.CommonLog(logger=logger, logname="scheduler")
 
jobstores = {
    'default': DjangoJobStore()
}
job_defaults = {
    'coalesce': True,
    'max_instances': 1,
    'misfire_grace_time': 60,
    'replace_existing': True
}
 
scheduler = BackgroundScheduler(jobstores=jobstores, job_defaults=job_defaults)
# pool_pre_ping为True,即每次从连接池中取连接的时候,都会验证一下与数据库是否连接正常,如果没有连接,那么该连接会被回收。
# engine_options={'pool_pre_ping': True, 'pool_recycle': 100}
scheduler.start()
 
 
def listener_event(event):
    job_id = event.job_id
    scheduled_run_time = event.scheduled_run_time.strftime("%Y-%m-%d %H:%M:%S")
    if event.exception:
        logger.error('作业ID:{} 在 {} 执行失败,错误原因:{}'.format(job_id, scheduled_run_time, event.exception.args[0]))
 
 
scheduler._logger = logger
# 当任务执行完或任务出错时,listener_event
scheduler.add_listener(listener_event, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
 
 
# 使用main_loop阻塞进程,防止进程挂起
# import time
# while True:
#     time.sleep(60)
#     sig = uwsgi.signal_wait()

3.2 apscheduler跟随uwsgi的重启而自动重启

通过命令```uwsgi --reload  /usr/local/uwsgi/buwsgi.pid```重启项目时,一方面容易造成uwsgi.ini的重复启动,导致项目报错;另一方面导致定时任务执行的函数可能仍旧为更新前的旧代码。这是因为 apscheduler没有跟随uwsgi的重启而重启。

解决方案:
在apscheduler_start.py文件同级目录中__init__.py文件中添加:

import django
django.setup()
from ops.apscheduler_start import scheduler
 
__all__ = ('scheduler',)


此时重启uwsgi服务,apscheduler会自动重启。

3.1 待执行函数核心逻辑,需要结合redis_lock

# pip install python-redis-lock
import redis_lock
 
# 使用锁的案列
# conn redis链接对象; lock-key:写入redis的key
lock = redis_lock.Lock(conn, "lock-key")
if lock.acquire(blocking=False):
    print("Got the lock.")
    # 此处为上述动态添加接口中:定时执行s_func_name的逻辑
    ...业务逻辑代码...
    
    # 最后需要手动释放掉锁
    lock.release()
else:
    print("Someone else has the lock.")

如上面提到的test_task和test_task2函数,均先获取redis_lock,当某一个先获取了redis_lock后,其他worker将获取不到,从而无法重复执行核心逻辑代码。

标签:nginx,lock,get,redis,Django,apscheduler,job,func,date
From: https://www.cnblogs.com/HeroZhang/p/18108400

相关文章

  • Django项目windows部署教程,详细踩坑总结
    Django项目windows部署教程,详细踩坑总结本篇文章主要关于Django在Windows上利用Apache部署,如果你想通过IIS部署,推荐这篇文章在IIS服务器上部署django_djangoiis-CSDN博客,我之前尝试部署在IIS上,发现Django的定时任务失效了,上网查找了一番,普遍说法是被服务器回收了,经过了一番操作,......
  • Django——初探路由
    第3章初探路由3.1路由定义规则​ 路由称为URL,也可以称为URLconf,是对可以从互联网上得到的资源位置和访问方法的一种简洁的表示,是互联网上标准资源的地址。互联网上的每一个文件都有一个唯一的路由,用于指出网站文件的路径位置。简单地说,路由可视为我们常说的网址,每个网址代表......
  • C#中的负载均衡(Nginx )
    负载均衡在C#和WPF日常开发中可能不太常见,因为负载均衡通常与网络服务器、Web应用程序等领域更相关。但是,如果你的WPF应用程序涉及到与远程服务器进行通信或者使用了分布式架构,那么负载均衡可能会成为一个重要的话题。以下是关于负载均衡的知识点,以及可能会在面试中被问到......
  • Django-xadmin+rule对象级权限的实现方式
    Django-xadmin+rule对象级权限的实现方式1.需求vs现状1.1需求要求做一个ERP后台辅助管理的程序,有以下几项基本要求:基本的增删改查功能基于对象的权限控制(如:系统用户分为平台运营人员和商家用户,商家用户小A只能查看编辑所属商家记录,而管理员可以纵览全局)数据库记录导......
  • django安装xadmin及问题解决
    django安装xadmin及问题解决环境:Windows10专业版pycharmpro2020.3django3.2.1xadmin选django2的版本一,安装这里我选择从GitHub安装:pipinstallgit+https://github.com/sshwsfc/xadmin.git结果如下:Successfullyinstalleddefusedxml-0.7.1diff-match-patch......
  • Django xadmin安装及使用详解
    Djangoxadmin安装及使用详解一.简介xadmin是一个开源项目。针对于djangoadmin,页面美化程度,功能不完善等问题,额外开发的一个模块。目前xadmin的最新版本已经是xadmin3.0,但是xadmin3.0已经变成了一个纯前端项目,有兴趣的同学可以自己研究下。目前作者已经不对x......
  • Ubuntu上nginx常用命令
     错误截图 启用nginx systemctlstartnginx停止systemctlstopnginx重启systemctlrestartnginx 检查nginx配置文件是否正确nginx-t-c/etc/nginx/nginx.conf修改配置文件后,重新加载配置文件命令nginx-sreload 查看nginx服务状态,根据服务状态......
  • 20 Nginx报403 forbidden
    引起nginx403forbidden通常是三种情况:一是缺少索引文件,二是权限问题,三是selinux状态缺少index.html或者index.php文件,就是配置文件中indexindex.htmlindex.htm这行中的指定的文件如果在/www下面没有index.php,index.html的时候,直接访问域名,找不到文件,会报403forbiddense......
  • Nginx 代理访问minio存储桶图片
    设置存储桶Anonymous配置nginx并reload location/images/{ proxy_passhttp://127.0.0.1:9000/; } 测试https://paylove.online/images/存储桶/xxx.pngIAM参考https://docs.aws.amazon.com/zh_cn/IAM/latest/UserGuide/reference_policies_elements.html......
  • Nginx静态压缩和代码压缩,提高访问速度!
    https://mp.weixin.qq.com/s/0yfUWWfM2RcQBgCiAKcZLAnginx静态资源动态压缩nginx静态资源静态压缩基于目前大部分的应用,都使用了前后端分离的框架,vue的前端应用,也是十分的流行。不知道大家有无遇到这样的问题:随着前端框架的页面,功能开发不断的迭代;安装的依赖,不断的增多;这......