Spaces:
Running
Running
File size: 17,333 Bytes
3e462dd 88e2e89 3e462dd 88e2e89 3e462dd 88e2e89 3e462dd 88e2e89 3e462dd | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 | #!/usr/bin/env python3
"""
PrefSampler class for producing batches of preference data.
"""
from typing import Dict, List, Optional, Any
import random
from robometer.data.dataset_types import PreferenceSample, Trajectory
from robometer.data.samplers.base import RBMBaseSampler
from robometer.data.datasets.helpers import (
DataGenStrat,
convert_continuous_to_discrete_bins,
)
from robometer.utils.logger import get_logger, rank_0_info, trace
from robometer.utils.timer import timer
logger = get_logger()
class PrefSampler(RBMBaseSampler):
"""Data generator for producing batches of preference prediction data."""
def __init__(self, is_evaluation=False, **kwargs):
super().__init__(**kwargs)
self.dataset_preference_ratio = self.config.dataset_preference_ratio
self.preference_strategy_ratio: List[float] = self.config.preference_strategy_ratio
self._has_suboptimal = (
any(len(indices) > 0 for indices in self.suboptimal_by_task.values()) if self.suboptimal_by_task else False
)
rank_0_info(f"[PREF SAMPLER] Has suboptimal: {self._has_suboptimal}")
# Initialize preference dataset
self._load_preference_dataset()
def _generate_sample(self, item: dict, preferred_strategy: Optional[DataGenStrat] = None):
"""Generate a preference sample from an item.
If the item has a non-successful quality label, it will be used as the rejected
trajectory and an optimal trajectory from the same task will be found as the chosen one.
Otherwise, normal preference sampling logic is used.
Args:
item: The trajectory item
preferred_strategy: Optional strategy to use (if None, will select strategy based on ratios)
"""
quality_label = item["quality_label"]
use_partial_success = item.get("partial_success") is not None
# Handle non-successful trajectories: use as rejected, find optimal from same task as chosen
# skip this for trajectories with partial_success which we will handle with partial success logic
if quality_label != "successful" and not use_partial_success:
traj_id = item["id"]
task_name = item["task"]
logger.trace(
f"[PREF SAMPLER] Non-successful quality detected for ID={traj_id}, using as rejected trajectory, task={task_name}"
)
# Find optimal trajectories from the same task
same_task_optimal_indices = self.optimal_by_task.get(task_name, [])
if not same_task_optimal_indices:
logger.trace(
f"[PREF SAMPLER] No optimal trajectories found for task '{task_name}', falling through to normal sampling"
)
return self._create_pref_sample(item, preferred_strategy=preferred_strategy)
# Select a random optimal trajectory from the same task as chosen
chosen_idx = random.choice(same_task_optimal_indices)
chosen_traj_dict = self.dataset[chosen_idx]
chosen_trajectory = self._get_traj_from_data(chosen_traj_dict)
rejected_trajectory = self._get_traj_from_data(item)
sample = PreferenceSample(
chosen_trajectory=chosen_trajectory,
rejected_trajectory=rejected_trajectory,
data_gen_strategy=DataGenStrat.SUBOPTIMAL.value,
)
logger.trace(
f"[PREF SAMPLER] Created preference sample for non-successful traj ID={traj_id} with optimal traj from same task"
)
return sample
return self._create_pref_sample(item, preferred_strategy=preferred_strategy)
def _execute_strategy(
self, strategy: DataGenStrat, chosen_traj: Dict[str, Any], use_partial_success: bool
) -> tuple[Dict[str, Any], str, Dict[str, Any]] | None:
"""Execute a strategy to get rejected trajectory.
Args:
strategy: The strategy to execute
chosen_traj: The chosen trajectory
use_partial_success: Whether this trajectory uses partial_success
Returns:
Tuple of (rejected_traj, rejected_subsample_strategy, chosen_traj) or None if failed
Note: chosen_traj may be swapped with rejected_traj for partial_success trajectories
"""
max_retries = 3
rejected_subsample_strategy = None
rejected_traj = None
if strategy == DataGenStrat.REWIND:
rejected_traj = chosen_traj.copy()
rejected_subsample_strategy = "subsample_rewind"
elif strategy == DataGenStrat.SUBOPTIMAL:
for _ in range(max_retries):
rejected_traj = self._get_same_task_suboptimal(chosen_traj)
if rejected_traj is not None:
# For trajectories with partial_success, if the returned trajectory has higher partial_success, swap them
if use_partial_success:
chosen_partial_success = chosen_traj.get("partial_success")
rejected_partial_success = rejected_traj.get("partial_success")
if rejected_partial_success is not None and chosen_partial_success is not None:
if rejected_partial_success > chosen_partial_success:
logger.trace(
f"[PREF SAMPLER] Swapping trajectories: found higher partial_success "
f"({rejected_partial_success} > {chosen_partial_success})"
)
rejected_traj, chosen_traj = chosen_traj, rejected_traj
break
rejected_subsample_strategy = "subsample_forward"
elif strategy == DataGenStrat.DIFFERENT_TASK:
for _ in range(max_retries):
rejected_traj = self._get_different_video_traj(chosen_traj)
if rejected_traj is not None:
break
rejected_subsample_strategy = "subsample_forward"
elif strategy == DataGenStrat.REVERSE_PROGRESS:
rejected_traj = chosen_traj.copy()
rejected_subsample_strategy = "subsample_reverse"
else:
return None
if rejected_traj is None:
return None
return (rejected_traj, rejected_subsample_strategy, chosen_traj)
def _create_pref_sample_from_dataset(self) -> PreferenceSample:
"""Create a preference sample from the loaded preference dataset."""
if not self.preferences:
return None
# For now, return a simple preference sample
# This can be enhanced later when we have actual preference data
random.choice(self.preferences)
# This is a placeholder - would need to be implemented based on actual preference data structure
return None
def _load_preference_dataset(self):
"""Load the preference dataset from disk or hub if provided."""
self.preferences = []
# For now, we'll use empty preferences since the config structure has changed
# This can be updated later if needed
rank_0_info("[PREF SAMPLER] No preference dataset provided, will use random sampling for preferences")
return
def _create_preference_sample(self) -> PreferenceSample:
"""Create a preference prediction sample: chosen vs rejected where chosen is preferred.
Either from dataset or from generated trajectories.
Returns:
PreferenceSample: A preference sample with chosen (preferred) vs rejected
(suboptimal) trajectories and associated metadata
"""
with timer("create_preference_sample", verbose=False):
if random.random() < self.dataset_preference_ratio and self.preferences:
# Use preference trajectories from dataset
return self._create_pref_sample_from_dataset()
else:
return self._create_pref_sample()
def _create_pref_sample(
self, chosen_traj: Optional[Dict[str, Any]] = None, preferred_strategy: Optional[DataGenStrat] = None
) -> PreferenceSample:
"""Create a preference prediction sample using various rejected trajectory generation strategies.
Rewind Same Task
- Creates a suboptimal trajectory by rewinding the chosen trajectory
Suboptimal/Failure Same Task
- Uses existing suboptimal/failure trajectories from the same task
Different Task
- Uses trajectories from completely different tasks
Returns:
PreferenceSample: A preference sample with chosen (preferred) vs rejected
(suboptimal) trajectories and associated metadata
Raises:
ValueError: If no chosen trajectories are available for preference generation
RuntimeError: If all strategies fail and fallback rewind also fails
"""
# Log when preference sampler is called
traj_id = chosen_traj["id"] if chosen_traj is not None else "sampling_new"
logger.trace(f"[PREF SAMPLER] Creating preference sample for trajectory ID: {traj_id}")
# Use provided chosen trajectory if given; otherwise sample one
if chosen_traj is None:
# Use preprocessed chosen trajectories from index maps
if not self.optimal_by_task:
return None
# Filter out tasks with empty optimal_indices to avoid infinite loop
valid_tasks = {
task: indices
for task, indices in self.optimal_by_task.items()
if indices # Only include tasks with non-empty indices
}
if not valid_tasks:
# No valid tasks with optimal trajectories available
return None
# Get a random task and chosen trajectory from it
task_name = random.choice(list(valid_tasks.keys()))
optimal_indices = valid_tasks[task_name]
# Double-check that we have valid indices (should always be true now)
if not optimal_indices:
return None
chosen_idx = random.choice(optimal_indices)
chosen_traj = self.dataset[chosen_idx]
# Initialize variables for strategy selection
rejected_traj = None
strategy_used = None
rejected_subsample_strategy = None
# Check if this trajectory uses partial_success
use_partial_success = chosen_traj.get("partial_success") is not None
if use_partial_success:
partial_success = chosen_traj.get("partial_success")
logger.trace(
f"[PREF SAMPLER] Trajectory with partial_success detected (ID: {chosen_traj.get('id', 'unknown')}, partial_success: {partial_success})"
)
# Strategy selection: use preferred_strategy if provided, otherwise select based on ratios
if preferred_strategy is not None:
# Use the preferred strategy directly
logger.trace(f"[PREF SAMPLER] Using preferred strategy: {preferred_strategy.value}")
result = self._execute_strategy(preferred_strategy, chosen_traj, use_partial_success)
if result is None:
logger.trace(f"[PREF SAMPLER] Preferred strategy {preferred_strategy.value} failed, returning None")
return None
rejected_traj, rejected_subsample_strategy, chosen_traj = result
strategy_used = preferred_strategy
attempt = 1 # Set attempt for preferred strategy path
else:
# Strategy selection with rebalancing on failure
strategies = []
if self.preference_strategy_ratio[0] > 0:
strategies.append((DataGenStrat.REWIND, self.preference_strategy_ratio[0]))
if self._has_suboptimal and self.preference_strategy_ratio[1] > 0:
strategies.append((DataGenStrat.SUBOPTIMAL, self.preference_strategy_ratio[1]))
if self.preference_strategy_ratio[2] > 0:
strategies.append((DataGenStrat.DIFFERENT_TASK, self.preference_strategy_ratio[2]))
if self.preference_strategy_ratio[3] > 0:
strategies.append((DataGenStrat.REVERSE_PROGRESS, self.preference_strategy_ratio[3]))
max_attempts = 10 # Limit retry attempts to prevent infinite loops
max_strategy_attempts = 3 # Maximum attempts per strategy before removing it
attempt = 0
# Track attempts per strategy
strategy_attempt_counts = {strat: 0 for strat, _ in strategies}
while rejected_traj is None and attempt < max_attempts:
attempt += 1
# Check if we have any strategies left
if not strategies:
return None
# Rebalance probabilities based on remaining strategies
total_prob = sum(prob for _, prob in strategies)
if total_prob == 0:
return None
# Normalize probabilities
normalized_strategies = [(strat, prob / total_prob) for strat, prob in strategies]
# Select strategy based on rebalanced probabilities
prob = random.random()
cumulative_prob = 0.0
selected_strategy = None
for strat, normalized_prob in normalized_strategies:
cumulative_prob += normalized_prob
if prob <= cumulative_prob:
selected_strategy = strat
break
# Log strategy attempt
logger.trace(
f"[PREF SAMPLER] Attempt {attempt}/{max_attempts}: Trying strategy {selected_strategy.value if selected_strategy else 'None'}"
)
# Execute selected strategy
result = self._execute_strategy(selected_strategy, chosen_traj, use_partial_success)
if result is not None:
rejected_traj, rejected_subsample_strategy, chosen_traj = result
strategy_used = selected_strategy
logger.trace(f"[PREF SAMPLER] Strategy {selected_strategy.value} succeeded on attempt {attempt}")
else:
# Strategy failed - increment attempt count
strategy_attempt_counts[selected_strategy] = strategy_attempt_counts.get(selected_strategy, 0) + 1
failed_count = strategy_attempt_counts[selected_strategy]
logger.trace(
f"[PREF SAMPLER] Strategy {selected_strategy.value} failed (failure count: {failed_count}/{max_strategy_attempts})"
)
# Only remove strategy if it has failed max_strategy_attempts times
if strategy_attempt_counts[selected_strategy] >= max_strategy_attempts:
logger.trace(
f"[PREF SAMPLER] Removing strategy {selected_strategy.value} after {max_strategy_attempts} consecutive failures"
)
strategies = [(strat, prob) for strat, prob in strategies if strat != selected_strategy]
continue
# If we still don't have a sample after all attempts, return None
if rejected_traj is None:
logger.trace(
f"[PREF SAMPLER] Failed to generate preference sample after {max_attempts} attempts - all strategies exhausted"
)
return None
chosen_subsample_strategy = "subsample_forward"
chosen_trajectory = self._get_traj_from_data(chosen_traj, subsample_strategy=chosen_subsample_strategy)
rejected_trajectory = self._get_traj_from_data(rejected_traj, subsample_strategy=rejected_subsample_strategy)
if rejected_trajectory is None or chosen_trajectory is None:
return None
# If our strategy is different task, make sure the rejected trajectory has 0 progress and 0 success labels
if strategy_used in [
DataGenStrat.DIFFERENT_TASK,
DataGenStrat.DIFFERENT_TASK_INSTRUCTION,
]:
rejected_trajectory.target_progress = [0.0] * len(rejected_trajectory.target_progress)
if self.config.progress_loss_type.lower() == "discrete":
rejected_trajectory.target_progress = convert_continuous_to_discrete_bins(
rejected_trajectory.target_progress, self.config.progress_discrete_bins
)
# Also set success labels to 0.0 (predict 0 success for different task trajectories)
if rejected_trajectory.success_label is not None:
rejected_trajectory.success_label = [0.0] * len(rejected_trajectory.success_label)
# Create preference sample structure
sample = PreferenceSample(
chosen_trajectory=chosen_trajectory,
rejected_trajectory=rejected_trajectory,
data_gen_strategy=strategy_used.value,
)
sample.resample_attempts = attempt
return sample
|