File size: 17,333 Bytes
3e462dd
 
 
 
 
 
 
 
 
88e2e89
 
 
3e462dd
 
 
88e2e89
 
3e462dd
 
 
 
88e2e89
3e462dd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88e2e89
 
 
3e462dd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
#!/usr/bin/env python3
"""
PrefSampler class for producing batches of preference data.
"""

from typing import Dict, List, Optional, Any

import random

from robometer.data.dataset_types import PreferenceSample, Trajectory
from robometer.data.samplers.base import RBMBaseSampler
from robometer.data.datasets.helpers import (
    DataGenStrat,
    convert_continuous_to_discrete_bins,
)
from robometer.utils.logger import get_logger, rank_0_info, trace
from robometer.utils.timer import timer

logger = get_logger()


class PrefSampler(RBMBaseSampler):
    """Data generator for producing batches of preference prediction data."""

    def __init__(self, is_evaluation=False, **kwargs):
        super().__init__(**kwargs)

        self.dataset_preference_ratio = self.config.dataset_preference_ratio
        self.preference_strategy_ratio: List[float] = self.config.preference_strategy_ratio
        self._has_suboptimal = (
            any(len(indices) > 0 for indices in self.suboptimal_by_task.values()) if self.suboptimal_by_task else False
        )
        rank_0_info(f"[PREF SAMPLER] Has suboptimal: {self._has_suboptimal}")

        # Initialize preference dataset
        self._load_preference_dataset()

    def _generate_sample(self, item: dict, preferred_strategy: Optional[DataGenStrat] = None):
        """Generate a preference sample from an item.

        If the item has a non-successful quality label, it will be used as the rejected
        trajectory and an optimal trajectory from the same task will be found as the chosen one.
        Otherwise, normal preference sampling logic is used.

        Args:
            item: The trajectory item
            preferred_strategy: Optional strategy to use (if None, will select strategy based on ratios)
        """
        quality_label = item["quality_label"]
        use_partial_success = item.get("partial_success") is not None

        # Handle non-successful trajectories: use as rejected, find optimal from same task as chosen
        # skip this for trajectories with partial_success which we will handle with partial success logic
        if quality_label != "successful" and not use_partial_success:
            traj_id = item["id"]
            task_name = item["task"]

            logger.trace(
                f"[PREF SAMPLER] Non-successful quality detected for ID={traj_id}, using as rejected trajectory, task={task_name}"
            )

            # Find optimal trajectories from the same task
            same_task_optimal_indices = self.optimal_by_task.get(task_name, [])

            if not same_task_optimal_indices:
                logger.trace(
                    f"[PREF SAMPLER] No optimal trajectories found for task '{task_name}', falling through to normal sampling"
                )
                return self._create_pref_sample(item, preferred_strategy=preferred_strategy)

            # Select a random optimal trajectory from the same task as chosen
            chosen_idx = random.choice(same_task_optimal_indices)
            chosen_traj_dict = self.dataset[chosen_idx]

            chosen_trajectory = self._get_traj_from_data(chosen_traj_dict)
            rejected_trajectory = self._get_traj_from_data(item)

            sample = PreferenceSample(
                chosen_trajectory=chosen_trajectory,
                rejected_trajectory=rejected_trajectory,
                data_gen_strategy=DataGenStrat.SUBOPTIMAL.value,
            )

            logger.trace(
                f"[PREF SAMPLER] Created preference sample for non-successful traj ID={traj_id} with optimal traj from same task"
            )
            return sample

        return self._create_pref_sample(item, preferred_strategy=preferred_strategy)

    def _execute_strategy(
        self, strategy: DataGenStrat, chosen_traj: Dict[str, Any], use_partial_success: bool
    ) -> tuple[Dict[str, Any], str, Dict[str, Any]] | None:
        """Execute a strategy to get rejected trajectory.

        Args:
            strategy: The strategy to execute
            chosen_traj: The chosen trajectory
            use_partial_success: Whether this trajectory uses partial_success

        Returns:
            Tuple of (rejected_traj, rejected_subsample_strategy, chosen_traj) or None if failed
            Note: chosen_traj may be swapped with rejected_traj for partial_success trajectories
        """
        max_retries = 3
        rejected_subsample_strategy = None
        rejected_traj = None

        if strategy == DataGenStrat.REWIND:
            rejected_traj = chosen_traj.copy()
            rejected_subsample_strategy = "subsample_rewind"
        elif strategy == DataGenStrat.SUBOPTIMAL:
            for _ in range(max_retries):
                rejected_traj = self._get_same_task_suboptimal(chosen_traj)
                if rejected_traj is not None:
                    # For trajectories with partial_success, if the returned trajectory has higher partial_success, swap them
                    if use_partial_success:
                        chosen_partial_success = chosen_traj.get("partial_success")
                        rejected_partial_success = rejected_traj.get("partial_success")
                        if rejected_partial_success is not None and chosen_partial_success is not None:
                            if rejected_partial_success > chosen_partial_success:
                                logger.trace(
                                    f"[PREF SAMPLER] Swapping trajectories: found higher partial_success "
                                    f"({rejected_partial_success} > {chosen_partial_success})"
                                )
                                rejected_traj, chosen_traj = chosen_traj, rejected_traj
                    break
            rejected_subsample_strategy = "subsample_forward"
        elif strategy == DataGenStrat.DIFFERENT_TASK:
            for _ in range(max_retries):
                rejected_traj = self._get_different_video_traj(chosen_traj)
                if rejected_traj is not None:
                    break
            rejected_subsample_strategy = "subsample_forward"
        elif strategy == DataGenStrat.REVERSE_PROGRESS:
            rejected_traj = chosen_traj.copy()
            rejected_subsample_strategy = "subsample_reverse"
        else:
            return None

        if rejected_traj is None:
            return None

        return (rejected_traj, rejected_subsample_strategy, chosen_traj)

    def _create_pref_sample_from_dataset(self) -> PreferenceSample:
        """Create a preference sample from the loaded preference dataset."""
        if not self.preferences:
            return None

        # For now, return a simple preference sample
        # This can be enhanced later when we have actual preference data
        random.choice(self.preferences)

        # This is a placeholder - would need to be implemented based on actual preference data structure
        return None

    def _load_preference_dataset(self):
        """Load the preference dataset from disk or hub if provided."""
        self.preferences = []

        # For now, we'll use empty preferences since the config structure has changed
        # This can be updated later if needed
        rank_0_info("[PREF SAMPLER] No preference dataset provided, will use random sampling for preferences")
        return

    def _create_preference_sample(self) -> PreferenceSample:
        """Create a preference prediction sample: chosen vs rejected where chosen is preferred.
        Either from dataset or from generated trajectories.

        Returns:
            PreferenceSample: A preference sample with chosen (preferred) vs rejected
            (suboptimal) trajectories and associated metadata
        """

        with timer("create_preference_sample", verbose=False):
            if random.random() < self.dataset_preference_ratio and self.preferences:
                # Use preference trajectories from dataset
                return self._create_pref_sample_from_dataset()
            else:
                return self._create_pref_sample()

    def _create_pref_sample(
        self, chosen_traj: Optional[Dict[str, Any]] = None, preferred_strategy: Optional[DataGenStrat] = None
    ) -> PreferenceSample:
        """Create a preference prediction sample using various rejected trajectory generation strategies.

        Rewind Same Task
        - Creates a suboptimal trajectory by rewinding the chosen trajectory

        Suboptimal/Failure Same Task
        - Uses existing suboptimal/failure trajectories from the same task

        Different Task
        - Uses trajectories from completely different tasks

        Returns:
            PreferenceSample: A preference sample with chosen (preferred) vs rejected
            (suboptimal) trajectories and associated metadata

        Raises:
            ValueError: If no chosen trajectories are available for preference generation
            RuntimeError: If all strategies fail and fallback rewind also fails
        """
        # Log when preference sampler is called
        traj_id = chosen_traj["id"] if chosen_traj is not None else "sampling_new"
        logger.trace(f"[PREF SAMPLER] Creating preference sample for trajectory ID: {traj_id}")

        # Use provided chosen trajectory if given; otherwise sample one
        if chosen_traj is None:
            # Use preprocessed chosen trajectories from index maps
            if not self.optimal_by_task:
                return None

            # Filter out tasks with empty optimal_indices to avoid infinite loop
            valid_tasks = {
                task: indices
                for task, indices in self.optimal_by_task.items()
                if indices  # Only include tasks with non-empty indices
            }

            if not valid_tasks:
                # No valid tasks with optimal trajectories available
                return None

            # Get a random task and chosen trajectory from it
            task_name = random.choice(list(valid_tasks.keys()))
            optimal_indices = valid_tasks[task_name]

            # Double-check that we have valid indices (should always be true now)
            if not optimal_indices:
                return None

            chosen_idx = random.choice(optimal_indices)
            chosen_traj = self.dataset[chosen_idx]

        # Initialize variables for strategy selection
        rejected_traj = None
        strategy_used = None
        rejected_subsample_strategy = None

        # Check if this trajectory uses partial_success
        use_partial_success = chosen_traj.get("partial_success") is not None
        if use_partial_success:
            partial_success = chosen_traj.get("partial_success")
            logger.trace(
                f"[PREF SAMPLER] Trajectory with partial_success detected (ID: {chosen_traj.get('id', 'unknown')}, partial_success: {partial_success})"
            )

        # Strategy selection: use preferred_strategy if provided, otherwise select based on ratios
        if preferred_strategy is not None:
            # Use the preferred strategy directly
            logger.trace(f"[PREF SAMPLER] Using preferred strategy: {preferred_strategy.value}")
            result = self._execute_strategy(preferred_strategy, chosen_traj, use_partial_success)
            if result is None:
                logger.trace(f"[PREF SAMPLER] Preferred strategy {preferred_strategy.value} failed, returning None")
                return None
            rejected_traj, rejected_subsample_strategy, chosen_traj = result
            strategy_used = preferred_strategy
            attempt = 1  # Set attempt for preferred strategy path
        else:
            # Strategy selection with rebalancing on failure
            strategies = []
            if self.preference_strategy_ratio[0] > 0:
                strategies.append((DataGenStrat.REWIND, self.preference_strategy_ratio[0]))
            if self._has_suboptimal and self.preference_strategy_ratio[1] > 0:
                strategies.append((DataGenStrat.SUBOPTIMAL, self.preference_strategy_ratio[1]))
            if self.preference_strategy_ratio[2] > 0:
                strategies.append((DataGenStrat.DIFFERENT_TASK, self.preference_strategy_ratio[2]))
            if self.preference_strategy_ratio[3] > 0:
                strategies.append((DataGenStrat.REVERSE_PROGRESS, self.preference_strategy_ratio[3]))

            max_attempts = 10  # Limit retry attempts to prevent infinite loops
            max_strategy_attempts = 3  # Maximum attempts per strategy before removing it
            attempt = 0

            # Track attempts per strategy
            strategy_attempt_counts = {strat: 0 for strat, _ in strategies}

            while rejected_traj is None and attempt < max_attempts:
                attempt += 1

                # Check if we have any strategies left
                if not strategies:
                    return None

                # Rebalance probabilities based on remaining strategies
                total_prob = sum(prob for _, prob in strategies)
                if total_prob == 0:
                    return None

                # Normalize probabilities
                normalized_strategies = [(strat, prob / total_prob) for strat, prob in strategies]

                # Select strategy based on rebalanced probabilities
                prob = random.random()
                cumulative_prob = 0.0
                selected_strategy = None

                for strat, normalized_prob in normalized_strategies:
                    cumulative_prob += normalized_prob
                    if prob <= cumulative_prob:
                        selected_strategy = strat
                        break

                # Log strategy attempt
                logger.trace(
                    f"[PREF SAMPLER] Attempt {attempt}/{max_attempts}: Trying strategy {selected_strategy.value if selected_strategy else 'None'}"
                )

                # Execute selected strategy
                result = self._execute_strategy(selected_strategy, chosen_traj, use_partial_success)
                if result is not None:
                    rejected_traj, rejected_subsample_strategy, chosen_traj = result
                    strategy_used = selected_strategy
                    logger.trace(f"[PREF SAMPLER] Strategy {selected_strategy.value} succeeded on attempt {attempt}")
                else:
                    # Strategy failed - increment attempt count
                    strategy_attempt_counts[selected_strategy] = strategy_attempt_counts.get(selected_strategy, 0) + 1
                    failed_count = strategy_attempt_counts[selected_strategy]

                    logger.trace(
                        f"[PREF SAMPLER] Strategy {selected_strategy.value} failed (failure count: {failed_count}/{max_strategy_attempts})"
                    )

                    # Only remove strategy if it has failed max_strategy_attempts times
                    if strategy_attempt_counts[selected_strategy] >= max_strategy_attempts:
                        logger.trace(
                            f"[PREF SAMPLER] Removing strategy {selected_strategy.value} after {max_strategy_attempts} consecutive failures"
                        )
                        strategies = [(strat, prob) for strat, prob in strategies if strat != selected_strategy]
                        continue

            # If we still don't have a sample after all attempts, return None
            if rejected_traj is None:
                logger.trace(
                    f"[PREF SAMPLER] Failed to generate preference sample after {max_attempts} attempts - all strategies exhausted"
                )
                return None

        chosen_subsample_strategy = "subsample_forward"
        chosen_trajectory = self._get_traj_from_data(chosen_traj, subsample_strategy=chosen_subsample_strategy)

        rejected_trajectory = self._get_traj_from_data(rejected_traj, subsample_strategy=rejected_subsample_strategy)

        if rejected_trajectory is None or chosen_trajectory is None:
            return None

        # If our strategy is different task, make sure the rejected trajectory has 0 progress and 0 success labels
        if strategy_used in [
            DataGenStrat.DIFFERENT_TASK,
            DataGenStrat.DIFFERENT_TASK_INSTRUCTION,
        ]:
            rejected_trajectory.target_progress = [0.0] * len(rejected_trajectory.target_progress)
            if self.config.progress_loss_type.lower() == "discrete":
                rejected_trajectory.target_progress = convert_continuous_to_discrete_bins(
                    rejected_trajectory.target_progress, self.config.progress_discrete_bins
                )
            # Also set success labels to 0.0 (predict 0 success for different task trajectories)
            if rejected_trajectory.success_label is not None:
                rejected_trajectory.success_label = [0.0] * len(rejected_trajectory.success_label)

        # Create preference sample structure
        sample = PreferenceSample(
            chosen_trajectory=chosen_trajectory,
            rejected_trajectory=rejected_trajectory,
            data_gen_strategy=strategy_used.value,
        )
        sample.resample_attempts = attempt
        return sample