首页 > 其他分享 >团队开发第二阶段3

团队开发第二阶段3

时间:2023-05-17 17:13:25浏览次数:43  
标签:第二阶段 img dst shape 开发 bbox np 团队 data

  今天开始将算法优化,以下是优化后代码。

  

import copy
import time
from abc import abstractmethod
from queue import Empty

import cv2
import numpy as np
import torch
from PIL import ImageFont, Image, ImageDraw
from PyQt5.QtGui import QPixmap, QImage

from models.concentration_evaluator import ConcentrationEvaluation, ConcentrationEvaluator
from pipeline_module.core.base_module import BaseModule, TASK_DATA_OK, DictData
from utils.vis import draw_keypoints136

box_color = (0, 255, 0)
cheating_box_color = (0, 0, 255)
draw_keypoints_default = False
import warnings

warnings.filterwarnings("ignore")


def draw_frame(data, draw_keypoints=draw_keypoints_default, fps=-1):
frame = data.frame.copy()
pred = data.detections
preds_kps = data.keypoints
preds_scores = data.keypoints_scores
if pred.shape[0] > 0:
# 绘制骨骼关键点
if draw_keypoints and preds_kps is not None:
draw_keypoints136(frame, preds_kps, preds_scores)
# 绘制目标检测框和动作分类
frame_pil = Image.fromarray(frame)
draw = ImageDraw.Draw(frame_pil)
for det, class_prob, best_pred in zip(pred, data.classes_probs, data.best_preds):
det = det.to(torch.int)
class_name = data.classes_names[best_pred]
# show_text = f"{class_name}: %.2f" % class_prob[best_pred]
show_text = f"{class_name}"
show_color = box_color if best_pred == 0 else cheating_box_color
draw.rectangle((det[0], det[1], det[2], det[3]), outline=show_color, width=2)
# 文字
"""
2023/4/19修改
fontText = ImageFont.truetype("resource/font/NotoSansCJKkr-Black.otf",
int(40 * (min(det[2] - det[0], det[3] - det[1])) / 200),
encoding="utf-8")
draw.text((det[0], det[1]), show_text, show_color, font=fontText)
"""
# cv2.putText(frame, show_text,
# (det[0], det[1]),
# cv2.FONT_HERSHEY_COMPLEX,
# float((det[2] - det[0]) / 200),
# show_color)
frame = np.asarray(frame_pil)
# 头部姿态估计轴
"""
2023/4/19修改
for (r, t) in data.head_pose:
data.draw_axis(frame, r, t)
"""
# 绘制fps
cv2.putText(frame, "FPS: %.2f" % fps, (0, 52), cv2.FONT_HERSHEY_COMPLEX, 0.5, (0, 0, 255))
data.frame_anno = frame # 保存绘制过的图像


class DataDealerModule(BaseModule):
def __init__(self, push_frame_func, interval=0.06, skippable=False):
super(DataDealerModule, self).__init__(skippable=skippable)
self.last_time = time.time()
self.push_frame_func = push_frame_func
self.last_data = None
self.interval = interval
self.size_waiting = True

#
self.queue_threshold = 10

@abstractmethod
def deal_skipped_data(self, data: DictData, last_data: DictData) -> DictData:
pass

@abstractmethod
def draw_frame(self, data, fps):
pass

def process_data(self, data):
if hasattr(data, 'skipped') and self.last_data is not None:
data = self.deal_skipped_data(data, copy.copy(self.last_data))
else:
self.last_data = data
current_time = time.time()
interval = (current_time - self.last_time)
fps = 1 / interval
data.fps = fps
self.draw_frame(data, fps=fps)
data.interval = interval
self.last_time = current_time # 更新时间
self.push_frame_func(data)
if hasattr(data, 'source_fps'):
time.sleep(1 / data.source_fps * (1 + self.self_balance_factor()))
else:
time.sleep(self.interval)
return TASK_DATA_OK

def self_balance_factor(self):
factor = max(-0.999, (self.queue.qsize() / 20 - 0.5) / -0.5)
# print(factor)
return factor

def product_task_data(self):
# print(self.queue.qsize(), self.size_waiting)
if self.queue.qsize() == 0:
self.size_waiting = True
if self.queue.qsize() > self.queue_threshold or not self.size_waiting:
self.size_waiting = False
try:
task_data = self.queue.get(block=True, timeout=1)
return task_data
except Empty:
return self.ignore_task_data
else:
time.sleep(1)
return self.ignore_task_data

def put_task_data(self, task_data):
self.queue.put(task_data)

def open(self):
super(DataDealerModule, self).open()
pass


class CheatingDetectionVisModule(DataDealerModule):

def __init__(self, push_frame_func, interval=0.06, skippable=False):
super(CheatingDetectionVisModule, self).__init__(push_frame_func, interval, skippable)

def deal_skipped_data(self, data: DictData, last_data: DictData) -> DictData:
frame = data.frame
data = last_data
data.skipped = None
data.frame = frame
data.detections = data.detections.clone()
# 添加抖动
data.detections[:, :4] += torch.rand_like(data.detections[:, :4]) * 3
return data

def draw_frame(self, data, fps):
draw_frame(data, fps=fps)


class ClassConcentrationVisModule(DataDealerModule):

def __init__(self, push_frame_func, interval=0.06, skippable=False):
super(ClassConcentrationVisModule, self).__init__(push_frame_func, interval, skippable)

def deal_skipped_data(self, data: DictData, last_data: DictData) -> DictData:
frame = data.frame
data = last_data
data.skipped = None
data.frame = frame
data.detections = data.detections.clone()
# 添加抖动
data.detections[:, :4] += torch.rand_like(data.detections[:, :4]) * 3
return data

def draw_frame(self, data, fps):
def opt_draw_frame(show_box=True, self_weights=None,
draw_keypoints=draw_keypoints_default, show_fps=True,
data=data,
self=self):
frame = data.frame.copy()
pred = data.detections
preds_kps = data.keypoints
preds_scores = data.keypoints_scores
if show_box and pred.shape[0] > 0:
# 绘制骨骼关键点
if draw_keypoints and preds_kps is not None:
draw_keypoints136(frame, preds_kps, preds_scores)
ce: ConcentrationEvaluation = data.concentration_evaluation
# 绘制目标检测框和动作分类
frame_pil = Image.fromarray(frame)
draw = ImageDraw.Draw(frame_pil)
#
primary_levels = ce.primary_levels
if self_weights is not None:
primary_levels = ce.secondary_levels @ ConcentrationEvaluator.softmax(np.array(self_weights))
for det, primary_level, secondary_level in zip(pred,
primary_levels,
ce.secondary_levels):
det = det.to(torch.int)
action_color_channel = int(secondary_level[0] * 44)
face_color_channel = int(secondary_level[1] * 44)
head_pose_color_channel = int(secondary_level[2] * 44)
draw.rectangle((det[0], det[1], det[2], det[3]),
outline=(action_color_channel,
face_color_channel,
head_pose_color_channel),
width=2)
h = int((det[3] - det[1]) * 0.2)
w = int((det[2] - det[0]) / 3)
draw.rectangle((det[0], det[1] - h, det[0] + w, det[1]),
fill=(action_color_channel, 0, 0),
width=2)

draw.rectangle((det[0] + w, det[1] - h, det[0] + 2 * w, det[1]),
fill=(0, face_color_channel, 0),
width=2)

draw.rectangle((det[0] + 2 * w, det[1] - h, det[0] + 3 * w, det[1]),
fill=(0, 0, head_pose_color_channel),
width=2)

# 文字
fontText = ImageFont.truetype("resource/font/NotoSansCJKkr-Black.otf",
int(40 * (min(det[2] - det[0], det[3] - det[1])) / 200),
encoding="utf-8")

show_text = f'{primary_level:8.2f}'
f_w, f_h = fontText.getsize(show_text)
draw.text(((det[2] + det[0] - f_w) // 2, det[1] - f_h),
show_text,
(255, 255, 255),
fontText)

frame = np.asarray(frame_pil)
# 头部姿态估计轴
"""
2023/4/19修改
for (r, t) in data.head_pose:
data.draw_axis(frame, r, t)
"""

# 绘制fps
if show_fps:
cv2.putText(frame,
"FPS: %.2f" % data.fps,
(0, 52),
cv2.FONT_HERSHEY_COMPLEX,
0.5,
(0, 0, 255))
return frame # 保存绘制过的图像

data.get_draw_frame = lambda show_box=True, self_weights=None: opt_draw_frame(show_box=show_box,
self_weights=self_weights)   # -----------------------------------------------------
# Copyright (c) Shanghai Jiao Tong University. All rights reserved.
# Written by Jiefeng Li ([email protected])
# -----------------------------------------------------

"""Pose related transforrmation functions."""

import random

import cv2
import numpy as np
import torch
from torch.nn import functional as F


def rnd(x):
return max(-2 * x, min(2 * x, np.random.randn(1)[0] * x))


def box_transform(bbox, sf, imgwidth, imght, train):
"""Random scaling."""
width = bbox[2] - bbox[0]
ht = bbox[3] - bbox[1]
if train:
scaleRate = 0.25 * np.clip(np.random.randn() * sf, - sf, sf)

bbox[0] = max(0, bbox[0] - width * scaleRate / 2)
bbox[1] = max(0, bbox[1] - ht * scaleRate / 2)
bbox[2] = min(imgwidth, bbox[2] + width * scaleRate / 2)
bbox[3] = min(imght, bbox[3] + ht * scaleRate / 2)
else:
scaleRate = 0.25

bbox[0] = max(0, bbox[0] - width * scaleRate / 2)
bbox[1] = max(0, bbox[1] - ht * scaleRate / 2)
bbox[2] = min(imgwidth, max(bbox[2] + width * scaleRate / 2, bbox[0] + 5))
bbox[3] = min(imght, max(bbox[3] + ht * scaleRate / 2, bbox[1] + 5))

return bbox


def addDPG(bbox, imgwidth, imght):
"""Add dpg for data augmentation, including random crop and random sample."""
PatchScale = random.uniform(0, 1)
width = bbox[2] - bbox[0]
ht = bbox[3] - bbox[1]

if PatchScale > 0.85:
ratio = ht / width
if (width < ht):
patchWidth = PatchScale * width
patchHt = patchWidth * ratio
else:
patchHt = PatchScale * ht
patchWidth = patchHt / ratio

xmin = bbox[0] + random.uniform(0, 1) * (width - patchWidth)
ymin = bbox[1] + random.uniform(0, 1) * (ht - patchHt)
xmax = xmin + patchWidth + 1
ymax = ymin + patchHt + 1
else:
xmin = max(1, min(bbox[0] + np.random.normal(-0.0142, 0.1158) * width, imgwidth - 3))
ymin = max(1, min(bbox[1] + np.random.normal(0.0043, 0.068) * ht, imght - 3))
xmax = min(max(xmin + 2, bbox[2] + np.random.normal(0.0154, 0.1337) * width), imgwidth - 3)
ymax = min(max(ymin + 2, bbox[3] + np.random.normal(-0.0013, 0.0711) * ht), imght - 3)

bbox[0] = xmin
bbox[1] = ymin
bbox[2] = xmax
bbox[3] = ymax

return bbox


def im_to_torch(img):
"""Transform ndarray image to torch tensor.
Parameters
----------
img: numpy.ndarray
An ndarray with shape: `(H, W, 3)`.
Returns
-------
torch.Tensor
A tensor with shape: `(3, H, W)`.
"""
img = np.transpose(img, (2, 0, 1)) # C*H*W
img = to_torch(img).float()
if img.max() > 1:
img /= 255
return img


def torch_to_im(img):
"""Transform torch tensor to ndarray image.
Parameters
----------
img: torch.Tensor
A tensor with shape: `(3, H, W)`.
Returns
-------
numpy.ndarray
An ndarray with shape: `(H, W, 3)`.
"""
img = to_numpy(img)
img = np.transpose(img, (1, 2, 0)) # C*H*W
return img


def load_image(img_path):
# H x W x C => C x H x W
return im_to_torch(cv2.cvtColor(cv2.imread(img_path), cv2.COLOR_BGR2RGB))#scipy.misc.imread(img_path, mode='RGB'))


def to_numpy(tensor):
# torch.Tensor => numpy.ndarray
if torch.is_tensor(tensor):
return tensor.cpu().numpy()
elif type(tensor).__module__ != 'numpy':
raise ValueError("Cannot convert {} to numpy array"
.format(type(tensor)))
return tensor


def to_torch(ndarray):
# numpy.ndarray => torch.Tensor
if type(ndarray).__module__ == 'numpy':
return torch.from_numpy(ndarray)
elif not torch.is_tensor(ndarray):
raise ValueError("Cannot convert {} to torch tensor"
.format(type(ndarray)))
return ndarray


def cv_cropBox(img, bbox, input_size):
"""Crop bbox from image by Affinetransform.
Parameters
----------
img: torch.Tensor
A tensor with shape: `(3, H, W)`.
bbox: list or tuple
[xmin, ymin, xmax, ymax].
input_size: tuple
Resulting image size, as (height, width).
Returns
-------
torch.Tensor
A tensor with shape: `(3, height, width)`.
"""
xmin, ymin, xmax, ymax = bbox
xmax -= 1
ymax -= 1
resH, resW = input_size

lenH = max((ymax - ymin), (xmax - xmin) * resH / resW)
lenW = lenH * resW / resH
if img.dim() == 2:
img = img[np.newaxis, :, :]

box_shape = [ymax - ymin, xmax - xmin]
pad_size = [(lenH - box_shape[0]) // 2, (lenW - box_shape[1]) // 2]
# Padding Zeros
img[:, :ymin, :], img[:, :, :xmin] = 0, 0
img[:, ymax + 1:, :], img[:, :, xmax + 1:] = 0, 0

src = np.zeros((3, 2), dtype=np.float32)
dst = np.zeros((3, 2), dtype=np.float32)

src[0, :] = np.array([xmin - pad_size[1], ymin - pad_size[0]], np.float32)
src[1, :] = np.array([xmax + pad_size[1], ymax + pad_size[0]], np.float32)
dst[0, :] = 0
dst[1, :] = np.array([resW - 1, resH - 1], np.float32)

src[2:, :] = get_3rd_point(src[0, :], src[1, :])
dst[2:, :] = get_3rd_point(dst[0, :], dst[1, :])

trans = cv2.getAffineTransform(np.float32(src), np.float32(dst))
dst_img = cv2.warpAffine(torch_to_im(img), trans,
(resW, resH), flags=cv2.INTER_LINEAR)
if dst_img.ndim == 2:
dst_img = dst_img[:, :, np.newaxis]

return im_to_torch(torch.Tensor(dst_img))


def cv_cropBox_rot(img, bbox, input_size, rot):
"""Crop bbox from image by Affinetransform.
Parameters
----------
img: torch.Tensor
A tensor with shape: `(3, H, W)`.
bbox: list or tuple
[xmin, ymin, xmax, ymax].
input_size: tuple
Resulting image size, as (height, width).
Returns
-------
torch.Tensor
A tensor with shape: `(3, height, width)`.
"""
xmin, ymin, xmax, ymax = bbox
xmax -= 1
ymax -= 1
resH, resW = input_size
rot_rad = np.pi * rot / 180

if img.dim() == 2:
img = img[np.newaxis, :, :]

src = np.zeros((3, 2), dtype=np.float32)
dst = np.zeros((3, 2), dtype=np.float32)
center = np.array([(xmax + xmin) / 2, (ymax + ymin) / 2])

src_dir = get_dir([0, (ymax - ymin) * -0.5], rot_rad)
dst_dir = np.array([0, (resH - 1) * -0.5], np.float32)

src = np.zeros((3, 2), dtype=np.float32)
dst = np.zeros((3, 2), dtype=np.float32)

src[0, :] = center
src[1, :] = center + src_dir
dst[0, :] = [(resW - 1) * 0.5, (resH - 1) * 0.5]
dst[1, :] = np.array([(resW - 1) * 0.5, (resH - 1) * 0.5]) + dst_dir

src[2:, :] = get_3rd_point(src[0, :], src[1, :])
dst[2:, :] = get_3rd_point(dst[0, :], dst[1, :])

trans = cv2.getAffineTransform(np.float32(src), np.float32(dst))
dst_img = cv2.warpAffine(torch_to_im(img), trans,
(resW, resH), flags=cv2.INTER_LINEAR)
if dst_img.ndim == 2:
dst_img = dst_img[:, :, np.newaxis]

return im_to_torch(torch.Tensor(dst_img))


def fix_cropBox(img, bbox, input_size):
"""Crop bbox from image by Affinetransform.
Parameters
----------
img: torch.Tensor
A tensor with shape: `(3, H, W)`.
bbox: list or tuple
[xmin, ymin, xmax, ymax].
input_size: tuple
Resulting image size, as (height, width).
Returns
-------
torch.Tensor
A tensor with shape: `(3, height, width)`.
"""
xmin, ymin, xmax, ymax = bbox
input_ratio = input_size[0] / input_size[1]
bbox_ratio = (ymax - ymin) / (xmax - xmin)
if bbox_ratio > input_ratio:
# expand width
cx = (xmax + xmin) / 2
h = ymax - ymin
w = h / input_ratio
xmin = cx - w / 2
xmax = cx + w / 2
elif bbox_ratio < input_ratio:
# expand height
cy = (ymax + ymin) / 2
w = xmax - xmin
h = w * input_ratio
ymin = cy - h / 2
ymax = cy + h / 2
bbox = [int(x) for x in [xmin, ymin, xmax, ymax]]

return cv_cropBox(img, bbox, input_size), bbox


def fix_cropBox_rot(img, bbox, input_size, rot):
"""Crop bbox from image by Affinetransform.
Parameters
----------
img: torch.Tensor
A tensor with shape: `(3, H, W)`.
bbox: list or tuple
[xmin, ymin, xmax, ymax].
input_size: tuple
Resulting image size, as (height, width).
Returns
-------
torch.Tensor
A tensor with shape: `(3, height, width)`.
"""
xmin, ymin, xmax, ymax = bbox
input_ratio = input_size[0] / input_size[1]
bbox_ratio = (ymax - ymin) / (xmax - xmin)
if bbox_ratio > input_ratio:
# expand width
cx = (xmax + xmin) / 2
h = ymax - ymin
w = h / input_ratio
xmin = cx - w / 2
xmax = cx + w / 2
elif bbox_ratio < input_ratio:
# expand height
cy = (ymax + ymin) / 2
w = xmax - xmin
h = w * input_ratio
ymin = cy - h / 2
ymax = cy + h / 2
bbox = [int(x) for x in [xmin, ymin, xmax, ymax]]

return cv_cropBox_rot(img, bbox, input_size, rot), bbox


def get_3rd_point(a, b):
"""Return vector c that perpendicular to (a - b)."""
direct = a - b
return b + np.array([-direct[1], direct[0]], dtype=np.float32)


def get_dir(src_point, rot_rad):
"""Rotate the point by `rot_rad` degree."""
sn, cs = np.sin(rot_rad), np.cos(rot_rad)

src_result = [0, 0]
src_result[0] = src_point[0] * cs - src_point[1] * sn
src_result[1] = src_point[0] * sn + src_point[1] * cs

return src_result


def cv_cropBoxInverse(inp, bbox, img_size, output_size):
"""Paste the cropped bbox to the original image.
Parameters
----------
inp: torch.Tensor
A tensor with shape: `(3, height, width)`.
bbox: list or tuple
[xmin, ymin, xmax, ymax].
img_size: tuple
Original image size, as (img_H, img_W).
output_size: tuple
Cropped input size, as (height, width).
Returns
-------
torch.Tensor
A tensor with shape: `(3, img_H, img_W)`.
"""
xmin, ymin, xmax, ymax = bbox
xmax -= 1
ymax -= 1
resH, resW = output_size
imgH, imgW = img_size

lenH = max((ymax - ymin), (xmax - xmin) * resH / resW)
lenW = lenH * resW / resH
if inp.dim() == 2:
inp = inp[np.newaxis, :, :]

box_shape = [ymax - ymin, xmax - xmin]
pad_size = [(lenH - box_shape[0]) // 2, (lenW - box_shape[1]) // 2]

src = np.zeros((3, 2), dtype=np.float32)
dst = np.zeros((3, 2), dtype=np.float32)

src[0, :] = 0
src[1, :] = np.array([resW - 1, resH - 1], np.float32)
dst[0, :] = np.array([xmin - pad_size[1], ymin - pad_size[0]], np.float32)
dst[1, :] = np.array([xmax + pad_size[1], ymax + pad_size[0]], np.float32)

src[2:, :] = get_3rd_point(src[0, :], src[1, :])
dst[2:, :] = get_3rd_point(dst[0, :], dst[1, :])

trans = cv2.getAffineTransform(np.float32(src), np.float32(dst))
dst_img = cv2.warpAffine(torch_to_im(inp), trans,
(imgW, imgH), flags=cv2.INTER_LINEAR)
if dst_img.ndim == 3 and dst_img.shape[2] == 1:
dst_img = dst_img[:, :, 0]
return dst_img
elif dst_img.ndim == 2:
return dst_img
else:
return im_to_torch(torch.Tensor(dst_img))


def cv_rotate(img, rot, input_size):
"""Rotate image by Affinetransform.
Parameters
----------
img: torch.Tensor
A tensor with shape: `(3, H, W)`.
rot: int
Rotation degree.
input_size: tuple
Resulting image size, as (height, width).
Returns
-------
torch.Tensor
A tensor with shape: `(3, height, width)`.
"""
resH, resW = input_size
center = np.array((resW - 1, resH - 1)) / 2
rot_rad = np.pi * rot / 180

src_dir = get_dir([0, (resH - 1) * -0.5], rot_rad)
dst_dir = np.array([0, (resH - 1) * -0.5], np.float32)

src = np.zeros((3, 2), dtype=np.float32)
dst = np.zeros((3, 2), dtype=np.float32)

src[0, :] = center
src[1, :] = center + src_dir
dst[0, :] = [(resW - 1) * 0.5, (resH - 1) * 0.5]
dst[1, :] = np.array([(resW - 1) * 0.5, (resH - 1) * 0.5]) + dst_dir

src[2:, :] = get_3rd_point(src[0, :], src[1, :])
dst[2:, :] = get_3rd_point(dst[0, :], dst[1, :])

trans = cv2.getAffineTransform(np.float32(src), np.float32(dst))

dst_img = cv2.warpAffine(torch_to_im(img), trans,
(resW, resH), flags=cv2.INTER_LINEAR)
if dst_img.ndim == 2:
dst_img = dst_img[:, :, np.newaxis]

return im_to_torch(torch.Tensor(dst_img))


def count_visible(bbox, joints_3d):
"""Count number of visible joints given bound box."""
vis = np.logical_and.reduce((
joints_3d[:, 0, 0] > 0,
joints_3d[:, 0, 0] > bbox[0],
joints_3d[:, 0, 0] < bbox[2],
joints_3d[:, 1, 0] > 0,
joints_3d[:, 1, 0] > bbox[1],
joints_3d[:, 1, 0] < bbox[3],
joints_3d[:, 0, 1] > 0,
joints_3d[:, 1, 1] > 0
))
return np.sum(vis), vis


def drawGaussian(img, pt, sigma):
"""Draw 2d gaussian on input image.
Parameters
----------
img: torch.Tensor
A tensor with shape: `(3, H, W)`.
pt: list or tuple
A point: (x, y).
sigma: int
Sigma of gaussian distribution.
Returns
-------
torch.Tensor
A tensor with shape: `(3, H, W)`.
"""
img = to_numpy(img)
tmpSize = 3 * sigma
# Check that any part of the gaussian is in-bounds
ul = [int(pt[0] - tmpSize), int(pt[1] - tmpSize)]
br = [int(pt[0] + tmpSize + 1), int(pt[1] + tmpSize + 1)]

if (ul[0] >= img.shape[1] or ul[1] >= img.shape[0] or br[0] < 0 or br[1] < 0):
# If not, just return the image as is
return to_torch(img)

# Generate gaussian
size = 2 * tmpSize + 1
x = np.arange(0, size, 1, float)
y = x[:, np.newaxis]
x0 = y0 = size // 2
# The gaussian is not normalized, we want the center value to equal 1
g = np.exp(- ((x - x0) ** 2 + (y - y0) ** 2) / (2 * sigma ** 2))

# Usable gaussian range
g_x = max(0, -ul[0]), min(br[0], img.shape[1]) - ul[0]
g_y = max(0, -ul[1]), min(br[1], img.shape[0]) - ul[1]
# Image range
img_x = max(0, ul[0]), min(br[0], img.shape[1])
img_y = max(0, ul[1]), min(br[1], img.shape[0])

img[img_y[0]:img_y[1], img_x[0]:img_x[1]] = g[g_y[0]:g_y[1], g_x[0]:g_x[1]]
return to_torch(img)


def flip(x):
assert (x.dim() == 3 or x.dim() == 4)
dim = x.dim() - 1

return x.flip(dims=(dim,))


def flip_heatmap(heatmap, joint_pairs, shift=False):
"""Flip pose heatmap according to joint pairs.
Parameters
----------
heatmap : numpy.ndarray
Heatmap of joints.
joint_pairs : list
List of joint pairs.
shift : bool
Whether to shift the output.
Returns
-------
numpy.ndarray
Flipped heatmap.
"""
assert (heatmap.dim() == 3 or heatmap.dim() == 4)
out = flip(heatmap)

for pair in joint_pairs:
dim0, dim1 = pair
idx = torch.Tensor((dim0, dim1)).long()
inv_idx = torch.Tensor((dim1, dim0)).long()
if out.dim() == 4:
out[:, idx] = out[:, inv_idx]
else:
out[idx] = out[inv_idx]

if shift:
if out.dim() == 3:
out[:, :, 1:] = out[:, :, 0:-1]
else:
out[:, :, :, 1:] = out[:, :, :, 0:-1]
return out


def flip_joints_3d(joints_3d, width, joint_pairs):
"""Flip 3d joints.
Parameters
----------
joints_3d : numpy.ndarray
Joints in shape (num_joints, 3, 2)
width : int
Image width.
joint_pairs : list
List of joint pairs.
Returns
-------
numpy.ndarray
Flipped 3d joints with shape (num_joints, 3, 2)
"""
joints = joints_3d.copy()
# flip horizontally
joints[:, 0, 0] = width - joints[:, 0, 0] - 1
# change left-right parts
for pair in joint_pairs:
joints[pair[0], :, 0], joints[pair[1], :, 0] = \
joints[pair[1], :, 0], joints[pair[0], :, 0].copy()
joints[pair[0], :, 1], joints[pair[1], :, 1] = \
joints[pair[1], :, 1], joints[pair[0], :, 1].copy()

joints[:, :, 0] *= joints[:, :, 1]
return joints


def heatmap_to_coord_simple(hms, bbox, hms_flip=None, **kwargs):
if hms_flip is not None:
hms = (hms + hms_flip) / 2
if not isinstance(hms,np.ndarray):
hms = hms.cpu().data.numpy()
coords, maxvals = get_max_pred(hms)

hm_h = hms.shape[1]
hm_w = hms.shape[2]

# post-processing
for p in range(coords.shape[0]):
hm = hms[p]
px = int(round(float(coords[p][0])))
py = int(round(float(coords[p][1])))
if 1 < px < hm_w - 1 and 1 < py < hm_h - 1:
diff = np.array((hm[py][px + 1] - hm[py][px - 1],
hm[py + 1][px] - hm[py - 1][px]))
coords[p] += np.sign(diff) * .25

preds = np.zeros_like(coords)

# transform bbox to scale
xmin, ymin, xmax, ymax = bbox
w = xmax - xmin
h = ymax - ymin
center = np.array([xmin + w * 0.5, ymin + h * 0.5])
scale = np.array([w, h])
# Transform back
for i in range(coords.shape[0]):
preds[i] = transform_preds(coords[i], center, scale,
[hm_w, hm_h])

return preds, maxvals


def heatmap_to_coord_simple_regress(preds, bbox, hm_shape, norm_type, hms_flip=None):
def integral_op(hm_1d):
if hm_1d.device.index is not None:
hm_1d = hm_1d * torch.cuda.comm.broadcast(torch.arange(hm_1d.shape[-1]).type(
torch.cuda.FloatTensor), devices=[hm_1d.device.index])[0]
else:
hm_1d = hm_1d * torch.arange(hm_1d.shape[-1]).type(torch.FloatTensor)
return hm_1d

if preds.dim() == 3:
preds = preds.unsqueeze(0)
hm_height, hm_width = hm_shape
num_joints = preds.shape[1]

pred_jts, pred_scores = _integral_tensor(preds, num_joints, False, hm_width, hm_height, 1, integral_op, norm_type)
pred_jts = pred_jts.reshape(pred_jts.shape[0], num_joints, 2)

if hms_flip is not None:
if hms_flip.dim() == 3:
hms_flip = hms_flip.unsqueeze(0)
pred_jts_flip, pred_scores_flip = _integral_tensor(hms_flip, num_joints, False, hm_width, hm_height, 1, integral_op, norm_type)
pred_jts_flip = pred_jts_flip.reshape(pred_jts_flip.shape[0], num_joints, 2)

pred_jts = (pred_jts + pred_jts_flip) / 2
pred_scores = (pred_scores + pred_scores_flip) / 2

ndims = pred_jts.dim()
assert ndims in [2, 3], "Dimensions of input heatmap should be 3 or 4"
if ndims == 2:
pred_jts = pred_jts.unsqueeze(0)
pred_scores = pred_scores.unsqueeze(0)

coords = pred_jts.cpu().numpy()
coords = coords.astype(np.float32)
pred_scores = pred_scores.cpu().numpy()
pred_scores = pred_scores.astype(np.float32)

coords[:, :, 0] = (coords[:, :, 0] + 0.5) * hm_width
coords[:, :, 1] = (coords[:, :, 1] + 0.5) * hm_height

preds = np.zeros_like(coords)
# transform bbox to scale
xmin, ymin, xmax, ymax = bbox
w = xmax - xmin
h = ymax - ymin
center = np.array([xmin + w * 0.5, ymin + h * 0.5])
scale = np.array([w, h])
# Transform back
for i in range(coords.shape[0]):
for j in range(coords.shape[1]):
preds[i, j, 0:2] = transform_preds(coords[i, j, 0:2], center, scale,
[hm_width, hm_height])

if preds.shape[0] == 1:
preds = preds[0]
pred_scores = pred_scores[0]
return preds, pred_scores


def _integral_tensor(preds, num_joints, output_3d, hm_width, hm_height, hm_depth, integral_operation, norm_type='softmax'):
# normalization
preds = preds.reshape((preds.shape[0], num_joints, -1))
preds = norm_heatmap(norm_type, preds)

# get heatmap confidence
if norm_type == 'sigmoid':
maxvals, _ = torch.max(preds, dim=2, keepdim=True)
else:
maxvals = torch.ones(
(*preds.shape[:2], 1), dtype=torch.float, device=preds.device)

# normalized to probability
heatmaps = preds / preds.sum(dim=2, keepdim=True)
heatmaps = heatmaps.reshape(
(heatmaps.shape[0], num_joints, hm_depth, hm_height, hm_width))

# The edge probability
hm_x = heatmaps.sum((2, 3))
hm_y = heatmaps.sum((2, 4))
hm_z = heatmaps.sum((3, 4))

hm_x = integral_operation(hm_x)
hm_y = integral_operation(hm_y)
hm_z = integral_operation(hm_z)

coord_x = hm_x.sum(dim=2, keepdim=True)
coord_y = hm_y.sum(dim=2, keepdim=True)
coord_z = hm_z.sum(dim=2, keepdim=True)

coord_x = coord_x / float(hm_width) - 0.5
coord_y = coord_y / float(hm_height) - 0.5
if output_3d:
coord_z = coord_z / float(hm_depth) - 0.5
pred_jts = torch.cat((coord_x, coord_y, coord_z), dim=2)
pred_jts = pred_jts.reshape((pred_jts.shape[0], num_joints * 3))
else:
pred_jts = torch.cat((coord_x, coord_y), dim=2)
pred_jts = pred_jts.reshape((pred_jts.shape[0], num_joints * 2))
return pred_jts, maxvals.float()


def norm_heatmap(norm_type, heatmap):
# Input tensor shape: [N,C,...]
shape = heatmap.shape
if norm_type == 'softmax':
heatmap = heatmap.reshape(*shape[:2], -1)
# global soft max
heatmap = F.softmax(heatmap, 2)
return heatmap.reshape(*shape)
elif norm_type == 'sigmoid':
return heatmap.sigmoid()
elif norm_type == 'divide_sum':
heatmap = heatmap.reshape(*shape[:2], -1)
heatmap = heatmap / heatmap.sum(dim=2, keepdim=True)
return heatmap.reshape(*shape)
else:
raise NotImplementedError


def transform_preds(coords, center, scale, output_size):
target_coords = np.zeros(coords.shape)
trans = get_affine_transform(center, scale, 0, output_size, inv=1)
target_coords[0:2] = affine_transform(coords[0:2], trans)
return target_coords


def get_max_pred(heatmaps):
num_joints = heatmaps.shape[0]
width = heatmaps.shape[2]
heatmaps_reshaped = heatmaps.reshape((num_joints, -1))
idx = np.argmax(heatmaps_reshaped, 1)
maxvals = np.max(heatmaps_reshaped, 1)

maxvals = maxvals.reshape((num_joints, 1))
idx = idx.reshape((num_joints, 1))

preds = np.tile(idx, (1, 2)).astype(np.float32)

preds[:, 0] = (preds[:, 0]) % width
preds[:, 1] = np.floor((preds[:, 1]) / width)

pred_mask = np.tile(np.greater(maxvals, 0.0), (1, 2))
pred_mask = pred_mask.astype(np.float32)

preds *= pred_mask
return preds, maxvals


def get_max_pred_batch(batch_heatmaps):
batch_size = batch_heatmaps.shape[0]
num_joints = batch_heatmaps.shape[1]
width = batch_heatmaps.shape[3]
heatmaps_reshaped = batch_heatmaps.reshape((batch_size, num_joints, -1))
idx = np.argmax(heatmaps_reshaped, 2)
maxvals = np.max(heatmaps_reshaped, 2)

maxvals = maxvals.reshape((batch_size, num_joints, 1))
idx = idx.reshape((batch_size, num_joints, 1))

preds = np.tile(idx, (1, 1, 2)).astype(np.float32)

preds[:, :, 0] = (preds[:, :, 0]) % width
preds[:, :, 1] = np.floor((preds[:, :, 1]) / width)

pred_mask = np.tile(np.greater(maxvals, 0.0), (1, 1, 2))
pred_mask = pred_mask.astype(np.float32)

preds *= pred_mask
return preds, maxvals


def get_affine_transform(center,
scale,
rot,
output_size,
shift=np.array([0, 0], dtype=np.float32),
inv=0):
if not isinstance(scale, np.ndarray) and not isinstance(scale, list):
scale = np.array([scale, scale])

scale_tmp = scale
src_w = scale_tmp[0]
dst_w = output_size[0]
dst_h = output_size[1]

rot_rad = np.pi * rot / 180
src_dir = get_dir([0, src_w * -0.5], rot_rad)
dst_dir = np.array([0, dst_w * -0.5], np.float32)

src = np.zeros((3, 2), dtype=np.float32)
dst = np.zeros((3, 2), dtype=np.float32)
src[0, :] = center + scale_tmp * shift
src[1, :] = center + src_dir + scale_tmp * shift
dst[0, :] = [dst_w * 0.5, dst_h * 0.5]
dst[1, :] = np.array([dst_w * 0.5, dst_h * 0.5]) + dst_dir

src[2:, :] = get_3rd_point(src[0, :], src[1, :])
dst[2:, :] = get_3rd_point(dst[0, :], dst[1, :])

if inv:
trans = cv2.getAffineTransform(np.float32(dst), np.float32(src))
else:
trans = cv2.getAffineTransform(np.float32(src), np.float32(dst))

return trans


def affine_transform(pt, t):
new_pt = np.array([pt[0], pt[1], 1.]).T
new_pt = np.dot(t, new_pt)
return new_pt[:2]


def get_func_heatmap_to_coord(cfg):
if cfg.DATA_PRESET.TYPE == 'simple':
if cfg.LOSS.TYPE == 'MSELoss':
return heatmap_to_coord_simple
elif cfg.LOSS.TYPE == 'L1JointRegression':
return heatmap_to_coord_simple_regress
else:
raise NotImplementedError

标签:第二阶段,img,dst,shape,开发,bbox,np,团队,data
From: https://www.cnblogs.com/joranger/p/17402402.html

相关文章

  • 4大特性看Huawei Cloud EulerOS为开发者带来平滑迁移体验
    摘要:本期《解密HuaweiCloudEulerOS算力释放技术》主题直播中,华为云DTSE技术布道师陆维迪通过剖析传统OS上云面临的性能,安全,弹性等问题,与开发者们分享HuaweiCloudEulerOS(简称“HCEOS”)在提升客户云上使用体验的核心优势和关键技术。本文分享自华为云社区《4大特性看Huawei......
  • VSCode上的代码变量命名工具插件,让你的开发效率倍增!
    本篇文章主要讲解VSCode上的代码变量命名工具插件chtml代码命名工具的使用。日期:2023年5月15日vscode版本1.78及以上转载地址:https://blog.csdn.net/weixin_46078894,已获作者同意!插件说明CHTML是一款在线的代码命名工具,提供变量命名规则库,可以帮助开发者快速选择合适的变......
  • yhm138收集的音视频编解码开发工具
    这里着重关注工具的收集,而不是使用的技巧,技术的原理目录评测软件评测https://codecwar.com/视频编解码器质量评估和相对性能比较的在线服务软件eseye_u.exeYUVViewer.exe ElecardStreamAnalyzer:这是一款用于分析视频和音频流的专业工具。它可以用来分析MPEG-2,MP......
  • 设计软件的二次开发总结(表格)
    设计软件的二次开发总结二次开发软件语言框架UG(UGNX的简称)CC++C#JavaUFUN,支持C、C++语言;NXOpen,支持C++、Java、C#;OpenGrip类似于AutoCAD的Lisp;OpenC,UG的一个C语言函数库;CATIAC++VBAC#CAA支持 C++Java;(支持COM、OLE);CATIA支持 VBA、......
  • 直播软件开发,Android实现根据进程名杀死特定进程
    直播软件开发,Android实现根据进程名杀死特定进程首先你实现功能的进程必须是系统进程,在AndroidManifest里面加入如下标签。 android:sharedUserId="android.uid.system"​调用ActivityManager的forceStopPackageAsUser方法 ActivityManagermActivityManager=(Activity......
  • Git 代码分支管理 | 京东云技术团队
    作者:京东科技周新智一、引言近日,IoT研发团队加入了不少新同学,对git分支的命名和管理方式有些许的模糊,分支的命名规范以及管理方式对项目的版本发布至关重要,为了解决实际开发过程中版本发布时代码管理混乱、冲突等比较头疼的问题,我们将在文中阐述如何更好的管理代码分支。二、总......
  • 楠姐技术漫话:图计算的那些事 | 京东云技术团队
    不知道大家在平时的工作中有没有听说过“图计算”这个名词但大家一定在各工作汇报,技术分享中听说过“智能化”,“人工智能”这样的字眼而我们今天要唠的这个图计算就是人工智能领域内近几年炙手可热的前沿宠儿也是我们风控反欺诈中常用的“大杀器”在了解图计算之前首先得了解什么是......
  • 用Python开发输入法后台
    首先,安装PIME,github地址:https://github.com/EasyIME/PIME下载安装PIME-1.3.0-stable-setup.exe,就可以得到一个输入法,后端是python,在安装目录下‘C:\ProgramFiles(x86)\PIME\’,python目录就是运行脚本。入口点是'server.py',其中安装目录下的‘PIMELauncher.exe’是负责前后台......
  • 【开发者指南】如何在MyEclipse中编辑HTML或JSP文件?(一)
    MyEclipsev2022.1.0正式版下载MyEclipse技术交流群:742336981欢迎一起进群讨论如果您有HTML或JSP文件要编辑,这里将介绍如何编辑。查找以下信息:编辑源代码大纲和属性视图参数页面该功能在MyEclipse中是可用的。一、HTML/JSP编辑器要编辑HTML或JSP文件,请执行以下操作当......
  • Linux开发
    shell命令概述Shell作用:命令解释器介于操作系统内核与用户之间,负责解释命令行获得命令帮助内部命令help命令的“--help”选项使用man命令阅读手册页命令行编辑的几个辅助操作Tab键:自动补齐反斜杠“\”:强制换行快捷键Ctrl+U:清空至行首快捷键Ctrl+K:清空至行尾快捷键Ctr......