Spaces:
Running
Running
| # Copyright (c) 2018-present, Facebook, Inc. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| # | |
| import hashlib | |
| import os | |
| import pathlib | |
| import shutil | |
| import sys | |
| import time | |
| import cv2 | |
| import numpy as np | |
| import torch | |
| def add_path(): | |
| Alphapose_path = os.path.abspath('joints_detectors/Alphapose') | |
| hrnet_path = os.path.abspath('joints_detectors/hrnet') | |
| trackers_path = os.path.abspath('pose_trackers') | |
| paths = filter(lambda p: p not in sys.path, [Alphapose_path, hrnet_path, trackers_path]) | |
| sys.path.extend(paths) | |
| def wrap(func, *args, unsqueeze=False): | |
| """ | |
| Wrap a torch function so it can be called with NumPy arrays. | |
| Input and return types are seamlessly converted. | |
| """ | |
| # Convert input types where applicable | |
| args = list(args) | |
| for i, arg in enumerate(args): | |
| if type(arg) == np.ndarray: | |
| args[i] = torch.from_numpy(arg) | |
| if unsqueeze: | |
| args[i] = args[i].unsqueeze(0) | |
| result = func(*args) | |
| # Convert output types where applicable | |
| if isinstance(result, tuple): | |
| result = list(result) | |
| for i, res in enumerate(result): | |
| if type(res) == torch.Tensor: | |
| if unsqueeze: | |
| res = res.squeeze(0) | |
| result[i] = res.numpy() | |
| return tuple(result) | |
| elif type(result) == torch.Tensor: | |
| if unsqueeze: | |
| result = result.squeeze(0) | |
| return result.numpy() | |
| else: | |
| return result | |
| def deterministic_random(min_value, max_value, data): | |
| digest = hashlib.sha256(data.encode()).digest() | |
| raw_value = int.from_bytes(digest[:4], byteorder='little', signed=False) | |
| return int(raw_value / (2 ** 32 - 1) * (max_value - min_value)) + min_value | |
| def alpha_map(prediction): | |
| p_min, p_max = prediction.min(), prediction.max() | |
| k = 1.6 / (p_max - p_min) | |
| b = 0.8 - k * p_max | |
| prediction = k * prediction + b | |
| return prediction | |
| def change_score(prediction, detectron_detection_path): | |
| detectron_predictions = np.load(detectron_detection_path, allow_pickle=True)['positions_2d'].item() | |
| pose = detectron_predictions['S1']['Directions 1'] | |
| prediction[..., 2] = pose[..., 2] | |
| return prediction | |
| class Timer: | |
| def __init__(self, message, show=True): | |
| self.message = message | |
| self.elapsed = 0 | |
| self.show = show | |
| def __enter__(self): | |
| self.start = time.perf_counter() | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| if self.show: | |
| print(f'{self.message} --- elapsed time: {time.perf_counter() - self.start} s') | |
| def calculate_area(data): | |
| """ | |
| Get the rectangle area of keypoints. | |
| :param data: AlphaPose json keypoint format([x, y, score, ... , x, y, score]) or AlphaPose result keypoint format([[x, y], ..., [x, y]]) | |
| :return: area | |
| """ | |
| data = np.array(data) | |
| if len(data.shape) == 1: | |
| data = np.reshape(data, (-1, 3)) | |
| width = min(data[:, 0]) - max(data[:, 0]) | |
| height = min(data[:, 1]) - max(data[:, 1]) | |
| return np.abs(width * height) | |
| def read_video(filename, fps=None, skip=0, limit=-1): | |
| stream = cv2.VideoCapture(filename) | |
| i = 0 | |
| while True: | |
| grabbed, frame = stream.read() | |
| # if the `grabbed` boolean is `False`, then we have | |
| # reached the end of the video file | |
| if not grabbed: | |
| print('===========================> This video get ' + str(i) + ' frames in total.') | |
| sys.stdout.flush() | |
| break | |
| i += 1 | |
| if i > skip: | |
| frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| yield np.array(frame) | |
| if i == limit: | |
| break | |
| def split_video(video_path): | |
| stream = cv2.VideoCapture(video_path) | |
| output_dir = os.path.dirname(video_path) | |
| video_name = os.path.basename(video_path) | |
| video_name = video_name[:video_name.rfind('.')] | |
| save_folder = pathlib.Path(f'./{output_dir}/alpha_pose_{video_name}/split_image/') | |
| shutil.rmtree(str(save_folder), ignore_errors=True) | |
| save_folder.mkdir(parents=True, exist_ok=True) | |
| total_frames = int(stream.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| length = len(str(total_frames)) + 1 | |
| i = 1 | |
| while True: | |
| grabbed, frame = stream.read() | |
| if not grabbed: | |
| print(f'Split totally {i + 1} images from video.') | |
| break | |
| save_path = f'{save_folder}/output{str(i).zfill(length)}.png' | |
| cv2.imwrite(save_path, frame) | |
| i += 1 | |
| saved_path = os.path.dirname(save_path) | |
| print(f'Split images saved in {saved_path}') | |
| return saved_path | |
| def evaluate(test_generator, model_pos, action=None, return_predictions=False): | |
| """ | |
| Inference the 3d positions from 2d position. | |
| :type test_generator: UnchunkedGenerator | |
| :param test_generator: | |
| :param model_pos: 3d pose model | |
| :param return_predictions: return predictions if true | |
| :return: | |
| """ | |
| joints_left, joints_right = list([4, 5, 6, 11, 12, 13]), list([1, 2, 3, 14, 15, 16]) | |
| with torch.no_grad(): | |
| model_pos.eval() | |
| N = 0 | |
| for _, batch, batch_2d in test_generator.next_epoch(): | |
| inputs_2d = torch.from_numpy(batch_2d.astype('float32')) | |
| if torch.cuda.is_available(): | |
| inputs_2d = inputs_2d | |
| # Positional model | |
| predicted_3d_pos = model_pos(inputs_2d) | |
| if test_generator.augment_enabled(): | |
| # Undo flipping and take average with non-flipped version | |
| predicted_3d_pos[1, :, :, 0] *= -1 | |
| predicted_3d_pos[1, :, joints_left + joints_right] = predicted_3d_pos[1, :, joints_right + joints_left] | |
| predicted_3d_pos = torch.mean(predicted_3d_pos, dim=0, keepdim=True) | |
| if return_predictions: | |
| return predicted_3d_pos.squeeze(0).cpu().numpy() | |
| if __name__ == '__main__': | |
| os.chdir('..') | |
| split_video('outputs/kobe.mp4') | |