目录
本文通过获取停车场的一段视频,实时检测出整个停车场中,当前一共有多少辆车,一共有多少个空余的车位,并标识空余的停车位。运用到了:二值化,灰度化,Canny边缘检测,霍夫变换,Keras。
整体步骤:
首先以视频中某一帧的图像为单位,进行处理,通过二值化,灰度化,边缘检测,特定点标定连线等,把图片中多余的部分去掉,只保留停车场。其次,通过霍夫变换的直线检测,找到图片中的直线,根据直线坐标,先按列为单位,把车位按列框起来。然后,在每一列中,锁定每个停车位的位置,并对每个停车位进行标号。最后,通过每个停车位的位置,提取出对应图片,作为后面模型的训练以及验证的数据集,用训练的模型去预测这个停车位上有没有车,把没有车的标识出来,并统计个数。
1.数据预处理
(1)背景过滤
读入原始图像如下图所示:
二值化过滤背景:
def select_rgb_white_yellow(self,image):
#过滤掉背景
lower = np.uint8([120, 120, 120])
upper = np.uint8([255, 255, 255])
# lower_red和高于upper_red的部分分别变成0,lower_red~upper_red之间的值变成255,相当于过滤背景
white_mask = cv2.inRange(image, lower, upper)#inRange可对单通道图像或三通道图像进行二值化,threshold()需先转换为单通道图像(灰度图)
self.cv_show('white_mask',white_mask)
masked = cv2.bitwise_and(image, image, mask = white_mask)#与操作:只保留原图与二值化图相同的部分
self.cv_show('masked',masked)
return masked
(2)边缘检测
- 后续检测停车场车位直线时使用的霍夫变换要求传入的图像是边缘检测图像,此处使用的是Canny
def detect_edges(self,image, low_threshold=50, high_threshold=200):
return cv2.Canny(image, low_threshold, high_threshold)
(3)停车场区域提取
- 根据停车场的位置,手动地设置6个点,如下第一张图所示。根据这6个点使用fillPoly填充函数制作一个罩,如下面第二张图所示。用这个罩对边缘检测的图像处理,便可得截取出停车场的位置,如下第三张图所示。
def filter_region(self,image, vertices):
"""
剔除掉不需要的地方
"""
mask = np.zeros_like(image)#全黑的罩
if len(mask.shape)==2:
cv2.fillPoly(mask, vertices, 255)#vertices表示传入的6个点所裁区域,在mask罩的基础上填充成白色
self.cv_show('mask', mask)
return cv2.bitwise_and(image, mask)#与操作
def select_region(self,image):
"""
手动选择区域
"""
# first, define the polygon by vertices
rows, cols = image.shape[:2]
pt_1 = [cols*0.05, rows*0.90]#由这6个点裁出停车场位置
pt_2 = [cols*0.05, rows*0.70]
pt_3 = [cols*0.30, rows*0.55]
pt_4 = [cols*0.6, rows*0.15]
pt_5 = [cols*0.90, rows*0.15]
pt_6 = [cols*0.90, rows*0.90]
vertices = np.array([[pt_1, pt_2, pt_3, pt_4, pt_5, pt_6]], dtype=np.int32)
point_img = image.copy()
point_img = cv2.cvtColor(point_img, cv2.COLOR_GRAY2RGB)
for point in vertices[0]:
cv2.circle(point_img, (point[0],point[1]), 10, (0,0,255), 4)
self.cv_show('point_img',point_img)
return self.filter_region(image, vertices)
2.确定停车位
(1)霍夫变换检测直线
用霍夫变换检测所划的停车位,输入的图像需要是边缘检测后的结果。
def hough_lines(self,image):
#minLineLengh(线的最短长度,比这个短的都被忽略)和MaxLineCap(两条直线之间的最大间隔,小于此值,认为是一条直线)
#rho距离精度,theta角度精度,threshold超过设定阈值才被检测出线段
return cv2.HoughLinesP(image, rho=0.1, theta=np.pi/10, threshold=15, minLineLength=9, maxLineGap=4)
(2)过滤霍夫变换检测到的直线
筛选霍夫变换检测的直线,去除倾斜的直线,水平方向不能太短或太长。
def draw_lines(self,image, lines, color=[255, 0, 0], thickness=2, make_copy=True):
# 过滤霍夫变换检测到直线
if make_copy:
image = np.copy(image)
cleaned = []
for line in lines:
for x1,y1,x2,y2 in line:
if abs(y2-y1) <=1 and abs(x2-x1) >=25 and abs(x2-x1) <= 55:#通过y2-y1判断线的倾斜程度,x2-x1判断水平方向长度
cleaned.append((x1,y1,x2,y2))
cv2.line(image, (x1, y1), (x2, y2), color, thickness)
print(" No lines detected: ", len(cleaned))
return image
- 项目刚开始传入的是两帧图像,所以有两组数据,第一帧图像经筛选后剩有效直线576。
(3)以列为单位,划分停车位
def identify_blocks(self,image, lines, make_copy=True):
if make_copy:
new_image = np.copy(image)
#Step 1: 过滤部分直线
cleaned = []
for line in lines:
for x1,y1,x2,y2 in line:
if abs(y2-y1) <=1 and abs(x2-x1) >=25 and abs(x2-x1) <= 55:
cleaned.append((x1,y1,x2,y2))
#Step 2: 对直线按照x1进行排序,从上往下,从左(第一列)往右
import operator
list1 = sorted(cleaned, key=operator.itemgetter(0, 1))
#Step 3: 找到多个列,相当于每列是一排车
clusters = {}
dIndex = 0
clus_dist = 10 # 每一列之间的那个距离
for i in range(len(list1) - 1):
# 相邻两条线之间的距离,如果是一列的,那么x1这个距离应该很近,毕竟是同一列上的
# 如果这个值大于10了,说明是下一列的了,此时需要移动dIndex, 这个表示的是第几列
distance = abs(list1[i+1][0] - list1[i][0])
if distance <= clus_dist:
if not dIndex in clusters.keys(): clusters[dIndex] = []
clusters[dIndex].append(list1[i])
clusters[dIndex].append(list1[i + 1])
else:
dIndex += 1
#Step 4: 得到坐标
#有了每一列里面的直线,下面就是就是遍历每一列,先拿到所有直线,然后找到纵坐标的最大值和最小值,以及横坐标的最大和最小值,但由于横坐标这里,首尾列都一排车位,中间排都是两列,不好直接取到最大最小坐标,所以这里采用了求平均的方式。 这样遍历完,针对每一列,就能得到左上角点和右下角点,这是一个矩形框。
rects = {}
i = 0
for key in clusters:
all_list = clusters[key]
cleaned = list(set(all_list))
if len(cleaned) > 5: # 至少有5个停车位
cleaned = sorted(cleaned, key=lambda tup: tup[1])
avg_y1 = cleaned[0][1]
avg_y2 = cleaned[-1][1]
avg_x1 = 0
avg_x2 = 0
for tup in cleaned:
avg_x1 += tup[0]
avg_x2 += tup[2]
avg_x1 = avg_x1/len(cleaned)
avg_x2 = avg_x2/len(cleaned)
rects[i] = (avg_x1, avg_y1, avg_x2, avg_y2)
i += 1
print("Num Parking Lanes: ", len(rects))
#Step 5: 把列矩形画出来
buff = 7
for key in rects:
tup_topLeft = (int(rects[key][0] - buff), int(rects[key][1]))
tup_botRight = (int(rects[key][2] + buff), int(rects[key][3]))
cv2.rectangle(new_image, tup_topLeft,tup_botRight,(0,255,0),3)
return new_image, rects
(4)锁定每个车位
def draw_parking(self,image, rects, make_copy = True, color=[255, 0, 0], thickness=2, save = True):
if make_copy:
new_image = np.copy(image)
gap = 15.5
spot_dict = {} # 字典:一个车位对应一个位置
tot_spots = 0
#微调
adj_y1 = {0: 20, 1:-10, 2:0, 3:-11, 4:28, 5:5, 6:-15, 7:-15, 8:-10, 9:-30, 10:9, 11:-32}
adj_y2 = {0: 30, 1: 50, 2:15, 3:10, 4:-15, 5:15, 6:15, 7:-20, 8:15, 9:15, 10:0, 11:30}
adj_x1 = {0: -8, 1:-15, 2:-15, 3:-15, 4:-15, 5:-15, 6:-15, 7:-15, 8:-10, 9:-10, 10:-10, 11:0}
adj_x2 = {0: 0, 1: 15, 2:15, 3:15, 4:15, 5:15, 6:15, 7:15, 8:10, 9:10, 10:10, 11:0}
for key in rects:
tup = rects[key]
x1 = int(tup[0]+ adj_x1[key])
x2 = int(tup[2]+ adj_x2[key])
y1 = int(tup[1] + adj_y1[key])
y2 = int(tup[3] + adj_y2[key])
cv2.rectangle(new_image, (x1, y1),(x2,y2),(0,255,0),2)
num_splits = int(abs(y2-y1)//gap)
for i in range(0, num_splits+1):
y = int(y1 + i*gap)
cv2.line(new_image, (x1, y), (x2, y), color, thickness)
if key > 0 and key < len(rects) -1 :
#竖直线
x = int((x1 + x2)/2)
cv2.line(new_image, (x, y1), (x, y2), color, thickness)
# 计算数量
if key == 0 or key == (len(rects) -1):
tot_spots += num_splits +1
else:
tot_spots += 2*(num_splits +1)
# 字典对应好
if key == 0 or key == (len(rects) -1):
for i in range(0, num_splits+1):
cur_len = len(spot_dict)
y = int(y1 + i*gap)
spot_dict[(x1, y, x2, y+gap)] = cur_len +1
else:
for i in range(0, num_splits+1):
cur_len = len(spot_dict)
y = int(y1 + i*gap)
x = int((x1 + x2)/2)
spot_dict[(x1, y, x, y+gap)] = cur_len +1
spot_dict[(x, y, x2, y+gap)] = cur_len +2
print("total parking spaces: ", tot_spots, cur_len)
if save:
filename = 'with_parking.jpg'
cv2.imwrite(filename, new_image)
return new_image, spot_dict
(5)截取出每个停车位的图像,作为训练模型的数据。
def save_images_for_cnn(self,image, spot_dict, folder_name ='cnn_data'):
for spot in spot_dict.keys():
(x1, y1, x2, y2) = spot
(x1, y1, x2, y2) = (int(x1), int(y1), int(x2), int(y2))
#裁剪
spot_img = image[y1:y2, x1:x2]
spot_img = cv2.resize(spot_img, (0,0), fx=2.0, fy=2.0)
spot_id = spot_dict[spot]
filename = 'spot' + str(spot_id) +'.jpg'
print(spot_img.shape, filename, (x1,x2,y1,y2))
cv2.imwrite(os.path.join(folder_name, filename), spot_img)
3.模型训练及预测
(1)模型训练
import numpy
import os
from keras import applications
from keras.preprocessing.image import ImageDataGenerator
from keras import optimizers
from keras.models import Sequential, Model
from keras.layers import Dropout, Flatten, Dense, GlobalAveragePooling2D
from keras import backend as k
from keras.callbacks import ModelCheckpoint, LearningRateScheduler, TensorBoard, EarlyStopping
from keras.models import Sequential
from keras.layers.normalization import BatchNormalization
from keras.layers.convolutional import Conv2D
from keras.layers.convolutional import MaxPooling2D
from keras.initializers import TruncatedNormal
from keras.layers.core import Activation
from keras.layers.core import Flatten
from keras.layers.core import Dropout
from keras.layers.core import Dense
files_train = 0
files_validation = 0
#读数据
cwd = os.getcwd()
folder = 'train_data/train' #训练集
for sub_folder in os.listdir(folder):
path, dirs, files = next(os.walk(os.path.join(folder,sub_folder)))
files_train += len(files)
folder = 'train_data/test' #测试集
for sub_folder in os.listdir(folder):
path, dirs, files = next(os.walk(os.path.join(folder,sub_folder)))
files_validation += len(files)
print(files_train,files_validation)
#指定参数
img_width, img_height = 48, 48
train_data_dir = "train_data/train"
validation_data_dir = "train_data/test"
nb_train_samples = files_train
nb_validation_samples = files_validation
batch_size = 32
epochs = 15
num_classes = 2 #二分类,空车位或已占的车位
model = applications.VGG16(weights='imagenet', include_top=False, input_shape = (img_width, img_height, 3))
#前10层冻起来,提升速度
for layer in model.layers[:10]:
layer.trainable = False
x = model.output
x = Flatten()(x)
predictions = Dense(num_classes, activation="softmax")(x)
model_final = Model(input = model.input, output = predictions)
model_final.compile(loss = "categorical_crossentropy",
optimizer = optimizers.SGD(lr=0.0001, momentum=0.9),
metrics=["accuracy"])
train_datagen = ImageDataGenerator(
rescale = 1./255,
horizontal_flip = True,
fill_mode = "nearest",
zoom_range = 0.1,
width_shift_range = 0.1,
height_shift_range=0.1,
rotation_range=5)
test_datagen = ImageDataGenerator(
rescale = 1./255, #所有数据集将乘以该数值
horizontal_flip = True, #是否随机水平翻转
fill_mode = "nearest",
zoom_range = 0.1, #随机缩放的范围
width_shift_range = 0.1, #随即宽度偏移量
height_shift_range=0.1, #随即高度偏移量
rotation_range=5) #随即旋转角度数范围
train_generator = train_datagen.flow_from_directory(
train_data_dir,
target_size = (img_height, img_width),
batch_size = batch_size,
class_mode = "categorical")
validation_generator = test_datagen.flow_from_directory(
validation_data_dir,
target_size = (img_height, img_width),
class_mode = "categorical")
checkpoint = ModelCheckpoint("car1.h5", monitor='val_acc', verbose=1, save_best_only=True, save_weights_only=False, mode='auto', period=1)
early = EarlyStopping(monitor='val_acc', min_delta=0, patience=10, verbose=1, mode='auto')
history_object = model_final.fit_generator(
train_generator,
samples_per_epoch = nb_train_samples,
epochs = epochs,
validation_data = validation_generator,
nb_val_samples = nb_validation_samples,
callbacks = [checkpoint, early])
(2)模型预测
def make_prediction(self,image,model,class_dictionary):
#预处理
img = image/255.
#转换成4D tensor
image = np.expand_dims(img, axis=0)
# 用训练好的模型进行预测
class_predicted = model.predict(image)
inID = np.argmax(class_predicted[0])
label = class_dictionary[inID]
return label
def predict_on_image(self,image, spot_dict , model,class_dictionary,make_copy=True, color = [0, 255, 0], alpha=0.5):
if make_copy:
new_image = np.copy(image)
overlay = np.copy(image)
self.cv_show('new_image',new_image)
cnt_empty = 0
all_spots = 0
for spot in spot_dict.keys():
all_spots += 1
(x1, y1, x2, y2) = spot
(x1, y1, x2, y2) = (int(x1), int(y1), int(x2), int(y2))
spot_img = image[y1:y2, x1:x2]
spot_img = cv2.resize(spot_img, (48, 48))
label = self.make_prediction(spot_img,model,class_dictionary)
if label == 'empty':
cv2.rectangle(overlay, (int(x1),int(y1)), (int(x2),int(y2)), color, -1)
cnt_empty += 1
cv2.addWeighted(overlay, alpha, new_image, 1 - alpha, 0, new_image)
cv2.putText(new_image, "Available: %d spots" %cnt_empty, (30, 95),
cv2.FONT_HERSHEY_SIMPLEX,
0.7, (255, 255, 255), 2)
cv2.putText(new_image, "Total: %d spots" %all_spots, (30, 125),
cv2.FONT_HERSHEY_SIMPLEX,
0.7, (255, 255, 255), 2)
save = False
if save:
filename = 'with_marking.jpg'
cv2.imwrite(filename, new_image)
self.cv_show('new_image',new_image)
return new_image
- 第一张图为一帧图像的预测结果,第二张图为整个视频的预测结果,整个停车场有500多个车位,对视频的处理是截成一帧一帧的图像,若每秒有60帧图像,每张图都要对500多个车位进行预测,预测一秒的视频就要预测30000次,由于电脑性能的问题只截取了刚开始的结果。