Django+nginx+uwsgi +apscheduler任务重复执行问题
1、问题描述
通过Django+nginx+uwsgi 部署项目,前端页面通过API接口动态添加、管理apscheduler定时任务。
由于uwsgi启动多个worker,导致每个worker执行一次定时任务,导致任务的重复执行。
如下图,因为uwsgi.ini 文件中processes=4,所以对应的启动了4个worker。
同时需配置enable-threads=true来启动多线程)
2、apscheduler部署
本次举例的django项目名称为:filemanage;app名称为:schedulerApp
此部分先描述apscheduler任务的添加方式,便于后续的解决方案描述。
2.1 API接口添加定时任务
其中的s_func_name为指定的定时任务待执行函数,位于schedulerApp.schefunc.py文件中
### schedulerApp.view.py
from filemanage.apscheduler_start import scheduler #注意此处scheduler的来源
def add_job(request):
ctx = {}
job_id = ''
s_params = {}
try:
# 获取参数
s_name = request.GET.get('sname') # 任务名称
s_trigger = request.GET.get('s_trigger') # 执行方式:date,interval,cron'
s_func_name = request.GET.get('s_func_name') # 任务执行的函数名称
desc = request.GET.get('desc') # 任务描述
job_id = s_func_name + '_' + str(toolbox.get_timestamp())
# 判断任务是否存在
count = sche_manage.objects.filter(s_func_name=s_func_name, status__in=['Executed', 'Pause']).count()
if count > 0:
ctx['code'] = 'no'
ctx['msg'] = '该函数已经被使用,请确认'
return HttpResponse(json.dumps(ctx), content_type='application/json')
else:
# aFunc = getattr(func, "GetFuncByStr") # 获取对象
# funD = getattr(aFunc, s_func_name) # 获取对象的属性值,以字符串调用对应的同名函数
# print(funD)
funD = ''
# 创建任务-根据不同执行方式添加不同任务
if s_trigger == 'date': # 某个日期执行
date_param = request.GET.get('date_param')
s_params = {'date': date_param}
scheduler.add_job(eval(s_func_name), 'date', run_date=date_param, id=job_id)
elif s_trigger == 'interval': # 指定间隔执行
dd = request.GET.get('interval_param_day')
if not dd:
dd = 0
hh = request.GET.get('interval_param_hour')
if not hh:
hh = 0
mm = request.GET.get('interval_param_minute')
if not mm:
mm = 0
s_params = {'day': dd, 'hour': hh, 'minute': mm}
scheduler.add_job(eval(s_func_name), 'interval', days=int(dd), hours=int(hh), minutes=int(mm),
id=job_id)
elif s_trigger == 'cron': # 指定时间循环执行
mm = request.GET.get('cron_param_minute')
if not mm:
mm = None
hh = request.GET.get('cron_param_hour')
if not hh:
hh = None
dd = request.GET.get('cron_param_day')
if not dd:
dd = None
mon = request.GET.get('cron_param_month')
if not mon:
mon = None
week = request.GET.get('cron_param_week')
if not week:
week = None
start_date = request.GET.get('cron_param_start_date')
if not start_date:
start_date = None
s_params = {'minute': mm, 'hour': hh, 'day': dd, 'month': mon, 'week': week, 'start_date': start_date}
scheduler.add_job(eval(s_func_name), 'cron', minute=mm, hour=hh, day=dd, month=mon,
day_of_week=week,
start_date=start_date, replace_existing=True, id=job_id)
# 保存任务信息到 sche_manage 表
sche_manage.objects.create(sid=job_id, sname=s_name, desc=desc, s_func_name=s_func_name,
s_trigger=s_trigger, s_params=s_params, status='Executed')
ctx['code'] = 'ok'
ctx['msg'] = '创建成功'
except Exception as e:
log.info('添加任务异常,jobid:' + job_id + ',异常:' + str(e))
ctx['code'] = 'no'
ctx['msg'] = '创建失败'
# 有错误就停止定时器
scheduler.shutdown()
return HttpResponse(json.dumps(ctx, cls=toolbox.DateEncoder), content_type='application/json')
### schedulerApp.schefunc.py
from alarmApp.models import alarm_obj, alarm_param_record, alarm_param
from publictool import toolbox, logtool, dbtool
from publictool.dataEnum import DataEnum
from publictool.msgtool import MsgApi
from alarmApp import getFuncByStr as func
import redis_lock
logger = logging.getLogger()
log = logtool.CommonLog(logger=logger, logname="scheduler")
def test_task():
# conn redis链接对象; lock-key:写入redis的key
log.info('task1-开始执行')
conn = get_redis_connection('myredis')
lock = redis_lock.Lock(conn, "lock-key")
if lock.acquire(blocking=False):
log.info("Got the lock1.")
# 此处为上述动态添加接口中:定时执行func的逻辑
log.info('task1-' + toolbox.get_now('%Y-%m-%d %H:%M:%S'))
# 最后需要手动释放掉锁
lock.release()
else:
log.info("Someone else has the lock1.")
def test_task2():
# conn redis链接对象; lock-key:写入redis的key
log.info('task2-开始执行')
conn = get_redis_connection('myredis')
lock = redis_lock.Lock(conn, "lock-key")
if lock.acquire(blocking=False):
log.info("Got the lock2.")
# 此处为上述动态添加接口中:定时执行func的逻辑
log.info('task2-' + toolbox.get_now('%Y-%m-%d %H:%M:%S'))
# 最后需要手动释放掉锁
lock.release()
else:
log.info("Someone else has the lock2.")
这里贴上部分前端页面(任务管理页面),便于理解。
3、问题解决
通过redis_lock锁,解决多worker的重复执行问题
3.1 apscheduler_start .py
# -*-coding:utf-8-*-
import logging
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore
from publictool import logtool
logger = logging.getLogger()
log = logtool.CommonLog(logger=logger, logname="scheduler")
jobstores = {
'default': DjangoJobStore()
}
job_defaults = {
'coalesce': True,
'max_instances': 1,
'misfire_grace_time': 60,
'replace_existing': True
}
scheduler = BackgroundScheduler(jobstores=jobstores, job_defaults=job_defaults)
# pool_pre_ping为True,即每次从连接池中取连接的时候,都会验证一下与数据库是否连接正常,如果没有连接,那么该连接会被回收。
# engine_options={'pool_pre_ping': True, 'pool_recycle': 100}
scheduler.start()
def listener_event(event):
job_id = event.job_id
scheduled_run_time = event.scheduled_run_time.strftime("%Y-%m-%d %H:%M:%S")
if event.exception:
logger.error('作业ID:{} 在 {} 执行失败,错误原因:{}'.format(job_id, scheduled_run_time, event.exception.args[0]))
scheduler._logger = logger
# 当任务执行完或任务出错时,listener_event
scheduler.add_listener(listener_event, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
# 使用main_loop阻塞进程,防止进程挂起
# import time
# while True:
# time.sleep(60)
# sig = uwsgi.signal_wait()
3.2 apscheduler跟随uwsgi的重启而自动重启
通过命令```uwsgi --reload /usr/local/uwsgi/buwsgi.pid```重启项目时,一方面容易造成uwsgi.ini的重复启动,导致项目报错;另一方面导致定时任务执行的函数可能仍旧为更新前的旧代码。这是因为 apscheduler没有跟随uwsgi的重启而重启。
解决方案:
在apscheduler_start.py文件同级目录中__init__.py文件中添加:
import django
django.setup()
from ops.apscheduler_start import scheduler
__all__ = ('scheduler',)
此时重启uwsgi服务,apscheduler会自动重启。
3.1 待执行函数核心逻辑,需要结合redis_lock
# pip install python-redis-lock
import redis_lock
# 使用锁的案列
# conn redis链接对象; lock-key:写入redis的key
lock = redis_lock.Lock(conn, "lock-key")
if lock.acquire(blocking=False):
print("Got the lock.")
# 此处为上述动态添加接口中:定时执行s_func_name的逻辑
...业务逻辑代码...
# 最后需要手动释放掉锁
lock.release()
else:
print("Someone else has the lock.")
如上面提到的test_task和test_task2函数,均先获取redis_lock,当某一个先获取了redis_lock后,其他worker将获取不到,从而无法重复执行核心逻辑代码。
标签:nginx,lock,get,redis,Django,apscheduler,job,func,date From: https://www.cnblogs.com/HeroZhang/p/18108400