Anthony Liang
updates
88e2e89
#!/usr/bin/env python3
"""
PrefSampler class for producing batches of preference data.
"""
from typing import Dict, List, Optional, Any
import random
from robometer.data.dataset_types import PreferenceSample, Trajectory
from robometer.data.samplers.base import RBMBaseSampler
from robometer.data.datasets.helpers import (
DataGenStrat,
convert_continuous_to_discrete_bins,
)
from robometer.utils.logger import get_logger, rank_0_info, trace
from robometer.utils.timer import timer
logger = get_logger()
class PrefSampler(RBMBaseSampler):
"""Data generator for producing batches of preference prediction data."""
def __init__(self, is_evaluation=False, **kwargs):
super().__init__(**kwargs)
self.dataset_preference_ratio = self.config.dataset_preference_ratio
self.preference_strategy_ratio: List[float] = self.config.preference_strategy_ratio
self._has_suboptimal = (
any(len(indices) > 0 for indices in self.suboptimal_by_task.values()) if self.suboptimal_by_task else False
)
rank_0_info(f"[PREF SAMPLER] Has suboptimal: {self._has_suboptimal}")
# Initialize preference dataset
self._load_preference_dataset()
def _generate_sample(self, item: dict, preferred_strategy: Optional[DataGenStrat] = None):
"""Generate a preference sample from an item.
If the item has a non-successful quality label, it will be used as the rejected
trajectory and an optimal trajectory from the same task will be found as the chosen one.
Otherwise, normal preference sampling logic is used.
Args:
item: The trajectory item
preferred_strategy: Optional strategy to use (if None, will select strategy based on ratios)
"""
quality_label = item["quality_label"]
use_partial_success = item.get("partial_success") is not None
# Handle non-successful trajectories: use as rejected, find optimal from same task as chosen
# skip this for trajectories with partial_success which we will handle with partial success logic
if quality_label != "successful" and not use_partial_success:
traj_id = item["id"]
task_name = item["task"]
logger.trace(
f"[PREF SAMPLER] Non-successful quality detected for ID={traj_id}, using as rejected trajectory, task={task_name}"
)
# Find optimal trajectories from the same task
same_task_optimal_indices = self.optimal_by_task.get(task_name, [])
if not same_task_optimal_indices:
logger.trace(
f"[PREF SAMPLER] No optimal trajectories found for task '{task_name}', falling through to normal sampling"
)
return self._create_pref_sample(item, preferred_strategy=preferred_strategy)
# Select a random optimal trajectory from the same task as chosen
chosen_idx = random.choice(same_task_optimal_indices)
chosen_traj_dict = self.dataset[chosen_idx]
chosen_trajectory = self._get_traj_from_data(chosen_traj_dict)
rejected_trajectory = self._get_traj_from_data(item)
sample = PreferenceSample(
chosen_trajectory=chosen_trajectory,
rejected_trajectory=rejected_trajectory,
data_gen_strategy=DataGenStrat.SUBOPTIMAL.value,
)
logger.trace(
f"[PREF SAMPLER] Created preference sample for non-successful traj ID={traj_id} with optimal traj from same task"
)
return sample
return self._create_pref_sample(item, preferred_strategy=preferred_strategy)
def _execute_strategy(
self, strategy: DataGenStrat, chosen_traj: Dict[str, Any], use_partial_success: bool
) -> tuple[Dict[str, Any], str, Dict[str, Any]] | None:
"""Execute a strategy to get rejected trajectory.
Args:
strategy: The strategy to execute
chosen_traj: The chosen trajectory
use_partial_success: Whether this trajectory uses partial_success
Returns:
Tuple of (rejected_traj, rejected_subsample_strategy, chosen_traj) or None if failed
Note: chosen_traj may be swapped with rejected_traj for partial_success trajectories
"""
max_retries = 3
rejected_subsample_strategy = None
rejected_traj = None
if strategy == DataGenStrat.REWIND:
rejected_traj = chosen_traj.copy()
rejected_subsample_strategy = "subsample_rewind"
elif strategy == DataGenStrat.SUBOPTIMAL:
for _ in range(max_retries):
rejected_traj = self._get_same_task_suboptimal(chosen_traj)
if rejected_traj is not None:
# For trajectories with partial_success, if the returned trajectory has higher partial_success, swap them
if use_partial_success:
chosen_partial_success = chosen_traj.get("partial_success")
rejected_partial_success = rejected_traj.get("partial_success")
if rejected_partial_success is not None and chosen_partial_success is not None:
if rejected_partial_success > chosen_partial_success:
logger.trace(
f"[PREF SAMPLER] Swapping trajectories: found higher partial_success "
f"({rejected_partial_success} > {chosen_partial_success})"
)
rejected_traj, chosen_traj = chosen_traj, rejected_traj
break
rejected_subsample_strategy = "subsample_forward"
elif strategy == DataGenStrat.DIFFERENT_TASK:
for _ in range(max_retries):
rejected_traj = self._get_different_video_traj(chosen_traj)
if rejected_traj is not None:
break
rejected_subsample_strategy = "subsample_forward"
elif strategy == DataGenStrat.REVERSE_PROGRESS:
rejected_traj = chosen_traj.copy()
rejected_subsample_strategy = "subsample_reverse"
else:
return None
if rejected_traj is None:
return None
return (rejected_traj, rejected_subsample_strategy, chosen_traj)
def _create_pref_sample_from_dataset(self) -> PreferenceSample:
"""Create a preference sample from the loaded preference dataset."""
if not self.preferences:
return None
# For now, return a simple preference sample
# This can be enhanced later when we have actual preference data
random.choice(self.preferences)
# This is a placeholder - would need to be implemented based on actual preference data structure
return None
def _load_preference_dataset(self):
"""Load the preference dataset from disk or hub if provided."""
self.preferences = []
# For now, we'll use empty preferences since the config structure has changed
# This can be updated later if needed
rank_0_info("[PREF SAMPLER] No preference dataset provided, will use random sampling for preferences")
return
def _create_preference_sample(self) -> PreferenceSample:
"""Create a preference prediction sample: chosen vs rejected where chosen is preferred.
Either from dataset or from generated trajectories.
Returns:
PreferenceSample: A preference sample with chosen (preferred) vs rejected
(suboptimal) trajectories and associated metadata
"""
with timer("create_preference_sample", verbose=False):
if random.random() < self.dataset_preference_ratio and self.preferences:
# Use preference trajectories from dataset
return self._create_pref_sample_from_dataset()
else:
return self._create_pref_sample()
def _create_pref_sample(
self, chosen_traj: Optional[Dict[str, Any]] = None, preferred_strategy: Optional[DataGenStrat] = None
) -> PreferenceSample:
"""Create a preference prediction sample using various rejected trajectory generation strategies.
Rewind Same Task
- Creates a suboptimal trajectory by rewinding the chosen trajectory
Suboptimal/Failure Same Task
- Uses existing suboptimal/failure trajectories from the same task
Different Task
- Uses trajectories from completely different tasks
Returns:
PreferenceSample: A preference sample with chosen (preferred) vs rejected
(suboptimal) trajectories and associated metadata
Raises:
ValueError: If no chosen trajectories are available for preference generation
RuntimeError: If all strategies fail and fallback rewind also fails
"""
# Log when preference sampler is called
traj_id = chosen_traj["id"] if chosen_traj is not None else "sampling_new"
logger.trace(f"[PREF SAMPLER] Creating preference sample for trajectory ID: {traj_id}")
# Use provided chosen trajectory if given; otherwise sample one
if chosen_traj is None:
# Use preprocessed chosen trajectories from index maps
if not self.optimal_by_task:
return None
# Filter out tasks with empty optimal_indices to avoid infinite loop
valid_tasks = {
task: indices
for task, indices in self.optimal_by_task.items()
if indices # Only include tasks with non-empty indices
}
if not valid_tasks:
# No valid tasks with optimal trajectories available
return None
# Get a random task and chosen trajectory from it
task_name = random.choice(list(valid_tasks.keys()))
optimal_indices = valid_tasks[task_name]
# Double-check that we have valid indices (should always be true now)
if not optimal_indices:
return None
chosen_idx = random.choice(optimal_indices)
chosen_traj = self.dataset[chosen_idx]
# Initialize variables for strategy selection
rejected_traj = None
strategy_used = None
rejected_subsample_strategy = None
# Check if this trajectory uses partial_success
use_partial_success = chosen_traj.get("partial_success") is not None
if use_partial_success:
partial_success = chosen_traj.get("partial_success")
logger.trace(
f"[PREF SAMPLER] Trajectory with partial_success detected (ID: {chosen_traj.get('id', 'unknown')}, partial_success: {partial_success})"
)
# Strategy selection: use preferred_strategy if provided, otherwise select based on ratios
if preferred_strategy is not None:
# Use the preferred strategy directly
logger.trace(f"[PREF SAMPLER] Using preferred strategy: {preferred_strategy.value}")
result = self._execute_strategy(preferred_strategy, chosen_traj, use_partial_success)
if result is None:
logger.trace(f"[PREF SAMPLER] Preferred strategy {preferred_strategy.value} failed, returning None")
return None
rejected_traj, rejected_subsample_strategy, chosen_traj = result
strategy_used = preferred_strategy
attempt = 1 # Set attempt for preferred strategy path
else:
# Strategy selection with rebalancing on failure
strategies = []
if self.preference_strategy_ratio[0] > 0:
strategies.append((DataGenStrat.REWIND, self.preference_strategy_ratio[0]))
if self._has_suboptimal and self.preference_strategy_ratio[1] > 0:
strategies.append((DataGenStrat.SUBOPTIMAL, self.preference_strategy_ratio[1]))
if self.preference_strategy_ratio[2] > 0:
strategies.append((DataGenStrat.DIFFERENT_TASK, self.preference_strategy_ratio[2]))
if self.preference_strategy_ratio[3] > 0:
strategies.append((DataGenStrat.REVERSE_PROGRESS, self.preference_strategy_ratio[3]))
max_attempts = 10 # Limit retry attempts to prevent infinite loops
max_strategy_attempts = 3 # Maximum attempts per strategy before removing it
attempt = 0
# Track attempts per strategy
strategy_attempt_counts = {strat: 0 for strat, _ in strategies}
while rejected_traj is None and attempt < max_attempts:
attempt += 1
# Check if we have any strategies left
if not strategies:
return None
# Rebalance probabilities based on remaining strategies
total_prob = sum(prob for _, prob in strategies)
if total_prob == 0:
return None
# Normalize probabilities
normalized_strategies = [(strat, prob / total_prob) for strat, prob in strategies]
# Select strategy based on rebalanced probabilities
prob = random.random()
cumulative_prob = 0.0
selected_strategy = None
for strat, normalized_prob in normalized_strategies:
cumulative_prob += normalized_prob
if prob <= cumulative_prob:
selected_strategy = strat
break
# Log strategy attempt
logger.trace(
f"[PREF SAMPLER] Attempt {attempt}/{max_attempts}: Trying strategy {selected_strategy.value if selected_strategy else 'None'}"
)
# Execute selected strategy
result = self._execute_strategy(selected_strategy, chosen_traj, use_partial_success)
if result is not None:
rejected_traj, rejected_subsample_strategy, chosen_traj = result
strategy_used = selected_strategy
logger.trace(f"[PREF SAMPLER] Strategy {selected_strategy.value} succeeded on attempt {attempt}")
else:
# Strategy failed - increment attempt count
strategy_attempt_counts[selected_strategy] = strategy_attempt_counts.get(selected_strategy, 0) + 1
failed_count = strategy_attempt_counts[selected_strategy]
logger.trace(
f"[PREF SAMPLER] Strategy {selected_strategy.value} failed (failure count: {failed_count}/{max_strategy_attempts})"
)
# Only remove strategy if it has failed max_strategy_attempts times
if strategy_attempt_counts[selected_strategy] >= max_strategy_attempts:
logger.trace(
f"[PREF SAMPLER] Removing strategy {selected_strategy.value} after {max_strategy_attempts} consecutive failures"
)
strategies = [(strat, prob) for strat, prob in strategies if strat != selected_strategy]
continue
# If we still don't have a sample after all attempts, return None
if rejected_traj is None:
logger.trace(
f"[PREF SAMPLER] Failed to generate preference sample after {max_attempts} attempts - all strategies exhausted"
)
return None
chosen_subsample_strategy = "subsample_forward"
chosen_trajectory = self._get_traj_from_data(chosen_traj, subsample_strategy=chosen_subsample_strategy)
rejected_trajectory = self._get_traj_from_data(rejected_traj, subsample_strategy=rejected_subsample_strategy)
if rejected_trajectory is None or chosen_trajectory is None:
return None
# If our strategy is different task, make sure the rejected trajectory has 0 progress and 0 success labels
if strategy_used in [
DataGenStrat.DIFFERENT_TASK,
DataGenStrat.DIFFERENT_TASK_INSTRUCTION,
]:
rejected_trajectory.target_progress = [0.0] * len(rejected_trajectory.target_progress)
if self.config.progress_loss_type.lower() == "discrete":
rejected_trajectory.target_progress = convert_continuous_to_discrete_bins(
rejected_trajectory.target_progress, self.config.progress_discrete_bins
)
# Also set success labels to 0.0 (predict 0 success for different task trajectories)
if rejected_trajectory.success_label is not None:
rejected_trajectory.success_label = [0.0] * len(rejected_trajectory.success_label)
# Create preference sample structure
sample = PreferenceSample(
chosen_trajectory=chosen_trajectory,
rejected_trajectory=rejected_trajectory,
data_gen_strategy=strategy_used.value,
)
sample.resample_attempts = attempt
return sample