def group_keypoints(all_keypoints_by_type, pafs, pose_entry_size=20, min_paf_score=0.05): pose_entries = [] all_keypoints = np.array([item for sublist in all_keypoints_by_type for item in sublist]) points_per_limb = 10 grid = np.arange(points_per_limb, dtype=np.float32).reshape(1, -1, 1) all_keypoints_by_type = [np.array(keypoints, np.float32) for keypoints in all_keypoints_by_type] for part_id in range(len(BODY_PARTS_PAF_IDS)): part_pafs = pafs[:, :, BODY_PARTS_PAF_IDS[part_id]] kpts_a = all_keypoints_by_type[BODY_PARTS_KPT_IDS[part_id][0]] kpts_b = all_keypoints_by_type[BODY_PARTS_KPT_IDS[part_id][1]] n = len(kpts_a) m = len(kpts_b) if n == 0 or m == 0: continue # Get vectors between all pairs of keypoints, i.e. candidate limb vectors. a = kpts_a[:, :2]#某个关键点的坐标 a = np.broadcast_to(a[None], (m, n, 2))#广播机制用b中的坐标减去a中的所有坐标会生成m*n维度的向量。 b = kpts_b[:, :2]#关键点坐标 vec_raw = (b[:, None, :] - a).reshape(-1, 1, 2) # Sample points along every candidate limb vector. steps = (1 / (points_per_limb - 1) * vec_raw) points = steps * grid + a.reshape(-1, 1, 2) points = points.round().astype(dtype=np.int32) x = points[..., 0].ravel() y = points[..., 1].ravel() # Compute affinity score between candidate limb vectors and part affinity field. field = part_pafs[y, x].reshape(-1, points_per_limb, 2) vec_norm = np.linalg.norm(vec_raw, ord=2, axis=-1, keepdims=True) vec = vec_raw / (vec_norm + 1e-6) affinity_scores = (field * vec).sum(-1).reshape(-1, points_per_limb) valid_affinity_scores = affinity_scores > min_paf_score valid_num = valid_affinity_scores.sum(1) affinity_scores = (affinity_scores * valid_affinity_scores).sum(1) / (valid_num + 1e-6) success_ratio = valid_num / points_per_limb # Get a list of limbs according to the obtained affinity score. valid_limbs = np.where(np.logical_and(affinity_scores > 0, success_ratio > 0.8))[0] if len(valid_limbs) == 0: continue b_idx, a_idx = np.divmod(valid_limbs, n) affinity_scores = affinity_scores[valid_limbs] # Suppress incompatible connections. a_idx, b_idx, affinity_scores = connections_nms(a_idx, b_idx, affinity_scores) connections = list(zip(kpts_a[a_idx, 3].astype(np.int32), kpts_b[b_idx, 3].astype(np.int32), affinity_scores)) if len(connections) == 0: continue if part_id == 0: pose_entries = [np.ones(pose_entry_size) * -1 for _ in range(len(connections))] for i in range(len(connections)): pose_entries[i][BODY_PARTS_KPT_IDS[0][0]] = connections[i][0] pose_entries[i][BODY_PARTS_KPT_IDS[0][1]] = connections[i][1] pose_entries[i][-1] = 2 pose_entries[i][-2] = np.sum(all_keypoints[connections[i][0:2], 2]) + connections[i][2] elif part_id == 17 or part_id == 18: kpt_a_id = BODY_PARTS_KPT_IDS[part_id][0] kpt_b_id = BODY_PARTS_KPT_IDS[part_id][1] for i in range(len(connections)): for j in range(len(pose_entries)): if pose_entries[j][kpt_a_id] == connections[i][0] and pose_entries[j][kpt_b_id] == -1: pose_entries[j][kpt_b_id] = connections[i][1] elif pose_entries[j][kpt_b_id] == connections[i][1] and pose_entries[j][kpt_a_id] == -1: pose_entries[j][kpt_a_id] = connections[i][0] continue else: kpt_a_id = BODY_PARTS_KPT_IDS[part_id][0] kpt_b_id = BODY_PARTS_KPT_IDS[part_id][1] for i in range(len(connections)): num = 0 for j in range(len(pose_entries)): if pose_entries[j][kpt_a_id] == connections[i][0]: pose_entries[j][kpt_b_id] = connections[i][1] num += 1 pose_entries[j][-1] += 1 pose_entries[j][-2] += all_keypoints[connections[i][1], 2] + connections[i][2] if num == 0: pose_entry = np.ones(pose_entry_size) * -1 pose_entry[kpt_a_id] = connections[i][0] pose_entry[kpt_b_id] = connections[i][1] pose_entry[-1] = 2 pose_entry[-2] = np.sum(all_keypoints[connections[i][0:2], 2]) + connections[i][2] pose_entries.append(pose_entry) filtered_entries = [] for i in range(len(pose_entries)): if pose_entries[i][-1] < 3 or (pose_entries[i][-2] / pose_entries[i][-1] < 0.2): continue filtered_entries.append(pose_entries[i]) pose_entries = np.asarray(filtered_entries) return pose_entries, all_keypoints
def group_keypoints(all_keypoints_by_type, pafs, pose_entry_size=20, min_paf_score=0.05): | |
pose_entries = [] | |
all_keypoints = np.array([item for sublist in all_keypoints_by_type for item in sublist]) | |
points_per_limb = 10 | |
grid = np.arange(points_per_limb, dtype=np.float32).reshape(1, -1, 1) | |
all_keypoints_by_type = [np.array(keypoints, np.float32) for keypoints in all_keypoints_by_type] | |
for part_id in range(len(BODY_PARTS_PAF_IDS)): | |
part_pafs = pafs[:, :, BODY_PARTS_PAF_IDS[part_id]] | |
kpts_a = all_keypoints_by_type[BODY_PARTS_KPT_IDS[part_id][0]] | |
kpts_b = all_keypoints_by_type[BODY_PARTS_KPT_IDS[part_id][1]] | |
n = len(kpts_a) | |
m = len(kpts_b) | |
if n == 0 or m == 0: | |
continue | |
# Get vectors between all pairs of keypoints, i.e. candidate limb vectors. | |
a = kpts_a[:, :2] | |
a = np.broadcast_to(a[None], (m, n, 2)) | |
b = kpts_b[:, :2] | |
vec_raw = (b[:, None, :] - a).reshape(-1, 1, 2) | |
# Sample points along every candidate limb vector. | |
steps = (1 / (points_per_limb - 1) * vec_raw) | |
points = steps * grid + a.reshape(-1, 1, 2) | |
points = points.round().astype(dtype=np.int32) | |
x = points[..., 0].ravel() | |
y = points[..., 1].ravel() | |
# Compute affinity score between candidate limb vectors and part affinity field. | |
field = part_pafs[y, x].reshape(-1, points_per_limb, 2) | |
vec_norm = np.linalg.norm(vec_raw, ord=2, axis=-1, keepdims=True) | |
vec = vec_raw / (vec_norm + 1e-6) | |
affinity_scores = (field * vec).sum(-1).reshape(-1, points_per_limb) | |
valid_affinity_scores = affinity_scores > min_paf_score | |
valid_num = valid_affinity_scores.sum(1) | |
affinity_scores = (affinity_scores * valid_affinity_scores).sum(1) / (valid_num + 1e-6) | |
success_ratio = valid_num / points_per_limb | |
# Get a list of limbs according to the obtained affinity score. | |
valid_limbs = np.where(np.logical_and(affinity_scores > 0, success_ratio > 0.8))[0] | |
if len(valid_limbs) == 0: | |
continue | |
b_idx, a_idx = np.divmod(valid_limbs, n) | |
affinity_scores = affinity_scores[valid_limbs] | |
# Suppress incompatible connections. | |
a_idx, b_idx, affinity_scores = connections_nms(a_idx, b_idx, affinity_scores) | |
connections = list(zip(kpts_a[a_idx, 3].astype(np.int32), | |
kpts_b[b_idx, 3].astype(np.int32), | |
affinity_scores)) | |
if len(connections) == 0: | |
continue | |
if part_id == 0: | |
pose_entries = [np.ones(pose_entry_size) * -1 for _ in range(len(connections))] | |
for i in range(len(connections)): | |
pose_entries[i][BODY_PARTS_KPT_IDS[0][0]] = connections[i][0] | |
pose_entries[i][BODY_PARTS_KPT_IDS[0][1]] = connections[i][1] | |
pose_entries[i][-1] = 2 | |
pose_entries[i][-2] = np.sum(all_keypoints[connections[i][0:2], 2]) + connections[i][2] | |
elif part_id == 17 or part_id == 18: | |
kpt_a_id = BODY_PARTS_KPT_IDS[part_id][0] | |
kpt_b_id = BODY_PARTS_KPT_IDS[part_id][1] | |
for i in range(len(connections)): | |
for j in range(len(pose_entries)): | |
if pose_entries[j][kpt_a_id] == connections[i][0] and pose_entries[j][kpt_b_id] == -1: | |
pose_entries[j][kpt_b_id] = connections[i][1] | |
elif pose_entries[j][kpt_b_id] == connections[i][1] and pose_entries[j][kpt_a_id] == -1: | |
pose_entries[j][kpt_a_id] = connections[i][0] | |
continue | |
else: | |
kpt_a_id = BODY_PARTS_KPT_IDS[part_id][0] | |
kpt_b_id = BODY_PARTS_KPT_IDS[part_id][1] | |
for i in range(len(connections)): | |
num = 0 | |
for j in range(len(pose_entries)): | |
if pose_entries[j][kpt_a_id] == connections[i][0]: | |
pose_entries[j][kpt_b_id] = connections[i][1] | |
num += 1 | |
pose_entries[j][-1] += 1 | |
pose_entries[j][-2] += all_keypoints[connections[i][1], 2] + connections[i][2] | |
if num == 0: | |
pose_entry = np.ones(pose_entry_size) * -1 | |
pose_entry[kpt_a_id] = connections[i][0] | |
pose_entry[kpt_b_id] = connections[i][1] | |
pose_entry[-1] = 2 | |
pose_entry[-2] = np.sum(all_keypoints[connections[i][0:2], 2]) + connections[i][2] | |
pose_entries.append(pose_entry) | |
filtered_entries = [] | |
for i in range(len(pose_entries)): | |
if pose_entries[i][-1] < 3 or (pose_entries[i][-2] / pose_entries[i][-1] < 0.2): | |
continue | |
filtered_entries.append(pose_entries[i]) | |
pose_entries = np.asarray(filtered_entries) | |
return pose_entries, all_keypoints |