#!/usr/bin/env python3 """ PrefSampler class for producing batches of preference data. """ from typing import Dict, List, Optional, Any import random from robometer.data.dataset_types import PreferenceSample, Trajectory from robometer.data.samplers.base import RBMBaseSampler from robometer.data.datasets.helpers import ( DataGenStrat, convert_continuous_to_discrete_bins, ) from robometer.utils.logger import get_logger, rank_0_info, trace from robometer.utils.timer import timer logger = get_logger() class PrefSampler(RBMBaseSampler): """Data generator for producing batches of preference prediction data.""" def __init__(self, is_evaluation=False, **kwargs): super().__init__(**kwargs) self.dataset_preference_ratio = self.config.dataset_preference_ratio self.preference_strategy_ratio: List[float] = self.config.preference_strategy_ratio self._has_suboptimal = ( any(len(indices) > 0 for indices in self.suboptimal_by_task.values()) if self.suboptimal_by_task else False ) rank_0_info(f"[PREF SAMPLER] Has suboptimal: {self._has_suboptimal}") # Initialize preference dataset self._load_preference_dataset() def _generate_sample(self, item: dict, preferred_strategy: Optional[DataGenStrat] = None): """Generate a preference sample from an item. If the item has a non-successful quality label, it will be used as the rejected trajectory and an optimal trajectory from the same task will be found as the chosen one. Otherwise, normal preference sampling logic is used. Args: item: The trajectory item preferred_strategy: Optional strategy to use (if None, will select strategy based on ratios) """ quality_label = item["quality_label"] use_partial_success = item.get("partial_success") is not None # Handle non-successful trajectories: use as rejected, find optimal from same task as chosen # skip this for trajectories with partial_success which we will handle with partial success logic if quality_label != "successful" and not use_partial_success: traj_id = item["id"] task_name = item["task"] logger.trace( f"[PREF SAMPLER] Non-successful quality detected for ID={traj_id}, using as rejected trajectory, task={task_name}" ) # Find optimal trajectories from the same task same_task_optimal_indices = self.optimal_by_task.get(task_name, []) if not same_task_optimal_indices: logger.trace( f"[PREF SAMPLER] No optimal trajectories found for task '{task_name}', falling through to normal sampling" ) return self._create_pref_sample(item, preferred_strategy=preferred_strategy) # Select a random optimal trajectory from the same task as chosen chosen_idx = random.choice(same_task_optimal_indices) chosen_traj_dict = self.dataset[chosen_idx] chosen_trajectory = self._get_traj_from_data(chosen_traj_dict) rejected_trajectory = self._get_traj_from_data(item) sample = PreferenceSample( chosen_trajectory=chosen_trajectory, rejected_trajectory=rejected_trajectory, data_gen_strategy=DataGenStrat.SUBOPTIMAL.value, ) logger.trace( f"[PREF SAMPLER] Created preference sample for non-successful traj ID={traj_id} with optimal traj from same task" ) return sample return self._create_pref_sample(item, preferred_strategy=preferred_strategy) def _execute_strategy( self, strategy: DataGenStrat, chosen_traj: Dict[str, Any], use_partial_success: bool ) -> tuple[Dict[str, Any], str, Dict[str, Any]] | None: """Execute a strategy to get rejected trajectory. Args: strategy: The strategy to execute chosen_traj: The chosen trajectory use_partial_success: Whether this trajectory uses partial_success Returns: Tuple of (rejected_traj, rejected_subsample_strategy, chosen_traj) or None if failed Note: chosen_traj may be swapped with rejected_traj for partial_success trajectories """ max_retries = 3 rejected_subsample_strategy = None rejected_traj = None if strategy == DataGenStrat.REWIND: rejected_traj = chosen_traj.copy() rejected_subsample_strategy = "subsample_rewind" elif strategy == DataGenStrat.SUBOPTIMAL: for _ in range(max_retries): rejected_traj = self._get_same_task_suboptimal(chosen_traj) if rejected_traj is not None: # For trajectories with partial_success, if the returned trajectory has higher partial_success, swap them if use_partial_success: chosen_partial_success = chosen_traj.get("partial_success") rejected_partial_success = rejected_traj.get("partial_success") if rejected_partial_success is not None and chosen_partial_success is not None: if rejected_partial_success > chosen_partial_success: logger.trace( f"[PREF SAMPLER] Swapping trajectories: found higher partial_success " f"({rejected_partial_success} > {chosen_partial_success})" ) rejected_traj, chosen_traj = chosen_traj, rejected_traj break rejected_subsample_strategy = "subsample_forward" elif strategy == DataGenStrat.DIFFERENT_TASK: for _ in range(max_retries): rejected_traj = self._get_different_video_traj(chosen_traj) if rejected_traj is not None: break rejected_subsample_strategy = "subsample_forward" elif strategy == DataGenStrat.REVERSE_PROGRESS: rejected_traj = chosen_traj.copy() rejected_subsample_strategy = "subsample_reverse" else: return None if rejected_traj is None: return None return (rejected_traj, rejected_subsample_strategy, chosen_traj) def _create_pref_sample_from_dataset(self) -> PreferenceSample: """Create a preference sample from the loaded preference dataset.""" if not self.preferences: return None # For now, return a simple preference sample # This can be enhanced later when we have actual preference data random.choice(self.preferences) # This is a placeholder - would need to be implemented based on actual preference data structure return None def _load_preference_dataset(self): """Load the preference dataset from disk or hub if provided.""" self.preferences = [] # For now, we'll use empty preferences since the config structure has changed # This can be updated later if needed rank_0_info("[PREF SAMPLER] No preference dataset provided, will use random sampling for preferences") return def _create_preference_sample(self) -> PreferenceSample: """Create a preference prediction sample: chosen vs rejected where chosen is preferred. Either from dataset or from generated trajectories. Returns: PreferenceSample: A preference sample with chosen (preferred) vs rejected (suboptimal) trajectories and associated metadata """ with timer("create_preference_sample", verbose=False): if random.random() < self.dataset_preference_ratio and self.preferences: # Use preference trajectories from dataset return self._create_pref_sample_from_dataset() else: return self._create_pref_sample() def _create_pref_sample( self, chosen_traj: Optional[Dict[str, Any]] = None, preferred_strategy: Optional[DataGenStrat] = None ) -> PreferenceSample: """Create a preference prediction sample using various rejected trajectory generation strategies. Rewind Same Task - Creates a suboptimal trajectory by rewinding the chosen trajectory Suboptimal/Failure Same Task - Uses existing suboptimal/failure trajectories from the same task Different Task - Uses trajectories from completely different tasks Returns: PreferenceSample: A preference sample with chosen (preferred) vs rejected (suboptimal) trajectories and associated metadata Raises: ValueError: If no chosen trajectories are available for preference generation RuntimeError: If all strategies fail and fallback rewind also fails """ # Log when preference sampler is called traj_id = chosen_traj["id"] if chosen_traj is not None else "sampling_new" logger.trace(f"[PREF SAMPLER] Creating preference sample for trajectory ID: {traj_id}") # Use provided chosen trajectory if given; otherwise sample one if chosen_traj is None: # Use preprocessed chosen trajectories from index maps if not self.optimal_by_task: return None # Filter out tasks with empty optimal_indices to avoid infinite loop valid_tasks = { task: indices for task, indices in self.optimal_by_task.items() if indices # Only include tasks with non-empty indices } if not valid_tasks: # No valid tasks with optimal trajectories available return None # Get a random task and chosen trajectory from it task_name = random.choice(list(valid_tasks.keys())) optimal_indices = valid_tasks[task_name] # Double-check that we have valid indices (should always be true now) if not optimal_indices: return None chosen_idx = random.choice(optimal_indices) chosen_traj = self.dataset[chosen_idx] # Initialize variables for strategy selection rejected_traj = None strategy_used = None rejected_subsample_strategy = None # Check if this trajectory uses partial_success use_partial_success = chosen_traj.get("partial_success") is not None if use_partial_success: partial_success = chosen_traj.get("partial_success") logger.trace( f"[PREF SAMPLER] Trajectory with partial_success detected (ID: {chosen_traj.get('id', 'unknown')}, partial_success: {partial_success})" ) # Strategy selection: use preferred_strategy if provided, otherwise select based on ratios if preferred_strategy is not None: # Use the preferred strategy directly logger.trace(f"[PREF SAMPLER] Using preferred strategy: {preferred_strategy.value}") result = self._execute_strategy(preferred_strategy, chosen_traj, use_partial_success) if result is None: logger.trace(f"[PREF SAMPLER] Preferred strategy {preferred_strategy.value} failed, returning None") return None rejected_traj, rejected_subsample_strategy, chosen_traj = result strategy_used = preferred_strategy attempt = 1 # Set attempt for preferred strategy path else: # Strategy selection with rebalancing on failure strategies = [] if self.preference_strategy_ratio[0] > 0: strategies.append((DataGenStrat.REWIND, self.preference_strategy_ratio[0])) if self._has_suboptimal and self.preference_strategy_ratio[1] > 0: strategies.append((DataGenStrat.SUBOPTIMAL, self.preference_strategy_ratio[1])) if self.preference_strategy_ratio[2] > 0: strategies.append((DataGenStrat.DIFFERENT_TASK, self.preference_strategy_ratio[2])) if self.preference_strategy_ratio[3] > 0: strategies.append((DataGenStrat.REVERSE_PROGRESS, self.preference_strategy_ratio[3])) max_attempts = 10 # Limit retry attempts to prevent infinite loops max_strategy_attempts = 3 # Maximum attempts per strategy before removing it attempt = 0 # Track attempts per strategy strategy_attempt_counts = {strat: 0 for strat, _ in strategies} while rejected_traj is None and attempt < max_attempts: attempt += 1 # Check if we have any strategies left if not strategies: return None # Rebalance probabilities based on remaining strategies total_prob = sum(prob for _, prob in strategies) if total_prob == 0: return None # Normalize probabilities normalized_strategies = [(strat, prob / total_prob) for strat, prob in strategies] # Select strategy based on rebalanced probabilities prob = random.random() cumulative_prob = 0.0 selected_strategy = None for strat, normalized_prob in normalized_strategies: cumulative_prob += normalized_prob if prob <= cumulative_prob: selected_strategy = strat break # Log strategy attempt logger.trace( f"[PREF SAMPLER] Attempt {attempt}/{max_attempts}: Trying strategy {selected_strategy.value if selected_strategy else 'None'}" ) # Execute selected strategy result = self._execute_strategy(selected_strategy, chosen_traj, use_partial_success) if result is not None: rejected_traj, rejected_subsample_strategy, chosen_traj = result strategy_used = selected_strategy logger.trace(f"[PREF SAMPLER] Strategy {selected_strategy.value} succeeded on attempt {attempt}") else: # Strategy failed - increment attempt count strategy_attempt_counts[selected_strategy] = strategy_attempt_counts.get(selected_strategy, 0) + 1 failed_count = strategy_attempt_counts[selected_strategy] logger.trace( f"[PREF SAMPLER] Strategy {selected_strategy.value} failed (failure count: {failed_count}/{max_strategy_attempts})" ) # Only remove strategy if it has failed max_strategy_attempts times if strategy_attempt_counts[selected_strategy] >= max_strategy_attempts: logger.trace( f"[PREF SAMPLER] Removing strategy {selected_strategy.value} after {max_strategy_attempts} consecutive failures" ) strategies = [(strat, prob) for strat, prob in strategies if strat != selected_strategy] continue # If we still don't have a sample after all attempts, return None if rejected_traj is None: logger.trace( f"[PREF SAMPLER] Failed to generate preference sample after {max_attempts} attempts - all strategies exhausted" ) return None chosen_subsample_strategy = "subsample_forward" chosen_trajectory = self._get_traj_from_data(chosen_traj, subsample_strategy=chosen_subsample_strategy) rejected_trajectory = self._get_traj_from_data(rejected_traj, subsample_strategy=rejected_subsample_strategy) if rejected_trajectory is None or chosen_trajectory is None: return None # If our strategy is different task, make sure the rejected trajectory has 0 progress and 0 success labels if strategy_used in [ DataGenStrat.DIFFERENT_TASK, DataGenStrat.DIFFERENT_TASK_INSTRUCTION, ]: rejected_trajectory.target_progress = [0.0] * len(rejected_trajectory.target_progress) if self.config.progress_loss_type.lower() == "discrete": rejected_trajectory.target_progress = convert_continuous_to_discrete_bins( rejected_trajectory.target_progress, self.config.progress_discrete_bins ) # Also set success labels to 0.0 (predict 0 success for different task trajectories) if rejected_trajectory.success_label is not None: rejected_trajectory.success_label = [0.0] * len(rejected_trajectory.success_label) # Create preference sample structure sample = PreferenceSample( chosen_trajectory=chosen_trajectory, rejected_trajectory=rejected_trajectory, data_gen_strategy=strategy_used.value, ) sample.resample_attempts = attempt return sample