import cv2 import dlib import threading import queue import math import time import tkinter as tk from PIL import Image, ImageTk import os import random import numpy as np from glob import glob from PIL import Image, ImageOps import matplotlib.pyplot as plt from skimage.metrics import structural_similarity as compare_ssim from sklearn.linear_model import SGDClassifier#分类器 import tensorflow as tf tf.enable_eager_execution() from tensorflow import keras from tensorflow.keras import layers from mtcnn import MTCNN # 创建MTCNN检测器 mtcnn_detector = MTCNN() start_time=time.time() #初始化计数器 minute_ant=0 minute_cnt=0 show_fatigue_message = False global_photo = None # 用于跟踪增强度量值的全局变量 enhancement_metrics = [] # 初始化SGD分类器 clf = SGDClassifier(loss='log') # 使用逻辑回归 clf.partial_fit([[0.2]], [0], classes=[0, 1]) # 初始拟合 # 初始化训练数据和标签的列表 X_train = [] y_train = [] frames_queue = queue.Queue() capture = cv2.VideoCapture(1) detector = dlib.get_frontal_face_detector() predictor = dlib.shape_predictor("shape_predictor_68_face_landmarks.dat") def enhance_low_light(model, image): print("调用低光图像增强Zero-DCE") # 将OpenCV图像转换为PIL图像 pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) # 将PIL图像转换为TensorFlow期望的格式 tf_image = tf.convert_to_tensor(np.array(pil_image), dtype=tf.float32) tf_image = tf_image / 255.0 tf_image = tf.expand_dims(tf_image, axis=0) # 使用 zero_dce_model 增强图像 enhanced_image = model(tf_image)[0].numpy() return enhanced_image def detect_faces(model): global start_time, minute_ant, minute_cnt ,show_fatigue_message # 引入全局变量 ant = 0 # 初始化闭眼次数 cnt=0 #嘴巴张开次数 while True: ret, frame = capture.read() gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) faces = mtcnn_detector.detect_faces(frame) avg_brightness = int(gray.mean()) # 检查光线亮度,显示光线暗的提醒 if avg_brightness < 55: cv2.putText(frame, "Light is too dim open Zero-dce", (10, 110), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) enhanced_image = enhance_low_light(model,frame) # 调用增强光线函数 metrics = calculate_enhancement_metrics(frame, enhanced_image) enhancement_metrics.append(metrics) display_metrics(frame, metrics) for face in faces: x, y, width, height = face["box"] cv2.rectangle(frame, (x, y), (x + width, y + height), (0, 255, 0), 2) face_rect = dlib.rectangle(x, y, x + width, y + height) shape = predictor(gray, face_rect) # 左眼关键点坐标提取 left_eye_coords = [(shape.part(i).x, shape.part(i).y) for i in range(36, 42)] # 右眼关键点坐标提取 right_eye_coords = [(shape.part(i).x, shape.part(i).y) for i in range(42, 48)] # 嘴巴关键坐标提取 mouth1x=shape.part(49).x mouth1y = shape.part(49).y mouth2x = shape.part(51).x mouth2y = shape.part(51).y mouth3x = shape.part(53).x mouth3y= shape.part(53).y mouth4x = shape.part(55).x mouth4y = shape.part(55).y mouth5x = shape.part(57).x mouth5y = shape.part(57).y mouth6x = shape.part(59).x mouth6y = shape.part(59).y mdata1 = math.sqrt((mouth2x - mouth6x) * (mouth2x - mouth6x) + (mouth2y - mouth6y) * (mouth2y - mouth6y)) + math.sqrt((mouth3x - mouth5x) * (mouth3x - mouth5x) + (mouth3y - mouth5y) * (mouth3y - mouth5y)) mdata2 = 2 * math.sqrt((mouth1x - mouth4x) * (mouth1x - mouth4x) + (mouth1y - mouth4y) * (mouth1y - mouth4y)); mouth = mdata1 / mdata2 # 计算左眼 EAR dataLE1 = euclidean_distance(left_eye_coords[1], left_eye_coords[5]) + euclidean_distance(left_eye_coords[2], left_eye_coords[4]) dataLE2 = 2 * euclidean_distance(left_eye_coords[0], left_eye_coords[3]) Leye = dataLE1 / dataLE2 # 计算右眼 EAR dataRE1 = euclidean_distance(right_eye_coords[1], right_eye_coords[5]) + euclidean_distance(right_eye_coords[2], right_eye_coords[4]) dataRE2 = 2 * euclidean_distance(right_eye_coords[0], right_eye_coords[3]) Reye = dataRE1 / dataRE2 avg = (Reye + Leye) * 0.5 #计算平均值 # 更新训练数据和标签 # 在这里,我们使用一个简单的启发式方法来标记数据:如果EAR小于0.2,我们假设眼睛是闭的(标签0);否则,我们假设眼睛是开的(标签1)。 label = 0 if avg < 0.2 else 1 X_train.append([avg]) y_train.append(label) # 在线更新模型 clf.partial_fit(X_train, y_train) # 清空训练数据和标签以进行下一次迭代 X_train.clear() y_train.clear() # 使用模型进行预测 prediction = clf.predict_proba([[avg]])[:, 1] # 获取标签为1(眼睛打开)的概率 # 根据概率设置阈值,这里假设如果概率小于0.5,则认为眼睛是闭合的 if prediction < 0.5: ant += 1 # 闭眼次数累加 minute_ant += 1 if mouth>=0.75: cnt+=0.1 minute_cnt+=0.1 cv2.putText(frame, f"Close your eyes number of times: {ant}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) cv2.putText(frame, f"Number of mouth openings : {cnt:.2f}", (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) elapsed_time = time.time() - start_time if elapsed_time >= 60: if(minute_cnt>=3 and minute_ant>=20): show_fatigue_message=True print("Fatigue detected!") # 添加打印语句 else: show_fatigue_message=False print(f"时间 {int(elapsed_time / 60)} - 眨眼次数: {minute_ant}, 嘴巴张开次数: {minute_cnt}") start_time = time.time() # 重置开始时间 minute_ant = 0 # 重置闭眼次数 minute_cnt = 0 # 重置嘴巴张开次数 frames_queue.put(frame) IMAGE_SIZE = 256 BATCH_SIZE = 16 MAX_TRAIN_IMAGES = 400 def calculate_enhancement_metrics(original_image, enhanced_image): # 使用compare_ssim函数计算SSIM ssim = compare_ssim(original_image, enhanced_image, multichannel=True) # 其他度量值的计算代码(例如PSNR) # 以字典形式返回度量值 metrics = { "SSIM": ssim, # 如果需要,可以添加其他度量值 } return metrics def display_metrics(frame, metrics): # Display enhancement metrics on the frame y_offset = 140 # Adjust the vertical position as needed for key, value in metrics.items(): cv2.putText(frame, f"{key}: {value:.2f}", (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2) y_offset += 30 # Adjust the vertical spacing as needed def build_dce_net(): input_img = keras.Input(shape=[256, 256, 3])#输入256*256*3的图像 conv1 = layers.Conv2D( 32, (3, 3), strides=(1, 1), activation="relu", padding="same"#relu作为激活函数same作为填充 )(input_img) conv2 = layers.Conv2D( 32, (3, 3), strides=(1, 1), activation="relu", padding="same" )(conv1) conv3 = layers.Conv2D( 32, (3, 3), strides=(1, 1), activation="relu", padding="same" )(conv2) conv4 = layers.Conv2D( 32, (3, 3), strides=(1, 1), activation="relu", padding="same" )(conv3) int_con1 = layers.Concatenate(axis=-1)([conv4, conv3])#进行连接 conv5 = layers.Conv2D( 32, (3, 3), strides=(1, 1), activation="relu", padding="same" )(int_con1) int_con2 = layers.Concatenate(axis=-1)([conv5, conv2]) conv6 = layers.Conv2D( 32, (3, 3), strides=(1, 1), activation="relu", padding="same" )(int_con2) int_con3 = layers.Concatenate(axis=-1)([conv6, conv1]) x_r = layers.Conv2D(24, (3, 3), strides=(1, 1), activation="tanh", padding="same")(#虑波器选择24 3*3作为卷积核tanh作为激活函数 int_con3 ) return keras.Model(inputs=input_img, outputs=x_r) def color_constancy_loss(x):#计算x在维度上RGB的平均值,计算平均值与实现的图像在现实中的图像相似 mean_rgb = tf.reduce_mean(x, axis=(1, 2), keepdims=True) mr, mg, mb = mean_rgb[:, :, :, 0], mean_rgb[:, :, :, 1], mean_rgb[:, :, :, 2] d_rg = tf.square(mr - mg) d_rb = tf.square(mr - mb) d_gb = tf.square(mb - mg) return tf.sqrt(tf.square(d_rg) + tf.square(d_rb) + tf.square(d_gb)) def exposure_loss(x, mean_val=0.6): x = tf.reduce_mean(x, axis=3, keepdims=True)#每个像素做平均化得到一个单通道的图像 mean = tf.nn.avg_pool2d(x, ksize=16, strides=16, padding="VALID")#计算图像的大致亮度 return tf.reduce_mean(tf.square(mean - mean_val))#调整图像的曝光度 def illumination_smoothness_loss(x):#获取图片信息做归一化处理,计算照明平滑损失确保增强后的图像没有剧烈的变化 batch_size = tf.shape(x)[0] h_x = tf.shape(x)[1] w_x = tf.shape(x)[2] count_h = (tf.shape(x)[2] - 1) * tf.shape(x)[3] count_w = tf.shape(x)[2] * (tf.shape(x)[3] - 1) h_tv = tf.reduce_sum(tf.square((x[:, 1:, :, :] - x[:, : h_x - 1, :, :]))) w_tv = tf.reduce_sum(tf.square((x[:, :, 1:, :] - x[:, :, : w_x - 1, :]))) batch_size = tf.cast(batch_size, dtype=tf.float32) count_h = tf.cast(count_h, dtype=tf.float32) count_w = tf.cast(count_w, dtype=tf.float32) return 2 * (h_tv / count_h + w_tv / count_w) / batch_size class SpatialConsistencyLoss(keras.losses.Loss): def __init__(self, **kwargs): super().__init__(reduction="none")#调用了父类TensorFlow初始化 self.left_kernel = tf.constant( [[[[0, 0, 0]], [[-1, 1, 0]], [[0, 0, 0]]]], dtype=tf.float32#定义4个方向的卷积核 ) self.right_kernel = tf.constant( [[[[0, 0, 0]], [[0, 1, -1]], [[0, 0, 0]]]], dtype=tf.float32 ) self.up_kernel = tf.constant( [[[[0, -1, 0]], [[0, 1, 0]], [[0, 0, 0]]]], dtype=tf.float32 ) self.down_kernel = tf.constant( [[[[0, 0, 0]], [[0, 1, 0]], [[0, -1, 0]]]], dtype=tf.float32 ) def call(self, y_true, y_pred): original_mean = tf.reduce_mean(y_true, 3, keepdims=True)#计算张量以三个维度的均值keepdims确保结果保持原始维度 enhanced_mean = tf.reduce_mean(y_pred, 3, keepdims=True) original_pool = tf.nn.avg_pool2d( original_mean, ksize=4, strides=4, padding="VALID"#进行平均池化操作尺寸为4*4 步长为4 ) enhanced_pool = tf.nn.avg_pool2d( enhanced_mean, ksize=4, strides=4, padding="VALID" ) d_original_left = tf.nn.conv2d( #在池化后的张量上进行卷积操作,卷积核self.left_kernel......卷积操作比较了原始图像(y_true)和增强图像(y_pred)在不同方向上的池化特征,以捕捉空间信息。 original_pool, self.left_kernel, strides=[1, 1, 1, 1], padding="SAME" ) d_original_right = tf.nn.conv2d( original_pool, self.right_kernel, strides=[1, 1, 1, 1], padding="SAME" ) d_original_up = tf.nn.conv2d( original_pool, self.up_kernel, strides=[1, 1, 1, 1], padding="SAME" ) d_original_down = tf.nn.conv2d( original_pool, self.down_kernel, strides=[1, 1, 1, 1], padding="SAME" ) d_enhanced_left = tf.nn.conv2d( enhanced_pool, self.left_kernel, strides=[1, 1, 1, 1], padding="SAME" ) d_enhanced_right = tf.nn.conv2d( enhanced_pool, self.right_kernel, strides=[1, 1, 1, 1], padding="SAME" ) d_enhanced_up = tf.nn.conv2d( enhanced_pool, self.up_kernel, strides=[1, 1, 1, 1], padding="SAME" ) d_enhanced_down = tf.nn.conv2d( enhanced_pool, self.down_kernel, strides=[1, 1, 1, 1], padding="SAME" ) d_left = tf.square(d_original_left - d_enhanced_left)# 这些代码计算了原始图像和增强图像在不同方向上卷积结果的差的平方。平方差可以强调较大的差异并更强烈地惩罚较小的差异。 d_right = tf.square(d_original_right - d_enhanced_right) d_up = tf.square(d_original_up - d_enhanced_up) d_down = tf.square(d_original_down - d_enhanced_down) return d_left + d_right + d_up + d_down class ZeroDCE(keras.Model): def __init__(self, **kwargs): super().__init__(**kwargs) self.dce_model = build_dce_net() def custom_compile(self, learning_rate, **kwargs): super().compile(**kwargs) self.optimizer = keras.optimizers.Adam(learning_rate=learning_rate) self.spatial_constancy_loss = SpatialConsistencyLoss(reduction="none") def get_enhanced_image(self, data, output):#提取8张子张量按照3个通道 r1 = output[:, :, :, :3] r2 = output[:, :, :, 3:6] r3 = output[:, :, :, 6:9] r4 = output[:, :, :, 9:12] r5 = output[:, :, :, 12:15] r6 = output[:, :, :, 15:18] r7 = output[:, :, :, 18:21] r8 = output[:, :, :, 21:24] x = data + r1 * (tf.square(data) - data)#与输入的data进行运算建立非线性函数运算 x = x + r2 * (tf.square(x) - x) x = x + r3 * (tf.square(x) - x) enhanced_image = x + r4 * (tf.square(x) - x) x = enhanced_image + r5 * (tf.square(enhanced_image) - enhanced_image) x = x + r6 * (tf.square(x) - x) x = x + r7 * (tf.square(x) - x) enhanced_image = x + r8 * (tf.square(x) - x) return enhanced_image def call(self, data): dce_net_output = self.dce_model(data) return self.get_enhanced_image(data, dce_net_output) def compute_losses(self, data, output): enhanced_image = self.get_enhanced_image(data, output) loss_illumination = 200 * illumination_smoothness_loss(output) loss_spatial_constancy = tf.reduce_mean( self.spatial_constancy_loss(enhanced_image, data) ) loss_color_constancy = 5 * tf.reduce_mean(color_constancy_loss(enhanced_image)) loss_exposure = 10 * tf.reduce_mean(exposure_loss(enhanced_image)) total_loss = ( loss_illumination + loss_spatial_constancy + loss_color_constancy + loss_exposure ) return { "total_loss": total_loss, "illumination_smoothness_loss": loss_illumination, "spatial_constancy_loss": loss_spatial_constancy, "color_constancy_loss": loss_color_constancy, "exposure_loss": loss_exposure, } def train_step(self, data): with tf.GradientTape() as tape: output = self.dce_model(data) losses = self.compute_losses(data, output) gradients = tape.gradient( losses["total_loss"], self.dce_model.trainable_weights ) self.optimizer.apply_gradients(zip(gradients, self.dce_model.trainable_weights)) return losses def test_step(self, data): output = self.dce_model(data) return self.compute_losses(data, output) def save_weights(self, filepath, overwrite=True, save_format=None, options=None): self.dce_model.save_weights( filepath, overwrite=overwrite, save_format=save_format, options=options ) def load_weights(self, filepath, by_name=False, skip_mismatch=False, options=None): self.dce_model.load_weights( filepath=filepath, by_name=by_name, skip_mismatch=skip_mismatch, options=options, ) zero_dce_model = ZeroDCE()#定义模型 zero_dce_model.custom_compile(learning_rate=1e-4) # 创建并启动人脸检测线程 face_detection_thread = threading.Thread(target=detect_faces,args=(zero_dce_model,))#添加模型 face_detection_thread.start() def euclidean_distance(p1, p2): return math.sqrt((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2) def snapshot(): image = cv2.imread("A.png") cv2.namedWindow("My Image", cv2.WINDOW_NORMAL) cv2.resizeWindow("My Image", 600, 500) cv2.imshow("My Image", image) cv2.waitKey(0) cv2.destroyAllWindows() root = tk.Tk() root.title("基于计算机视觉Opencv与dlib微表情疲劳驾驶检测技术") canvas = tk.Canvas(root, width=capture.get(cv2.CAP_PROP_FRAME_WIDTH), height=capture.get(cv2.CAP_PROP_FRAME_HEIGHT)) canvas.pack() background_img=Image.open("C:/Users/陈志文/OneDrive/图片/本机照片/45.png") background_photo=ImageTk.PhotoImage(image=background_img) btn_show = tk.Button(root, text="开始", width=10, command=snapshot) btn_show.pack() #btn_snapshot = tk.Button(root, text="点击拍照", width=15, command=snapshot) #btn_snapshot.pack() def update_ui(): if not frames_queue.empty(): frame = frames_queue.get() if(show_fatigue_message): cv2.putText(frame, "You seem fatigued need rest with", (10, 120), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) photo = ImageTk.PhotoImage(image=pil_image) # 将PIL图像转换为PhotoImage对象 canvas.create_image(0, 0, image=photo, anchor=tk.NW) canvas.image = photo root.after(10, update_ui) # 循环调用更新UI函数 else: root.after(10, update_ui) # 循环调用更新UI函数 update_ui() # 启动UI更新循环 root.mainloop() capture.release() cv2.destroyAllWindows()
标签:loss,备份,cv2,self,enhanced,tf,image,代码 From: https://www.cnblogs.com/liliczw2209/p/17690839.html