| | import os |
| | import json |
| | from lzstring import LZString |
| | from pycocotools import mask as mask_utils |
| | import numpy as np |
| | from PIL import Image |
| | from decord import VideoReader, gpu, cpu |
| | import argparse |
| | import cv2 |
| | from time import time |
| | from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor |
| | from multiprocessing import cpu_count |
| | import functools |
| | from pathlib import Path |
| |
|
| | def save_frames_batch(frames, frame_idxes, output_folder, is_aria=False, quality=95): |
| | """Optimized batch frame saving with reduced resize operations""" |
| | scale = 2 if is_aria else 4 |
| | os.makedirs(output_folder, exist_ok=True) |
| | |
| | |
| | if len(frames) > 0: |
| | H, W, C = frames[0].shape |
| | if H < 1408: |
| | return |
| | |
| | |
| | for img, fidx in zip(frames, frame_idxes): |
| | |
| | img_resized = cv2.resize(img, (W//scale, H//scale), interpolation=cv2.INTER_AREA) |
| | output_path = os.path.join(output_folder, f'{fidx}.jpg') |
| | |
| | cv2.imwrite(output_path, img_resized, [cv2.IMWRITE_JPEG_QUALITY, quality]) |
| |
|
| | def process_single_camera(args_tuple): |
| | """Process a single camera video - designed for parallel execution""" |
| | takepath, take_name, cam_id, subsample_idx, outputpath, take_id, is_ego = args_tuple |
| | |
| | video_path = f"{takepath}/{take_name}/frame_aligned_videos/{cam_id}.mp4" |
| | output_dir = f"{outputpath}/{take_id}/{cam_id}" |
| | |
| | |
| | if os.path.exists(output_dir) and len(os.listdir(output_dir)) > 0: |
| | return f"{cam_id}: Already processed" |
| | |
| | if not os.path.exists(video_path): |
| | return f"{cam_id}: Video file not found" |
| | |
| | try: |
| | |
| | try: |
| | vr = VideoReader(video_path, ctx=gpu(0)) |
| | except: |
| | vr = VideoReader(video_path, ctx=cpu(0)) |
| | |
| | |
| | frames = vr.get_batch(subsample_idx).asnumpy() |
| | |
| | |
| | frames = frames[..., ::-1] |
| | |
| | save_frames_batch( |
| | frames=frames, |
| | frame_idxes=subsample_idx, |
| | output_folder=output_dir, |
| | is_aria=is_ego |
| | ) |
| | |
| | return f"{cam_id}: Successfully processed {len(frames)} frames" |
| | |
| | except Exception as e: |
| | return f"{cam_id}: Error - {str(e)}" |
| |
|
| | def processVideo_parallel(takepath, take_name, ego_cam, exo_cams, outputpath, take_id, max_workers=None): |
| | """Parallel video processing for all cameras""" |
| | |
| | ego_video_path = f"{takepath}/{take_name}/frame_aligned_videos/{ego_cam}.mp4" |
| | if not os.path.exists(ego_video_path): |
| | return -1 |
| | |
| | |
| | try: |
| | vr = VideoReader(ego_video_path, ctx=cpu(0)) |
| | len_video = len(vr) |
| | |
| | subsample_idx = np.arange(3510, 4111, 1) |
| | except Exception as e: |
| | print(f"Error reading ego video: {e}") |
| | return -1 |
| | |
| | |
| | camera_args = [] |
| | |
| | |
| | camera_args.append(( |
| | takepath, take_name, ego_cam, subsample_idx, |
| | outputpath, take_id, True |
| | )) |
| | |
| | |
| | for exo_cam in exo_cams: |
| | camera_args.append(( |
| | takepath, take_name, exo_cam, subsample_idx, |
| | outputpath, take_id, False |
| | )) |
| | |
| | |
| | if max_workers is None: |
| | max_workers = min(len(camera_args), cpu_count()) |
| | |
| | with ThreadPoolExecutor(max_workers=max_workers) as executor: |
| | results = list(executor.map(process_single_camera, camera_args)) |
| | |
| | |
| | for result in results: |
| | print(f" {result}") |
| | |
| | return subsample_idx.tolist() |
| |
|
| | def process_single_take(args_tuple): |
| | """Process a single take - designed for parallel execution""" |
| | take_id, annos, takepath, outputpath = args_tuple |
| | |
| | if os.path.exists(f"{outputpath}/{take_id}"): |
| | return f"{take_id}: Already done!" |
| | |
| | try: |
| | |
| | os.makedirs(f"{outputpath}/{take_id}", exist_ok=True) |
| | |
| | |
| | anno = annos[take_id] |
| | take_name = anno["take_name"] |
| | |
| | |
| | valid_cams = set() |
| | for x in anno['object_masks'].keys(): |
| | valid_cams.update(set(anno['object_masks'][x].keys())) |
| | |
| | ego_cams = [vc for vc in valid_cams if 'aria' in vc] |
| | |
| | exo_cams = ["cam03"] |
| | |
| | if len(ego_cams) > 1: |
| | return f"{take_id}: ERROR - Multiple ego cameras found" |
| | |
| | if len(ego_cams) == 0: |
| | return f"{take_id}: ERROR - No ego camera found" |
| | |
| | print(f"Processing take {take_id} {take_name}") |
| | print(f" Ego cameras: {ego_cams}") |
| | print(f" Exo cameras: {exo_cams[:5]}...") |
| | |
| | |
| | subsample_idx = processVideo_parallel( |
| | takepath, take_name, ego_cam=ego_cams[0], |
| | exo_cams=exo_cams, outputpath=outputpath, take_id=take_id |
| | ) |
| | |
| | if subsample_idx == -1: |
| | return f"{take_id}: ERROR - Ego video not found" |
| | |
| | return f"{take_id}: Successfully processed {len(subsample_idx)} frames across {len(ego_cams) + len(exo_cams)} cameras" |
| | |
| | except Exception as e: |
| | return f"{take_id}: ERROR - {str(e)}" |
| |
|
| | def decode_mask(width, height, encoded_mask): |
| | """Optimized mask decoding with better error handling""" |
| | try: |
| | decomp_string = LZString.decompressFromEncodedURIComponent(encoded_mask) |
| | if decomp_string is None: |
| | return None |
| | except: |
| | return None |
| | |
| | try: |
| | decomp_encoded = decomp_string.encode() |
| | rle_obj = { |
| | "size": [height, width], |
| | "counts": decomp_encoded.decode('ascii'), |
| | } |
| | return rle_obj |
| | except: |
| | return None |
| |
|
| | def processMask(anno, new_anno): |
| | """Original mask processing function - kept for compatibility""" |
| | for object_id in anno.keys(): |
| | new_anno[object_id] = {} |
| | for cam_id in anno[object_id].keys(): |
| | new_anno[object_id][cam_id] = {} |
| | for frame_id in anno[object_id][cam_id]["annotation"].keys(): |
| | width = anno[object_id][cam_id]["annotation"][frame_id]["width"] |
| | height = anno[object_id][cam_id]["annotation"][frame_id]["height"] |
| | encoded_mask = anno[object_id][cam_id]["annotation"][frame_id]["encodedMask"] |
| | coco_mask = decode_mask(width, height, encoded_mask) |
| | new_anno[object_id][cam_id][frame_id] = coco_mask |
| |
|
| | if __name__ == "__main__": |
| | parser = argparse.ArgumentParser() |
| | parser.add_argument( |
| | "--takepath", |
| | help="EgoExo take data root", |
| | required=True |
| | ) |
| | parser.add_argument( |
| | "--annotationpath", |
| | help="Annotations json file path", |
| | required=True |
| | ) |
| | parser.add_argument( |
| | "--split_path", |
| | help="path to split.json", |
| | required=True |
| | ) |
| | parser.add_argument( |
| | "--split", |
| | help="train/val/test split to process", |
| | required=True |
| | ) |
| | parser.add_argument( |
| | "--outputpath", |
| | help="Output data root", |
| | required=True |
| | ) |
| | parser.add_argument( |
| | "--max_workers", |
| | help="Maximum number of parallel workers for take processing", |
| | type=int, |
| | default=None |
| | ) |
| | parser.add_argument( |
| | "--camera_workers", |
| | help="Maximum number of parallel workers for camera processing within each take", |
| | type=int, |
| | default=None |
| | ) |
| | args = parser.parse_args() |
| |
|
| | |
| | with open(args.split_path, "r") as fp: |
| | data_split = json.load(fp) |
| | |
| | |
| | take_list = ['3c744ca5-c64a-4de3-8235-c2f542ac5056'] |
| | |
| | os.makedirs(args.outputpath, exist_ok=True) |
| | |
| | |
| | print("Loading annotations...") |
| | with open(args.annotationpath, "r") as f: |
| | annos = json.load(f) |
| | annos = annos['annotations'] |
| |
|
| | start = time() |
| | |
| | |
| | if args.max_workers is None: |
| | max_workers = min(len(take_list), max(1, cpu_count() // 2)) |
| | else: |
| | max_workers = args.max_workers |
| | |
| | print(f"Processing {len(take_list)} takes with {max_workers} workers") |
| | |
| | |
| | take_args = [(take_id, annos, args.takepath, args.outputpath) for take_id in take_list] |
| | |
| | if len(take_list) == 1: |
| | |
| | results = [process_single_take(take_args[0])] |
| | else: |
| | |
| | with ProcessPoolExecutor(max_workers=max_workers) as executor: |
| | results = list(executor.map(process_single_take, take_args)) |
| | |
| | |
| | print("\n=== Processing Results ===") |
| | for result in results: |
| | print(result) |
| | |
| | end = time() |
| | print(f"\nTotal time: {end-start:.2f} seconds") |
| | print(f"Average time per take: {(end-start)/len(take_list):.2f} seconds") |