Spaces:
Running
Running
| import os | |
| import time | |
| from common.arguments import parse_args | |
| from common.camera import * | |
| from common.generators import * | |
| from common.loss import * | |
| from common.model import * | |
| from common.utils import Timer, evaluate, add_path | |
| from common.inference_3d import * | |
| from model.block.refine import refine | |
| from model.stmo import Model | |
| import HPE2keyframes as Hk | |
| from datetime import datetime | |
| import pytz | |
| # from joints_detectors.openpose.main import generate_kpts as open_pose | |
| os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" # see issue #152 | |
| os.environ["CUDA_VISIBLE_DEVICES"] = "0" | |
| metadata = {'layout_name': 'coco', 'num_joints': 17, 'keypoints_symmetry': [[1, 3, 5, 7, 9, 11, 13, 15], [2, 4, 6, 8, 10, 12, 14, 16]]} | |
| add_path() | |
| # record time | |
| def ckpt_time(ckpt=None): | |
| if not ckpt: | |
| return time.time() | |
| else: | |
| return time.time() - float(ckpt), time.time() | |
| time0 = ckpt_time() | |
| def get_detector_2d(detector_name): | |
| def get_alpha_pose(): | |
| from joints_detectors.Alphapose.gene_npz import generate_kpts as alpha_pose | |
| return alpha_pose | |
| detector_map = { | |
| 'alpha_pose': get_alpha_pose, | |
| # 'hr_pose': get_hr_pose, | |
| # 'open_pose': open_pose | |
| } | |
| assert detector_name in detector_map, f'2D detector: {detector_name} not implemented yet!' | |
| return detector_map[detector_name]() | |
| class Skeleton: | |
| def parents(self): | |
| return np.array([-1, 0, 1, 2, 0, 4, 5, 0, 7, 8, 9, 8, 11, 12, 8, 14, 15]) | |
| def joints_right(self): | |
| return [1, 2, 3, 14, 15, 16] | |
| def main(args, progress): | |
| detector_2d = get_detector_2d(args.detector_2d) | |
| assert detector_2d, 'detector_2d should be in ({alpha, hr, open}_pose)' | |
| # 2D kpts loads or generate | |
| #args.input_npz = './outputs/alpha_pose_skiing_cut/skiing_cut.npz' | |
| if not args.input_npz: | |
| video_name = args.viz_video | |
| keypoints = detector_2d(video_name, progress) | |
| else: | |
| npz = np.load(args.input_npz) | |
| keypoints = npz['kpts'] # (N, 17, 2) | |
| keypoints_symmetry = metadata['keypoints_symmetry'] | |
| kps_left, kps_right = list(keypoints_symmetry[0]), list(keypoints_symmetry[1]) | |
| joints_left, joints_right = list([4, 5, 6, 11, 12, 13]), list([1, 2, 3, 14, 15, 16]) | |
| # normlization keypoints Suppose using the camera parameter | |
| keypoints = normalize_screen_coordinates(keypoints[..., :2], w=1000, h=1002) | |
| # model_pos = TemporalModel(17, 2, 17, filter_widths=[3, 3, 3, 3, 3], causal=args.causal, dropout=args.dropout, channels=args.channels, | |
| # dense=args.dense) | |
| model = {} | |
| model['trans'] = Model(args) | |
| # if torch.cuda.is_available(): | |
| # model_pos = model_pos | |
| ckpt, time1 = ckpt_time(time0) | |
| print('-------------- load data spends {:.2f} seconds'.format(ckpt)) | |
| # load trained model | |
| # chk_filename = os.path.join(args.checkpoint, args.resume if args.resume else args.evaluate) | |
| # print('Loading checkpoint', chk_filename) | |
| # checkpoint = torch.load(chk_filename, map_location=lambda storage, loc: storage) # 把loc映射到storage | |
| # model_pos.load_state_dict(checkpoint['model_pos']) | |
| model_dict = model['trans'].state_dict() | |
| no_refine_path = "checkpoint/PSTMOS_no_refine_48_5137_in_the_wild.pth" | |
| pre_dict = torch.load(no_refine_path, map_location=torch.device('cpu')) | |
| for key, value in pre_dict.items(): | |
| name = key[7:] | |
| model_dict[name] = pre_dict[key] | |
| model['trans'].load_state_dict(model_dict) | |
| ckpt, time2 = ckpt_time(time1) | |
| print('-------------- load 3D model spends {:.2f} seconds'.format(ckpt)) | |
| # Receptive field: 243 frames for args.arc [3, 3, 3, 3, 3] | |
| receptive_field = args.frames | |
| pad = (receptive_field - 1) // 2 # Padding on each side | |
| causal_shift = 0 | |
| print('Rendering...') | |
| input_keypoints = keypoints.copy() | |
| print(input_keypoints.shape) | |
| # gen = UnchunkedGenerator(None, None, [input_keypoints], | |
| # pad=pad, causal_shift=causal_shift, augment=args.test_time_augmentation, | |
| # kps_left=kps_left, kps_right=kps_right, joints_left=joints_left, joints_right=joints_right) | |
| # test_data = Fusion(opt=args, train=False, dataset=dataset, root_path=root_path, MAE=opt.MAE) | |
| # test_dataloader = torch.utils.data.DataLoader(test_data, batch_size=1, | |
| # shuffle=False, num_workers=0, pin_memory=True) | |
| #prediction = evaluate(gen, model_pos, return_predictions=True) | |
| gen = Evaluate_Generator(128, None, None, [input_keypoints], args.stride, | |
| pad=pad, causal_shift=causal_shift, augment=args.test_time_augmentation, shuffle=False, | |
| kps_left=kps_left, kps_right=kps_right, joints_left=joints_left, joints_right=joints_right) | |
| prediction = val(args, gen, model, progress) | |
| # save 3D joint points | |
| # np.save(f'outputs/test_3d_{args.video_name}_output.npy', prediction, allow_pickle=True) | |
| rot = np.array([0.14070565, -0.15007018, -0.7552408, 0.62232804], dtype=np.float32) | |
| prediction = camera_to_world(prediction, R=rot, t=0) | |
| # We don't have the trajectory, but at least we can rebase the height | |
| prediction[:, :, 2] -= np.min(prediction[:, :, 2]) | |
| output_dir_dict = {} | |
| npy_filename = f'output_3Dpose_npy/{args.video_name}.npy' | |
| output_dir_dict['npy'] = npy_filename | |
| np.save(npy_filename, prediction, allow_pickle=True) | |
| anim_output = {'Skeleton': prediction} | |
| input_keypoints = image_coordinates(input_keypoints[..., :2], w=1000, h=1002) | |
| ckpt, time3 = ckpt_time(time2) | |
| print('-------------- generate reconstruction 3D data spends {:.2f} seconds'.format(ckpt)) | |
| if not args.viz_output: | |
| args.viz_output = 'outputs/alpha_result.mp4' | |
| from common.visualization import render_animation | |
| render_animation(input_keypoints, anim_output, | |
| Skeleton(), 25, args.viz_bitrate, np.array(70., dtype=np.float32), args.viz_output, progress, | |
| limit=args.viz_limit, downsample=args.viz_downsample, size=args.viz_size, | |
| input_video_path=args.viz_video, viewport=(1000, 1002), | |
| input_video_skip=args.viz_skip) | |
| ckpt, time4 = ckpt_time(time3) | |
| print('total spend {:2f} second'.format(ckpt)) | |
| return output_dir_dict | |
| def inference_video(video_path, detector_2d, progress): | |
| """ | |
| Do image -> 2d points -> 3d points to video. | |
| :param detector_2d: used 2d joints detector. Can be {alpha_pose, hr_pose} | |
| :param video_path: relative to outputs | |
| :return: None | |
| """ | |
| args = parse_args() | |
| args.detector_2d = detector_2d | |
| dir_name = os.path.dirname(video_path) | |
| basename = os.path.basename(video_path) | |
| args.video_name = basename[:basename.rfind('.')] | |
| args.viz_video = video_path | |
| args.viz_output = f'output_videos/{args.video_name}.mp4' | |
| args.evaluate = 'pretrained_h36m_detectron_coco.bin' | |
| with Timer(video_path): | |
| output_dir_dict = main(args, progress) | |
| output_dir_dict["output_videos"] = args.viz_output | |
| output_dir_dict["video_name"] = args.video_name | |
| return output_dir_dict | |
| def gr_video2mc(video_path, progress): | |
| print("\n>>>>> One video uploaded <<<<<\n") | |
| china_tz = pytz.timezone('Asia/Shanghai') | |
| current_time = datetime.now(china_tz) | |
| formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S') | |
| print(f"Start Time: {formatted_time}\n") | |
| if not os.path.exists('output_3Dpose_npy'): | |
| os.makedirs('output_3Dpose_npy') | |
| if not os.path.exists('output_alphapose'): | |
| os.makedirs('output_alphapose') | |
| if not os.path.exists('output_miframes'): | |
| os.makedirs('output_miframes') | |
| if not os.path.exists('output_videos'): | |
| os.makedirs('output_videos') | |
| FPS_mine_imator = 30 | |
| output_dir_dict = inference_video(video_path, 'alpha_pose', progress) | |
| Hk.hpe2keyframes(output_dir_dict['npy'], FPS_mine_imator, f"output_miframes/{output_dir_dict['video_name']}.miframes") | |
| path1 = os.path.abspath(f"output_miframes/{output_dir_dict['video_name']}.miframes") | |
| path2 = os.path.abspath(f"output_videos/{output_dir_dict['video_name']}.mp4") | |
| print("\n----- One video processed -----\n") | |
| china_tz = pytz.timezone('Asia/Shanghai') | |
| current_time = datetime.now(china_tz) | |
| formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S') | |
| print(f"Finished Time: {formatted_time}\n") | |
| return path1, path2 | |
| if __name__ == '__main__': | |
| files = os.listdir('./input_videos') | |
| FPS_mine_imator = 30 | |
| for file in files: | |
| output_dir_dict = inference_video(os.path.join('input_videos', file), 'alpha_pose') | |
| Hk.hpe2keyframes(output_dir_dict['npy'], FPS_mine_imator, f"output_miframes/{output_dir_dict['video_name']}.miframes") | |