import os import json from lzstring import LZString from pycocotools import mask as mask_utils import numpy as np from PIL import Image from decord import VideoReader, gpu, cpu import argparse import cv2 from time import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor from multiprocessing import cpu_count import functools from pathlib import Path def save_frames_batch(frames, frame_idxes, output_folder, is_aria=False, quality=95): """Optimized batch frame saving with reduced resize operations""" scale = 2 if is_aria else 4 os.makedirs(output_folder, exist_ok=True) # Check dimensions once for the entire batch if len(frames) > 0: H, W, C = frames[0].shape if H < 1408: return # Batch resize and save for img, fidx in zip(frames, frame_idxes): # Use INTER_AREA for better quality when downsampling img_resized = cv2.resize(img, (W//scale, H//scale), interpolation=cv2.INTER_AREA) output_path = os.path.join(output_folder, f'{fidx}.jpg') # Use optimized JPEG parameters cv2.imwrite(output_path, img_resized, [cv2.IMWRITE_JPEG_QUALITY, quality]) def process_single_camera(args_tuple): """Process a single camera video - designed for parallel execution""" takepath, take_name, cam_id, subsample_idx, outputpath, take_id, is_ego = args_tuple video_path = f"{takepath}/{take_name}/frame_aligned_videos/{cam_id}.mp4" output_dir = f"{outputpath}/{take_id}/{cam_id}" # Skip if already processed if os.path.exists(output_dir) and len(os.listdir(output_dir)) > 0: return f"{cam_id}: Already processed" if not os.path.exists(video_path): return f"{cam_id}: Video file not found" try: # Try GPU first, fallback to CPU try: vr = VideoReader(video_path, ctx=gpu(0)) except: vr = VideoReader(video_path, ctx=cpu(0)) # Get frames in batch - this is more efficient than individual reads frames = vr.get_batch(subsample_idx).asnumpy() # Convert BGR to RGB more efficiently frames = frames[..., ::-1] save_frames_batch( frames=frames, frame_idxes=subsample_idx, output_folder=output_dir, is_aria=is_ego ) return f"{cam_id}: Successfully processed {len(frames)} frames" except Exception as e: return f"{cam_id}: Error - {str(e)}" def processVideo_parallel(takepath, take_name, ego_cam, exo_cams, outputpath, take_id, max_workers=None): """Parallel video processing for all cameras""" ego_video_path = f"{takepath}/{take_name}/frame_aligned_videos/{ego_cam}.mp4" if not os.path.exists(ego_video_path): return -1 # Get video length from ego camera to determine subsample indices try: vr = VideoReader(ego_video_path, ctx=cpu(0)) len_video = len(vr) # subsample_idx = np.arange(0, len_video, 1) # Original: every frame subsample_idx = np.arange(3510, 4111, 1) # Debug range except Exception as e: print(f"Error reading ego video: {e}") return -1 # Prepare arguments for parallel processing camera_args = [] # Add ego camera camera_args.append(( takepath, take_name, ego_cam, subsample_idx, outputpath, take_id, True )) # Add exo cameras for exo_cam in exo_cams: camera_args.append(( takepath, take_name, exo_cam, subsample_idx, outputpath, take_id, False )) # Process cameras in parallel if max_workers is None: max_workers = min(len(camera_args), cpu_count()) with ThreadPoolExecutor(max_workers=max_workers) as executor: results = list(executor.map(process_single_camera, camera_args)) # Print results for result in results: print(f" {result}") return subsample_idx.tolist() def process_single_take(args_tuple): """Process a single take - designed for parallel execution""" take_id, annos, takepath, outputpath = args_tuple if os.path.exists(f"{outputpath}/{take_id}"): return f"{take_id}: Already done!" try: # Create output folder os.makedirs(f"{outputpath}/{take_id}", exist_ok=True) # Get annotation info anno = annos[take_id] take_name = anno["take_name"] # Find valid cameras valid_cams = set() for x in anno['object_masks'].keys(): valid_cams.update(set(anno['object_masks'][x].keys())) ego_cams = [vc for vc in valid_cams if 'aria' in vc] # exo_cams = [vc for vc in valid_cams if 'aria' not in vc] exo_cams = ["cam03"] # debug if len(ego_cams) > 1: return f"{take_id}: ERROR - Multiple ego cameras found" if len(ego_cams) == 0: return f"{take_id}: ERROR - No ego camera found" print(f"Processing take {take_id} {take_name}") print(f" Ego cameras: {ego_cams}") print(f" Exo cameras: {exo_cams[:5]}...") # Show first 5 only # Process videos with parallel camera processing subsample_idx = processVideo_parallel( takepath, take_name, ego_cam=ego_cams[0], exo_cams=exo_cams, outputpath=outputpath, take_id=take_id ) if subsample_idx == -1: return f"{take_id}: ERROR - Ego video not found" return f"{take_id}: Successfully processed {len(subsample_idx)} frames across {len(ego_cams) + len(exo_cams)} cameras" except Exception as e: return f"{take_id}: ERROR - {str(e)}" def decode_mask(width, height, encoded_mask): """Optimized mask decoding with better error handling""" try: decomp_string = LZString.decompressFromEncodedURIComponent(encoded_mask) if decomp_string is None: return None except: return None try: decomp_encoded = decomp_string.encode() rle_obj = { "size": [height, width], "counts": decomp_encoded.decode('ascii'), } return rle_obj except: return None def processMask(anno, new_anno): """Original mask processing function - kept for compatibility""" for object_id in anno.keys(): new_anno[object_id] = {} for cam_id in anno[object_id].keys(): new_anno[object_id][cam_id] = {} for frame_id in anno[object_id][cam_id]["annotation"].keys(): width = anno[object_id][cam_id]["annotation"][frame_id]["width"] height = anno[object_id][cam_id]["annotation"][frame_id]["height"] encoded_mask = anno[object_id][cam_id]["annotation"][frame_id]["encodedMask"] coco_mask = decode_mask(width, height, encoded_mask) new_anno[object_id][cam_id][frame_id] = coco_mask if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( "--takepath", help="EgoExo take data root", required=True ) parser.add_argument( "--annotationpath", help="Annotations json file path", required=True ) parser.add_argument( "--split_path", help="path to split.json", required=True ) parser.add_argument( "--split", help="train/val/test split to process", required=True ) parser.add_argument( "--outputpath", help="Output data root", required=True ) parser.add_argument( "--max_workers", help="Maximum number of parallel workers for take processing", type=int, default=None ) parser.add_argument( "--camera_workers", help="Maximum number of parallel workers for camera processing within each take", type=int, default=None ) args = parser.parse_args() # Load data splits with open(args.split_path, "r") as fp: data_split = json.load(fp) # take_list = data_split[args.split] # Original take_list = ['3c744ca5-c64a-4de3-8235-c2f542ac5056'] # Debug os.makedirs(args.outputpath, exist_ok=True) # Load annotations print("Loading annotations...") with open(args.annotationpath, "r") as f: annos = json.load(f) annos = annos['annotations'] start = time() # Determine optimal number of workers if args.max_workers is None: max_workers = min(len(take_list), max(1, cpu_count() // 2)) else: max_workers = args.max_workers print(f"Processing {len(take_list)} takes with {max_workers} workers") # Prepare arguments for parallel take processing take_args = [(take_id, annos, args.takepath, args.outputpath) for take_id in take_list] if len(take_list) == 1: # For single take, don't use process parallelization to avoid overhead results = [process_single_take(take_args[0])] else: # Process takes in parallel with ProcessPoolExecutor(max_workers=max_workers) as executor: results = list(executor.map(process_single_take, take_args)) # Print results print("\n=== Processing Results ===") for result in results: print(result) end = time() print(f"\nTotal time: {end-start:.2f} seconds") print(f"Average time per take: {(end-start)/len(take_list):.2f} seconds")