Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| from __future__ import annotations | |
| import torch | |
| import io | |
| import json | |
| import os | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Union, Optional, Tuple | |
| from datetime import datetime | |
| import aiohttp | |
| import numpy as np | |
| import requests | |
| import torch | |
| from dataset_types import PreferenceSample, ProgressSample, Trajectory | |
| def pad_trajectory_to_max_frames_np( | |
| frames: np.ndarray, progress: List[float], max_frames: int, pad_from: str = "right" | |
| ) -> Tuple[np.ndarray, List[float]]: | |
| """Pad trajectory frames and progress to max_frames by repeating the first frame/progress if needed. | |
| Args: | |
| frames: Trajectory frames (numpy array) | |
| progress: Progress values (list of floats) | |
| max_frames: Target number of frames | |
| Returns: | |
| Tuple[np.ndarray, List[float]: (padded_frames, padded_progress) | |
| """ | |
| current_frames = frames.shape[0] | |
| if current_frames >= max_frames: | |
| # No padding needed | |
| return frames, progress | |
| if pad_from == "left": | |
| pad_frame = frames[0:1] # Keep the batch dimension | |
| pad_progress = progress[0] | |
| else: | |
| pad_frame = frames[-1:] | |
| pad_progress = progress[-1] | |
| # Calculate how many frames to pad | |
| frames_to_pad = max_frames - current_frames | |
| # Pad frames by repeating the first frame | |
| if pad_from == "left": | |
| padded_frames = np.concatenate([np.repeat(pad_frame, frames_to_pad, axis=0), frames], axis=0) | |
| padded_progress = [pad_progress] * frames_to_pad + progress | |
| else: | |
| padded_frames = np.concatenate([frames, np.repeat(pad_frame, frames_to_pad, axis=0)], axis=0) | |
| padded_progress = progress + [pad_progress] * frames_to_pad | |
| return padded_frames, padded_progress | |
| def linspace_subsample_frames( | |
| frames: np.ndarray, num_frames: int = 8, end_idx: Optional[int] = None | |
| ) -> Tuple[np.ndarray, List[int]]: | |
| """Uniformly subsample frames from a trajectory and return the indices. | |
| This method takes the full trajectory (e.g., 64 frames) and uniformly subsamples | |
| num_frames from it. The first and last frames are always included. | |
| Args: | |
| frames: Full trajectory frames (N frames) | |
| num_frames: Number of frames to subsample (default: 8) | |
| end_idx: Optional end index to subsample up to (if None, uses total_frames - 1) | |
| Returns: | |
| Tuple[np.ndarray, List[int]: (subsampled_frames, subsampled_indices) | |
| """ | |
| if hasattr(frames, "shape"): | |
| total_frames = frames.shape[0] | |
| else: | |
| total_frames = len(frames) | |
| if total_frames <= 0: | |
| return frames, [] | |
| # Use end_idx if provided, otherwise use full trajectory | |
| if end_idx is not None: | |
| end_idx = min(end_idx, total_frames - 1) | |
| frames_to_subsample = frames[: end_idx + 1] | |
| effective_total = end_idx + 1 | |
| else: | |
| frames_to_subsample = frames | |
| effective_total = total_frames | |
| if effective_total <= num_frames: | |
| # If we have fewer (or equal) frames than requested, return all frames | |
| indices = list(range(effective_total)) | |
| return frames_to_subsample, indices | |
| # Special case: if num_frames == 1, always take the last frame | |
| if num_frames == 1: | |
| indices = [effective_total - 1] | |
| subsampled_frames = frames_to_subsample[indices] | |
| return subsampled_frames, indices | |
| # Evenly spaced indices from 0 to effective_total-1, inclusive | |
| indices_np = np.linspace(0, effective_total - 1, num_frames) | |
| indices = np.rint(indices_np).astype(int).tolist() | |
| # Enforce first and last explicitly | |
| indices[0] = 0 | |
| indices[-1] = effective_total - 1 | |
| # Ensure indices are strictly non-decreasing and within bounds | |
| for k in range(1, len(indices)): | |
| if indices[k] < indices[k - 1]: | |
| indices[k] = indices[k - 1] | |
| if indices[k] >= effective_total: | |
| indices[k] = effective_total - 1 | |
| # Subsample frames | |
| subsampled_frames = frames_to_subsample[indices] | |
| return subsampled_frames, indices | |
| def raw_dict_to_sample( | |
| raw_data: Union[Tuple[Dict[str, Any], Dict[str, Any]], Dict[str, Any]], | |
| max_frames: int = 16, | |
| sample_type: str = "progress", | |
| ) -> Union[ProgressSample, PreferenceSample]: | |
| """ | |
| Convert raw data dictionary to a ProgressSample or PreferenceSample. | |
| Args: | |
| raw_data: Dict with 'frames', 'task', 'id', 'metadata', 'video_embeddings', 'text_embedding' or Tuple of (Dict[str, Any], Dict[str, Any]) | |
| max_frames: Maximum number of frames to use (default: 16) | |
| sample_type: Either "progress" or "preference" (default: "progress") | |
| Returns: | |
| ProgressSample or PreferenceSample | |
| """ | |
| def _build_trajectory(raw_data: Dict[str, Any], num_frames: int) -> Trajectory: | |
| processed_item: Dict[str, Any] = {} | |
| # Process frames | |
| frames_array = raw_data["frames"] | |
| # Ensure we have the correct shape: (T, H, W, C) | |
| if len(frames_array.shape) != 4: | |
| raise ValueError(f"Expected 4D array (T, H, W, C), got shape {frames_array.shape}") | |
| # Convert from CxHxW to HxWxC if needed | |
| if frames_array.shape[1] == 3: | |
| frames_array = np.transpose(frames_array, (0, 2, 3, 1)) | |
| frames_array, _ = linspace_subsample_frames(frames_array, num_frames) | |
| dummy_progress = [0.0] * len(frames_array) | |
| frames_array, _ = pad_trajectory_to_max_frames_np(frames_array, dummy_progress, num_frames, pad_from="right") | |
| if frames_array.size == 0: | |
| raise ValueError("No frames processed for example") | |
| processed_item["frames"] = frames_array | |
| processed_item["frames_shape"] = frames_array.shape | |
| processed_item["task"] = raw_data["task"] | |
| processed_item["lang_vector"] = None | |
| processed_item["metadata"] = raw_data.get("metadata", None) | |
| # Process video embeddings using same helper functions | |
| video_embeddings = raw_data.get("video_embeddings") | |
| if video_embeddings is not None: | |
| video_embeddings, _ = linspace_subsample_frames(video_embeddings, num_frames) | |
| dummy_progress_emb = [0.0] * len(video_embeddings) | |
| video_embeddings, _ = pad_trajectory_to_max_frames_np( | |
| video_embeddings, dummy_progress_emb, num_frames, pad_from="right" | |
| ) | |
| text_embedding = raw_data.get("text_embedding") | |
| # Convert to tensors if they are numpy arrays | |
| if video_embeddings is not None and isinstance(video_embeddings, np.ndarray): | |
| video_embeddings = torch.tensor(video_embeddings) | |
| if text_embedding is not None and isinstance(text_embedding, np.ndarray): | |
| text_embedding = torch.tensor(text_embedding) | |
| processed_item["video_embeddings"] = video_embeddings | |
| processed_item["text_embedding"] = text_embedding | |
| processed_item["video_shape"] = video_embeddings.shape if video_embeddings is not None else None | |
| processed_item["text_shape"] = text_embedding.shape if text_embedding is not None else None | |
| trajectory = Trajectory(**processed_item) | |
| return trajectory | |
| if sample_type == "progress": | |
| assert isinstance(raw_data, dict), "raw_data must be a dictionary" | |
| trajectory = _build_trajectory(raw_data=raw_data, num_frames=max_frames) | |
| return ProgressSample(trajectory=trajectory) | |
| elif sample_type == "preference": | |
| assert isinstance(raw_data, tuple), "raw_data must be a tuple" | |
| assert len(raw_data) == 2, "raw_data must be a tuple of two dictionaries" | |
| trajectories: List[Trajectory] = [] | |
| for trajectory_data in raw_data: | |
| trajectory = _build_trajectory(raw_data=trajectory_data, num_frames=max_frames) | |
| trajectories.append(trajectory) | |
| return PreferenceSample(chosen_trajectory=trajectories[0], rejected_trajectory=trajectories[1]) | |
| else: | |
| raise ValueError(f"Unsupported sample_type: {sample_type}") | |
| def build_payload( | |
| samples: list[PreferenceSample | ProgressSample], | |
| ) -> tuple[dict[str, Any], list[dict[str, Any]]]: | |
| """Build a payload with numpy array handling. | |
| Args: | |
| samples: List of samples to convert | |
| Returns: | |
| Tuple of (files, sample_data) where: | |
| - files: Dict of numpy arrays converted to .npy format | |
| - sample_data: List of sample dictionaries with numpy arrays replaced by file references | |
| """ | |
| files = {} | |
| sample_data = [] | |
| for sample_idx, sample in enumerate(samples): | |
| # Copy the original sample and handle numpy arrays | |
| processed_sample = sample.model_dump().copy() | |
| # Handle trajectory objects with numpy arrays | |
| for key in [ | |
| "chosen_trajectory", | |
| "rejected_trajectory", | |
| "trajectory", | |
| ]: | |
| if key in processed_sample and isinstance(processed_sample[key], dict): | |
| trajectory = processed_sample[key] | |
| # Convert numpy arrays to .npy files | |
| numpy_fields = ["frames", "lang_vector", "video_embeddings", "text_embedding"] | |
| for field_name in numpy_fields: | |
| # if it is a tensor, first convert it to a numpy array | |
| if field_name in trajectory and isinstance(trajectory[field_name], torch.Tensor): | |
| trajectory[field_name] = trajectory[field_name].numpy() | |
| if field_name in trajectory and isinstance(trajectory[field_name], np.ndarray): | |
| # Convert numpy array to .npy file | |
| buf = io.BytesIO() | |
| np.save(buf, trajectory[field_name]) | |
| buf.seek(0) | |
| file_key = f"sample_{sample_idx}_{key}_{field_name}" | |
| files[file_key] = ( | |
| f"sample_{sample_idx}_{key}_{field_name}.npy", | |
| buf, | |
| "application/octet-stream", | |
| ) | |
| trajectory[field_name] = {"__numpy_file__": file_key} | |
| sample_data.append(processed_sample) | |
| return files, sample_data | |
| def post_batch(url: str, payload: dict[str, Any], timeout_s: float = 120.0) -> dict[str, Any]: | |
| """POST a batch payload to the evaluation server and return parsed JSON.""" | |
| resp = requests.post(url.rstrip("/") + "/evaluate_batch", json=payload, timeout=timeout_s) | |
| resp.raise_for_status() | |
| return resp.json() | |
| def post_batch_npy( | |
| url: str, | |
| files: dict[str, Any], | |
| sample_data: list[dict[str, Any]], | |
| timeout_s: float = 120.0, | |
| extra_form_data: Optional[dict[str, Any]] = None, | |
| ) -> dict[str, Any]: | |
| """POST batch using .npy format for numpy arrays. | |
| Args: | |
| url: Server URL | |
| files: Dict of numpy arrays converted to .npy format | |
| sample_data: List of sample dictionaries | |
| timeout_s: Request timeout in seconds | |
| extra_form_data: Optional extra form data to include (e.g., use_frame_steps) | |
| """ | |
| # Convert sample_data to form data | |
| data = {f"sample_{i}": json.dumps(sample) for i, sample in enumerate(sample_data)} | |
| # Add extra form data if provided | |
| if extra_form_data: | |
| for key, value in extra_form_data.items(): | |
| data[key] = json.dumps(value) if not isinstance(value, str) else value | |
| # Send as multipart form data | |
| resp = requests.post(url.rstrip("/") + "/evaluate_batch_npy", files=files, data=data, timeout=timeout_s) | |
| resp.raise_for_status() | |
| return resp.json() | |
| async def post_batch_npy_async( | |
| session: aiohttp.ClientSession, | |
| url: str, | |
| files: dict[str, Any], | |
| sample_data: list[dict[str, Any]], | |
| timeout_s: float = 120.0, | |
| ) -> dict[str, Any]: | |
| """Async version of post_batch_npy using aiohttp.""" | |
| # Create FormData for aiohttp | |
| form_data = aiohttp.FormData() | |
| # Add files | |
| for key, (filename, file_obj, content_type) in files.items(): | |
| form_data.add_field(key, file_obj, filename=filename, content_type=content_type) | |
| # Add sample data | |
| for i, sample in enumerate(sample_data): | |
| form_data.add_field(f"sample_{i}", json.dumps(sample)) | |
| headers = {"Connection": "close"} | |
| # Send as multipart form data using aiohttp | |
| timeout = aiohttp.ClientTimeout(total=timeout_s) | |
| async with session.post( | |
| url.rstrip("/") + "/evaluate_batch_npy", data=form_data, timeout=timeout, headers=headers | |
| ) as resp: | |
| resp.raise_for_status() | |
| return await resp.json() | |
| async def parse_npy_form_data(form_data: Any) -> Tuple[Dict[str, np.ndarray], Dict[str, Any]]: | |
| """Parse multipart form data to extract numpy arrays and other data. | |
| Args: | |
| form_data: FastAPI form data from request.form() | |
| Returns: | |
| Tuple of (numpy_arrays dict, other_data dict) | |
| """ | |
| numpy_arrays = {} | |
| other_data = {} | |
| for key, value in form_data.items(): | |
| # Check if this is a file upload (UploadFile object) | |
| if hasattr(value, "filename") and value.filename: | |
| # This is a file upload | |
| if value.filename.endswith(".npy"): | |
| # Load .npy file (await async read) | |
| content = await value.read() | |
| buf = io.BytesIO(content) | |
| array = np.load(buf) | |
| numpy_arrays[key] = array | |
| else: | |
| # Non-.npy file, skip for now | |
| continue | |
| else: | |
| # This is a string value (form field) | |
| try: | |
| # Try to parse as JSON | |
| other_data[key] = json.loads(value) | |
| except (json.JSONDecodeError, TypeError): | |
| # Keep as string if not JSON | |
| other_data[key] = value | |
| return numpy_arrays, other_data | |
| def reconstruct_payload_from_npy( | |
| numpy_arrays: Dict[str, np.ndarray], | |
| other_data: Dict[str, Any], | |
| trajectory_keys: Optional[List[str]] = None, | |
| convert_embeddings_to_torch: bool = False, | |
| ) -> List[Dict[str, Any]]: | |
| """Reconstruct the original payload structure from .npy files and form data. | |
| The client sends data in this format: | |
| - Files: sample_0_chosen_trajectory_frames.npy, sample_0_trajectory_frames.npy, etc. | |
| - Data: sample_0, sample_1, etc. (each containing the full sample JSON with numpy file references) | |
| Args: | |
| numpy_arrays: Dictionary of numpy arrays loaded from .npy files | |
| other_data: Dictionary of other form data | |
| trajectory_keys: List of trajectory keys to process (default: common keys) | |
| convert_embeddings_to_torch: Whether to convert embeddings to torch tensors | |
| Returns: | |
| List of reconstructed sample dictionaries | |
| """ | |
| if trajectory_keys is None: | |
| trajectory_keys = [ | |
| "chosen_trajectory", | |
| "rejected_trajectory", | |
| "trajectory", | |
| ] | |
| samples = [] | |
| # Process each sample | |
| for i in range(len(other_data)): | |
| sample_key = f"sample_{i}" | |
| if sample_key in other_data: | |
| # Get the sample data - might already be parsed or might be a string | |
| sample_data = other_data[sample_key] | |
| if isinstance(sample_data, str): | |
| # Parse the sample JSON if it's a string | |
| sample_data = json.loads(sample_data) | |
| # Replace numpy file references with actual arrays | |
| for key, value in sample_data.items(): | |
| if key in trajectory_keys: | |
| if isinstance(value, dict): | |
| for traj_key, traj_value in value.items(): | |
| if isinstance(traj_value, dict) and traj_value.get("__numpy_file__"): | |
| # Replace with actual numpy array | |
| file_key = traj_value["__numpy_file__"] | |
| if file_key in numpy_arrays: | |
| value[traj_key] = numpy_arrays[file_key] | |
| # Convert embeddings to torch if requested | |
| if convert_embeddings_to_torch and traj_key in ["video_embeddings", "text_embedding"]: | |
| if traj_key in value and value[traj_key] is not None: | |
| if isinstance(value[traj_key], np.ndarray): | |
| value[traj_key] = torch.tensor(value[traj_key]) | |
| elif isinstance(value[traj_key], list): | |
| value[traj_key] = torch.tensor(value[traj_key]) | |
| samples.append(sample_data) | |
| return samples | |
| def find_video_files(directory: str) -> list[str]: | |
| """Find all video files in a directory. | |
| Args: | |
| directory: Path to directory containing video files | |
| Returns: | |
| List of paths to video files | |
| """ | |
| video_extensions = {".mp4", ".avi", ".mov", ".mkv", ".webm", ".flv", ".wmv", ".m4v"} | |
| video_files = [] | |
| directory_path = Path(directory) | |
| if not directory_path.is_dir(): | |
| return [] | |
| for file_path in directory_path.iterdir(): | |
| if file_path.is_file() and file_path.suffix.lower() in video_extensions: | |
| video_files.append(str(file_path)) | |
| video_files.sort() | |
| return video_files | |
| def infer_task_from_video_name(video_path: str) -> str: | |
| """Infer task name from video filename. | |
| Task is everything before the comma (if comma exists), or everything before success/fail/failure. | |
| Args: | |
| video_path: Path to video file | |
| Returns: | |
| Inferred task name | |
| """ | |
| video_name = Path(video_path).stem # Get filename without extension | |
| # If there's a comma, task is everything before the comma | |
| if "," in video_name: | |
| task_part = video_name.split(",")[0] | |
| else: | |
| # Otherwise, split by underscore and remove success/fail/failure suffixes | |
| parts = video_name.split("_") | |
| filtered_parts = [] | |
| for part in parts: | |
| part_lower = part.lower() | |
| if part_lower not in ["success", "fail", "failure"]: | |
| filtered_parts.append(part) | |
| if not filtered_parts: | |
| return "Complete the task" | |
| task_part = "_".join(filtered_parts) | |
| # Split by underscore and join with spaces | |
| task_words = task_part.split("_") | |
| task = " ".join(task_words) | |
| if task: | |
| # Capitalize first letter of first word, keep rest as is | |
| task = task[0].upper() + task[1:] if len(task) > 1 else task.upper() | |
| else: | |
| task = "Complete the task" | |
| return task | |
| def setup_output_directory(output_dir: Optional[str], video_path: Optional[str] = None) -> str: | |
| """Create output directory and return path.""" | |
| if output_dir: | |
| save_dir = output_dir | |
| else: | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| save_dir = os.path.join(".", f"eval_outputs/{timestamp}") | |
| os.makedirs(save_dir, exist_ok=True) | |
| return save_dir | |