#!/usr/bin/env python3 from __future__ import annotations import torch import io import json import os from pathlib import Path from typing import Any, Dict, List, Union, Optional, Tuple from datetime import datetime import aiohttp import numpy as np import requests import torch from dataset_types import PreferenceSample, ProgressSample, Trajectory def pad_trajectory_to_max_frames_np( frames: np.ndarray, progress: List[float], max_frames: int, pad_from: str = "right" ) -> Tuple[np.ndarray, List[float]]: """Pad trajectory frames and progress to max_frames by repeating the first frame/progress if needed. Args: frames: Trajectory frames (numpy array) progress: Progress values (list of floats) max_frames: Target number of frames Returns: Tuple[np.ndarray, List[float]: (padded_frames, padded_progress) """ current_frames = frames.shape[0] if current_frames >= max_frames: # No padding needed return frames, progress if pad_from == "left": pad_frame = frames[0:1] # Keep the batch dimension pad_progress = progress[0] else: pad_frame = frames[-1:] pad_progress = progress[-1] # Calculate how many frames to pad frames_to_pad = max_frames - current_frames # Pad frames by repeating the first frame if pad_from == "left": padded_frames = np.concatenate([np.repeat(pad_frame, frames_to_pad, axis=0), frames], axis=0) padded_progress = [pad_progress] * frames_to_pad + progress else: padded_frames = np.concatenate([frames, np.repeat(pad_frame, frames_to_pad, axis=0)], axis=0) padded_progress = progress + [pad_progress] * frames_to_pad return padded_frames, padded_progress def linspace_subsample_frames( frames: np.ndarray, num_frames: int = 8, end_idx: Optional[int] = None ) -> Tuple[np.ndarray, List[int]]: """Uniformly subsample frames from a trajectory and return the indices. This method takes the full trajectory (e.g., 64 frames) and uniformly subsamples num_frames from it. The first and last frames are always included. Args: frames: Full trajectory frames (N frames) num_frames: Number of frames to subsample (default: 8) end_idx: Optional end index to subsample up to (if None, uses total_frames - 1) Returns: Tuple[np.ndarray, List[int]: (subsampled_frames, subsampled_indices) """ if hasattr(frames, "shape"): total_frames = frames.shape[0] else: total_frames = len(frames) if total_frames <= 0: return frames, [] # Use end_idx if provided, otherwise use full trajectory if end_idx is not None: end_idx = min(end_idx, total_frames - 1) frames_to_subsample = frames[: end_idx + 1] effective_total = end_idx + 1 else: frames_to_subsample = frames effective_total = total_frames if effective_total <= num_frames: # If we have fewer (or equal) frames than requested, return all frames indices = list(range(effective_total)) return frames_to_subsample, indices # Special case: if num_frames == 1, always take the last frame if num_frames == 1: indices = [effective_total - 1] subsampled_frames = frames_to_subsample[indices] return subsampled_frames, indices # Evenly spaced indices from 0 to effective_total-1, inclusive indices_np = np.linspace(0, effective_total - 1, num_frames) indices = np.rint(indices_np).astype(int).tolist() # Enforce first and last explicitly indices[0] = 0 indices[-1] = effective_total - 1 # Ensure indices are strictly non-decreasing and within bounds for k in range(1, len(indices)): if indices[k] < indices[k - 1]: indices[k] = indices[k - 1] if indices[k] >= effective_total: indices[k] = effective_total - 1 # Subsample frames subsampled_frames = frames_to_subsample[indices] return subsampled_frames, indices def raw_dict_to_sample( raw_data: Union[Tuple[Dict[str, Any], Dict[str, Any]], Dict[str, Any]], max_frames: int = 16, sample_type: str = "progress", ) -> Union[ProgressSample, PreferenceSample]: """ Convert raw data dictionary to a ProgressSample or PreferenceSample. Args: raw_data: Dict with 'frames', 'task', 'id', 'metadata', 'video_embeddings', 'text_embedding' or Tuple of (Dict[str, Any], Dict[str, Any]) max_frames: Maximum number of frames to use (default: 16) sample_type: Either "progress" or "preference" (default: "progress") Returns: ProgressSample or PreferenceSample """ def _build_trajectory(raw_data: Dict[str, Any], num_frames: int) -> Trajectory: processed_item: Dict[str, Any] = {} # Process frames frames_array = raw_data["frames"] # Ensure we have the correct shape: (T, H, W, C) if len(frames_array.shape) != 4: raise ValueError(f"Expected 4D array (T, H, W, C), got shape {frames_array.shape}") # Convert from CxHxW to HxWxC if needed if frames_array.shape[1] == 3: frames_array = np.transpose(frames_array, (0, 2, 3, 1)) frames_array, _ = linspace_subsample_frames(frames_array, num_frames) dummy_progress = [0.0] * len(frames_array) frames_array, _ = pad_trajectory_to_max_frames_np(frames_array, dummy_progress, num_frames, pad_from="right") if frames_array.size == 0: raise ValueError("No frames processed for example") processed_item["frames"] = frames_array processed_item["frames_shape"] = frames_array.shape processed_item["task"] = raw_data["task"] processed_item["lang_vector"] = None processed_item["metadata"] = raw_data.get("metadata", None) # Process video embeddings using same helper functions video_embeddings = raw_data.get("video_embeddings") if video_embeddings is not None: video_embeddings, _ = linspace_subsample_frames(video_embeddings, num_frames) dummy_progress_emb = [0.0] * len(video_embeddings) video_embeddings, _ = pad_trajectory_to_max_frames_np( video_embeddings, dummy_progress_emb, num_frames, pad_from="right" ) text_embedding = raw_data.get("text_embedding") # Convert to tensors if they are numpy arrays if video_embeddings is not None and isinstance(video_embeddings, np.ndarray): video_embeddings = torch.tensor(video_embeddings) if text_embedding is not None and isinstance(text_embedding, np.ndarray): text_embedding = torch.tensor(text_embedding) processed_item["video_embeddings"] = video_embeddings processed_item["text_embedding"] = text_embedding processed_item["video_shape"] = video_embeddings.shape if video_embeddings is not None else None processed_item["text_shape"] = text_embedding.shape if text_embedding is not None else None trajectory = Trajectory(**processed_item) return trajectory if sample_type == "progress": assert isinstance(raw_data, dict), "raw_data must be a dictionary" trajectory = _build_trajectory(raw_data=raw_data, num_frames=max_frames) return ProgressSample(trajectory=trajectory) elif sample_type == "preference": assert isinstance(raw_data, tuple), "raw_data must be a tuple" assert len(raw_data) == 2, "raw_data must be a tuple of two dictionaries" trajectories: List[Trajectory] = [] for trajectory_data in raw_data: trajectory = _build_trajectory(raw_data=trajectory_data, num_frames=max_frames) trajectories.append(trajectory) return PreferenceSample(chosen_trajectory=trajectories[0], rejected_trajectory=trajectories[1]) else: raise ValueError(f"Unsupported sample_type: {sample_type}") def build_payload( samples: list[PreferenceSample | ProgressSample], ) -> tuple[dict[str, Any], list[dict[str, Any]]]: """Build a payload with numpy array handling. Args: samples: List of samples to convert Returns: Tuple of (files, sample_data) where: - files: Dict of numpy arrays converted to .npy format - sample_data: List of sample dictionaries with numpy arrays replaced by file references """ files = {} sample_data = [] for sample_idx, sample in enumerate(samples): # Copy the original sample and handle numpy arrays processed_sample = sample.model_dump().copy() # Handle trajectory objects with numpy arrays for key in [ "chosen_trajectory", "rejected_trajectory", "trajectory", ]: if key in processed_sample and isinstance(processed_sample[key], dict): trajectory = processed_sample[key] # Convert numpy arrays to .npy files numpy_fields = ["frames", "lang_vector", "video_embeddings", "text_embedding"] for field_name in numpy_fields: # if it is a tensor, first convert it to a numpy array if field_name in trajectory and isinstance(trajectory[field_name], torch.Tensor): trajectory[field_name] = trajectory[field_name].numpy() if field_name in trajectory and isinstance(trajectory[field_name], np.ndarray): # Convert numpy array to .npy file buf = io.BytesIO() np.save(buf, trajectory[field_name]) buf.seek(0) file_key = f"sample_{sample_idx}_{key}_{field_name}" files[file_key] = ( f"sample_{sample_idx}_{key}_{field_name}.npy", buf, "application/octet-stream", ) trajectory[field_name] = {"__numpy_file__": file_key} sample_data.append(processed_sample) return files, sample_data def post_batch(url: str, payload: dict[str, Any], timeout_s: float = 120.0) -> dict[str, Any]: """POST a batch payload to the evaluation server and return parsed JSON.""" resp = requests.post(url.rstrip("/") + "/evaluate_batch", json=payload, timeout=timeout_s) resp.raise_for_status() return resp.json() def post_batch_npy( url: str, files: dict[str, Any], sample_data: list[dict[str, Any]], timeout_s: float = 120.0, extra_form_data: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: """POST batch using .npy format for numpy arrays. Args: url: Server URL files: Dict of numpy arrays converted to .npy format sample_data: List of sample dictionaries timeout_s: Request timeout in seconds extra_form_data: Optional extra form data to include (e.g., use_frame_steps) """ # Convert sample_data to form data data = {f"sample_{i}": json.dumps(sample) for i, sample in enumerate(sample_data)} # Add extra form data if provided if extra_form_data: for key, value in extra_form_data.items(): data[key] = json.dumps(value) if not isinstance(value, str) else value # Send as multipart form data resp = requests.post(url.rstrip("/") + "/evaluate_batch_npy", files=files, data=data, timeout=timeout_s) resp.raise_for_status() return resp.json() async def post_batch_npy_async( session: aiohttp.ClientSession, url: str, files: dict[str, Any], sample_data: list[dict[str, Any]], timeout_s: float = 120.0, ) -> dict[str, Any]: """Async version of post_batch_npy using aiohttp.""" # Create FormData for aiohttp form_data = aiohttp.FormData() # Add files for key, (filename, file_obj, content_type) in files.items(): form_data.add_field(key, file_obj, filename=filename, content_type=content_type) # Add sample data for i, sample in enumerate(sample_data): form_data.add_field(f"sample_{i}", json.dumps(sample)) headers = {"Connection": "close"} # Send as multipart form data using aiohttp timeout = aiohttp.ClientTimeout(total=timeout_s) async with session.post( url.rstrip("/") + "/evaluate_batch_npy", data=form_data, timeout=timeout, headers=headers ) as resp: resp.raise_for_status() return await resp.json() async def parse_npy_form_data(form_data: Any) -> Tuple[Dict[str, np.ndarray], Dict[str, Any]]: """Parse multipart form data to extract numpy arrays and other data. Args: form_data: FastAPI form data from request.form() Returns: Tuple of (numpy_arrays dict, other_data dict) """ numpy_arrays = {} other_data = {} for key, value in form_data.items(): # Check if this is a file upload (UploadFile object) if hasattr(value, "filename") and value.filename: # This is a file upload if value.filename.endswith(".npy"): # Load .npy file (await async read) content = await value.read() buf = io.BytesIO(content) array = np.load(buf) numpy_arrays[key] = array else: # Non-.npy file, skip for now continue else: # This is a string value (form field) try: # Try to parse as JSON other_data[key] = json.loads(value) except (json.JSONDecodeError, TypeError): # Keep as string if not JSON other_data[key] = value return numpy_arrays, other_data def reconstruct_payload_from_npy( numpy_arrays: Dict[str, np.ndarray], other_data: Dict[str, Any], trajectory_keys: Optional[List[str]] = None, convert_embeddings_to_torch: bool = False, ) -> List[Dict[str, Any]]: """Reconstruct the original payload structure from .npy files and form data. The client sends data in this format: - Files: sample_0_chosen_trajectory_frames.npy, sample_0_trajectory_frames.npy, etc. - Data: sample_0, sample_1, etc. (each containing the full sample JSON with numpy file references) Args: numpy_arrays: Dictionary of numpy arrays loaded from .npy files other_data: Dictionary of other form data trajectory_keys: List of trajectory keys to process (default: common keys) convert_embeddings_to_torch: Whether to convert embeddings to torch tensors Returns: List of reconstructed sample dictionaries """ if trajectory_keys is None: trajectory_keys = [ "chosen_trajectory", "rejected_trajectory", "trajectory", ] samples = [] # Process each sample for i in range(len(other_data)): sample_key = f"sample_{i}" if sample_key in other_data: # Get the sample data - might already be parsed or might be a string sample_data = other_data[sample_key] if isinstance(sample_data, str): # Parse the sample JSON if it's a string sample_data = json.loads(sample_data) # Replace numpy file references with actual arrays for key, value in sample_data.items(): if key in trajectory_keys: if isinstance(value, dict): for traj_key, traj_value in value.items(): if isinstance(traj_value, dict) and traj_value.get("__numpy_file__"): # Replace with actual numpy array file_key = traj_value["__numpy_file__"] if file_key in numpy_arrays: value[traj_key] = numpy_arrays[file_key] # Convert embeddings to torch if requested if convert_embeddings_to_torch and traj_key in ["video_embeddings", "text_embedding"]: if traj_key in value and value[traj_key] is not None: if isinstance(value[traj_key], np.ndarray): value[traj_key] = torch.tensor(value[traj_key]) elif isinstance(value[traj_key], list): value[traj_key] = torch.tensor(value[traj_key]) samples.append(sample_data) return samples def find_video_files(directory: str) -> list[str]: """Find all video files in a directory. Args: directory: Path to directory containing video files Returns: List of paths to video files """ video_extensions = {".mp4", ".avi", ".mov", ".mkv", ".webm", ".flv", ".wmv", ".m4v"} video_files = [] directory_path = Path(directory) if not directory_path.is_dir(): return [] for file_path in directory_path.iterdir(): if file_path.is_file() and file_path.suffix.lower() in video_extensions: video_files.append(str(file_path)) video_files.sort() return video_files def infer_task_from_video_name(video_path: str) -> str: """Infer task name from video filename. Task is everything before the comma (if comma exists), or everything before success/fail/failure. Args: video_path: Path to video file Returns: Inferred task name """ video_name = Path(video_path).stem # Get filename without extension # If there's a comma, task is everything before the comma if "," in video_name: task_part = video_name.split(",")[0] else: # Otherwise, split by underscore and remove success/fail/failure suffixes parts = video_name.split("_") filtered_parts = [] for part in parts: part_lower = part.lower() if part_lower not in ["success", "fail", "failure"]: filtered_parts.append(part) if not filtered_parts: return "Complete the task" task_part = "_".join(filtered_parts) # Split by underscore and join with spaces task_words = task_part.split("_") task = " ".join(task_words) if task: # Capitalize first letter of first word, keep rest as is task = task[0].upper() + task[1:] if len(task) > 1 else task.upper() else: task = "Complete the task" return task def setup_output_directory(output_dir: Optional[str], video_path: Optional[str] = None) -> str: """Create output directory and return path.""" if output_dir: save_dir = output_dir else: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") save_dir = os.path.join(".", f"eval_outputs/{timestamp}") os.makedirs(save_dir, exist_ok=True) return save_dir