|
|
|
|
|
import abc
|
|
|
import argparse
|
|
|
import os.path as osp
|
|
|
from collections import defaultdict
|
|
|
from tempfile import TemporaryDirectory
|
|
|
|
|
|
import mmengine
|
|
|
import numpy as np
|
|
|
|
|
|
from mmaction.apis import detection_inference, pose_inference
|
|
|
from mmaction.utils import frame_extract
|
|
|
|
|
|
args = abc.abstractproperty()
|
|
|
args.det_config = 'demo/demo_configs/faster-rcnn_r50-caffe_fpn_ms-1x_coco-person.py'
|
|
|
args.det_checkpoint = 'https://download.openmmlab.com/mmdetection/v2.0/faster_rcnn/faster_rcnn_r50_fpn_1x_coco-person/faster_rcnn_r50_fpn_1x_coco-person_20201216_175929-d022e227.pth'
|
|
|
args.det_score_thr = 0.5
|
|
|
args.pose_config = 'demo/demo_configs/td-hm_hrnet-w32_8xb64-210e_coco-256x192_infer.py'
|
|
|
args.pose_checkpoint = 'https://download.openmmlab.com/mmpose/top_down/hrnet/hrnet_w32_coco_256x192-c78dce93_20200708.pth'
|
|
|
|
|
|
|
|
|
def intersection(b0, b1):
|
|
|
l, r = max(b0[0], b1[0]), min(b0[2], b1[2])
|
|
|
u, d = max(b0[1], b1[1]), min(b0[3], b1[3])
|
|
|
return max(0, r - l) * max(0, d - u)
|
|
|
|
|
|
|
|
|
def iou(b0, b1):
|
|
|
i = intersection(b0, b1)
|
|
|
u = area(b0) + area(b1) - i
|
|
|
return i / u
|
|
|
|
|
|
|
|
|
def area(b):
|
|
|
return (b[2] - b[0]) * (b[3] - b[1])
|
|
|
|
|
|
|
|
|
def removedup(bbox):
|
|
|
|
|
|
def inside(box0, box1, threshold=0.8):
|
|
|
return intersection(box0, box1) / area(box0) > threshold
|
|
|
|
|
|
num_bboxes = bbox.shape[0]
|
|
|
if num_bboxes == 1 or num_bboxes == 0:
|
|
|
return bbox
|
|
|
valid = []
|
|
|
for i in range(num_bboxes):
|
|
|
flag = True
|
|
|
for j in range(num_bboxes):
|
|
|
if i != j and inside(bbox[i],
|
|
|
bbox[j]) and bbox[i][4] <= bbox[j][4]:
|
|
|
flag = False
|
|
|
break
|
|
|
if flag:
|
|
|
valid.append(i)
|
|
|
return bbox[valid]
|
|
|
|
|
|
|
|
|
def is_easy_example(det_results, num_person):
|
|
|
threshold = 0.95
|
|
|
|
|
|
def thre_bbox(bboxes, threshold=threshold):
|
|
|
shape = [sum(bbox[:, -1] > threshold) for bbox in bboxes]
|
|
|
ret = np.all(np.array(shape) == shape[0])
|
|
|
return shape[0] if ret else -1
|
|
|
|
|
|
if thre_bbox(det_results) == num_person:
|
|
|
det_results = [x[x[..., -1] > 0.95] for x in det_results]
|
|
|
return True, np.stack(det_results)
|
|
|
return False, thre_bbox(det_results)
|
|
|
|
|
|
|
|
|
def bbox2tracklet(bbox):
|
|
|
iou_thre = 0.6
|
|
|
tracklet_id = -1
|
|
|
tracklet_st_frame = {}
|
|
|
tracklets = defaultdict(list)
|
|
|
for t, box in enumerate(bbox):
|
|
|
for idx in range(box.shape[0]):
|
|
|
matched = False
|
|
|
for tlet_id in range(tracklet_id, -1, -1):
|
|
|
cond1 = iou(tracklets[tlet_id][-1][-1], box[idx]) >= iou_thre
|
|
|
cond2 = (
|
|
|
t - tracklet_st_frame[tlet_id] - len(tracklets[tlet_id]) <
|
|
|
10)
|
|
|
cond3 = tracklets[tlet_id][-1][0] != t
|
|
|
if cond1 and cond2 and cond3:
|
|
|
matched = True
|
|
|
tracklets[tlet_id].append((t, box[idx]))
|
|
|
break
|
|
|
if not matched:
|
|
|
tracklet_id += 1
|
|
|
tracklet_st_frame[tracklet_id] = t
|
|
|
tracklets[tracklet_id].append((t, box[idx]))
|
|
|
return tracklets
|
|
|
|
|
|
|
|
|
def drop_tracklet(tracklet):
|
|
|
tracklet = {k: v for k, v in tracklet.items() if len(v) > 5}
|
|
|
|
|
|
def meanarea(track):
|
|
|
boxes = np.stack([x[1] for x in track]).astype(np.float32)
|
|
|
areas = (boxes[..., 2] - boxes[..., 0]) * (
|
|
|
boxes[..., 3] - boxes[..., 1])
|
|
|
return np.mean(areas)
|
|
|
|
|
|
tracklet = {k: v for k, v in tracklet.items() if meanarea(v) > 5000}
|
|
|
return tracklet
|
|
|
|
|
|
|
|
|
def distance_tracklet(tracklet):
|
|
|
dists = {}
|
|
|
for k, v in tracklet.items():
|
|
|
bboxes = np.stack([x[1] for x in v])
|
|
|
c_x = (bboxes[..., 2] + bboxes[..., 0]) / 2.
|
|
|
c_y = (bboxes[..., 3] + bboxes[..., 1]) / 2.
|
|
|
c_x -= 480
|
|
|
c_y -= 270
|
|
|
c = np.concatenate([c_x[..., None], c_y[..., None]], axis=1)
|
|
|
dist = np.linalg.norm(c, axis=1)
|
|
|
dists[k] = np.mean(dist)
|
|
|
return dists
|
|
|
|
|
|
|
|
|
def tracklet2bbox(track, num_frame):
|
|
|
|
|
|
bbox = np.zeros((num_frame, 5))
|
|
|
trackd = {}
|
|
|
for k, v in track:
|
|
|
bbox[k] = v
|
|
|
trackd[k] = v
|
|
|
for i in range(num_frame):
|
|
|
if bbox[i][-1] <= 0.5:
|
|
|
mind = np.Inf
|
|
|
for k in trackd:
|
|
|
if np.abs(k - i) < mind:
|
|
|
mind = np.abs(k - i)
|
|
|
bbox[i] = bbox[k]
|
|
|
return bbox
|
|
|
|
|
|
|
|
|
def tracklets2bbox(tracklet, num_frame):
|
|
|
dists = distance_tracklet(tracklet)
|
|
|
sorted_inds = sorted(dists, key=lambda x: dists[x])
|
|
|
dist_thre = np.Inf
|
|
|
for i in sorted_inds:
|
|
|
if len(tracklet[i]) >= num_frame / 2:
|
|
|
dist_thre = 2 * dists[i]
|
|
|
break
|
|
|
|
|
|
dist_thre = max(50, dist_thre)
|
|
|
|
|
|
bbox = np.zeros((num_frame, 5))
|
|
|
bboxd = {}
|
|
|
for idx in sorted_inds:
|
|
|
if dists[idx] < dist_thre:
|
|
|
for k, v in tracklet[idx]:
|
|
|
if bbox[k][-1] < 0.01:
|
|
|
bbox[k] = v
|
|
|
bboxd[k] = v
|
|
|
bad = 0
|
|
|
for idx in range(num_frame):
|
|
|
if bbox[idx][-1] < 0.01:
|
|
|
bad += 1
|
|
|
mind = np.Inf
|
|
|
mink = None
|
|
|
for k in bboxd:
|
|
|
if np.abs(k - idx) < mind:
|
|
|
mind = np.abs(k - idx)
|
|
|
mink = k
|
|
|
bbox[idx] = bboxd[mink]
|
|
|
return bad, bbox[:, None, :]
|
|
|
|
|
|
|
|
|
def bboxes2bbox(bbox, num_frame):
|
|
|
ret = np.zeros((num_frame, 2, 5))
|
|
|
for t, item in enumerate(bbox):
|
|
|
if item.shape[0] <= 2:
|
|
|
ret[t, :item.shape[0]] = item
|
|
|
else:
|
|
|
inds = sorted(
|
|
|
list(range(item.shape[0])), key=lambda x: -item[x, -1])
|
|
|
ret[t] = item[inds[:2]]
|
|
|
for t in range(num_frame):
|
|
|
if ret[t, 0, -1] <= 0.01:
|
|
|
ret[t] = ret[t - 1]
|
|
|
elif ret[t, 1, -1] <= 0.01:
|
|
|
if t:
|
|
|
if ret[t - 1, 0, -1] > 0.01 and ret[t - 1, 1, -1] > 0.01:
|
|
|
if iou(ret[t, 0], ret[t - 1, 0]) > iou(
|
|
|
ret[t, 0], ret[t - 1, 1]):
|
|
|
ret[t, 1] = ret[t - 1, 1]
|
|
|
else:
|
|
|
ret[t, 1] = ret[t - 1, 0]
|
|
|
return ret
|
|
|
|
|
|
|
|
|
def ntu_det_postproc(vid, det_results):
|
|
|
det_results = [removedup(x) for x in det_results]
|
|
|
label = int(vid.split('/')[-1].split('A')[1][:3])
|
|
|
mpaction = list(range(50, 61)) + list(range(106, 121))
|
|
|
n_person = 2 if label in mpaction else 1
|
|
|
is_easy, bboxes = is_easy_example(det_results, n_person)
|
|
|
if is_easy:
|
|
|
print('\nEasy Example')
|
|
|
return bboxes
|
|
|
|
|
|
tracklets = bbox2tracklet(det_results)
|
|
|
tracklets = drop_tracklet(tracklets)
|
|
|
|
|
|
print(f'\nHard {n_person}-person Example, found {len(tracklets)} tracklet')
|
|
|
if n_person == 1:
|
|
|
if len(tracklets) == 1:
|
|
|
tracklet = list(tracklets.values())[0]
|
|
|
det_results = tracklet2bbox(tracklet, len(det_results))
|
|
|
return np.stack(det_results)
|
|
|
else:
|
|
|
bad, det_results = tracklets2bbox(tracklets, len(det_results))
|
|
|
return det_results
|
|
|
|
|
|
if len(tracklets) <= 2:
|
|
|
tracklets = list(tracklets.values())
|
|
|
bboxes = []
|
|
|
for tracklet in tracklets:
|
|
|
bboxes.append(tracklet2bbox(tracklet, len(det_results))[:, None])
|
|
|
bbox = np.concatenate(bboxes, axis=1)
|
|
|
return bbox
|
|
|
else:
|
|
|
return bboxes2bbox(det_results, len(det_results))
|
|
|
|
|
|
|
|
|
def pose_inference_with_align(args, frame_paths, det_results):
|
|
|
|
|
|
det_results = [
|
|
|
frm_dets for frm_dets in det_results if frm_dets.shape[0] > 0
|
|
|
]
|
|
|
|
|
|
pose_results, _ = pose_inference(args.pose_config, args.pose_checkpoint,
|
|
|
frame_paths, det_results, args.device)
|
|
|
|
|
|
num_persons = max([pose['keypoints'].shape[0] for pose in pose_results])
|
|
|
num_points = pose_results[0]['keypoints'].shape[1]
|
|
|
num_frames = len(pose_results)
|
|
|
keypoints = np.zeros((num_persons, num_frames, num_points, 2),
|
|
|
dtype=np.float32)
|
|
|
scores = np.zeros((num_persons, num_frames, num_points), dtype=np.float32)
|
|
|
|
|
|
for f_idx, frm_pose in enumerate(pose_results):
|
|
|
frm_num_persons = frm_pose['keypoints'].shape[0]
|
|
|
for p_idx in range(frm_num_persons):
|
|
|
keypoints[p_idx, f_idx] = frm_pose['keypoints'][p_idx]
|
|
|
scores[p_idx, f_idx] = frm_pose['keypoint_scores'][p_idx]
|
|
|
|
|
|
return keypoints, scores
|
|
|
|
|
|
|
|
|
def ntu_pose_extraction(vid, skip_postproc=False):
|
|
|
tmp_dir = TemporaryDirectory()
|
|
|
frame_paths, _ = frame_extract(vid, out_dir=tmp_dir.name)
|
|
|
det_results, _ = detection_inference(
|
|
|
args.det_config,
|
|
|
args.det_checkpoint,
|
|
|
frame_paths,
|
|
|
args.det_score_thr,
|
|
|
device=args.device,
|
|
|
with_score=True)
|
|
|
|
|
|
if not skip_postproc:
|
|
|
det_results = ntu_det_postproc(vid, det_results)
|
|
|
|
|
|
anno = dict()
|
|
|
|
|
|
keypoints, scores = pose_inference_with_align(args, frame_paths,
|
|
|
det_results)
|
|
|
anno['keypoint'] = keypoints
|
|
|
anno['keypoint_score'] = scores
|
|
|
anno['frame_dir'] = osp.splitext(osp.basename(vid))[0]
|
|
|
anno['img_shape'] = (1080, 1920)
|
|
|
anno['original_shape'] = (1080, 1920)
|
|
|
anno['total_frames'] = keypoints.shape[1]
|
|
|
anno['label'] = int(osp.basename(vid).split('A')[1][:3]) - 1
|
|
|
tmp_dir.cleanup()
|
|
|
|
|
|
return anno
|
|
|
|
|
|
|
|
|
def parse_args():
|
|
|
parser = argparse.ArgumentParser(
|
|
|
description='Generate Pose Annotation for a single NTURGB-D video')
|
|
|
parser.add_argument('video', type=str, help='source video')
|
|
|
parser.add_argument('output', type=str, help='output pickle name')
|
|
|
parser.add_argument('--device', type=str, default='cuda:0')
|
|
|
parser.add_argument('--skip-postproc', action='store_true')
|
|
|
args = parser.parse_args()
|
|
|
return args
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
global_args = parse_args()
|
|
|
args.device = global_args.device
|
|
|
args.video = global_args.video
|
|
|
args.output = global_args.output
|
|
|
args.skip_postproc = global_args.skip_postproc
|
|
|
anno = ntu_pose_extraction(args.video, args.skip_postproc)
|
|
|
mmengine.dump(anno, args.output)
|
|
|
|