diff --git a/app.py b/app.py index 37a448b5b291118335df3c5e47f17e89e274c90c..ad9197c0705b45c5bbef4b8c5d1a21ace6a88320 100644 --- a/app.py +++ b/app.py @@ -25,9 +25,9 @@ import numpy as np import requests from typing import Any, List, Optional, Tuple -from rfm.data.dataset_types import Trajectory, ProgressSample, PreferenceSample, SimilaritySample -from rfm.evals.eval_utils import build_payload, post_batch_npy -from rfm.evals.eval_viz_utils import create_combined_progress_success_plot, extract_frames +from dataset_types import Trajectory, ProgressSample, PreferenceSample, SimilaritySample +from eval_utils import build_payload, post_batch_npy +from eval_viz_utils import create_combined_progress_success_plot, extract_frames from datasets import load_dataset as load_dataset_hf, get_dataset_config_names logger = logging.getLogger(__name__) @@ -514,7 +514,7 @@ def process_two_videos( elif prediction_type == "progress": # Create ProgressSamples for both videos - from rfm.data.dataset_types import ProgressSample + from dataset_types import ProgressSample progress_sample_a = ProgressSample( trajectory=trajectory_a, diff --git a/dataset_types.py b/dataset_types.py new file mode 100755 index 0000000000000000000000000000000000000000..d78d2d4f0e4bfd88eb611d54b4fd7b2fd38411ee --- /dev/null +++ b/dataset_types.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python3 +""" +Dataclasses for RFM model dataset trajectory structures. +Defines the standard format for HuggingFace dataset trajectories. +""" + +from typing import Any, Union, List, Dict, Tuple, Optional + +import numpy as np +from pydantic import BaseModel, ConfigDict +import torch + + +class Trajectory(BaseModel): + """Trajectory structure containing frames, metadata, and progress information.""" + + # Core trajectory fields + frames: Union[List[str], np.ndarray, None] = None + frames_shape: Optional[Tuple] = None + + # If embeddings are precomputed + embeddings_path: Optional[str] = None + video_embeddings: Union[torch.Tensor, np.ndarray, None] = None + text_embedding: Union[torch.Tensor, np.ndarray, None] = None + + id: Optional[str] = None + task: Optional[str] = None + lang_vector: Union[np.ndarray, List[float], None] = None + data_source: Optional[str] = None + quality_label: Optional[str] = None + is_robot: Optional[bool] = None + + # Progress and metadata + # Can be List[float] for continuous progress, np.ndarray, or List[np.ndarray] for C51 discrete distributions + target_progress: Optional[Union[List[float], List[torch.Tensor], torch.Tensor, None]] = None + partial_success: Optional[Union[float, torch.Tensor]] = None # float for continuous, Tensor for C51 discrete + success_label: Optional[List[float]] = None # Success labels for each frame (1.0 for success, 0.0 for failure) + metadata: Optional[Dict[str, Any]] = None + data_gen_strategy: Optional[str] = None + + model_config = ConfigDict(arbitrary_types_allowed=True) + + +class ProgressSample(BaseModel): + """Sample structure for progress evaluation.""" + + trajectory: Trajectory + sample_type: str = "progress" + data_gen_strategy: Optional[str] = None + resample_attempts: int = 1 + + +class PreferenceSample(BaseModel): + """Sample structure for preference prediction: chosen vs rejected where chosen is preferred.""" + + # Trajectories + chosen_trajectory: Trajectory + rejected_trajectory: Trajectory + + sample_type: str = "preference" + data_gen_strategy: Optional[str] = None + resample_attempts: int = 1 + + +class SimilaritySample(BaseModel): + """Sample structure for similarity scoring: traj_sim and traj_diff ranked against o^ref.""" + + # Trajectories + ref_trajectory: Trajectory # o^ref + sim_trajectory: Trajectory # Similar trajectory + diff_trajectory: Optional[Trajectory] = None # Different trajectory (optional in inference mode) + + sample_type: str = "similarity" + data_gen_strategy: Optional[str] = None + resample_attempts: int = 1 + + +SampleType = Union[PreferenceSample, SimilaritySample, ProgressSample] diff --git a/eval_utils.py b/eval_utils.py new file mode 100755 index 0000000000000000000000000000000000000000..7682983e5127aa3e83adad1f9ffd3fbe23b89101 --- /dev/null +++ b/eval_utils.py @@ -0,0 +1,400 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import re +import torch +import io +import json +import os +from pathlib import Path +from typing import Any, Dict, List, Union, Optional, Tuple +from datetime import datetime + +import aiohttp +import numpy as np +import requests +import torch + +from rfm.data.dataset_types import PreferenceSample, SimilaritySample, ProgressSample, Trajectory +from rfm.data.datasets.helpers import linspace_subsample_frames, pad_trajectory_to_max_frames_np + + +def extract_answer_from_text(text: str) -> str: + """Extract answer from text using tags.""" + m = re.search(r"(.*?)", text, re.DOTALL) + ans = m.group(1).strip() if m else "" + return ans + + +def raw_dict_to_sample( + raw_data: Union[Tuple[Dict[str, Any], Dict[str, Any]], Dict[str, Any]], + max_frames: int = 16, + sample_type: str = "progress", +) -> Union[ProgressSample, PreferenceSample]: + """ + Convert raw data dictionary to a ProgressSample or PreferenceSample. + + Args: + raw_data: Dict with 'frames', 'task', 'id', 'metadata', 'video_embeddings', 'text_embedding' or Tuple of (Dict[str, Any], Dict[str, Any]) + max_frames: Maximum number of frames to use (default: 16) + sample_type: Either "progress" or "preference" (default: "progress") + + Returns: + ProgressSample or PreferenceSample + """ + + def _build_trajectory(raw_data: Dict[str, Any], num_frames: int) -> Trajectory: + processed_item: Dict[str, Any] = {} + + # Process frames + frames_array = raw_data["frames"] + + # Ensure we have the correct shape: (T, H, W, C) + if len(frames_array.shape) != 4: + raise ValueError(f"Expected 4D array (T, H, W, C), got shape {frames_array.shape}") + + # Convert from CxHxW to HxWxC if needed + if frames_array.shape[1] == 3: + frames_array = np.transpose(frames_array, (0, 2, 3, 1)) + + frames_array, _ = linspace_subsample_frames(frames_array, num_frames) + dummy_progress = [0.0] * len(frames_array) + frames_array, _ = pad_trajectory_to_max_frames_np(frames_array, dummy_progress, num_frames, pad_from="right") + + if frames_array.size == 0: + raise ValueError("No frames processed for example") + + processed_item["frames"] = frames_array + processed_item["frames_shape"] = frames_array.shape + processed_item["task"] = raw_data["task"] + processed_item["lang_vector"] = None + processed_item["metadata"] = raw_data.get("metadata", None) + + # Process video embeddings using same helper functions + video_embeddings = raw_data.get("video_embeddings") + if video_embeddings is not None: + video_embeddings, _ = linspace_subsample_frames(video_embeddings, num_frames) + dummy_progress_emb = [0.0] * len(video_embeddings) + video_embeddings, _ = pad_trajectory_to_max_frames_np( + video_embeddings, dummy_progress_emb, num_frames, pad_from="right" + ) + + text_embedding = raw_data.get("text_embedding") + + # Convert to tensors if they are numpy arrays + if video_embeddings is not None and isinstance(video_embeddings, np.ndarray): + video_embeddings = torch.tensor(video_embeddings) + if text_embedding is not None and isinstance(text_embedding, np.ndarray): + text_embedding = torch.tensor(text_embedding) + + processed_item["video_embeddings"] = video_embeddings + processed_item["text_embedding"] = text_embedding + processed_item["video_shape"] = video_embeddings.shape if video_embeddings is not None else None + processed_item["text_shape"] = text_embedding.shape if text_embedding is not None else None + + trajectory = Trajectory(**processed_item) + return trajectory + + if sample_type == "progress": + assert isinstance(raw_data, dict), "raw_data must be a dictionary" + trajectory = _build_trajectory(raw_data=raw_data, num_frames=max_frames) + return ProgressSample(trajectory=trajectory) + elif sample_type == "preference": + assert isinstance(raw_data, tuple), "raw_data must be a tuple" + assert len(raw_data) == 2, "raw_data must be a tuple of two dictionaries" + trajectories: List[Trajectory] = [] + for trajectory_data in raw_data: + trajectory = _build_trajectory(raw_data=trajectory_data, num_frames=max_frames) + trajectories.append(trajectory) + return PreferenceSample(chosen_trajectory=trajectories[0], rejected_trajectory=trajectories[1]) + else: + raise ValueError(f"Unsupported sample_type: {sample_type}") + + +def build_payload( + samples: list[PreferenceSample | SimilaritySample | ProgressSample], +) -> tuple[dict[str, Any], list[dict[str, Any]]]: + """Build a payload with numpy array handling. + + Args: + samples: List of samples to convert + + Returns: + Tuple of (files, sample_data) where: + - files: Dict of numpy arrays converted to .npy format + - sample_data: List of sample dictionaries with numpy arrays replaced by file references + """ + files = {} + sample_data = [] + + for sample_idx, sample in enumerate(samples): + # Copy the original sample and handle numpy arrays + processed_sample = sample.model_dump().copy() + + # Handle trajectory objects with numpy arrays + for key in [ + "chosen_trajectory", + "rejected_trajectory", + "reference_trajectory", + "traj_sim_trajectory", + "traj_diff_trajectory", + "trajectory", + ]: + if key in processed_sample and isinstance(processed_sample[key], dict): + trajectory = processed_sample[key] + + # Convert numpy arrays to .npy files + numpy_fields = ["frames", "lang_vector", "video_embeddings", "text_embedding"] + for field_name in numpy_fields: + # if it is a tensor, first convert it to a numpy array + if field_name in trajectory and isinstance(trajectory[field_name], torch.Tensor): + trajectory[field_name] = trajectory[field_name].numpy() + + if field_name in trajectory and isinstance(trajectory[field_name], np.ndarray): + # Convert numpy array to .npy file + buf = io.BytesIO() + np.save(buf, trajectory[field_name]) + buf.seek(0) + file_key = f"sample_{sample_idx}_{key}_{field_name}" + files[file_key] = ( + f"sample_{sample_idx}_{key}_{field_name}.npy", + buf, + "application/octet-stream", + ) + trajectory[field_name] = {"__numpy_file__": file_key} + + sample_data.append(processed_sample) + + return files, sample_data + + +def post_batch(url: str, payload: dict[str, Any], timeout_s: float = 120.0) -> dict[str, Any]: + """POST a batch payload to the evaluation server and return parsed JSON.""" + resp = requests.post(url.rstrip("/") + "/evaluate_batch", json=payload, timeout=timeout_s) + resp.raise_for_status() + return resp.json() + + +def post_batch_npy( + url: str, files: dict[str, Any], sample_data: list[dict[str, Any]], timeout_s: float = 120.0 +) -> dict[str, Any]: + """POST batch using .npy format for numpy arrays.""" + # Convert sample_data to form data + data = {f"sample_{i}": json.dumps(sample) for i, sample in enumerate(sample_data)} + + # Send as multipart form data + resp = requests.post(url.rstrip("/") + "/evaluate_batch_npy", files=files, data=data, timeout=timeout_s) + resp.raise_for_status() + return resp.json() + + +async def post_batch_npy_async( + session: aiohttp.ClientSession, + url: str, + files: dict[str, Any], + sample_data: list[dict[str, Any]], + timeout_s: float = 120.0, +) -> dict[str, Any]: + """Async version of post_batch_npy using aiohttp.""" + # Create FormData for aiohttp + form_data = aiohttp.FormData() + + # Add files + for key, (filename, file_obj, content_type) in files.items(): + form_data.add_field(key, file_obj, filename=filename, content_type=content_type) + + # Add sample data + for i, sample in enumerate(sample_data): + form_data.add_field(f"sample_{i}", json.dumps(sample)) + + headers = {"Connection": "close"} + # Send as multipart form data using aiohttp + timeout = aiohttp.ClientTimeout(total=timeout_s) + async with session.post( + url.rstrip("/") + "/evaluate_batch_npy", data=form_data, timeout=timeout, headers=headers + ) as resp: + resp.raise_for_status() + return await resp.json() + + +async def parse_npy_form_data(form_data: Any) -> Tuple[Dict[str, np.ndarray], Dict[str, Any]]: + """Parse multipart form data to extract numpy arrays and other data. + + Args: + form_data: FastAPI form data from request.form() + + Returns: + Tuple of (numpy_arrays dict, other_data dict) + """ + numpy_arrays = {} + other_data = {} + + for key, value in form_data.items(): + # Check if this is a file upload (UploadFile object) + if hasattr(value, "filename") and value.filename: + # This is a file upload + if value.filename.endswith(".npy"): + # Load .npy file (await async read) + content = await value.read() + buf = io.BytesIO(content) + array = np.load(buf) + numpy_arrays[key] = array + else: + # Non-.npy file, skip for now + continue + else: + # This is a string value (form field) + try: + # Try to parse as JSON + other_data[key] = json.loads(value) + except (json.JSONDecodeError, TypeError): + # Keep as string if not JSON + other_data[key] = value + + return numpy_arrays, other_data + + +def reconstruct_payload_from_npy( + numpy_arrays: Dict[str, np.ndarray], + other_data: Dict[str, Any], + trajectory_keys: Optional[List[str]] = None, + convert_embeddings_to_torch: bool = False, +) -> List[Dict[str, Any]]: + """Reconstruct the original payload structure from .npy files and form data. + + The client sends data in this format: + - Files: sample_0_chosen_trajectory_frames.npy, sample_0_trajectory_frames.npy, etc. + - Data: sample_0, sample_1, etc. (each containing the full sample JSON with numpy file references) + + Args: + numpy_arrays: Dictionary of numpy arrays loaded from .npy files + other_data: Dictionary of other form data + trajectory_keys: List of trajectory keys to process (default: common keys) + convert_embeddings_to_torch: Whether to convert embeddings to torch tensors + + Returns: + List of reconstructed sample dictionaries + """ + if trajectory_keys is None: + trajectory_keys = [ + "chosen_trajectory", + "rejected_trajectory", + "reference_trajectory", + "traj_sim_trajectory", + "traj_diff_trajectory", + "trajectory", + ] + + samples = [] + + # Process each sample + for i in range(len(other_data)): + sample_key = f"sample_{i}" + if sample_key in other_data: + # Get the sample data - might already be parsed or might be a string + sample_data = other_data[sample_key] + if isinstance(sample_data, str): + # Parse the sample JSON if it's a string + sample_data = json.loads(sample_data) + + # Replace numpy file references with actual arrays + for key, value in sample_data.items(): + if key in trajectory_keys: + if isinstance(value, dict): + for traj_key, traj_value in value.items(): + if isinstance(traj_value, dict) and traj_value.get("__numpy_file__"): + # Replace with actual numpy array + file_key = traj_value["__numpy_file__"] + if file_key in numpy_arrays: + value[traj_key] = numpy_arrays[file_key] + + # Convert embeddings to torch if requested + if convert_embeddings_to_torch and traj_key in ["video_embeddings", "text_embedding"]: + if traj_key in value and value[traj_key] is not None: + if isinstance(value[traj_key], np.ndarray): + value[traj_key] = torch.tensor(value[traj_key]) + elif isinstance(value[traj_key], list): + value[traj_key] = torch.tensor(value[traj_key]) + + samples.append(sample_data) + + return samples + + +def find_video_files(directory: str) -> list[str]: + """Find all video files in a directory. + + Args: + directory: Path to directory containing video files + + Returns: + List of paths to video files + """ + video_extensions = {".mp4", ".avi", ".mov", ".mkv", ".webm", ".flv", ".wmv", ".m4v"} + video_files = [] + + directory_path = Path(directory) + if not directory_path.is_dir(): + return [] + + for file_path in directory_path.iterdir(): + if file_path.is_file() and file_path.suffix.lower() in video_extensions: + video_files.append(str(file_path)) + + video_files.sort() + return video_files + + +def infer_task_from_video_name(video_path: str) -> str: + """Infer task name from video filename. + + Task is everything before the comma (if comma exists), or everything before success/fail/failure. + + Args: + video_path: Path to video file + + Returns: + Inferred task name + """ + video_name = Path(video_path).stem # Get filename without extension + + # If there's a comma, task is everything before the comma + if "," in video_name: + task_part = video_name.split(",")[0] + else: + # Otherwise, split by underscore and remove success/fail/failure suffixes + parts = video_name.split("_") + filtered_parts = [] + for part in parts: + part_lower = part.lower() + if part_lower not in ["success", "fail", "failure"]: + filtered_parts.append(part) + + if not filtered_parts: + return "Complete the task" + + task_part = "_".join(filtered_parts) + + # Split by underscore and join with spaces + task_words = task_part.split("_") + task = " ".join(task_words) + + if task: + # Capitalize first letter of first word, keep rest as is + task = task[0].upper() + task[1:] if len(task) > 1 else task.upper() + else: + task = "Complete the task" + + return task + + +def setup_output_directory(output_dir: Optional[str], video_path: Optional[str] = None) -> str: + """Create output directory and return path.""" + if output_dir: + save_dir = output_dir + else: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + save_dir = os.path.join(".", f"eval_outputs/{timestamp}") + + os.makedirs(save_dir, exist_ok=True) + return save_dir diff --git a/eval_viz_utils.py b/eval_viz_utils.py new file mode 100644 index 0000000000000000000000000000000000000000..ec7be6afff1ca2df681f61278a77727defe8cd29 --- /dev/null +++ b/eval_viz_utils.py @@ -0,0 +1,205 @@ +#!/usr/bin/env python3 +""" +Utility functions for visualization in RFM evaluations. +""" + +from typing import Optional +import os +import logging +import tempfile +import numpy as np +import matplotlib.pyplot as plt +import decord + +logger = logging.getLogger(__name__) + + +def create_combined_progress_success_plot( + progress_pred: np.ndarray, + num_frames: int, + success_binary: Optional[np.ndarray] = None, + success_probs: Optional[np.ndarray] = None, + success_labels: Optional[np.ndarray] = None, + is_discrete_mode: bool = False, + title: Optional[str] = None, + loss: Optional[float] = None, + pearson: Optional[float] = None, +) -> plt.Figure: + """Create a combined plot with progress, success binary, and success probabilities. + + This function creates a unified plot with 1 subplot (progress only) or 3 subplots + (progress, success binary, success probs), similar to the one used in compile_results.py. + + Args: + progress_pred: Progress predictions array + num_frames: Number of frames + success_binary: Optional binary success predictions + success_probs: Optional success probability predictions + success_labels: Optional ground truth success labels + is_discrete_mode: Whether progress is in discrete mode (deprecated, kept for compatibility) + title: Optional title for the plot (if None, auto-generated from loss/pearson) + loss: Optional loss value to display in title + pearson: Optional pearson correlation to display in title + + Returns: + matplotlib Figure object + """ + # Determine if we should show success plots + has_success_binary = success_binary is not None and len(success_binary) == len(progress_pred) + + if has_success_binary: + # Three subplots: progress, success (binary), success_probs + fig, axs = plt.subplots(1, 3, figsize=(15, 3.5)) + ax = axs[0] # Progress subplot + ax2 = axs[1] # Success subplot (binary) + ax3 = axs[2] # Success probs subplot + else: + # Single subplot: progress only + fig, ax = plt.subplots(figsize=(6, 3.5)) + ax2 = None + ax3 = None + + # Plot progress + ax.plot(progress_pred, linewidth=2) + ax.set_ylabel("Progress") + + # Build title + if title is None: + title_parts = ["Progress"] + if loss is not None: + title_parts.append(f"Loss: {loss:.3f}") + if pearson is not None: + title_parts.append(f"Pearson: {pearson:.2f}") + title = ", ".join(title_parts) + fig.suptitle(title) + + # Set y-limits and ticks (always continuous since discrete is converted before this function) + ax.set_ylim(0, 1) + ax.spines["right"].set_visible(False) + ax.spines["top"].set_visible(False) + y_ticks = [0, 0.2, 0.4, 0.6, 0.8, 1.0] + ax.set_yticks(y_ticks) + + # Setup success binary subplot + if ax2 is not None: + ax2.step(range(len(success_binary)), success_binary, where="post", linewidth=2, label="Predicted", color="blue") + # Add ground truth success labels as green line if available + if success_labels is not None and len(success_labels) == len(success_binary): + ax2.step( + range(len(success_labels)), + success_labels, + where="post", + linewidth=2, + label="Ground Truth", + color="green", + ) + ax2.set_ylabel("Success (Binary)") + ax2.set_ylim(-0.05, 1.05) + ax2.spines["right"].set_visible(False) + ax2.spines["top"].set_visible(False) + ax2.set_yticks([0, 1]) + ax2.legend() + + # Setup success probs subplot if available + if ax3 is not None and success_probs is not None: + ax3.plot(range(len(success_probs)), success_probs, linewidth=2, label="Success Prob", color="purple") + # Add ground truth success labels as green line if available + if success_labels is not None and len(success_labels) == len(success_probs): + ax3.step( + range(len(success_labels)), + success_labels, + where="post", + linewidth=2, + label="Ground Truth", + color="green", + linestyle="--", + ) + ax3.set_ylabel("Success Probability") + ax3.set_ylim(-0.05, 1.05) + ax3.spines["right"].set_visible(False) + ax3.spines["top"].set_visible(False) + ax3.set_yticks([0, 0.2, 0.4, 0.6, 0.8, 1.0]) + ax3.legend() + + plt.tight_layout() + return fig + + +def extract_frames(video_path: str, fps: float = 1.0, max_frames: int = 64) -> np.ndarray: + """Extract frames from video file as numpy array (T, H, W, C). + + Supports both local file paths and URLs (e.g., HuggingFace Hub URLs). + Uses the provided ``fps`` to control how densely frames are sampled from + the underlying video, but caps the total number of frames at ``max_frames`` + to prevent memory issues. + + Args: + video_path: Path to video file or URL + fps: Frames per second to extract (default: 1.0) + max_frames: Maximum number of frames to extract (default: 64). This prevents + memory issues with long videos or high FPS settings. + + Returns: + numpy array of shape (T, H, W, C) containing extracted frames, or None if error + """ + if video_path is None: + return None + + if isinstance(video_path, tuple): + video_path = video_path[0] + + # Check if it's a URL or local file + is_url = video_path.startswith(("http://", "https://")) + is_local_file = os.path.exists(video_path) if not is_url else False + + if not is_url and not is_local_file: + logger.warning(f"Video path does not exist: {video_path}") + return None + + try: + # decord.VideoReader can handle both local files and URLs + vr = decord.VideoReader(video_path, num_threads=1) + total_frames = len(vr) + + # Determine native FPS; fall back to a reasonable default if unavailable + try: + native_fps = float(vr.get_avg_fps()) + except Exception: + native_fps = 1.0 + + # If user-specified fps is invalid or None, default to native fps + if fps is None or fps <= 0: + fps = native_fps + + # Compute how many frames we want based on desired fps + # num_frames ≈ total_duration * fps = total_frames * (fps / native_fps) + if native_fps > 0: + desired_frames = int(round(total_frames * (fps / native_fps))) + else: + desired_frames = total_frames + + # Clamp to [1, total_frames] + desired_frames = max(1, min(desired_frames, total_frames)) + + # IMPORTANT: Cap at max_frames to prevent memory issues + # This is critical when fps is high or videos are long + if desired_frames > max_frames: + logger.warning( + f"Requested {desired_frames} frames but capping at {max_frames} " + f"to prevent memory issues (video has {total_frames} frames at {native_fps:.2f} fps, " + f"requested extraction at {fps:.2f} fps)" + ) + desired_frames = max_frames + + # Evenly sample indices to match the desired number of frames + if desired_frames == total_frames: + frame_indices = list(range(total_frames)) + else: + frame_indices = np.linspace(0, total_frames - 1, desired_frames, dtype=int).tolist() + + frames_array = vr.get_batch(frame_indices).asnumpy() # Shape: (T, H, W, C) + del vr + return frames_array + except Exception as e: + logger.error(f"Error extracting frames from {video_path}: {e}") + return None diff --git a/requirements.txt b/requirements.txt index 6754456f407b18079563d340b31584237913d5fc..d00d13d20779b938f306d29ea65a5e0d6452ea3a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,8 +26,7 @@ watchfiles # For file watching during development # RFM package (installed from git repository) # For local development, you can also install with: pip install -e ../ (from parent directory) ---ignore-requires-python -git+https://github.com/aliang8/reward_fm.git@anthony_working +# git+https://github.com/aliang8/reward_fm.git@anthony_working # Make sure a newer version of gradio is installed gradio==4.44.0 \ No newline at end of file diff --git a/samplers/README.md b/samplers/README.md new file mode 100644 index 0000000000000000000000000000000000000000..636a36d5453e4931f5dc2fe8ba86e59ced24ef35 --- /dev/null +++ b/samplers/README.md @@ -0,0 +1,182 @@ +# Sampler Strategies Documentation + +This document summarizes the data generation strategies used by each sampler type in the RFM data pipeline. + +## Overview + +The codebase contains three main sampler types: +- **SimSampler**: Generates similarity scoring samples +- **PrefSampler**: Generates preference prediction samples +- **ProgressSampler**: Generates progress prediction samples + +Each sampler implements multiple strategies for generating training data, with automatic retry logic and strategy rebalancing on failure. + +--- + +## SimSampler (Similarity Scoring) + +The `SimSampler` creates similarity scoring samples where two trajectories (`o^1` and `o^2`) are ranked against a reference trajectory (`o^ref`). The goal is to learn that `o^1` should be ranked higher than `o^2`. + +### Strategies + +#### 1. **REWIND** +- **Description**: Creates a similarity sample where `o^1` is an optimal trajectory from the same task, and `o^2` is a rewound (subsampled) version of the reference trajectory. +- **Purpose**: Learn to distinguish between optimal and suboptimal trajectories from the same task. +- **Implementation**: + - `traj_sim`: Optimal trajectory from same task (via `_get_same_task_optimal`) + - `traj_diff`: Rewound trajectory from reference (via `subsample_rewind`) + +#### 2. **SUBOPTIMAL** +- **Description**: Creates a similarity sample where `o^1` is an optimal trajectory from the same task, and `o^2` is a suboptimal/failure trajectory from the same task. +- **Purpose**: Learn to distinguish between optimal and suboptimal trajectories from the same task. +- **Conditions**: Only available when: + - Data source is in the failure category (`is_failure_ds`) + - Probability is boosted by 2x for failure category data sources +- **Implementation**: + - `traj_sim`: Optimal trajectory from same task (via `_get_same_task_optimal`) + - `traj_diff`: Suboptimal trajectory from same task (via `_get_same_task_suboptimal`) + +#### 3. **PAIRED_HUMAN_ROBOT** +- **Description**: Creates a similarity sample where `o^1` is a paired human/robot trajectory (opposite type from reference, same task), and `o^2` is from a different task. +- **Purpose**: Learn to distinguish between same-task and different-task trajectories, leveraging paired human/robot demonstrations. +- **Conditions**: Only available when: + - Data source is in the paired category (`is_paired_ds`) + - Paired human/robot data exists for the task + - Probability is boosted by 2x for paired category data sources +- **Implementation**: + - `traj_sim`: Paired human/robot trajectory (via `_get_paired_human_robot_traj`) + - `traj_diff`: Trajectory from different task (via `_get_different_video_traj`) + +### Strategy Selection +- Strategies are selected probabilistically based on `similarity_strategy_ratio` configuration +- Probabilities are rebalanced when strategies fail +- Strategies are removed after 4 consecutive failures +- Maximum 10 total attempts per sample generation + +### Reference Trajectory Requirements +- For non-RoboArena: Must have `quality_label == "successful"` +- For RoboArena: Must have `partial_success` field present + +--- + +## PrefSampler (Preference Prediction) + +The `PrefSampler` creates preference prediction samples with a chosen (preferred) trajectory and a rejected (suboptimal) trajectory. + +### Strategies + +#### 1. **REWIND** +- **Description**: Uses the same optimal trajectory for both chosen and rejected, but applies rewind subsampling to the rejected trajectory. +- **Purpose**: Learn that full trajectories are preferred over truncated/rewound versions. +- **Implementation**: + - `chosen_trajectory`: Original optimal trajectory (forward subsampling) + - `rejected_trajectory`: Same trajectory with `subsample_rewind` strategy + +#### 2. **SUBOPTIMAL** +- **Description**: Uses an optimal trajectory as chosen and a suboptimal/failure trajectory from the same task as rejected. +- **Purpose**: Learn to prefer optimal trajectories over suboptimal ones from the same task. +- **Conditions**: Only available when suboptimal trajectories exist for the task +- **Implementation**: + - `chosen_trajectory`: Optimal trajectory + - `rejected_trajectory`: Suboptimal trajectory from same task (via `_get_same_task_suboptimal`) + +#### 3. **DIFFERENT_TASK** +- **Description**: Uses an optimal trajectory as chosen and a trajectory from a completely different task as rejected. +- **Purpose**: Learn that trajectories from the same task are preferred over trajectories from different tasks. +- **Implementation**: + - `chosen_trajectory`: Optimal trajectory + - `rejected_trajectory`: Trajectory from different task (via `_get_different_video_traj`) + - **Note**: Rejected trajectory's `target_progress` is set to `[0.0]` for all timesteps + +#### 4. **REVERSE_PROGRESS** +- **Description**: Uses the same optimal trajectory for both chosen and rejected, but applies reverse uniform sampling to the rejected trajectory. +- **Purpose**: Learn that forward progress is preferred over reverse progress. +- **Implementation**: + - `chosen_trajectory`: Original optimal trajectory (forward subsampling) + - `rejected_trajectory`: Same trajectory with `subsample_reverse` strategy + +#### 5. **ROBOARENA_PARTIAL_SUCCESS** +- **Description**: Uses two trajectories from the same task with different `partial_success` values. The trajectory with higher `partial_success` becomes chosen, and the one with lower `partial_success` becomes rejected. +- **Purpose**: Learn to prefer trajectories with higher partial success scores (RoboArena-specific). +- **Conditions**: Only available for RoboArena trajectories (has `partial_success` field and data_source contains "roboarena") +- **Implementation**: + - Finds a different trajectory from same task (via `_get_different_partial_success_traj`) + - Swaps trajectories if found trajectory has higher `partial_success` + - `chosen_trajectory`: Trajectory with higher `partial_success` + - `rejected_trajectory`: Trajectory with lower `partial_success` + +### Special Handling +- **Non-successful trajectories**: If a trajectory has `quality_label != "successful"` (and is not RoboArena), it is automatically used as the rejected trajectory, with an optimal trajectory from the same task as the chosen trajectory. + +### Strategy Selection +- Strategies are selected probabilistically based on `preference_strategy_ratio` configuration +- Probabilities are rebalanced when strategies fail +- Strategies are removed after 3 consecutive failures +- Maximum 10 total attempts per sample generation + +--- + +## ProgressSampler (Progress Prediction) + +The `ProgressSampler` creates progress prediction samples from a single trajectory, applying different subsampling strategies to create training data. + +### Strategies + +#### 1. **DIFFERENT_TASK_INSTRUCTION** +- **Description**: Uses a trajectory from a different task, but keeps the original task's embeddings and instruction. +- **Purpose**: Learn that progress should be 0.0 when the trajectory doesn't match the task instruction. +- **Implementation**: + - Gets trajectory from different task (via `_get_different_task_instruction`) + - Replaces embeddings with original task's embeddings + - Sets `target_progress = [0.0]` for all timesteps + - Uses forward subsampling + +#### 2. **FORWARD_PROGRESS** +- **Description**: Samples the same trajectory with forward direction (start < middle < end). +- **Purpose**: Learn normal forward progress patterns. +- **Implementation**: + - Uses same trajectory with `subsample_forward` strategy + - Progress increases from start to end + +#### 3. **REVERSE_PROGRESS** +- **Description**: Samples the same trajectory with reverse direction (end < middle < start). +- **Purpose**: Learn to handle reverse progress scenarios. +- **Implementation**: + - Uses same trajectory with `subsample_reverse` strategy + - Progress decreases from start to end + +#### 4. **REWIND** +- **Description**: Samples the same trajectory with rewind direction (start < end < middle). +- **Purpose**: Learn to handle non-monotonic progress patterns. +- **Implementation**: + - Uses same trajectory with `subsample_rewind` strategy + - Progress pattern: increases, then decreases + +### Strategy Selection +- Strategies are selected probabilistically based on `progress_strategy_ratio` configuration +- Probabilities are rebalanced when strategies fail +- Failed strategies are immediately removed (no retry count threshold) +- Maximum 10 total attempts per sample generation + +--- + +## Common Features + +### Retry Logic +All samplers implement retry logic with: +- Maximum attempt limits (typically 10 attempts) +- Strategy-specific retry counts (3-4 attempts per strategy) +- Automatic strategy removal after consecutive failures +- Probability rebalancing when strategies are removed + +### Subsample Strategies +Common subsampling strategies used across samplers: +- `subsample_forward`: Normal forward sampling (start → end) +- `subsample_reverse`: Reverse sampling (end → start) +- `subsample_rewind`: Rewind sampling (start → end → start) + +### Data Source Filtering +- Strategies may be filtered or boosted based on data source categories: + - **Failure category**: Boosts SUBOPTIMAL strategy probability by 2x + - **Paired category**: Boosts PAIRED_HUMAN_ROBOT strategy probability by 2x + - **RoboArena**: Special handling for `partial_success` field diff --git a/samplers/__init__.py b/samplers/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..8da6208f0734c7b462488c28e332b3ef26233b05 --- /dev/null +++ b/samplers/__init__.py @@ -0,0 +1,23 @@ +from rfm.data.samplers.base import RFMBaseSampler +from rfm.data.samplers.pref import PrefSampler +from rfm.data.samplers.sim import SimSampler +from rfm.data.samplers.progress import ProgressSampler +from rfm.data.samplers.eval.confusion_matrix import ConfusionMatrixSampler +from rfm.data.samplers.eval.progress_policy_ranking import ProgressPolicyRankingSampler +from rfm.data.samplers.eval.reward_alignment import RewardAlignmentSampler +from rfm.data.samplers.eval.quality_preference import QualityPreferenceSampler +from rfm.data.samplers.eval.roboarena_quality_preference import RoboArenaQualityPreferenceSampler +from rfm.data.samplers.eval.similarity_score import SimilarityScoreSampler + +__all__ = [ + "RFMBaseSampler", + "PrefSampler", + "SimSampler", + "ProgressSampler", + "ConfusionMatrixSampler", + "ProgressPolicyRankingSampler", + "RewardAlignmentSampler", + "QualityPreferenceSampler", + "RoboArenaQualityPreferenceSampler", + "SimilarityScoreSampler", +] diff --git a/samplers/__pycache__/__init__.cpython-310.pyc b/samplers/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..b434a3cc556e0d57589d5498652b2c3719d99752 Binary files /dev/null and b/samplers/__pycache__/__init__.cpython-310.pyc differ diff --git a/samplers/__pycache__/__init__.cpython-311.pyc b/samplers/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..5aefe273918e9d3f0190672fc9cc0a13814cab59 Binary files /dev/null and b/samplers/__pycache__/__init__.cpython-311.pyc differ diff --git a/samplers/__pycache__/base.cpython-310.pyc b/samplers/__pycache__/base.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..a5b0aabe0ac6719fb196924c060e31920048279c Binary files /dev/null and b/samplers/__pycache__/base.cpython-310.pyc differ diff --git a/samplers/__pycache__/base.cpython-311.pyc b/samplers/__pycache__/base.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..4525f72475f8334dc502a7fcb494e30221bb060c Binary files /dev/null and b/samplers/__pycache__/base.cpython-311.pyc differ diff --git a/samplers/__pycache__/confusion_matrix.cpython-310.pyc b/samplers/__pycache__/confusion_matrix.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..8b144a2e2e2e1a9520658457c264c18bd2119e21 Binary files /dev/null and b/samplers/__pycache__/confusion_matrix.cpython-310.pyc differ diff --git a/samplers/__pycache__/confusion_matrix.cpython-311.pyc b/samplers/__pycache__/confusion_matrix.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..35f23c434e27f9f560b2af4670aee7f5a0aa39ab Binary files /dev/null and b/samplers/__pycache__/confusion_matrix.cpython-311.pyc differ diff --git a/samplers/__pycache__/pref.cpython-310.pyc b/samplers/__pycache__/pref.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..bced69f2d0b0d849196b83fb3cbad090504aa5a8 Binary files /dev/null and b/samplers/__pycache__/pref.cpython-310.pyc differ diff --git a/samplers/__pycache__/pref.cpython-311.pyc b/samplers/__pycache__/pref.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..cf55de22381cdc22d4ab54faf589a1bca59e5baa Binary files /dev/null and b/samplers/__pycache__/pref.cpython-311.pyc differ diff --git a/samplers/__pycache__/progress.cpython-310.pyc b/samplers/__pycache__/progress.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..2f6705ae18f19bd1bc870c5fe68f31bb7ac28d34 Binary files /dev/null and b/samplers/__pycache__/progress.cpython-310.pyc differ diff --git a/samplers/__pycache__/progress.cpython-311.pyc b/samplers/__pycache__/progress.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..8444bb4285695b53fa228dcb1ee4615186f3ba1e Binary files /dev/null and b/samplers/__pycache__/progress.cpython-311.pyc differ diff --git a/samplers/__pycache__/progress_default.cpython-310.pyc b/samplers/__pycache__/progress_default.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..dd1291987e33efae20ed475d85f32d9aa349d0f6 Binary files /dev/null and b/samplers/__pycache__/progress_default.cpython-310.pyc differ diff --git a/samplers/__pycache__/progress_default.cpython-311.pyc b/samplers/__pycache__/progress_default.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..6b10f15ca0878230b05e9bcfe5fa3b864b418c11 Binary files /dev/null and b/samplers/__pycache__/progress_default.cpython-311.pyc differ diff --git a/samplers/__pycache__/quality_preference.cpython-310.pyc b/samplers/__pycache__/quality_preference.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..f193f378e6f957d9e72aae7fc2a45fd484cff6cc Binary files /dev/null and b/samplers/__pycache__/quality_preference.cpython-310.pyc differ diff --git a/samplers/__pycache__/quality_preference.cpython-311.pyc b/samplers/__pycache__/quality_preference.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..e62539be7a4fcfe815430d24ca1b4d7e9e52674d Binary files /dev/null and b/samplers/__pycache__/quality_preference.cpython-311.pyc differ diff --git a/samplers/__pycache__/reward_alignment.cpython-310.pyc b/samplers/__pycache__/reward_alignment.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..46d980f7025796955fe016e58cd970a028025fbd Binary files /dev/null and b/samplers/__pycache__/reward_alignment.cpython-310.pyc differ diff --git a/samplers/__pycache__/reward_alignment.cpython-311.pyc b/samplers/__pycache__/reward_alignment.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..1dbb293daf9cf5df3a30c264feb5017e7983ca25 Binary files /dev/null and b/samplers/__pycache__/reward_alignment.cpython-311.pyc differ diff --git a/samplers/__pycache__/roboarena.cpython-310.pyc b/samplers/__pycache__/roboarena.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..dc6d35928eaa5ae38bca825e77f6370269cfeebf Binary files /dev/null and b/samplers/__pycache__/roboarena.cpython-310.pyc differ diff --git a/samplers/__pycache__/sim.cpython-310.pyc b/samplers/__pycache__/sim.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..64004f3ddc875c073a827307414e036b1dc651cf Binary files /dev/null and b/samplers/__pycache__/sim.cpython-310.pyc differ diff --git a/samplers/__pycache__/sim.cpython-311.pyc b/samplers/__pycache__/sim.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..44b4a71f95fb7480eecdfafea47a76eb4d0068aa Binary files /dev/null and b/samplers/__pycache__/sim.cpython-311.pyc differ diff --git a/samplers/__pycache__/success_failure.cpython-310.pyc b/samplers/__pycache__/success_failure.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..07ce55a9b09d81e8b93a35d191ea4f077887a293 Binary files /dev/null and b/samplers/__pycache__/success_failure.cpython-310.pyc differ diff --git a/samplers/__pycache__/success_failure.cpython-311.pyc b/samplers/__pycache__/success_failure.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..f407d069125a5c607f07366f7ba737a1c8d629e9 Binary files /dev/null and b/samplers/__pycache__/success_failure.cpython-311.pyc differ diff --git a/samplers/base.py b/samplers/base.py new file mode 100644 index 0000000000000000000000000000000000000000..42a4532eb6e191df21d96226a0f119a0649a2eb6 --- /dev/null +++ b/samplers/base.py @@ -0,0 +1,753 @@ +#!/usr/bin/env python3 +from typing import Optional, Dict, Any, List, Set, Tuple, Union + +import numpy as np +import random +import torch +from random import Random +from datasets import Dataset + +from rfm.configs.experiment_configs import DataConfig +from rfm.data.datasets.helpers import ( + load_frames_from_npz, + get_segment_indices_with_middle, + compute_progress_from_segment, + pad_trajectory_to_max_frames_torch, + pad_trajectory_to_max_frames_np, + compute_success_labels, + create_trajectory_from_dict, + load_embeddings_from_path, + linspace_subsample_frames, + convert_continuous_to_discrete_bins, +) +from rfm.data.dataset_types import Trajectory +from rfm.utils.logger import get_logger + +logger = get_logger() + + +class RFMBaseSampler: + """Base sampler class that provides trajectory retrieval functions for generating samples.""" + + def __init__( + self, + config: DataConfig, + dataset: Dataset, + combined_indices: Dict[str, Any], + dataset_success_cutoff_map: Optional[Dict[str, float]] = None, + verbose: bool = True, + random_seed: int = 42, + ): + """Initialize sampler with dataset and indices. + + Args: + config: Configuration object + dataset: The loaded dataset + combined_indices: Dictionary of combined indices from dataset loading + dataset_success_cutoff_map: Dictionary mapping dataset names to success cutoff percentages + verbose: Verbose flag + random_seed: Random seed for deterministic sampling. Creates a local Random instance to avoid affecting global random state. + """ + self.config = config + self.dataset = dataset + self.verbose = verbose + self.dataset_success_cutoff_map = dataset_success_cutoff_map or {} + self._local_random = Random(random_seed) + + self._cached_ids = self.dataset["id"] + self._cached_is_robot = self.dataset["is_robot"] + + # Build indices from combined_indices + self._build_indices(combined_indices) + + def _build_indices(self, combined_indices): + """Build all index mappings from combined_indices. + + Args: + combined_indices: Dictionary of combined indices from dataset loading + """ + # Initialize index mappings from the loaded indices + self.robot_trajectories = combined_indices["robot_trajectories"] + self.human_trajectories = combined_indices["human_trajectories"] + self.optimal_by_task = combined_indices["optimal_by_task"] + self.suboptimal_by_task = combined_indices["suboptimal_by_task"] + self.quality_indices = combined_indices["quality_indices"] + self.task_indices = combined_indices["task_indices"] + self.source_indices = combined_indices["source_indices"] + self.partial_success_indices = combined_indices["partial_success_indices"] + self.paired_human_robot_by_task = combined_indices["paired_human_robot_by_task"] + self.tasks_with_multiple_quality_labels = combined_indices["tasks_with_multiple_quality_labels"] + + # Build mapping from data source -> available task instructions + self._build_tasks_by_data_source() + + def _build_tasks_by_data_source(self): + """Cache mapping from data_source to available task instructions.""" + self.tasks_by_data_source: Dict[str, List[str]] = {} + + all_tasks = self.dataset["task"] + all_sources = self.dataset["data_source"] + + source_to_tasks: Dict[str, Set[str]] = {} + for task, source in zip(all_tasks, all_sources): + if task is None or source is None: + continue + if source not in source_to_tasks: + source_to_tasks[source] = set() + source_to_tasks[source].add(task) + + self.tasks_by_data_source = {source: list(tasks) for source, tasks in source_to_tasks.items()} + + def _generate_sample(self, item): + """Generate a sample from an item. + + This method should be overridden by subclasses to implement their specific + sample generation logic. + + Args: + item: An item from the dataset (typically a trajectory dict) + + Returns: + A sample object (e.g., PreferenceSample, SimilaritySample, ProgressSample) + """ + raise NotImplementedError("Subclasses must implement _generate_sample") + + def _get_same_task_optimal(self, ref_traj: dict) -> dict | None: + """Get optimal trajectory from same task (different from ref). + + Args: + ref_traj: Reference trajectory + + Returns: + Same task optimal trajectory dict or None if not available + """ + task_name = ref_traj["task"] + same_task_optimal_indices = self.optimal_by_task.get(task_name, []) + if not same_task_optimal_indices: + logger.trace(f"[BASE SAMPLER] _get_same_task_optimal: No optimal indices for task '{task_name}'") + return None + + # Use cached IDs to check without loading full trajectories + chosen_id = ref_traj["id"] + random_idx = random.choice(same_task_optimal_indices) + + # Retry if the selected trajectory has the same ID as ref + max_retries = min(10, len(same_task_optimal_indices)) + retries = 0 + while self._cached_ids[random_idx] == chosen_id and retries < max_retries: + random_idx = random.choice(same_task_optimal_indices) + retries += 1 + + # If still matches after retries, fall back to filtering + if self._cached_ids[random_idx] == chosen_id: + filtered_indices = [idx for idx in same_task_optimal_indices if self._cached_ids[idx] != chosen_id] + if filtered_indices: + random_idx = random.choice(filtered_indices) + else: + # No other trajectories available + logger.trace( + f"[BASE SAMPLER] _get_same_task_optimal: All trajectories have same ID '{chosen_id}' for task '{task_name}'" + ) + return None + + result = self.dataset[random_idx] + logger.trace( + f"[BASE SAMPLER] _get_same_task_optimal: Found trajectory {result.get('id', 'unknown')} for task '{task_name}'" + ) + return result + + def _get_same_task_suboptimal(self, ref_traj: dict) -> dict | None: + """Get suboptimal trajectory from same task. + + For trajectories with partial_success, uses partial_success logic instead of quality_label logic. + + Args: + ref_traj: Reference trajectory + + Returns: + Suboptimal trajectory dict or None if not available + """ + # Check if this trajectory uses partial_success + use_partial_success = ref_traj.get("partial_success") is not None + + if use_partial_success: + # For trajectories with partial_success, use partial_success logic + return self._get_different_partial_success_traj(ref_traj) + + # For trajectories without partial_success, use the standard suboptimal logic + task_name = ref_traj["task"] + same_task_suboptimal_indices = self.suboptimal_by_task.get(task_name, []) + if not same_task_suboptimal_indices: + logger.trace(f"[BASE SAMPLER] _get_same_task_suboptimal: No suboptimal indices for task '{task_name}'") + return None + + # Use cached IDs to check without loading full trajectories + chosen_id = ref_traj["id"] + random_idx = random.choice(same_task_suboptimal_indices) + + # Retry if the selected trajectory has the same ID as ref + max_retries = min(10, len(same_task_suboptimal_indices)) + retries = 0 + while self._cached_ids[random_idx] == chosen_id and retries < max_retries: + random_idx = random.choice(same_task_suboptimal_indices) + retries += 1 + + # If still matches after retries, fall back to filtering + if self._cached_ids[random_idx] == chosen_id: + filtered_indices = [idx for idx in same_task_suboptimal_indices if self._cached_ids[idx] != chosen_id] + if filtered_indices: + random_idx = random.choice(filtered_indices) + else: + # No other trajectories available + logger.trace( + f"[BASE SAMPLER] _get_same_task_suboptimal: All trajectories have same ID '{chosen_id}' for task '{task_name}'" + ) + return None + + result = self.dataset[random_idx] + logger.trace( + f"[BASE SAMPLER] _get_same_task_suboptimal: Found trajectory {result.get('id', 'unknown')} for task '{task_name}'" + ) + return result + + def _get_different_video_traj(self, ref_traj: dict) -> dict | None: + """Get trajectory from different task. + + Args: + ref_traj: Reference trajectory + + Returns: + Different task trajectory dict or None if not available + """ + same_source_prob = self.config.traj_same_source_prob + data_source = ref_traj.get("data_source") + other_tasks = [] + + if data_source and data_source in self.tasks_by_data_source and random.random() < same_source_prob: + other_tasks = [task for task in self.tasks_by_data_source[data_source] if task != ref_traj["task"]] + + if not other_tasks: + other_tasks = [task for task in self.optimal_by_task.keys() if task != ref_traj["task"]] + + if not other_tasks: + logger.trace( + f"[BASE SAMPLER] _get_different_video_traj: No other tasks available (ref task: '{ref_traj['task']}')" + ) + return None + + # Try up to 2 times to find a valid task + max_retries = 2 + other_task_indices = None + other_task = None + + for attempt in range(max_retries): + other_task = random.choice(other_tasks) + if other_task not in self.optimal_by_task: + logger.trace( + f"[BASE SAMPLER] _get_different_video_traj: Attempt {attempt + 1}/{max_retries}: Task '{other_task}' not found in optimal_by_task" + ) + continue + + other_task_indices = self.optimal_by_task[other_task] + if not other_task_indices: + logger.trace( + f"[BASE SAMPLER] _get_different_video_traj: Attempt {attempt + 1}/{max_retries}: Task '{other_task}' has no optimal indices" + ) + continue + + # Found a valid task with indices + break + + if other_task_indices is None or not other_task_indices: + logger.trace( + f"[BASE SAMPLER] _get_different_video_traj: Failed to find valid task after {max_retries} attempts" + ) + return None + + other_idx = random.choice(other_task_indices) + result = self.dataset[other_idx] + logger.trace( + f"[BASE SAMPLER] _get_different_video_traj: Found trajectory {result.get('id', 'unknown')} from task '{other_task}'" + ) + return result + + def _get_different_task_instruction(self, ref_traj: dict) -> dict | None: + """Get the same trajectory but with a different task instruction. + + Args: + ref_traj: Reference trajectory + + Returns: + Trajectory dict with different task instruction or None if not available + """ + same_source_prob = self.config.traj_same_source_prob + data_source = ref_traj.get("data_source") + candidate_tasks = [] + + if data_source and data_source in self.tasks_by_data_source and random.random() < same_source_prob: + candidate_tasks = [task for task in self.tasks_by_data_source[data_source] if task != ref_traj["task"]] + + if not candidate_tasks: + candidate_tasks = [task for task in self.optimal_by_task.keys() if task != ref_traj["task"]] + + if not candidate_tasks: + logger.trace( + f"[BASE SAMPLER] _get_different_task_instruction: No candidate tasks available (ref task: '{ref_traj['task']}')" + ) + return None + + other_task = random.choice(candidate_tasks) + + # Get embeddings_path and lang_vector from a random trajectory with the other_task + other_task_indices = self.optimal_by_task.get(other_task, []) + if not other_task_indices: + logger.trace(f"[BASE SAMPLER] _get_different_task_instruction: Task '{other_task}' has no optimal indices") + return None + + other_task_idx = random.choice(other_task_indices) + other_task_traj = self.dataset[other_task_idx] + + # Create a copy of the trajectory with the task changed + # Use embeddings_path and lang_vector from the other_task trajectory + new_traj = ref_traj.copy() + new_traj["task"] = other_task + # Get embeddings_path and lang_vector from a random trajectory with the other_task + if "embeddings_path" in other_task_traj: + new_traj["embeddings_path"] = other_task_traj["embeddings_path"] + if "lang_vector" in other_task_traj: + new_traj["lang_vector"] = other_task_traj["lang_vector"] + return new_traj + + def _get_paired_human_robot_traj(self, ref_traj: dict) -> dict | None: + """Get paired human/robot trajectory for the same task. + + Given a reference trajectory, if it's a robot trajectory, returns a human trajectory + from the same task. If it's a human trajectory, returns a robot trajectory from the + same task. + + Args: + ref_traj: Reference trajectory (can be robot or human) + + Returns: + Paired trajectory dict (opposite type) or None if not available + """ + task = ref_traj["task"] + is_robot = ref_traj.get("is_robot", True) + + if task not in self.paired_human_robot_by_task: + logger.trace( + f"[BASE SAMPLER] _get_paired_human_robot_traj: Task '{task}' not in paired_human_robot_by_task" + ) + return None + + task_pairs = self.paired_human_robot_by_task[task] + + # Get opposite type + opposite_key = "human" if is_robot else "robot" + opposite_indices = task_pairs.get(opposite_key, []) + + if not opposite_indices: + logger.trace(f"[BASE SAMPLER] _get_paired_human_robot_traj: No {opposite_key} indices for task '{task}'") + return None + + # Sample a paired trajectory and verify it's different from reference + chosen_id = ref_traj["id"] + available_indices = opposite_indices.copy() + paired_traj = None + + # Add retry limit to prevent infinite loops + max_retries = min(len(available_indices), 10) + retries = 0 + + logger.trace( + f"[BASE SAMPLER] _get_paired_human_robot_traj: Looking for {opposite_key} trajectory (chosen_id: {chosen_id}, available: {len(available_indices)})" + ) + + while (paired_traj is None or paired_traj.get("id") == chosen_id) and retries < max_retries: + retries += 1 + + if not available_indices: + logger.trace( + f"[BASE SAMPLER] _get_paired_human_robot_traj: No more available indices after {retries} retries" + ) + return None + + paired_idx = random.choice(available_indices) + paired_traj = self.dataset[paired_idx] + + # If it matches, remove this index and try again + if paired_traj.get("id") == chosen_id: + available_indices = [idx for idx in available_indices if idx != paired_idx] + paired_traj = None + continue + + # If we exhausted retries without finding a valid trajectory, return None + if paired_traj is None or paired_traj.get("id") == chosen_id: + logger.trace( + f"[BASE SAMPLER] _get_paired_human_robot_traj: Failed to find valid paired trajectory after {max_retries} retries" + ) + return None + + logger.trace( + f"[BASE SAMPLER] _get_paired_human_robot_traj: Found paired trajectory {paired_traj.get('id', 'unknown')} on retry {retries}" + ) + return paired_traj + + def _get_different_partial_success_traj(self, ref_traj: dict) -> dict | None: + """Get trajectory from same task with different partial_success. + + Finds trajectories with either higher or lower partial_success than the reference, + using absolute difference for threshold checking. + + Args: + ref_traj: Reference trajectory + + Returns: + Trajectory dict with different partial_success from same task or None if not available + """ + task_name = ref_traj["task"] + ref_partial_success = ref_traj.get("partial_success") + + # Check if partial_success is available + if ref_partial_success is None: + logger.trace( + f"[BASE SAMPLER] _get_different_partial_success_traj: No partial_success for trajectory {ref_traj.get('id', 'unknown')}" + ) + return None + + # Get minimum threshold from config + min_threshold = getattr(self.config, "partial_success_threshold", 0.2) + + # Get all trajectories from the same task + same_task_indices = self.task_indices.get(task_name, []) + if not same_task_indices: + logger.trace( + f"[BASE SAMPLER] _get_different_partial_success_traj: No trajectories found for task '{task_name}'" + ) + return None + + # Filter to trajectories with different partial_success that meet the threshold requirement + # Uses absolute difference to allow both higher and lower partial_success + chosen_id = ref_traj["id"] + candidate_indices = [] + + for idx in same_task_indices: + # Skip if same trajectory + if self._cached_ids[idx] == chosen_id: + continue + + # Get partial_success for this trajectory + traj_dict = self.dataset[idx] + traj_partial_success = traj_dict.get("partial_success", None) + + if traj_partial_success is None: + logger.trace( + f"[BASE SAMPLER] _get_different_partial_success_traj: No partial_success for trajectory {traj_dict.get('id', 'unknown')}, task '{task_name}'" + ) + continue + + # Include if partial_success differs from reference by at least the threshold (using abs) + partial_success_diff = abs(ref_partial_success - traj_partial_success) + if partial_success_diff >= min_threshold: + candidate_indices.append(idx) + + if not candidate_indices: + logger.trace( + f"[BASE SAMPLER] _get_different_partial_success_traj: No trajectories with different partial_success (threshold: {min_threshold}) for task '{task_name}' (ref: {ref_partial_success})" + ) + return None + + # Randomly select from candidates + selected_idx = random.choice(candidate_indices) + result = self.dataset[selected_idx] + result_partial_success = result.get("partial_success") + # If ref_partial_success is 1.0, direction is always "lower" since 1.0 is the maximum + if ref_partial_success == 1.0: + direction = "lower" + else: + direction = "higher" if result_partial_success > ref_partial_success else "lower" + logger.trace( + f"[BASE SAMPLER] _get_different_partial_success_traj: Found trajectory {result.get('id', 'unknown')} with partial_success {result_partial_success} ({direction} than {ref_partial_success}, abs diff: {abs(ref_partial_success - result_partial_success):.3f}, threshold: {min_threshold})" + ) + return result + + def _get_subsample_indices( + self, data, direction: str = "bidirectional", max_frames: int = None + ) -> Optional[Tuple[int, int, int]]: + """Get start, middle, and end indices for subsample strategy. + + Samples three random frames from the trajectory. The relationship between indices + follows three main scenarios: + 1. start < middle < end: forward progress - normal forward progression through trajectory + 2. start < end < middle: rewind progress - forward from start to end, then continues to middle (simulating rewind/backtrack) + 3. end < middle < start: reverse progress - backward from start through middle to end (full backward traversal) + + Args: + data: Trajectory data (frames or embeddings) to sample from + direction: Sampling direction - "forward" (start < middle < end), + "reverse" (end < middle < start), + "rewind" (start < end < middle), + or "bidirectional" (any of the 3 orderings) + max_frames: Maximum number of frames to subsample. If 1, returns only start. If 2, returns start and end. + + Returns: + Tuple of (start_idx, middle_idx, end_idx), or None if insufficient frames + For max_frames == 1: returns (start_idx, None, None) + For max_frames == 2: returns (start_idx, None, end_idx) + """ + num_frames_total = len(data) if hasattr(data, "__len__") else data.shape[0] + + # Handle edge cases for max_frames == 1 or 2 + if max_frames == 1: + # Randomly sample 1 frame + random_idx = random.randint(0, num_frames_total - 1) + logger.trace(f"[BASE SAMPLER] _get_subsample_indices: max_frames=1, randomly sampled idx={random_idx}") + return (random_idx, None, None) + + if max_frames == 2: + # Sample 2 frames: either forward (start < end) or reverse (end < start) + # No rewind possible with only 2 frames + if direction == "reverse": + # Reverse: sample end first, then start (end < start) + end_idx = random.randint(0, num_frames_total - 2) + start_idx = random.randint(end_idx + 1, num_frames_total - 1) + else: + # Forward: sample start first, then end (start < end) + start_idx = random.randint(0, num_frames_total - 2) + end_idx = random.randint(start_idx + 1, num_frames_total - 1) + logger.trace( + f"[BASE SAMPLER] _get_subsample_indices: max_frames=2, start_idx={start_idx}, end_idx={end_idx}, direction={direction}" + ) + return (start_idx, None, end_idx) + + if num_frames_total < 3: + logger.trace(f"[BASE SAMPLER] _get_subsample_indices: Not enough frames ({num_frames_total})") + return None + + # Sample three random distinct frames + frame_indices = sorted(random.sample(range(num_frames_total), 3)) + frame1_idx, frame2_idx, frame3_idx = frame_indices + + # Determine start, middle, and end based on direction + # We only care about 3 cases: + # 1. start < middle < end: forward progress + # 2. start < end < middle: rewind progress + # 3. end < middle < start: reverse progress + + if direction == "forward": + # Case 1: start < middle < end + start_idx = frame1_idx + middle_idx = frame2_idx + end_idx = frame3_idx + elif direction == "reverse": + # Case 3: end < middle < start + end_idx = frame1_idx + middle_idx = frame2_idx + start_idx = frame3_idx + elif direction == "rewind": + # Case 2: start < end < middle + start_idx = frame1_idx + end_idx = frame2_idx + middle_idx = frame3_idx + else: # bidirectional (default) + # Randomly choose from the 3 cases + pattern = random.choice([1, 2, 3]) + if pattern == 1: # start < middle < end: forward progress + start_idx = frame1_idx + middle_idx = frame2_idx + end_idx = frame3_idx + elif pattern == 2: # start < end < middle: rewind progress + start_idx = frame1_idx + end_idx = frame2_idx + middle_idx = frame3_idx + else: # pattern == 3: end < middle < start: reverse progress + end_idx = frame1_idx + middle_idx = frame2_idx + start_idx = frame3_idx + + logger.trace( + f"[BASE SAMPLER] _get_subsample_indices: Selected indices start={start_idx}, middle={middle_idx}, end={end_idx} " + f"from {num_frames_total} total frames (direction: {direction})" + ) + return start_idx, middle_idx, end_idx + + def _get_traj_from_data( + self, + traj: dict | Trajectory, + subsample_strategy: str | None = None, + frame_indices: List[int] | None = None, + metadata: Dict[str, Any] | None = None, + ) -> Trajectory: + """Load, subsample, and optionally pad trajectory data and create a Trajectory object. + + Args: + traj: Trajectory dict or Trajectory object + subsample_strategy: Optional strategy for subsampling ("subsample_forward", "subsample_reverse", "subsample_rewind", or None for default/bidirectional). Ignored if frame_indices is provided. + frame_indices: Optional list of specific frame indices to use. If provided, subsample_strategy is ignored. + metadata: Optional metadata dict to merge into trajectory metadata. + + Returns: + Trajectory object with loaded and subsampled data (padded) + """ + # Initialize variables + frames = None + video_embeddings = None + text_embedding = None + data = None + + if isinstance(traj, Trajectory): + # If already a Trajectory, just return it + return traj + + # Load from dict + # Check if text_embedding is already provided in the dict (for samplers that need to override it) + if "text_embedding" in traj and traj["text_embedding"] is not None: + text_embedding = traj["text_embedding"] + + if self.config.load_embeddings and traj.get("embeddings_path"): + embeddings = load_embeddings_from_path(traj["embeddings_path"]) + video_embeddings = embeddings["video_embeddings"] + # Only use loaded text_embedding if not already provided in dict + if text_embedding is None: + text_embedding = embeddings["text_embedding"] + data = video_embeddings + else: + if isinstance(traj["frames"], str): + frames = load_frames_from_npz(traj["frames"]) + else: + frames = traj["frames"] + data = frames + + # Get total frames for progress computation + if hasattr(data, "shape"): + num_frames_total = data.shape[0] + else: + num_frames_total = len(data) + + ds_key = traj["data_source"] + success_cutoff = self.dataset_success_cutoff_map.get(ds_key, self.config.max_success) + + # Determine which indices to use (construct indices first, then subsample uniformly) + if frame_indices is not None: + # Use provided frame indices directly + indices = frame_indices + elif subsample_strategy is not None: + # Use subsampling strategy + # Get subsample indices (handles edge cases for max_frames == 1 or 2) + if subsample_strategy == "subsample_forward": + strategy_indices = self._get_subsample_indices( + data, direction="forward", max_frames=self.config.max_frames + ) + elif subsample_strategy == "subsample_reverse": + strategy_indices = self._get_subsample_indices( + data, direction="reverse", max_frames=self.config.max_frames + ) + elif subsample_strategy == "subsample_rewind": + strategy_indices = self._get_subsample_indices( + data, direction="rewind", max_frames=self.config.max_frames + ) + else: + strategy_indices = self._get_subsample_indices( + data, direction="bidirectional", max_frames=self.config.max_frames + ) + + if strategy_indices is None: + logger.trace("[BASE SAMPLER] _get_traj_from_data: Failed to get uniform sample indices") + return None + + start_idx, middle_idx, end_idx = strategy_indices + + logger.trace( + f"[BASE SAMPLER] _get_traj_from_data: Subsampling trajectory with strategy: {subsample_strategy}, start_idx: {start_idx}, middle_idx: {middle_idx}, end_idx: {end_idx}" + ) + + # Use middle_idx only for rewind strategy (requires at least 3 frames) + use_middle = subsample_strategy == "subsample_rewind" and middle_idx is not None and num_frames_total >= 3 + + # Use get_segment_indices_with_middle to construct indices + indices = get_segment_indices_with_middle( + num_frames_total=num_frames_total, + start_idx=start_idx, + end_idx=end_idx, + middle_idx=middle_idx if use_middle else None, + max_frames=self.config.max_frames, + ) + else: + # No subsampling strategy or indices provided - use all frames + indices = list(range(num_frames_total)) + + # Extract data using indices + subsampled = data[indices] + + # Get partial_success early to pass to compute_progress_from_segment + partial_success = traj.get("partial_success") + + # Compute progress + target_progress = compute_progress_from_segment( + num_frames_total=num_frames_total, + frame_indices=indices, + progress_pred_type=self.config.progress_pred_type, + success_cutoff=success_cutoff, + partial_success=partial_success, + ) + + # Subsample uniformly if needed (if we have more frames than max_frames) + current_frame_count = len(subsampled) if hasattr(subsampled, "__len__") else subsampled.shape[0] + if current_frame_count > self.config.max_frames: + subsampled, frame_indices_subsample = linspace_subsample_frames(subsampled, self.config.max_frames) + # Update indices and target_progress + if target_progress and len(target_progress) == current_frame_count: + target_progress = [target_progress[idx] for idx in frame_indices_subsample] + indices = [indices[idx] for idx in frame_indices_subsample] if isinstance(indices, list) else indices + + # Pad if needed + if target_progress: + if self.config.load_embeddings: + subsampled, target_progress = pad_trajectory_to_max_frames_torch( + subsampled, target_progress, self.config.max_frames + ) + else: + subsampled, target_progress = pad_trajectory_to_max_frames_np( + subsampled, target_progress, self.config.max_frames + ) + + # Update frames_shape + frames_shape = subsampled.shape if hasattr(subsampled, "shape") else tuple() + + # Set frames or video_embeddings + if self.config.load_embeddings: + video_embeddings = subsampled + else: + frames = subsampled + + # Compute success labels + success_label = compute_success_labels( + target_progress=target_progress, + data_source=traj["data_source"], + dataset_success_percent=self.dataset_success_cutoff_map, + max_success=self.config.max_success, + quality_label=traj.get("quality_label"), + ) + + # Convert partial_success and target_progress to discrete bins if in discrete mode + if self.config.progress_loss_type.lower() == "discrete": + if partial_success is not None: + partial_success = convert_continuous_to_discrete_bins( + [partial_success], self.config.progress_discrete_bins + )[0] + target_progress = convert_continuous_to_discrete_bins(target_progress, self.config.progress_discrete_bins) + + trajectory = create_trajectory_from_dict( + traj, + overrides={ + "frames": frames, + "frames_shape": frames_shape, + "video_embeddings": video_embeddings, + "text_embedding": text_embedding, + "target_progress": target_progress, + "success_label": success_label, + "partial_success": partial_success, + "metadata": metadata, + }, + ) + return trajectory diff --git a/samplers/eval/__pycache__/base_pref.cpython-310.pyc b/samplers/eval/__pycache__/base_pref.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..1478a56ab453775c3035e4cb158af630e3b83e1e Binary files /dev/null and b/samplers/eval/__pycache__/base_pref.cpython-310.pyc differ diff --git a/samplers/eval/__pycache__/base_pref.cpython-311.pyc b/samplers/eval/__pycache__/base_pref.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..d8a58caa58f6bd69515fedc82dcd6e2660935b45 Binary files /dev/null and b/samplers/eval/__pycache__/base_pref.cpython-311.pyc differ diff --git a/samplers/eval/__pycache__/confusion_matrix.cpython-310.pyc b/samplers/eval/__pycache__/confusion_matrix.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..6d5ff81ad72f07318d9bb9a8fd1fe75150ff8951 Binary files /dev/null and b/samplers/eval/__pycache__/confusion_matrix.cpython-310.pyc differ diff --git a/samplers/eval/__pycache__/confusion_matrix.cpython-311.pyc b/samplers/eval/__pycache__/confusion_matrix.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..949f812617419fcac8664b59f5192d433cf8338a Binary files /dev/null and b/samplers/eval/__pycache__/confusion_matrix.cpython-311.pyc differ diff --git a/samplers/eval/__pycache__/progress_default.cpython-310.pyc b/samplers/eval/__pycache__/progress_default.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..385a0dbeb3f8cc9945b8e00626510339d2444e12 Binary files /dev/null and b/samplers/eval/__pycache__/progress_default.cpython-310.pyc differ diff --git a/samplers/eval/__pycache__/progress_default.cpython-311.pyc b/samplers/eval/__pycache__/progress_default.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..799358935895930a331d29b9085f361a380f1964 Binary files /dev/null and b/samplers/eval/__pycache__/progress_default.cpython-311.pyc differ diff --git a/samplers/eval/__pycache__/progress_policy_ranking.cpython-310.pyc b/samplers/eval/__pycache__/progress_policy_ranking.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..d767bac83cea4126832c0cdd124517aab407248b Binary files /dev/null and b/samplers/eval/__pycache__/progress_policy_ranking.cpython-310.pyc differ diff --git a/samplers/eval/__pycache__/progress_policy_ranking.cpython-311.pyc b/samplers/eval/__pycache__/progress_policy_ranking.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..ae08db9ee4272c2ee0806d71fc187c7d55687eb8 Binary files /dev/null and b/samplers/eval/__pycache__/progress_policy_ranking.cpython-311.pyc differ diff --git a/samplers/eval/__pycache__/quality_preference.cpython-310.pyc b/samplers/eval/__pycache__/quality_preference.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..e40700db9da0c89642115e452448f4b253621e3c Binary files /dev/null and b/samplers/eval/__pycache__/quality_preference.cpython-310.pyc differ diff --git a/samplers/eval/__pycache__/quality_preference.cpython-311.pyc b/samplers/eval/__pycache__/quality_preference.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..17dd93dfc73ec9ac2dea241c876dfe98acff1f98 Binary files /dev/null and b/samplers/eval/__pycache__/quality_preference.cpython-311.pyc differ diff --git a/samplers/eval/__pycache__/reward_alignment.cpython-310.pyc b/samplers/eval/__pycache__/reward_alignment.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..dd09e1f46c2bdb9942c7104ccadace0b2d010ec7 Binary files /dev/null and b/samplers/eval/__pycache__/reward_alignment.cpython-310.pyc differ diff --git a/samplers/eval/__pycache__/reward_alignment.cpython-311.pyc b/samplers/eval/__pycache__/reward_alignment.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..af8015ca9db3d371cbdfcf7effed7395837930f4 Binary files /dev/null and b/samplers/eval/__pycache__/reward_alignment.cpython-311.pyc differ diff --git a/samplers/eval/__pycache__/roboarena_quality_preference.cpython-310.pyc b/samplers/eval/__pycache__/roboarena_quality_preference.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..ef41f154d7d6ed6ffdb62ae3597c31d6b0cfa506 Binary files /dev/null and b/samplers/eval/__pycache__/roboarena_quality_preference.cpython-310.pyc differ diff --git a/samplers/eval/__pycache__/roboarena_quality_preference.cpython-311.pyc b/samplers/eval/__pycache__/roboarena_quality_preference.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..6b1941746027c59f00b1b76789a6d9b832c73a3d Binary files /dev/null and b/samplers/eval/__pycache__/roboarena_quality_preference.cpython-311.pyc differ diff --git a/samplers/eval/__pycache__/similarity_score.cpython-310.pyc b/samplers/eval/__pycache__/similarity_score.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..bc45ba95e51ec5e27946d5bde885dda7b0753c73 Binary files /dev/null and b/samplers/eval/__pycache__/similarity_score.cpython-310.pyc differ diff --git a/samplers/eval/__pycache__/similarity_score.cpython-311.pyc b/samplers/eval/__pycache__/similarity_score.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..70d8136aec52b3ff0aa1fe6ad00eca7d143bb43e Binary files /dev/null and b/samplers/eval/__pycache__/similarity_score.cpython-311.pyc differ diff --git a/samplers/eval/base_pref.py b/samplers/eval/base_pref.py new file mode 100644 index 0000000000000000000000000000000000000000..39b5eef2c9c57629813c4bfa03f6b19d46a86dc1 --- /dev/null +++ b/samplers/eval/base_pref.py @@ -0,0 +1,73 @@ +from typing import Dict, Any + +import numpy as np + +from rfm.data.dataset_types import PreferenceSample, Trajectory +from rfm.data.samplers.base import RFMBaseSampler + + +class BaseQualityPreferenceSampler(RFMBaseSampler): + """Base class for quality preference samplers. + + Subclasses should implement `_generate_all_sample_indices` to define how + trajectories are paired. This base class provides the common `_generate_sample_from_indices` + method that loads and processes the trajectories. + """ + + def _generate_sample_from_indices(self, sample_idx_info: Dict[str, Any]) -> PreferenceSample: + """Generate a single sample from stored indices.""" + chosen_idx = sample_idx_info["chosen_traj_idx"] + rejected_idx = sample_idx_info["rejected_traj_idx"] + + # Get the trajectories + chosen_traj = self.dataset[chosen_idx] + rejected_traj = self.dataset[rejected_idx] + + chosen_metadata = { + "quality_label": chosen_traj["quality_label"], + "data_source": chosen_traj["data_source"], + "task": chosen_traj["task"], + "id": chosen_traj["id"], + "video_path": chosen_traj["frames"], + } + # Add partial_success if available + if chosen_traj.get("partial_success") is not None: + chosen_metadata["partial_success"] = chosen_traj.get("partial_success") + + chosen_trajectory = self._get_traj_from_data( + traj=chosen_traj, + metadata=chosen_metadata, + ) + + rejected_metadata = { + "quality_label": rejected_traj["quality_label"], + "data_source": rejected_traj["data_source"], + "task": rejected_traj["task"], + "id": rejected_traj["id"], + "video_path": rejected_traj["frames"], + } + # Add partial_success if available + if rejected_traj.get("partial_success") is not None: + rejected_metadata["partial_success"] = rejected_traj.get("partial_success") + + rejected_trajectory = self._get_traj_from_data( + traj=rejected_traj, + metadata=rejected_metadata, + ) + + data_gen_strategy = getattr(self, "data_gen_strategy", "quality_preference") + + # Create preference sample + sample = PreferenceSample( + chosen_trajectory=chosen_trajectory, + rejected_trajectory=rejected_trajectory, + data_gen_strategy=data_gen_strategy, + ) + + return sample + + def __len__(self): + return len(self.sample_indices) + + def __getitem__(self, idx): + return self._generate_sample_from_indices(self.sample_indices[idx]) diff --git a/samplers/eval/confusion_matrix.py b/samplers/eval/confusion_matrix.py new file mode 100755 index 0000000000000000000000000000000000000000..f0d2febddaaa68467217a3a1b763267d54ad9f43 --- /dev/null +++ b/samplers/eval/confusion_matrix.py @@ -0,0 +1,299 @@ +#!/usr/bin/env python3 +""" +Data generator for confusion matrix analysis. +""" + +import random +import torch +from collections import Counter, defaultdict +from typing import Tuple + +from rfm.data.dataset_types import PreferenceSample, ProgressSample +from rfm.data.samplers.base import RFMBaseSampler +from rfm.utils.distributed import rank_0_print +from sentence_transformers import SentenceTransformer + + +class ConfusionMatrixSampler(RFMBaseSampler): + """ + Data generator that creates task-trajectory pairs for confusion matrix analysis. + + For each unique task, creates samples with each trajectory to analyze + how well the model can distinguish between different tasks. + + If multiple data sources are present, samples N random trajectories from each data source + and prioritizes different language instructions by randomizing the pairing order. + """ + + def __init__(self, n_trajectories_per_source: int = 5, **kwargs): + """Initialize confusion matrix sampler. + + Args: + n_trajectories_per_source: Number of trajectories to sample from each data source. + If None, uses all available trajectories. + **kwargs: Additional arguments passed to parent class. + """ + super().__init__(**kwargs) + self.n_trajectories_per_source = n_trajectories_per_source + + # Load sentence transformer model and precompute embeddings for all unique tasks + self.sentence_model = SentenceTransformer("sentence-transformers/all-MiniLM-L12-v2") + self.sentence_model.eval() + + # Precompute language embeddings for all unique tasks + unique_tasks = list(self.task_indices.keys()) + rank_0_print(f"Precomputing language embeddings for {len(unique_tasks)} unique tasks", verbose=self.verbose) + self.task_embeddings = {} + for task in unique_tasks: + embedding = self.sentence_model.encode(task) + self.task_embeddings[task] = torch.tensor(embedding) + rank_0_print(f"Precomputed {len(self.task_embeddings)} language embeddings", verbose=self.verbose) + + # Free up the model after precomputation (no longer needed) + del self.sentence_model + + self.sample_indices = self._generate_all_sample_indices() + + rank_0_print( + f"Generated {len(self.sample_indices)} confusion matrix sample indices from {len(self.robot_trajectories)} trajectories and {len(self.task_indices)} tasks" + ) + + def _generate_all_sample_indices(self) -> list[dict]: + """Generate all possible task-trajectory pair sample indices. + + If multiple data sources exist, samples N random trajectories from each data source. + Prioritizes different video tasks first, then prioritizes different language instructions + when creating pairs. + """ + sample_indices = [] + + # Get unique tasks (these will be the language instructions) + unique_lang_tasks = list(self.task_indices.keys()) + rank_0_print(f"Found {len(unique_lang_tasks)} unique language tasks: {unique_lang_tasks}", verbose=self.verbose) + + # Sample trajectories per data source (prioritizing different video tasks) + sampled_trajectories, stats = self._sample_trajectories_by_data_source() + + rank_0_print( + f"Processing {len(sampled_trajectories)} trajectories for confusion matrix analysis", + verbose=self.verbose, + ) + + # Print statistics about sampled trajectories + self._print_sampling_stats(stats) + + # Shuffle language tasks once for round-robin pairing + shuffled_lang_tasks = unique_lang_tasks.copy() + self._local_random.shuffle(shuffled_lang_tasks) + + # Create task-trajectory pairs with prioritized language instruction pairing + video_task_count = Counter() + + for traj_idx in sampled_trajectories: + traj = self.dataset[traj_idx] + video_task = traj["task"] + + # # Limit the number of video trajectories for each task to 5 + # if video_task_count[video_task] >= 5: + # continue + + video_task_count[video_task] += 1 + + # Pair this trajectory with all language tasks (shuffled for variety) + traj_id = traj.get("id", str(traj_idx)) + for lang_task in shuffled_lang_tasks: + sample_indices.append({ + "traj_idx": traj_idx, + "lang_task": lang_task, + "video_task": video_task, + "video_path": traj["frames"], + "id": traj_id, + }) + + # Shuffle final sample indices to further randomize the order + self._local_random.shuffle(sample_indices) + + # Print statistics about pairs created + rank_0_print(f"Generated {len(sample_indices)} task-trajectory pairs", verbose=self.verbose) + rank_0_print(f" Video tasks sampled: {dict(video_task_count)}", verbose=self.verbose) + rank_0_print(f" Trajectories per video task: {dict(sorted(video_task_count.items()))}", verbose=self.verbose) + + return sample_indices + + def _sample_trajectories_by_data_source(self) -> Tuple[list[int], dict]: + """Sample N random trajectories from each data source, prioritizing different video tasks. + + When sampling N trajectories, first selects one trajectory from each unique video task, + then repeats in round-robin fashion until N trajectories are sampled. + + Returns: + Tuple of (list of sampled trajectory indices, stats dictionary) + """ + sampled_indices = [] + stats = { + "by_source": {}, + "by_task": Counter(), + "traj_to_task": {}, + } + + # Group robot trajectories by data source, then by video task + trajectories_by_source_and_task = defaultdict(lambda: defaultdict(list)) + for traj_idx in self.robot_trajectories: + traj = self.dataset[traj_idx] + data_source = traj.get("data_source", "unknown") + video_task = traj.get("task", "unknown") + trajectories_by_source_and_task[data_source][video_task].append(traj_idx) + + rank_0_print( + f"Found {len(trajectories_by_source_and_task)} data sources: {list(trajectories_by_source_and_task.keys())}", + verbose=self.verbose, + ) + + # Sample N trajectories from each data source, prioritizing different tasks + for data_source, tasks_to_indices in trajectories_by_source_and_task.items(): + # Shuffle trajectories within each task for randomization + for task in tasks_to_indices: + self._local_random.shuffle(tasks_to_indices[task]) + + # Get all unique tasks for this data source + all_tasks = list(tasks_to_indices.keys()) + self._local_random.shuffle(all_tasks) # Randomize task order too + + source_stats = { + "total_available": sum(len(indices) for indices in tasks_to_indices.values()), + "tasks_available": {task: len(indices) for task, indices in tasks_to_indices.items()}, + "tasks_sampled": Counter(), + } + + if self.n_trajectories_per_source is None: + # Use all available trajectories + sampled_from_source = [] + for task, indices in tasks_to_indices.items(): + sampled_from_source.extend(indices) + source_stats["tasks_sampled"][task] = len(indices) + stats["by_task"][task] += len(indices) + + rank_0_print( + f" Data source '{data_source}': Using all {len(sampled_from_source)} trajectories", + verbose=self.verbose, + ) + else: + # Sample N trajectories using round-robin to prioritize different tasks + n_to_sample = min(self.n_trajectories_per_source, source_stats["total_available"]) + sampled_from_source = [] + + # Round-robin sampling: first get one from each task, then repeat + task_iterators = {task: iter(indices) for task, indices in tasks_to_indices.items()} + task_list = all_tasks.copy() + round_idx = 0 + + while len(sampled_from_source) < n_to_sample: + # If we've gone through all tasks once, reshuffle for next round + if round_idx >= len(task_list): + round_idx = 0 + self._local_random.shuffle(task_list) + + # Try to get one trajectory from current task + task = task_list[round_idx] + try: + traj_idx = next(task_iterators[task]) + sampled_from_source.append(traj_idx) + source_stats["tasks_sampled"][task] += 1 + stats["by_task"][task] += 1 + except StopIteration: + # This task is exhausted, remove it from rotation + task_list.pop(round_idx) + if not task_list: + break # All tasks exhausted + continue + + round_idx += 1 + + rank_0_print( + f" Data source '{data_source}': Sampled {len(sampled_from_source)} out of {source_stats['total_available']} trajectories", + verbose=self.verbose, + ) + rank_0_print( + f" Tasks sampled: {dict(sorted(source_stats['tasks_sampled'].items()))}", + verbose=self.verbose, + ) + + # Track trajectory to task mapping for stats + for traj_idx in sampled_from_source: + traj = self.dataset[traj_idx] + traj_id = traj.get("id", str(traj_idx)) + stats["traj_to_task"][traj_id] = traj.get("task", "unknown") + + sampled_indices.extend(sampled_from_source) + stats["by_source"][data_source] = source_stats + + return sampled_indices, stats + + def _print_sampling_stats(self, stats: dict): + """Print detailed statistics about sampled trajectories. + + Args: + stats: Statistics dictionary from _sample_trajectories_by_data_source + """ + if not self.verbose: + return + + rank_0_print("\n=== Confusion Matrix Sampling Statistics ===", verbose=self.verbose) + + # Overall task statistics + rank_0_print(f"\nOverall trajectories per video task:", verbose=self.verbose) + for task, count in sorted(stats["by_task"].items()): + rank_0_print(f" {task}: {count} trajectories", verbose=self.verbose) + + # Per data source statistics + rank_0_print(f"\nPer data source breakdown:", verbose=self.verbose) + for data_source, source_stats in stats["by_source"].items(): + rank_0_print(f" Data source: {data_source}", verbose=self.verbose) + rank_0_print(f" Total available: {source_stats['total_available']}", verbose=self.verbose) + rank_0_print(f" Tasks available: {len(source_stats['tasks_available'])}", verbose=self.verbose) + for task, count in sorted(source_stats['tasks_available'].items()): + sampled_count = source_stats['tasks_sampled'].get(task, 0) + rank_0_print( + f" {task}: {sampled_count}/{count} trajectories sampled", + verbose=self.verbose, + ) + + rank_0_print("=" * 50, verbose=self.verbose) + + def _generate_sample_from_indices(self, sample_idx_info: dict) -> PreferenceSample: + """Generate a single task-trajectory sample from stored indices.""" + traj_idx = sample_idx_info["traj_idx"] + lang_task = sample_idx_info["lang_task"] + video_task = sample_idx_info["video_task"] + video_path = sample_idx_info["video_path"] + + video_traj = self.dataset[traj_idx] + + # Look up precomputed embedding instead of encoding + text_embedding = self.task_embeddings[lang_task] + + metadata = { + "id": video_traj["id"], + "lang_task": lang_task, + "video_task": video_task, + "video_path": video_path, + } + + # Override task and text_embedding in the trajectory dict + video_traj_with_task = video_traj.copy() + video_traj_with_task["task"] = lang_task + video_traj_with_task["text_embedding"] = text_embedding + + sample_trajectory = self._get_traj_from_data( + traj=video_traj_with_task, + metadata=metadata, + ) + + sample = ProgressSample(trajectory=sample_trajectory) + return sample + + def __len__(self): + return len(self.sample_indices) + + def __getitem__(self, idx): + return self._generate_sample_from_indices(self.sample_indices[idx]) diff --git a/samplers/eval/progress_policy_ranking.py b/samplers/eval/progress_policy_ranking.py new file mode 100644 index 0000000000000000000000000000000000000000..4f7bbca8c9d10bf33f7692675e04e00df107848c --- /dev/null +++ b/samplers/eval/progress_policy_ranking.py @@ -0,0 +1,231 @@ +from typing import Dict, List, Any, Optional +from itertools import cycle + +import numpy as np +from collections import defaultdict +from rfm.data.dataset_types import ProgressSample +from rfm.data.samplers.base import RFMBaseSampler +from rfm.utils.logger import get_logger + +logger = get_logger() + + +class ProgressPolicyRankingSampler(RFMBaseSampler): + """Dataset that generates progress samples for policy ranking by selecting N trajectories per quality label for tasks with multiple quality labels.""" + + def __init__( + self, + num_examples_per_quality_pr: int = 5, + num_partial_successes: Optional[int] = None, + frame_step: int = 1, + use_frame_steps: bool = True, + max_tasks: Optional[int] = None, + **kwargs, + ): + super().__init__(**kwargs) + + self.num_examples_per_quality_pr = num_examples_per_quality_pr + self.num_partial_successes = num_partial_successes + self.frame_step = frame_step + self.use_frame_steps = use_frame_steps + self.max_tasks = max_tasks + logger.info(f"ProgressPolicyRankingSampler initialized with {len(self.robot_trajectories)} trajectories") + + self.sample_indices = self._generate_all_sample_indices() + + logger.info(f"Generated {len(self.sample_indices)} sample indices") + + def _generate_all_sample_indices(self) -> List[Dict[str, Any]]: + """Generate sample indices by selecting tasks with multiple quality labels/partial_success values and sampling N trajectories per group. + + For non-RoboArena: Groups by task and quality_label. + For RoboArena: Groups by task and partial_success values. + + If use_frame_steps=True, generates subsequence samples like reward_alignment (0:frame_step, 0:2*frame_step, etc.). + If use_frame_steps=False, generates one sample per trajectory (whole trajectory). + """ + + # Check if this is RoboArena (has partial_success) + is_roboarena = False + if self.robot_trajectories: + first_traj = self.dataset[self.robot_trajectories[0]] + is_roboarena = first_traj.get("partial_success") is not None + + # Group trajectories by task and grouping key (quality_label or partial_success) + task_to_key_to_trajs = defaultdict(lambda: defaultdict(list)) + + for traj_idx in self.robot_trajectories: + traj = self.dataset[traj_idx] + task = traj["task"] + + if is_roboarena: + # RoboArena: Use rounded partial_success as key to group similar values + partial_success_val = traj.get("partial_success") + if partial_success_val is not None: + partial_success = round(float(partial_success_val), 2) + task_to_key_to_trajs[task][partial_success].append(traj_idx) + else: + # Non-RoboArena: Use quality_label + quality = traj["quality_label"] + task_to_key_to_trajs[task][quality].append(traj_idx) + + # Filter to tasks that have multiple grouping values + tasks_with_multiple_values = { + task: key_to_trajs for task, key_to_trajs in task_to_key_to_trajs.items() if len(key_to_trajs) > 1 + } + + dataset_type_str = "partial_success values" if is_roboarena else "quality labels" + logger.info(f"Found {len(tasks_with_multiple_values)} tasks with multiple {dataset_type_str}") + + # Limit number of tasks if max_tasks is specified + if self.max_tasks is not None and self.max_tasks > 0: + # Convert to list, shuffle, and take first max_tasks + # Sort tasks first to ensure deterministic ordering before shuffling + tasks_list = sorted(tasks_with_multiple_values.items()) + self._local_random.shuffle(tasks_list) + tasks_with_multiple_values = dict(tasks_list[: self.max_tasks]) + logger.info(f"Limited to {len(tasks_with_multiple_values)} tasks (max_tasks={self.max_tasks})") + + # Sample trajectories for each task + sample_indices = [] + all_sampled_traj_indices = [] + # Sort tasks to ensure deterministic processing order + for task, key_to_trajs in sorted(tasks_with_multiple_values.items()): + if is_roboarena: + # RoboArena: Use num_partial_successes for circular sampling + num_to_sample_total = self.num_partial_successes + + # Build lists of available indices per partial_success (sorted for deterministic sampling) + available_lists = [] + for partial_success in sorted(key_to_trajs.keys()): + traj_indices = sorted(key_to_trajs[partial_success]) + if traj_indices: + available_lists.append(traj_indices) + + # Circular sampling: cycle through partial_success groups until we reach max + sampled_traj_indices = [] + for available_indices in cycle(available_lists): + if len(sampled_traj_indices) >= num_to_sample_total: + break + if not available_indices: + # If all lists are empty, stop + if all(not lst for lst in available_lists): + break + continue + + # Sample one trajectory from this group + sampled_idx = self._local_random.choice(available_indices) + sampled_traj_indices.append(sampled_idx) + # Remove the sampled index from this list + available_indices.remove(sampled_idx) + + # Generate samples for all sampled trajectories + for traj_idx in sampled_traj_indices: + traj = self.dataset[traj_idx] + sample_indices.extend(self._generate_indices_for_trajectory(traj_idx, traj)) + all_sampled_traj_indices.append(traj_idx) + else: + # Non-RoboArena: Sample N trajectories per quality label + # Sort quality labels to ensure deterministic order + for quality in sorted(key_to_trajs.keys()): + traj_indices = key_to_trajs[quality] + # Sort trajectory indices to ensure deterministic sampling + traj_indices = sorted(traj_indices) + # Sample up to num_examples_per_quality_pr trajectories for this quality label + num_to_sample = min(self.num_examples_per_quality_pr, len(traj_indices)) + sampled_traj_indices = self._local_random.sample(traj_indices, num_to_sample) + for traj_idx in sampled_traj_indices: + traj = self.dataset[traj_idx] + sample_indices.extend(self._generate_indices_for_trajectory(traj_idx, traj)) + all_sampled_traj_indices.append(traj_idx) + + logger.info(f"Sampled {len(sample_indices)} samples across {len(tasks_with_multiple_values)} tasks") + logger.info(f"Sampled trajectory indices: {all_sampled_traj_indices}") + + return sample_indices + + def _generate_indices_for_trajectory(self, traj_idx: int, traj: Dict[str, Any]) -> List[Dict[str, Any]]: + """Generate sample indices for a single trajectory. + + Args: + traj_idx: Index of the trajectory in the dataset + traj: Trajectory dictionary + + Returns: + List of sample index dictionaries + """ + num_frames = traj["num_frames"] + indices = [] + + if self.use_frame_steps: + # Generate subsequence indices like reward_alignment: 0:frame_step, 0:2*frame_step, etc. + for end_idx in range(self.frame_step, num_frames + 1, self.frame_step): + frame_indices = list(range(end_idx)) + indices.append({ + "traj_idx": traj_idx, + "frame_indices": frame_indices, + "num_frames": num_frames, + "video_path": traj["frames"], + "id": traj["id"], + "use_frame_steps": True, + }) + else: + # Generate one sample per trajectory (whole trajectory) + indices.append({ + "traj_idx": traj_idx, + "video_path": traj["frames"], + "id": traj["id"], + "use_frame_steps": False, + }) + + return indices + + def _generate_sample_from_indices(self, sample_idx_info: dict) -> ProgressSample: + """Generate a single progress sample from trajectory index.""" + traj_idx = sample_idx_info["traj_idx"] + use_frame_steps = sample_idx_info.get("use_frame_steps", True) + + traj = self.dataset[traj_idx] + + if use_frame_steps: + # Frame steps mode: create subsequence like reward_alignment + frame_indices = sample_idx_info["frame_indices"] + num_frames = sample_idx_info["num_frames"] + + metadata = { + "quality_label": traj["quality_label"], + "data_source": traj["data_source"], + "task": traj["task"], + "id": traj["id"], + "video_path": sample_idx_info["video_path"], + "frame_step": frame_indices[-1] if frame_indices else 0, + } + + trajectory = self._get_traj_from_data( + traj=traj, + frame_indices=frame_indices, + metadata=metadata, + ) + else: + # Whole trajectory mode + metadata = { + "quality_label": traj["quality_label"], + "data_source": traj["data_source"], + "task": traj["task"], + "id": traj["id"], + "video_path": sample_idx_info["video_path"], + } + + trajectory = self._get_traj_from_data( + traj=traj, + metadata=metadata, + ) + + sample = ProgressSample(trajectory=trajectory) + return sample + + def __len__(self): + return len(self.sample_indices) + + def __getitem__(self, idx): + return self._generate_sample_from_indices(self.sample_indices[idx]) diff --git a/samplers/eval/quality_preference.py b/samplers/eval/quality_preference.py new file mode 100644 index 0000000000000000000000000000000000000000..dd1f315e5287d02e1fc2d23f6a355fd976ce56e6 --- /dev/null +++ b/samplers/eval/quality_preference.py @@ -0,0 +1,219 @@ +from typing import Dict, List, Any + +from itertools import combinations +from tqdm import tqdm + +from rfm.data.samplers.eval.base_pref import BaseQualityPreferenceSampler +from rfm.utils.distributed import rank_0_print + + +class QualityPreferenceSampler(BaseQualityPreferenceSampler): + """Dataset that generates preference samples by pairing trajectories with different quality labels or partial_success values for the same task. + + For non-RoboArena: Pairs trajectories with different quality labels (failure, suboptimal, successful). + For RoboArena: Pairs trajectories with different partial_success values (higher partial_success = chosen). + """ + + def __init__( + self, + comparisons_per_task=None, + max_comparisons=None, + **kwargs, + ): + super().__init__(**kwargs) + + # Set data_gen_strategy for this sampler + self.data_gen_strategy = "quality_preference" + self.comparisons_per_task = comparisons_per_task + self.max_comparisons = max_comparisons + + # Generate all possible sample indices upfront (not the actual samples) + self.sample_indices = self._generate_all_sample_indices() + rank_0_print(f"Generated {len(self.sample_indices)} quality preference sample indices", verbose=self.verbose) + + def _generate_all_sample_indices(self) -> List[Dict[str, Any]]: + """Generate all possible quality preference sample indices (not the actual samples). + + For non-RoboArena: Groups by task and quality_label, pairs trajectories with different quality labels. + For RoboArena: Groups by task and partial_success values, pairs trajectories with different partial_success. + """ + sample_indices = [] + + # Check if this is RoboArena (has partial_success) + is_roboarena = False + if self.robot_trajectories: + first_traj = self.dataset[self.robot_trajectories[0]] + is_roboarena = first_traj.get("partial_success") is not None + + rank_0_print( + f"Generating quality preference samples for {len(self.robot_trajectories)} trajectories " + f"({'RoboArena (partial_success)' if is_roboarena else 'non-RoboArena (quality_label)'})", + verbose=self.verbose, + ) + + if is_roboarena: + # RoboArena: Group by task and partial_success (rounded to 2 decimals) + task_to_partial_trajs = {} + + for traj_idx in self.robot_trajectories: + traj = self.dataset[traj_idx] + task = traj["task"] + partial_success_val = traj.get("partial_success") + + if partial_success_val is None: + rank_0_print( + f"Warning: Trajectory {traj_idx} (task: {task}) missing partial_success, skipping", + verbose=self.verbose, + ) + continue + + # Round partial_success to 2 decimals for grouping + partial_success = round(float(partial_success_val), 2) + + if task not in task_to_partial_trajs: + task_to_partial_trajs[task] = {} + + if partial_success not in task_to_partial_trajs[task]: + task_to_partial_trajs[task][partial_success] = [] + + task_to_partial_trajs[task][partial_success].append(traj_idx) + + # Generate pairs for each task + for task in tqdm(task_to_partial_trajs, desc="Generating RoboArena quality preference samples"): + partial_groups = task_to_partial_trajs[task] + partial_values = list(partial_groups.keys()) + + # Only create pairs if we have at least 2 different partial_success values + if len(partial_values) < 2: + continue + + # Collect all pairs for this task + task_pairs = [] + + # Create pairs of different partial_success values + for partial1, partial2 in combinations(partial_values, 2): + trajs1 = partial_groups[partial1] + trajs2 = partial_groups[partial2] + + if not trajs1 or not trajs2: + continue + + # Determine which partial_success is higher (chosen) + if partial1 > partial2: + chosen_partial = partial1 + rejected_partial = partial2 + chosen_indices = trajs1 + rejected_indices = trajs2 + elif partial2 > partial1: + chosen_partial = partial2 + rejected_partial = partial1 + chosen_indices = trajs2 + rejected_indices = trajs1 + else: + # Same value, skip this pair + continue + + # Create all possible pairs for this partial_success combination + for chosen_idx in chosen_indices: + for rejected_idx in rejected_indices: + task_pairs.append({ + "chosen_traj_idx": chosen_idx, + "rejected_traj_idx": rejected_idx, + "task": task, + "chosen_partial_success": chosen_partial, + "rejected_partial_success": rejected_partial, + }) + + # Apply comparisons_per_task limit if set (sample uniformly across all pairs for this task) + if self.comparisons_per_task is not None and len(task_pairs) > self.comparisons_per_task: + # Uniformly sample comparisons for this task + task_pairs = self._local_random.sample(task_pairs, self.comparisons_per_task) + + sample_indices.extend(task_pairs) + + else: + # Non-RoboArena: Group by task and quality label + task_to_quality_trajs = {} + + for traj_idx in self.robot_trajectories: + traj = self.dataset[traj_idx] + task = traj["task"] + quality_label = traj["quality_label"] + + if task not in task_to_quality_trajs: + task_to_quality_trajs[task] = {} + + if quality_label not in task_to_quality_trajs[task]: + task_to_quality_trajs[task][quality_label] = [] + + task_to_quality_trajs[task][quality_label].append(traj_idx) + + # Generate pairs for each task + quality_order = {"failure": 1, "suboptimal": 2, "successful": 3} + + for task in tqdm(task_to_quality_trajs, desc="Generating quality preference samples"): + quality_groups = task_to_quality_trajs[task] + quality_labels = list(quality_groups.keys()) + + # Only create pairs if we have at least 2 different quality labels + if len(quality_labels) < 2: + continue + + # Collect all pairs for this task + task_pairs = [] + + # Create pairs of different quality labels + for quality1, quality2 in combinations(quality_labels, 2): + trajs1 = quality_groups[quality1] + trajs2 = quality_groups[quality2] + + if not trajs1 or not trajs2: + continue + + # Determine which quality is better (chosen) + order1 = quality_order.get(quality1, 0) + order2 = quality_order.get(quality2, 0) + + # Only create pairs if one quality is strictly better than the other + if order1 > order2: + chosen_quality = quality1 + rejected_quality = quality2 + chosen_indices = trajs1 + rejected_indices = trajs2 + elif order2 > order1: + chosen_quality = quality2 + rejected_quality = quality1 + chosen_indices = trajs2 + rejected_indices = trajs1 + else: + # Same order, skip this pair as we can't reliably compare them + continue + + # Create all possible pairs for this quality combination + for chosen_idx in chosen_indices: + for rejected_idx in rejected_indices: + task_pairs.append({ + "chosen_traj_idx": chosen_idx, + "rejected_traj_idx": rejected_idx, + "task": task, + "chosen_quality": chosen_quality, + "rejected_quality": rejected_quality, + }) + + # Apply comparisons_per_task limit if set (sample uniformly across all pairs for this task) + if self.comparisons_per_task is not None and len(task_pairs) > self.comparisons_per_task: + # Uniformly sample comparisons for this task + task_pairs = self._local_random.sample(task_pairs, self.comparisons_per_task) + + sample_indices.extend(task_pairs) + + # Apply max_comparisons limit if set (sample uniformly across all comparisons) + original_count = len(sample_indices) + if self.max_comparisons is not None and original_count > self.max_comparisons: + sample_indices = self._local_random.sample(sample_indices, self.max_comparisons) + rank_0_print( + f"Limited total comparisons to {self.max_comparisons} (from {original_count} total comparisons)", + verbose=self.verbose, + ) + + return sample_indices diff --git a/samplers/eval/reward_alignment.py b/samplers/eval/reward_alignment.py new file mode 100644 index 0000000000000000000000000000000000000000..c7217a0adeee98a10ea5acb42ada30e46f3fdc0e --- /dev/null +++ b/samplers/eval/reward_alignment.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python3 +""" +Data generator for reward alignment evaluation. + +This generator creates subsequence samples from trajectories for progress prediction evaluation. +For each trajectory, it creates multiple subsequences (0:2, 0:4, 0:6, etc.) and formats them +as PreferenceSample objects that can be evaluated by the model. +""" + +from typing import Dict, List, Any + +import torch +from tqdm import tqdm + +from rfm.data.dataset_types import ProgressSample, Trajectory +from rfm.data.samplers.base import RFMBaseSampler +from rfm.utils.distributed import rank_0_print + + +class RewardAlignmentSampler(RFMBaseSampler): + """ + Data generator that creates subsequence samples for reward alignment evaluation. + + For each trajectory, creates subsequences of frames (0:2, 0:4, 0:6, etc.) + and formats them as PreferenceSample objects for evaluation. + """ + + def __init__( + self, + max_trajectories: int | None = None, + frame_step: int = 1, + use_frame_steps: bool = True, + **kwargs, + ): + super().__init__(**kwargs) + + self.max_trajectories = max_trajectories + self.frame_step = frame_step + self.use_frame_steps = use_frame_steps + self.sample_indices = self._generate_all_sample_indices() + + rank_0_print( + f"Generated {len(self.sample_indices)} reward alignment sample indices from {min(len(self.robot_trajectories), self.max_trajectories) if self.max_trajectories else len(self.robot_trajectories)} trajectories", + verbose=self.verbose, + ) + + def _generate_all_sample_indices(self) -> List[Dict[str, Any]]: + """Generate all possible subsequence sample indices (not the actual samples).""" + sample_indices = [] + + # Limit number of trajectories if specified + trajectories_to_process = self.robot_trajectories + if self.max_trajectories is not None and self.max_trajectories < len(self.robot_trajectories): + trajectories_to_process = self._local_random.sample(self.robot_trajectories, self.max_trajectories) + + rank_0_print( + f"Generating subsequence samples for {len(trajectories_to_process)} trajectories", verbose=self.verbose + ) + + for traj_idx in trajectories_to_process: + traj = self.dataset[traj_idx] + sample_indices.extend(self._generate_indices_for_trajectory(traj_idx, traj)) + + return sample_indices + + def _generate_indices_for_trajectory(self, traj_idx: int, traj: Dict[str, Any]) -> List[Dict[str, Any]]: + """Generate sample indices for a single trajectory. + + Args: + traj_idx: Index of the trajectory in the dataset + traj: Trajectory dictionary + + Returns: + List of sample index dictionaries + """ + num_frames = traj["num_frames"] + indices = [] + + if self.use_frame_steps: + # Generate subsequence indices like reward_alignment: 0:frame_step, 0:2*frame_step, etc. + for end_idx in range(self.frame_step, num_frames + 1, self.frame_step): + frame_indices = list(range(end_idx)) + indices.append({ + "traj_idx": traj_idx, + "frame_indices": frame_indices, + "num_frames": num_frames, + "video_path": traj["frames"], + "id": traj["id"], + "use_frame_steps": True, + }) + else: + # Generate one sample per trajectory (whole trajectory) + indices.append({ + "traj_idx": traj_idx, + "video_path": traj["frames"], + "id": traj["id"], + "use_frame_steps": False, + }) + + return indices + + def _generate_sample_from_indices(self, sample_idx_info: dict) -> ProgressSample: + """Generate a single subsequence sample from stored indices.""" + traj_idx = sample_idx_info["traj_idx"] + use_frame_steps = sample_idx_info.get("use_frame_steps", True) + + traj = self.dataset[traj_idx] + + if use_frame_steps: + # Frame steps mode: create subsequence like reward_alignment + frame_indices = sample_idx_info["frame_indices"] + num_frames = sample_idx_info["num_frames"] + + metadata = { + "data_gen_strategy": "reward_alignment", + "id": traj["id"], + "video_path": sample_idx_info["video_path"], + "frame_step": frame_indices[-1] if frame_indices else 0, + "num_frames": num_frames, + } + + trajectory = self._get_traj_from_data( + traj=traj, + frame_indices=frame_indices, + metadata=metadata, + ) + else: + # Whole trajectory mode + metadata = { + "data_gen_strategy": "reward_alignment", + "id": traj["id"], + "video_path": sample_idx_info["video_path"], + } + + trajectory = self._get_traj_from_data( + traj=traj, + metadata=metadata, + ) + + sample = ProgressSample(trajectory=trajectory, sample_type="progress") + return sample + + def __len__(self): + return len(self.sample_indices) + + def __getitem__(self, idx): + return self._generate_sample_from_indices(self.sample_indices[idx]) diff --git a/samplers/eval/roboarena_quality_preference.py b/samplers/eval/roboarena_quality_preference.py new file mode 100644 index 0000000000000000000000000000000000000000..f33651b0b2a9b2ddc00799119bfa6bd533f2d5f3 --- /dev/null +++ b/samplers/eval/roboarena_quality_preference.py @@ -0,0 +1,121 @@ +from typing import Dict, List, Any + +from tqdm import tqdm + +from rfm.data.samplers.eval.base_pref import BaseQualityPreferenceSampler +from rfm.utils.distributed import rank_0_print + + +class RoboArenaQualityPreferenceSampler(BaseQualityPreferenceSampler): + """Dataset that generates preference samples by pairing trajectories with different partial_rewards for the same task. + + For RoboArena dataset, pairs trajectories from the same task where the chosen trajectory + has a higher partial_reward (partial_success) than the rejected trajectory. + """ + + def __init__( + self, + comparisons_per_task=None, + **kwargs, + ): + super().__init__(**kwargs) + + # Set data_gen_strategy for this sampler + self.data_gen_strategy = "quality_preference_roboarena" + + self._cached_tasks = self.dataset["task"] + self._cached_partial_success = self.dataset.get("partial_success") + + self.comparisons_per_task = comparisons_per_task + + # Generate all possible sample indices upfront (not the actual samples) + self.sample_indices = self._generate_all_sample_indices() + rank_0_print( + f"Generated {len(self.sample_indices)} RoboArena quality preference sample indices", verbose=self.verbose + ) + + def _generate_all_sample_indices(self) -> List[Dict[str, Any]]: + """Generate all possible quality preference sample indices based on partial_reward (partial_success).""" + sample_indices = [] + + # Group trajectories by task + task_to_trajs = {} + + rank_0_print( + f"Generating RoboArena quality preference samples for {len(self.robot_trajectories)} trajectories", + verbose=self.verbose, + ) + + for traj_idx in self.robot_trajectories: + # Use cached arrays for efficient access + task = self._cached_tasks[traj_idx] + partial_success = ( + self._cached_partial_success[traj_idx] if traj_idx < len(self._cached_partial_success) else None + ) + + # Ensure partial_success exists + if partial_success is None: + rank_0_print( + f"Warning: Trajectory {traj_idx} (task: {task}) missing partial_success, skipping", + verbose=self.verbose, + ) + continue + + if task not in task_to_trajs: + task_to_trajs[task] = [] + + task_to_trajs[task].append({ + "traj_idx": traj_idx, + "partial_success": float(partial_success), + }) + + # Generate pairs for each task + for task in tqdm(task_to_trajs, desc="Generating RoboArena quality preference samples"): + trajs = task_to_trajs[task] + + # Need at least 2 trajectories to create pairs + if len(trajs) < 2: + continue + + # Create all pairs of trajectories + task_pairs = [] + for i, traj1 in enumerate(trajs): + for j, traj2 in enumerate(trajs): + if i >= j: # Avoid duplicates and self-pairs + continue + + partial1 = traj1["partial_success"] + partial2 = traj2["partial_success"] + + # Skip if partial_success values are equal (can't determine preference) + if partial1 == partial2: + continue + + # Determine which trajectory is chosen (higher partial_success) + if partial1 > partial2: + chosen_traj_idx = traj1["traj_idx"] + rejected_traj_idx = traj2["traj_idx"] + chosen_partial = partial1 + rejected_partial = partial2 + else: + chosen_traj_idx = traj2["traj_idx"] + rejected_traj_idx = traj1["traj_idx"] + chosen_partial = partial2 + rejected_partial = partial1 + + task_pairs.append({ + "chosen_traj_idx": chosen_traj_idx, + "rejected_traj_idx": rejected_traj_idx, + "task": task, + "chosen_partial_success": chosen_partial, + "rejected_partial_success": rejected_partial, + }) + + # Apply comparisons_per_task limit if set (sample uniformly across all pairs for this task) + if self.comparisons_per_task is not None and len(task_pairs) > self.comparisons_per_task: + # Uniformly sample comparisons for this task + task_pairs = self._local_random.sample(task_pairs, self.comparisons_per_task) + + sample_indices.extend(task_pairs) + + return sample_indices diff --git a/samplers/eval/similarity_score.py b/samplers/eval/similarity_score.py new file mode 100644 index 0000000000000000000000000000000000000000..a2f398046c5d54d2df28f471037af77f7dc9cbf8 --- /dev/null +++ b/samplers/eval/similarity_score.py @@ -0,0 +1,154 @@ +#!/usr/bin/env python3 +""" +Data generator for similarity score evaluation. + +This generator creates similarity samples for evaluation: +- For each paired human-robot trajectory pair (same task), creates similarity samples +- For each pairing, samples N negative trajectories from other tasks +- Creates similarity samples: ref=human, traj_sim=robot (same task), traj_diff=negative (different task) +""" + +from typing import Dict, List, Any + +from rfm.data.dataset_types import SimilaritySample, Trajectory +from rfm.data.samplers.base import RFMBaseSampler +from rfm.utils.distributed import rank_0_print + + +class SimilarityScoreSampler(RFMBaseSampler): + """ + Data generator that creates similarity samples for evaluation. + + For each paired human-robot trajectory pair (same task): + - Creates similarity samples with ref=human, traj_sim=robot, traj_diff=negative (from different task) + - Samples N negative trajectories from other tasks for each pairing + """ + + def __init__(self, num_negatives: int = 2, **kwargs): + super().__init__(**kwargs) + + self.num_negatives = num_negatives + self.sample_indices = self._generate_all_sample_indices() + + rank_0_print( + f"Generated {len(self.sample_indices)} similarity score sample indices from {len(self.paired_human_robot_by_task)} tasks", + verbose=self.verbose, + ) + + def _generate_all_sample_indices(self) -> List[Dict[str, Any]]: + """Generate all possible similarity score sample indices.""" + sample_indices = [] + + # Iterate through all tasks with paired human-robot data + for task, paired_info in self.paired_human_robot_by_task.items(): + human_indices = paired_info["human"] + robot_indices = paired_info["robot"] + + if not human_indices or not robot_indices: + continue + + # Get all tasks except the current one for negative sampling + other_tasks = [t for t in self.task_indices.keys() if t != task] + + if not other_tasks: + continue + + # Limit number of human/robot trajectories considered per task to reduce combinatorics + if len(human_indices) > 2: + selected_humans = self._local_random.sample(human_indices, 2) + else: + selected_humans = human_indices + + if len(robot_indices) > 2: + selected_robots = self._local_random.sample(robot_indices, 2) + else: + selected_robots = robot_indices + + # For each selected human-robot pair, create N samples (one per negative) + for human_idx in selected_humans: + for robot_idx in selected_robots: + # Sample N negative tasks (with replacement if needed) + negative_tasks = self._local_random.choices(other_tasks, k=self.num_negatives) + + # Create one sample index entry per negative + for negative_task in negative_tasks: + negative_task_indices = self.task_indices.get(negative_task, []) + if not negative_task_indices: + continue + + # Store the negative task, we'll sample a specific negative during generation + sample_indices.append({ + "human_idx": human_idx, + "robot_idx": robot_idx, + "task": task, + "negative_task": negative_task, + "negative_task_indices": negative_task_indices, + }) + + return sample_indices + + def _generate_sample_from_indices(self, sample_idx_info: dict) -> SimilaritySample: + """Generate a single similarity sample from stored indices.""" + human_idx = sample_idx_info["human_idx"] + robot_idx = sample_idx_info["robot_idx"] + task = sample_idx_info["task"] + negative_task = sample_idx_info["negative_task"] + negative_task_indices = sample_idx_info["negative_task_indices"] + + # Get human and robot trajectories + human_traj = self.dataset[human_idx] + robot_traj = self.dataset[robot_idx] + + # Sample a negative trajectory from the specified different task + if not negative_task_indices: + return None + + negative_idx = self._local_random.choice(negative_task_indices) + negative_traj = self.dataset[negative_idx] + + # Create trajectories for the similarity sample + ref_traj = self._create_trajectory_from_data(human_traj) + sim_traj = self._create_trajectory_from_data(robot_traj) + diff_traj = self._create_trajectory_from_data(negative_traj) + + # Create metadata + metadata = { + "task": task, + "negative_task": negative_task, + "human_id": human_traj["id"], + "robot_id": robot_traj["id"], + "negative_id": negative_traj["id"], + } + + # Add metadata to trajectories + if ref_traj.metadata is None: + ref_traj.metadata = {} + ref_traj.metadata.update(metadata) + + sample = SimilaritySample( + ref_trajectory=ref_traj, + sim_trajectory=sim_traj, + diff_trajectory=diff_traj, + data_gen_strategy="similarity_score_eval", + ) + + return sample + + def _create_trajectory_from_data(self, traj_data: dict) -> Trajectory: + """Create a Trajectory object from dataset entry.""" + metadata = { + "data_gen_strategy": "similarity_score_eval", + } + + trajectory = self._get_traj_from_data( + traj=traj_data, + metadata=metadata, + ) + + return trajectory + + def __len__(self): + return len(self.sample_indices) + + def __getitem__(self, idx): + return self._generate_sample_from_indices(self.sample_indices[idx]) diff --git a/samplers/pref.py b/samplers/pref.py new file mode 100755 index 0000000000000000000000000000000000000000..d6b20d199db66eab39b3abdced46de3e4538e762 --- /dev/null +++ b/samplers/pref.py @@ -0,0 +1,367 @@ +#!/usr/bin/env python3 +""" +PrefSampler class for producing batches of preference data. +""" + +from typing import Dict, List, Optional, Any + +import random + +from rfm.data.dataset_types import PreferenceSample, Trajectory +from rfm.data.samplers.base import RFMBaseSampler +from rfm.data.datasets.helpers import ( + DataGenStrat, + convert_continuous_to_discrete_bins, +) +from rfm.utils.logger import get_logger, rank_0_info, trace +from rfm.utils.timer import timer + +logger = get_logger() + + +class PrefSampler(RFMBaseSampler): + """Data generator for producing batches of preference prediction data.""" + + def __init__(self, is_evaluation=False, **kwargs): + super().__init__(**kwargs) + + self.dataset_preference_ratio = self.config.dataset_preference_ratio + self.preference_strategy_ratio: List[float] = self.config.preference_strategy_ratio + self._has_suboptimal = ( + any(len(indices) > 0 for indices in self.suboptimal_by_task.values()) if self.suboptimal_by_task else False + ) + rank_0_info(f"[PREF SAMPLER] Has suboptimal: {self._has_suboptimal}") + + # Initialize preference dataset + self._load_preference_dataset() + + def _generate_sample(self, item: dict, preferred_strategy: Optional[DataGenStrat] = None): + """Generate a preference sample from an item. + + If the item has a non-successful quality label, it will be used as the rejected + trajectory and an optimal trajectory from the same task will be found as the chosen one. + Otherwise, normal preference sampling logic is used. + + Args: + item: The trajectory item + preferred_strategy: Optional strategy to use (if None, will select strategy based on ratios) + """ + quality_label = item["quality_label"] + use_partial_success = item.get("partial_success") is not None + + # Handle non-successful trajectories: use as rejected, find optimal from same task as chosen + # skip this for trajectories with partial_success which we will handle with partial success logic + if quality_label != "successful" and not use_partial_success: + traj_id = item["id"] + task_name = item["task"] + + logger.trace( + f"[PREF SAMPLER] Non-successful quality detected for ID={traj_id}, using as rejected trajectory, task={task_name}" + ) + + # Find optimal trajectories from the same task + same_task_optimal_indices = self.optimal_by_task.get(task_name, []) + + if not same_task_optimal_indices: + logger.trace( + f"[PREF SAMPLER] No optimal trajectories found for task '{task_name}', falling through to normal sampling" + ) + return self._create_pref_sample(item, preferred_strategy=preferred_strategy) + + # Select a random optimal trajectory from the same task as chosen + chosen_idx = random.choice(same_task_optimal_indices) + chosen_traj_dict = self.dataset[chosen_idx] + + chosen_trajectory = self._get_traj_from_data(chosen_traj_dict) + rejected_trajectory = self._get_traj_from_data(item) + + sample = PreferenceSample( + chosen_trajectory=chosen_trajectory, + rejected_trajectory=rejected_trajectory, + data_gen_strategy=DataGenStrat.SUBOPTIMAL.value, + ) + + logger.trace( + f"[PREF SAMPLER] Created preference sample for non-successful traj ID={traj_id} with optimal traj from same task" + ) + return sample + + return self._create_pref_sample(item, preferred_strategy=preferred_strategy) + + def _execute_strategy( + self, strategy: DataGenStrat, chosen_traj: Dict[str, Any], use_partial_success: bool + ) -> tuple[Dict[str, Any], str, Dict[str, Any]] | None: + """Execute a strategy to get rejected trajectory. + + Args: + strategy: The strategy to execute + chosen_traj: The chosen trajectory + use_partial_success: Whether this trajectory uses partial_success + + Returns: + Tuple of (rejected_traj, rejected_subsample_strategy, chosen_traj) or None if failed + Note: chosen_traj may be swapped with rejected_traj for partial_success trajectories + """ + max_retries = 3 + rejected_subsample_strategy = None + rejected_traj = None + + if strategy == DataGenStrat.REWIND: + rejected_traj = chosen_traj.copy() + rejected_subsample_strategy = "subsample_rewind" + elif strategy == DataGenStrat.SUBOPTIMAL: + for _ in range(max_retries): + rejected_traj = self._get_same_task_suboptimal(chosen_traj) + if rejected_traj is not None: + # For trajectories with partial_success, if the returned trajectory has higher partial_success, swap them + if use_partial_success: + chosen_partial_success = chosen_traj.get("partial_success") + rejected_partial_success = rejected_traj.get("partial_success") + if rejected_partial_success is not None and chosen_partial_success is not None: + if rejected_partial_success > chosen_partial_success: + logger.trace( + f"[PREF SAMPLER] Swapping trajectories: found higher partial_success " + f"({rejected_partial_success} > {chosen_partial_success})" + ) + rejected_traj, chosen_traj = chosen_traj, rejected_traj + break + rejected_subsample_strategy = "subsample_forward" + elif strategy == DataGenStrat.DIFFERENT_TASK: + for _ in range(max_retries): + rejected_traj = self._get_different_video_traj(chosen_traj) + if rejected_traj is not None: + break + rejected_subsample_strategy = "subsample_forward" + elif strategy == DataGenStrat.REVERSE_PROGRESS: + rejected_traj = chosen_traj.copy() + rejected_subsample_strategy = "subsample_reverse" + else: + return None + + if rejected_traj is None: + return None + + return (rejected_traj, rejected_subsample_strategy, chosen_traj) + + def _create_pref_sample_from_dataset(self) -> PreferenceSample: + """Create a preference sample from the loaded preference dataset.""" + if not self.preferences: + return None + + # For now, return a simple preference sample + # This can be enhanced later when we have actual preference data + random.choice(self.preferences) + + # This is a placeholder - would need to be implemented based on actual preference data structure + return None + + def _load_preference_dataset(self): + """Load the preference dataset from disk or hub if provided.""" + self.preferences = [] + + # For now, we'll use empty preferences since the config structure has changed + # This can be updated later if needed + rank_0_info("[PREF SAMPLER] No preference dataset provided, will use random sampling for preferences") + return + + def _create_preference_sample(self) -> PreferenceSample: + """Create a preference prediction sample: chosen vs rejected where chosen is preferred. + Either from dataset or from generated trajectories. + + Returns: + PreferenceSample: A preference sample with chosen (preferred) vs rejected + (suboptimal) trajectories and associated metadata + """ + + with timer("create_preference_sample", verbose=False): + if random.random() < self.dataset_preference_ratio and self.preferences: + # Use preference trajectories from dataset + return self._create_pref_sample_from_dataset() + else: + return self._create_pref_sample() + + def _create_pref_sample( + self, chosen_traj: Optional[Dict[str, Any]] = None, preferred_strategy: Optional[DataGenStrat] = None + ) -> PreferenceSample: + """Create a preference prediction sample using various rejected trajectory generation strategies. + + Rewind Same Task + - Creates a suboptimal trajectory by rewinding the chosen trajectory + + Suboptimal/Failure Same Task + - Uses existing suboptimal/failure trajectories from the same task + + Different Task + - Uses trajectories from completely different tasks + + Returns: + PreferenceSample: A preference sample with chosen (preferred) vs rejected + (suboptimal) trajectories and associated metadata + + Raises: + ValueError: If no chosen trajectories are available for preference generation + RuntimeError: If all strategies fail and fallback rewind also fails + """ + # Log when preference sampler is called + traj_id = chosen_traj["id"] if chosen_traj is not None else "sampling_new" + logger.trace(f"[PREF SAMPLER] Creating preference sample for trajectory ID: {traj_id}") + + # Use provided chosen trajectory if given; otherwise sample one + if chosen_traj is None: + # Use preprocessed chosen trajectories from index maps + if not self.optimal_by_task: + return None + + # Filter out tasks with empty optimal_indices to avoid infinite loop + valid_tasks = { + task: indices + for task, indices in self.optimal_by_task.items() + if indices # Only include tasks with non-empty indices + } + + if not valid_tasks: + # No valid tasks with optimal trajectories available + return None + + # Get a random task and chosen trajectory from it + task_name = random.choice(list(valid_tasks.keys())) + optimal_indices = valid_tasks[task_name] + + # Double-check that we have valid indices (should always be true now) + if not optimal_indices: + return None + + chosen_idx = random.choice(optimal_indices) + chosen_traj = self.dataset[chosen_idx] + + # Initialize variables for strategy selection + rejected_traj = None + strategy_used = None + rejected_subsample_strategy = None + + # Check if this trajectory uses partial_success + use_partial_success = chosen_traj.get("partial_success") is not None + if use_partial_success: + partial_success = chosen_traj.get("partial_success") + logger.trace( + f"[PREF SAMPLER] Trajectory with partial_success detected (ID: {chosen_traj.get('id', 'unknown')}, partial_success: {partial_success})" + ) + + # Strategy selection: use preferred_strategy if provided, otherwise select based on ratios + if preferred_strategy is not None: + # Use the preferred strategy directly + logger.trace(f"[PREF SAMPLER] Using preferred strategy: {preferred_strategy.value}") + result = self._execute_strategy(preferred_strategy, chosen_traj, use_partial_success) + if result is None: + logger.trace(f"[PREF SAMPLER] Preferred strategy {preferred_strategy.value} failed, returning None") + return None + rejected_traj, rejected_subsample_strategy, chosen_traj = result + strategy_used = preferred_strategy + attempt = 1 # Set attempt for preferred strategy path + else: + # Strategy selection with rebalancing on failure + strategies = [] + if self.preference_strategy_ratio[0] > 0: + strategies.append((DataGenStrat.REWIND, self.preference_strategy_ratio[0])) + if self._has_suboptimal and self.preference_strategy_ratio[1] > 0: + strategies.append((DataGenStrat.SUBOPTIMAL, self.preference_strategy_ratio[1])) + if self.preference_strategy_ratio[2] > 0: + strategies.append((DataGenStrat.DIFFERENT_TASK, self.preference_strategy_ratio[2])) + if self.preference_strategy_ratio[3] > 0: + strategies.append((DataGenStrat.REVERSE_PROGRESS, self.preference_strategy_ratio[3])) + + max_attempts = 10 # Limit retry attempts to prevent infinite loops + max_strategy_attempts = 3 # Maximum attempts per strategy before removing it + attempt = 0 + + # Track attempts per strategy + strategy_attempt_counts = {strat: 0 for strat, _ in strategies} + + while rejected_traj is None and attempt < max_attempts: + attempt += 1 + + # Check if we have any strategies left + if not strategies: + return None + + # Rebalance probabilities based on remaining strategies + total_prob = sum(prob for _, prob in strategies) + if total_prob == 0: + return None + + # Normalize probabilities + normalized_strategies = [(strat, prob / total_prob) for strat, prob in strategies] + + # Select strategy based on rebalanced probabilities + prob = random.random() + cumulative_prob = 0.0 + selected_strategy = None + + for strat, normalized_prob in normalized_strategies: + cumulative_prob += normalized_prob + if prob <= cumulative_prob: + selected_strategy = strat + break + + # Log strategy attempt + logger.trace( + f"[PREF SAMPLER] Attempt {attempt}/{max_attempts}: Trying strategy {selected_strategy.value if selected_strategy else 'None'}" + ) + + # Execute selected strategy + result = self._execute_strategy(selected_strategy, chosen_traj, use_partial_success) + if result is not None: + rejected_traj, rejected_subsample_strategy, chosen_traj = result + strategy_used = selected_strategy + logger.trace(f"[PREF SAMPLER] Strategy {selected_strategy.value} succeeded on attempt {attempt}") + else: + # Strategy failed - increment attempt count + strategy_attempt_counts[selected_strategy] = strategy_attempt_counts.get(selected_strategy, 0) + 1 + failed_count = strategy_attempt_counts[selected_strategy] + + logger.trace( + f"[PREF SAMPLER] Strategy {selected_strategy.value} failed (failure count: {failed_count}/{max_strategy_attempts})" + ) + + # Only remove strategy if it has failed max_strategy_attempts times + if strategy_attempt_counts[selected_strategy] >= max_strategy_attempts: + logger.trace( + f"[PREF SAMPLER] Removing strategy {selected_strategy.value} after {max_strategy_attempts} consecutive failures" + ) + strategies = [(strat, prob) for strat, prob in strategies if strat != selected_strategy] + continue + + # If we still don't have a sample after all attempts, return None + if rejected_traj is None: + logger.trace( + f"[PREF SAMPLER] Failed to generate preference sample after {max_attempts} attempts - all strategies exhausted" + ) + return None + + chosen_subsample_strategy = "subsample_forward" + chosen_trajectory = self._get_traj_from_data(chosen_traj, subsample_strategy=chosen_subsample_strategy) + + rejected_trajectory = self._get_traj_from_data(rejected_traj, subsample_strategy=rejected_subsample_strategy) + + # If our strategy is different task, make sure the rejected trajectory has 0 progress and 0 success labels + if strategy_used in [ + DataGenStrat.DIFFERENT_TASK, + DataGenStrat.DIFFERENT_TASK_INSTRUCTION, + ]: + rejected_trajectory.target_progress = [0.0] * len(rejected_trajectory.target_progress) + if self.config.progress_loss_type.lower() == "discrete": + rejected_trajectory.target_progress = convert_continuous_to_discrete_bins( + rejected_trajectory.target_progress, self.config.progress_discrete_bins + ) + # Also set success labels to 0.0 (predict 0 success for different task trajectories) + if rejected_trajectory.success_label is not None: + rejected_trajectory.success_label = [0.0] * len(rejected_trajectory.success_label) + + # Create preference sample structure + sample = PreferenceSample( + chosen_trajectory=chosen_trajectory, + rejected_trajectory=rejected_trajectory, + data_gen_strategy=strategy_used.value, + ) + sample.resample_attempts = attempt + return sample diff --git a/samplers/progress.py b/samplers/progress.py new file mode 100644 index 0000000000000000000000000000000000000000..ef35083a7447be7ff4154dd68e8a26be7688bc4f --- /dev/null +++ b/samplers/progress.py @@ -0,0 +1,173 @@ +from typing import Dict, Any, Optional + +import random +import torch + +from rfm.data.dataset_types import ProgressSample, Trajectory +from rfm.data.samplers.base import RFMBaseSampler +from rfm.data.datasets.helpers import ( + DataGenStrat, + load_embeddings_from_path, + convert_continuous_to_discrete_bins, +) +from rfm.utils.distributed import rank_0_print +from rfm.utils.logger import get_logger + +logger = get_logger() + + +class ProgressSampler(RFMBaseSampler): + """Data generator for progress samples.""" + + def __init__(self, is_evaluation=False, **kwargs): + super().__init__(**kwargs) + + def _generate_sample(self, item: Dict[str, Any], preferred_strategy: Optional[DataGenStrat] = None): + return self._create_progress_sample(item, preferred_strategy=preferred_strategy) + + def _execute_strategy(self, strategy: DataGenStrat, traj: Dict[str, Any]) -> tuple[Dict[str, Any], str] | None: + """Execute a strategy to get processed trajectory. + + Args: + strategy: The strategy to execute + traj: The trajectory to process + + Returns: + Tuple of (processed_traj, subsample_strategy) or None if failed + """ + if strategy == DataGenStrat.FORWARD_PROGRESS: + return (traj, "subsample_forward") + elif strategy == DataGenStrat.REVERSE_PROGRESS: + return (traj, "subsample_reverse") + elif strategy == DataGenStrat.REWIND: + return (traj, "subsample_rewind") + elif strategy == DataGenStrat.DIFFERENT_TASK_INSTRUCTION: + processed_traj = self._get_different_task_instruction(traj) + if processed_traj is None: + return None + return (processed_traj, "subsample_forward") + else: + return None + + def _create_progress_sample(self, traj: Dict[str, Any], preferred_strategy: Optional[DataGenStrat] = None): + """Create a progress sample using normalized and rebalanced strategy selection. + + Implements four strategies: + 1. Different Task: Use trajectory from different task (progress set to 0.0) + 2. Forward Progress: Sample with forward direction (start < middle < end) + 3. Reverse Progress: Sample with reverse direction (end < middle < start) + 4. Rewind: Sample with rewind direction (start < end < middle) + """ + # Initialize variables for strategy selection + processed_traj = None + strategy_used = None + subsample_strategy = None + + # Strategy selection: use preferred_strategy if provided, otherwise select based on ratios + if preferred_strategy is not None: + # Use the preferred strategy directly + logger.trace(f"[PROGRESS SAMPLER] Using preferred strategy: {preferred_strategy.value}") + result = self._execute_strategy(preferred_strategy, traj) + if result is None: + logger.trace(f"[PROGRESS SAMPLER] Preferred strategy {preferred_strategy.value} failed, returning None") + return None + processed_traj, subsample_strategy = result + strategy_used = preferred_strategy + attempt = 1 # Set attempt for preferred strategy path + else: + # Strategy setup with rebalancing on failure + # [different_task_instruction, forward_progress, reverse_progress, rewind] + strategies = [ + ( + DataGenStrat.DIFFERENT_TASK_INSTRUCTION, + self.config.progress_strategy_ratio[0] if len(self.config.progress_strategy_ratio) > 0 else 0.0, + ), + ( + DataGenStrat.FORWARD_PROGRESS, + self.config.progress_strategy_ratio[1] if len(self.config.progress_strategy_ratio) > 1 else 0.0, + ), + ( + DataGenStrat.REVERSE_PROGRESS, + self.config.progress_strategy_ratio[2] if len(self.config.progress_strategy_ratio) > 2 else 0.0, + ), + ( + DataGenStrat.REWIND, + self.config.progress_strategy_ratio[3] if len(self.config.progress_strategy_ratio) > 3 else 0.0, + ), + ] + + # Remove strategies with zero probability + strategies = [(strat, prob) for strat, prob in strategies if prob > 0] + + max_attempts = 10 # Limit retry attempts to prevent infinite loops + attempt = 0 + + while processed_traj is None and attempt < max_attempts: + attempt += 1 + + # Check if we have any strategies left + if not strategies: + return None + + # Rebalance probabilities based on remaining strategies + total_prob = sum(prob for _, prob in strategies) + if total_prob == 0: + return None + + # Normalize probabilities + normalized_strategies = [(strat, prob / total_prob) for strat, prob in strategies] + + # Select strategy based on rebalanced probabilities + prob = random.random() + cumulative_prob = 0.0 + selected_strategy = None + + for strat, normalized_prob in normalized_strategies: + cumulative_prob += normalized_prob + if prob <= cumulative_prob: + selected_strategy = strat + break + + # Execute selected strategy + result = self._execute_strategy(selected_strategy, traj) + if result is not None: + processed_traj, subsample_strategy = result + strategy_used = selected_strategy + else: + # Remove failed strategy and try again + strategies = [(strat, prob) for strat, prob in strategies if strat != selected_strategy] + continue + + # If we still don't have a sample after all attempts, return None + if processed_traj is None: + logger.trace( + f"[PROGRESS SAMPLER] Failed to generate progress sample after {max_attempts} attempts - all strategies exhausted" + ) + return None + + progress_traj = self._get_traj_from_data(processed_traj, subsample_strategy=subsample_strategy) + + # Handle special cases + if strategy_used in [DataGenStrat.DIFFERENT_TASK, DataGenStrat.DIFFERENT_TASK_INSTRUCTION]: + # We need to use the original task embeddings instead of the different task embeddings + if self.config.load_embeddings and traj.get("embeddings_path"): + progress_traj.text_embedding = load_embeddings_from_path(traj["embeddings_path"])["text_embedding"] + progress_traj.lang_vector = traj["lang_vector"] + progress_traj.task = traj["task"] + progress_traj.target_progress = [0.0] * len(progress_traj.target_progress) + if self.config.progress_loss_type.lower() == "discrete": + progress_traj.target_progress = convert_continuous_to_discrete_bins( + progress_traj.target_progress, self.config.progress_discrete_bins + ) + # Also set success labels to 0.0 (predict 0 success for different task trajectories) + if progress_traj.success_label is not None: + progress_traj.success_label = [0.0] * len(progress_traj.success_label) + + strategy_value = strategy_used.value if isinstance(strategy_used, DataGenStrat) else strategy_used + sample = ProgressSample( + trajectory=progress_traj, + sample_type="progress", + data_gen_strategy=strategy_value, + ) + sample.resample_attempts = attempt + return sample diff --git a/samplers/sim.py b/samplers/sim.py new file mode 100644 index 0000000000000000000000000000000000000000..38731a6ae26ae5d981e4e17ff5f1b2a4300461be --- /dev/null +++ b/samplers/sim.py @@ -0,0 +1,420 @@ +#!/usr/bin/env python3 +from typing import Dict, List, Tuple, Optional, Union, Any + +import torch + +from rfm.data.dataset_types import SimilaritySample, Trajectory +from rfm.data.samplers.base import RFMBaseSampler +from rfm.data.datasets.helpers import DataGenStrat, convert_continuous_to_discrete_bins +from rfm.data.dataset_category import is_failure_ds, is_paired_ds +from rfm.utils.logger import get_logger, rank_0_info + +logger = get_logger() + + +class SimSampler(RFMBaseSampler): + """Data generator for producing batches for similarity scoring.""" + + def __init__(self, is_evaluation=False, **kwargs): + super().__init__(**kwargs) + self.similarity_strategy_ratio: List[float] = self.config.similarity_strategy_ratio + self._has_paired_human_robot = ( + any( + len(entry.get("robot", [])) > 0 and len(entry.get("human", [])) > 0 + for entry in self.paired_human_robot_by_task.values() + ) + if self.paired_human_robot_by_task + else False + ) + self._has_suboptimal = ( + any(len(indices) > 0 for indices in self.suboptimal_by_task.values()) if self.suboptimal_by_task else False + ) + rank_0_info( + f"[SIM SAMPLER] Has paired human/robot: {self._has_paired_human_robot}, Has suboptimal: {self._has_suboptimal}" + ) + + def _generate_sample(self, item: dict, preferred_strategy: Optional[DataGenStrat] = None): + return self._create_similarity_sample(ref_traj=item, preferred_strategy=preferred_strategy) + + def _execute_strategy( + self, strategy: DataGenStrat, ref_traj: Dict[str, Any] + ) -> tuple[Dict[str, Any], Dict[str, Any]] | None: + """Execute a strategy to get trajectory pairs. + + Args: + strategy: The strategy to execute + ref_traj: The reference trajectory + + Returns: + Tuple of (traj_sim, traj_diff) or None if failed + """ + if strategy == DataGenStrat.REWIND: + return self._get_traj_dicts_for_rewind(ref_traj) + elif strategy == DataGenStrat.SUBOPTIMAL: + return self._get_traj_dicts_for_suboptimal(ref_traj) + elif strategy == DataGenStrat.PAIRED_HUMAN_ROBOT: + return self._get_traj_dicts_for_paired_human_robot(ref_traj) + else: + return None + + def _create_similarity_sample( + self, ref_traj: Optional[Dict[str, Any]] = None, preferred_strategy: Optional[DataGenStrat] = None + ) -> SimilaritySample: + """Create a similarity scoring sample: o^1 and o^2 ranked against o^ref. + + Two modes: + 1. Rewind mode: o^1 is rewound from same task, o^2 is from different task + - here o^1 is preferred and should be ranked higher than o^2 + 2. Optimal/Suboptimal mode: o^1 is optimal/suboptimal from same task, o^2 varies + - here o^1 is preferred and should be ranked higher than o^2 + + Args: + ref_traj: Optional reference trajectory. If None, samples from optimal trajectories. + """ + # Log when similarity sampler is called + traj_id = ref_traj.get("id", "unknown") if ref_traj is not None else "sampling_new" + logger.trace(f"[SIM SAMPLER] Creating similarity sample for trajectory ID: {traj_id}") + + # Use provided reference trajectory if given; otherwise sample one + if ref_traj is None: + # Use preprocessed optimal trajectories from index maps + if not self.optimal_by_task: + return None + + # Filter out tasks with empty optimal_indices to avoid infinite loop + valid_tasks = { + task: indices + for task, indices in self.optimal_by_task.items() + if indices # Only include tasks with non-empty indices + } + + if not valid_tasks: + # No valid tasks with optimal trajectories available + return None + + # Get a random task and optimal trajectory from it + task_name = self._local_random.choice(list(valid_tasks.keys())) + optimal_indices = valid_tasks[task_name] + + # Double-check that we have valid indices (should always be true now) + if not optimal_indices: + return None + + optimal_idx = self._local_random.choice(optimal_indices) + ref_traj = self.dataset[optimal_idx] + + # Check if ref_traj is successful - if not, return None to try a different trajectory + quality_label = ref_traj.get("quality_label") + partial_success = ref_traj.get("partial_success") + use_partial_success = partial_success is not None + + if use_partial_success: + # For trajectories with partial_success, require partial_success to exist + if partial_success is None: + logger.trace( + f"[SIM SAMPLER] Ref trajectory {ref_traj.get('id', 'unknown')} missing partial_success, skipping" + ) + return None + else: + # For trajectories without partial_success, require quality_label to be "successful" + if quality_label != "successful": + logger.trace( + f"[SIM SAMPLER] Ref trajectory {ref_traj.get('id', 'unknown')} is not successful (quality_label: {quality_label}), skipping" + ) + return None + + traj_sim, traj_diff = None, None + strategy_used = None + data_source = ref_traj.get("data_source") + is_failure_source = is_failure_ds(data_source) if data_source else False + is_paired_source = is_paired_ds(data_source) if data_source else False + + # Strategy selection: use preferred_strategy if provided, otherwise select based on ratios + if preferred_strategy is not None: + # Use the preferred strategy directly + logger.trace(f"[SIM SAMPLER] Using preferred strategy: {preferred_strategy.value}") + result = self._execute_strategy(preferred_strategy, ref_traj) + if result is None: + logger.trace(f"[SIM SAMPLER] Preferred strategy {preferred_strategy.value} failed, returning None") + return None + traj_sim, traj_diff = result + strategy_used = preferred_strategy + attempt = 1 # Set attempt for preferred strategy path + else: + # Strategy selection with data_source-based filtering and boosting + strategies = [] + + # Always include REWIND if ratio > 0 + if self.similarity_strategy_ratio[0] > 0: + strategies.append((DataGenStrat.REWIND, self.similarity_strategy_ratio[0])) + + # SUBOPTIMAL: include if data_source is in failure category + if len(self.similarity_strategy_ratio) > 1 and self.similarity_strategy_ratio[1] > 0 and is_failure_source: + # Boost probability by 2x if data_source is in failure category + boosted_prob = self.similarity_strategy_ratio[1] * 2.0 + strategies.append((DataGenStrat.SUBOPTIMAL, boosted_prob)) + + # PAIRED_HUMAN_ROBOT: only include if data_source is in paired category + if ( + self._has_paired_human_robot + and len(self.similarity_strategy_ratio) > 2 + and self.similarity_strategy_ratio[2] > 0 + and is_paired_source + ): + # Boost probability by 2x if data_source is in paired category + boosted_prob = self.similarity_strategy_ratio[2] * 2.0 + strategies.append((DataGenStrat.PAIRED_HUMAN_ROBOT, boosted_prob)) + + # Remove strategies with zero probability + strategies = [(strat, prob) for strat, prob in strategies if prob > 0] + + max_attempts = 10 # Limit retry attempts to prevent infinite loops + max_strategy_attempts = 4 # Maximum attempts per strategy before removing it + attempt = 0 + + strategies_tried = [] + # Track attempts per strategy + strategy_attempt_counts = {strat: 0 for strat, _ in strategies} + + while traj_sim is None and attempt < max_attempts: + attempt += 1 + + # Check if we have any strategies left + if not strategies: + return None + + # Rebalance probabilities based on remaining strategies + total_prob = sum(prob for _, prob in strategies) + if total_prob == 0: + return None + + # Normalize probabilities + normalized_strategies = [(strat, prob / total_prob) for strat, prob in strategies] + + # Select strategy based on rebalanced probabilities + prob = self._local_random.random() + cumulative_prob = 0.0 + selected_strategy = None + + for strat, normalized_prob in normalized_strategies: + cumulative_prob += normalized_prob + if prob <= cumulative_prob: + selected_strategy = strat + strategies_tried.append(selected_strategy) + break + + # Log strategy attempt + logger.trace( + f"[SIM SAMPLER] Attempt {attempt}/{max_attempts}: Trying strategy {selected_strategy.value if selected_strategy else 'None'}" + ) + + # Execute selected strategy + result = self._execute_strategy(selected_strategy, ref_traj) + if result is not None: + traj_sim, traj_diff = result + strategy_used = selected_strategy + logger.trace(f"[SIM SAMPLER] Strategy {selected_strategy.value} succeeded on attempt {attempt}") + else: + # Strategy failed - increment attempt count + strategy_attempt_counts[selected_strategy] = strategy_attempt_counts.get(selected_strategy, 0) + 1 + failed_count = strategy_attempt_counts[selected_strategy] + + logger.trace( + f"[SIM SAMPLER] Strategy {selected_strategy.value} failed (failure count: {failed_count}/{max_strategy_attempts})" + ) + + # Only remove strategy if it has failed max_strategy_attempts times + if strategy_attempt_counts[selected_strategy] >= max_strategy_attempts: + logger.trace( + f"[SIM SAMPLER] Removing strategy {selected_strategy.value} after {max_strategy_attempts} consecutive failures" + ) + strategies = [(strat, prob) for strat, prob in strategies if strat != selected_strategy] + continue + + # If we still don't have a sample after all attempts, return None + if traj_sim is None or traj_diff is None: + logger.trace( + f"[SIM SAMPLER] Failed to generate similarity sample after {max_attempts} attempts - all strategies exhausted" + ) + return None + + # Create trajectories + ref_trajectory = self._get_traj_from_data(ref_traj) + sim_trajectory = self._get_traj_from_data(traj_sim) + diff_trajectory = self._get_traj_from_data(traj_diff) + + # Handle different task trajectories: set progress=0 and success=0 + # For SUBOPTIMAL strategy, diff is suboptimal same task (progress masked, success=0 handled automatically) + # For PAIRED_HUMAN_ROBOT strategy, diff could be suboptimal same task or different task + if strategy_used == DataGenStrat.PAIRED_HUMAN_ROBOT: + # Check if diff is from different task (compare task names) + if diff_trajectory.task != ref_traj["task"]: + # Different task: set progress=0 and success=0 + diff_trajectory.target_progress = [0.0] * len(diff_trajectory.target_progress) + if self.config.progress_loss_type.lower() == "discrete": + diff_trajectory.target_progress = convert_continuous_to_discrete_bins( + diff_trajectory.target_progress, self.config.progress_discrete_bins + ) + if diff_trajectory.success_label is not None: + diff_trajectory.success_label = [0.0] * len(diff_trajectory.success_label) + # If same task, it's suboptimal - progress will be masked by should_compute_progress, success=0 already set + + sample = SimilaritySample( + ref_trajectory=ref_trajectory, + sim_trajectory=sim_trajectory, + diff_trajectory=diff_trajectory, + data_gen_strategy=strategy_used.value, + ) + sample.resample_attempts = attempt + return sample + + def _get_traj_dicts_for_rewind(self, ref_traj: dict) -> tuple[dict | Trajectory, dict] | None: + """Get traj_sim and traj_diff for rewind strategy. + + Returns: + Tuple of (traj_sim, traj_diff) where: + - traj_sim = optimal trajectory from same task + - traj_diff = rewound trajectory + Returns None if either cannot be generated after retries. + The main strategy loop will handle retries with different strategies. + """ + max_retries = 3 # Number of retry attempts for sampling + + # Try to get optimal trajectory from same task for sim + traj_sim = None + for _ in range(max_retries): + traj_sim = self._get_same_task_optimal(ref_traj) + if traj_sim is not None: + break + + # Try to get rewound trajectory for diff + traj_diff = None + for _ in range(max_retries): + traj_diff = self._get_traj_from_data(ref_traj, subsample_strategy="subsample_rewind") + if traj_diff is not None: + break + + # Return both if successful, otherwise return None (main loop will handle retries) + if traj_sim is not None and traj_diff is not None: + return traj_sim, traj_diff + + return None + + def _get_robot_suboptimal_same_task(self, ref_traj: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """Get robot suboptimal trajectory from same task. + + Args: + ref_traj: Reference trajectory (should be human) + + Returns: + Robot suboptimal trajectory dict from same task or None if not available + """ + task_name = ref_traj["task"] + same_task_suboptimal_indices = self.suboptimal_by_task.get(task_name, []) + + if not same_task_suboptimal_indices: + logger.trace(f"[SIM SAMPLER] _get_robot_suboptimal_same_task: No suboptimal indices for task '{task_name}'") + return None + + # Filter to only robot trajectories + robot_suboptimal_indices = [idx for idx in same_task_suboptimal_indices if self._cached_is_robot[idx]] + + if not robot_suboptimal_indices: + logger.trace( + f"[SIM SAMPLER] _get_robot_suboptimal_same_task: No robot suboptimal indices for task '{task_name}'" + ) + return None + + chosen_id = ref_traj["id"] + # Filter out the reference trajectory if it somehow appears + filtered_indices = [idx for idx in robot_suboptimal_indices if self._cached_ids[idx] != chosen_id] + + if not filtered_indices: + logger.trace( + f"[SIM SAMPLER] _get_robot_suboptimal_same_task: All robot suboptimal trajectories have same ID '{chosen_id}' for task '{task_name}'" + ) + return None + + selected_idx = self._local_random.choice(filtered_indices) + result = self.dataset[selected_idx] + logger.trace( + f"[SIM SAMPLER] _get_robot_suboptimal_same_task: Found robot suboptimal trajectory {result.get('id', 'unknown')} for task '{task_name}'" + ) + return result + + def _get_traj_dicts_for_paired_human_robot( + self, ref_traj: Dict[str, Any] + ) -> Optional[Tuple[Dict[str, Any], Union[Dict[str, Any], Trajectory]]]: + """Get traj_sim and traj_diff for paired human/robot strategy. + + Args: + ref_traj: Reference trajectory (should be human successful) + + Returns: + Tuple of (traj_sim, traj_diff) or None if not available. Both can be dict or Trajectory objects. + traj_sim is robot successful same task (progress: yes, success: yes) + traj_diff is robot suboptimal same task OR different task + - If suboptimal same task: progress masked, success=0 + - If different task: progress=0, success=0 + """ + max_retries = 3 # Number of retry attempts for sampling + + # Get robot successful trajectory from same task for sim + traj_sim = None + for _ in range(max_retries): + traj_sim = self._get_paired_human_robot_traj(ref_traj) + if traj_sim is not None: + break + + # 50/50 random choice between robot suboptimal same task and different task + traj_diff = None + for _ in range(max_retries): + # Randomly choose between robot suboptimal same task and different task + if self._local_random.random() < 0.5: + # Try robot suboptimal same task + traj_diff = self._get_robot_suboptimal_same_task(ref_traj) + else: + # Try different task + traj_diff = self._get_different_video_traj(ref_traj) + + if traj_diff is not None: + break + + if traj_sim is not None and traj_diff is not None: + return traj_sim, traj_diff + + return None + + def _get_traj_dicts_for_suboptimal( + self, ref_traj: Dict[str, Any] + ) -> Optional[Tuple[Dict[str, Any], Union[Dict[str, Any], Trajectory]]]: + """Get traj_sim and traj_diff for suboptimal strategy. + + Args: + ref_traj: Reference trajectory (must be successful) + + Returns: + Tuple of (traj_sim, traj_diff) or None if not available. Both can be dict or Trajectory objects. + traj_sim is an optimal trajectory from same task (progress: yes, success: yes) + traj_diff is a suboptimal trajectory from same task (progress masked, success=0) + """ + max_retries = 3 # Number of retry attempts for sampling + + # Get optimal trajectory from same task for sim + traj_sim = None + for _ in range(max_retries): + traj_sim = self._get_same_task_optimal(ref_traj) + if traj_sim is not None: + break + + # Get suboptimal trajectory from same task for diff + traj_diff = None + for _ in range(max_retries): + traj_diff = self._get_same_task_suboptimal(ref_traj) + if traj_diff is not None: + break + + if traj_sim is not None and traj_diff is not None: + return traj_sim, traj_diff + + return None