使用方法
可以写个while True
的循环监控数据,再分别定义每个状态要做什么事情
注意:
- 循环中不要写breake
- 不要return, 换成yield, 把函数作为一个生成器,由装饰器控制循环
代码示例
import functools
import time
from utils.log_setting import logger
from config import settings
def monitorDB(field_name, continue_values=[], stop_values=[]):
'''
监听函数返回值,与stop_flags相等时结束监听
field_name: 字段名,只用于打印
continue_values: 非列表中的返回值跳出循环
stop_value: 与被装饰函数返回值相等时跳出循环结束监听
'''
def decorator(func):
# 使用装饰器语法,将装饰器应用到函数上
@functools.wraps(func)
def wrapper(*args, **kwargs):
previous_value = None
retry_wait_time = settings.MONITORDB.CONFIG.RETRY_WAIT_TIME # 重试等待时间
retry_count = settings.MONITORDB.CONFIG.RETRY_COUNT # 最大重试次数
run_count = 1 # 运行次数
time_sleep_list = list(range(retry_wait_time, retry_wait_time * (retry_count + 1), retry_wait_time))
time_start = time.perf_counter()
try:
time.sleep(0.5)
# 获取最新的字段值
time.sleep(1)
current_value = next(func(*args, **kwargs))
except StopIteration:
logger.warning(f"{func.__name__}, 返回数据为空")
while True:
time.sleep(0.2)
# 判断装饰器参数完整性
if stop_values is [] and continue_values is []:
logger.error("{func.__name__}, 装饰器参数缺失")
break
# 判断是否是列表中的返回值
elif continue_values is not [] and current_value not in continue_values:
# 判断第一次执行
if previous_value is not None:
logger.info(f"{func.__name__}, {field_name} 从 {previous_value} 变为 {current_value}, done")
else:
logger.info(f"{func.__name__}, {field_name}: {current_value}")
break
# 判断停止循环
elif stop_values is not [] and current_value in stop_values:
# 判断第一次执行
if previous_value is not None:
logger.info(f"{func.__name__}, {field_name} 从 {previous_value} 变为 {current_value}, done")
else:
logger.info(f"{func.__name__}, {field_name}: {current_value}")
break
elif current_value != previous_value:
time_start = time.perf_counter()
# 判断第一次执行
if previous_value is not None:
logger.info(f"{func.__name__}, {field_name} 从 {previous_value} 变为 {current_value}")
previous_value = current_value
retry_wait_time = settings.MONITORDB.CONFIG.RETRY_WAIT_TIME # 重试等待时间
run_count = 1 # 运行次数
time_sleep_list = list(range(0, retry_wait_time * (retry_count + 1), retry_wait_time))
elif current_value == previous_value:
time_run = time.perf_counter() - time_start
time_run_fmt = '{:.2f}s'.format(time_run)
print(
'\r***************************************************************************等待中{:.2f}s***************************************************************************'.format(time_run),
end="",
)
if time_run > retry_wait_time:
print('')
try:
logger.info(f"已等待{time_run_fmt}秒, {field_name}: {current_value}, 第{run_count}次重试...")
current_value = next(func(*args, **kwargs))
run_count += 1
retry_wait_time = time_sleep_list.pop(1)
except IndexError:
logger.warning(f"已等待{time_run_fmt}秒, {field_name}: {current_value}, 第{run_count}次重试失败, 不再重试")
break
else:
logger.warning(f"{func.__name__}, 预料之外的参数field_name: {field_name}, continue_values: {continue_values}, stop_values: {stop_values}, current_value: {current_value}, previous_value: {previous_value}")
time.sleep(0.2)
return wrapper
return decorator
if __name__ == '__main__':
@monitorDB('IftttRuleTaskDetail_state', continue_values=[1, 2, 3, 4], stop_values=[4])
def check_field_value(): # 模拟获取字段的当前值,可以替换为实际的代码
while True:
current_value = 0
time.sleep(1)
logger.info("程序运行中...")
current_value = int(input('输入: '))
yield current_value
check_field_value()
标签:name,自定义,python,轮询,value,current,field,values,time
From: https://www.cnblogs.com/ishuangjin/p/17995511