大致代码如下:
import cv2
import numpy as np
from fastapi import HTTPException
RgbRangeType = tuple[tuple[int, int, int], tuple[int, int, int]]
class ValidationError(HTTPException):
def __init__(self, detail: str, status_code=400) -> None:
super().__init__(detail=detail, status_code=status_code)
class PictureHouse:
min_score = 250
delta_score = 50
def __init__(self, last_score: int = 0) -> None:
self.last_score = last_score
@staticmethod
def check_rgb(
frame: np.ndarray, rgb_range: RgbRangeType | None = None, thresthold=1000
) -> bool:
"""检测符合rgb区域值的点数,是否在阈值范围内"""
if rgb_range is None:
rgb_range = ((0, 130, 0), (5, 140, 5)) # 绿点
pixels = cv2.countNonZero(cv2.inRange(frame, *rgb_range)) # type:ignore
return pixels < thresthold
@staticmethod
def lap_score(frame: np.ndarray) -> int:
"""拉普拉斯法对图片进行评分"""
img2gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # 将图片压缩为单通道的灰度图
score = cv2.Laplacian(img2gray, cv2.CV_64F).var() # type:ignore[attr-defined]
return int(score)
def is_avaliable(self, frame: np.ndarray) -> bool:
"""排除花屏、无画面的图片"""
result = False
score = self.lap_score(frame)
if (last := self.last_score) and score >= self.min_score:
result = abs(score - last) <= self.delta_score and self.check_rgb(frame)
self.last_score = score
return result
class RtspCapture(cv2.VideoCapture):
def __init__(self, url: str, timeout=10) -> None:
# 使用GPU加速https://www.jianshu.com/p/733d7311c509
gpu_args = [cv2.CAP_PROP_HW_ACCELERATION, cv2.VIDEO_ACCELERATION_ANY]
super().__init__(url, cv2.CAP_FFMPEG, gpu_args)
self.set(cv2.CAP_PROP_POS_MSEC, timeout * 1000)
self._url = url
def close(self) -> None:
self.release()
def pick_out(self, total=30) -> np.ndarray:
image_filter = PictureHouse()
for _ in range(total):
success, frame = self.read()
if success and image_filter.is_avaliable(frame):
break
else:
raise ValidationError(f"Invalid {total} frames ({self._url})")
return frame
def screenshot(self) -> np.ndarray:
with closing(self) as cap:
if not cap.isOpened():
raise ValidationError(f"Failed to open stream: {self._url}")
return cap.pick_out()
async def capture_one(redis, data, frame_index, timeout=10) -> bytes:
rtsp_url = await ask_rtsp_url(redis, data)
frame = RtspCapture(rtsp_url, timeout).screenshot()
return cv2.imencode(".jpg", frame)[1].tobytes()
标签:截图,int,self,RTSP,cv2,score,def,frame,花屏
From: https://www.cnblogs.com/waketzheng/p/17972870