rewardeval_ui / samplers /eval /similarity_score.py
Anthony Liang
getting rid of dependencies
3e462dd
#!/usr/bin/env python3
"""
Data generator for similarity score evaluation.
This generator creates similarity samples for evaluation:
- For each paired human-robot trajectory pair (same task), creates similarity samples
- For each pairing, samples N negative trajectories from other tasks
- Creates similarity samples: ref=human, traj_sim=robot (same task), traj_diff=negative (different task)
"""
from typing import Dict, List, Any
from rfm.data.dataset_types import SimilaritySample, Trajectory
from rfm.data.samplers.base import RFMBaseSampler
from rfm.utils.distributed import rank_0_print
class SimilarityScoreSampler(RFMBaseSampler):
"""
Data generator that creates similarity samples for evaluation.
For each paired human-robot trajectory pair (same task):
- Creates similarity samples with ref=human, traj_sim=robot, traj_diff=negative (from different task)
- Samples N negative trajectories from other tasks for each pairing
"""
def __init__(self, num_negatives: int = 2, **kwargs):
super().__init__(**kwargs)
self.num_negatives = num_negatives
self.sample_indices = self._generate_all_sample_indices()
rank_0_print(
f"Generated {len(self.sample_indices)} similarity score sample indices from {len(self.paired_human_robot_by_task)} tasks",
verbose=self.verbose,
)
def _generate_all_sample_indices(self) -> List[Dict[str, Any]]:
"""Generate all possible similarity score sample indices."""
sample_indices = []
# Iterate through all tasks with paired human-robot data
for task, paired_info in self.paired_human_robot_by_task.items():
human_indices = paired_info["human"]
robot_indices = paired_info["robot"]
if not human_indices or not robot_indices:
continue
# Get all tasks except the current one for negative sampling
other_tasks = [t for t in self.task_indices.keys() if t != task]
if not other_tasks:
continue
# Limit number of human/robot trajectories considered per task to reduce combinatorics
if len(human_indices) > 2:
selected_humans = self._local_random.sample(human_indices, 2)
else:
selected_humans = human_indices
if len(robot_indices) > 2:
selected_robots = self._local_random.sample(robot_indices, 2)
else:
selected_robots = robot_indices
# For each selected human-robot pair, create N samples (one per negative)
for human_idx in selected_humans:
for robot_idx in selected_robots:
# Sample N negative tasks (with replacement if needed)
negative_tasks = self._local_random.choices(other_tasks, k=self.num_negatives)
# Create one sample index entry per negative
for negative_task in negative_tasks:
negative_task_indices = self.task_indices.get(negative_task, [])
if not negative_task_indices:
continue
# Store the negative task, we'll sample a specific negative during generation
sample_indices.append({
"human_idx": human_idx,
"robot_idx": robot_idx,
"task": task,
"negative_task": negative_task,
"negative_task_indices": negative_task_indices,
})
return sample_indices
def _generate_sample_from_indices(self, sample_idx_info: dict) -> SimilaritySample:
"""Generate a single similarity sample from stored indices."""
human_idx = sample_idx_info["human_idx"]
robot_idx = sample_idx_info["robot_idx"]
task = sample_idx_info["task"]
negative_task = sample_idx_info["negative_task"]
negative_task_indices = sample_idx_info["negative_task_indices"]
# Get human and robot trajectories
human_traj = self.dataset[human_idx]
robot_traj = self.dataset[robot_idx]
# Sample a negative trajectory from the specified different task
if not negative_task_indices:
return None
negative_idx = self._local_random.choice(negative_task_indices)
negative_traj = self.dataset[negative_idx]
# Create trajectories for the similarity sample
ref_traj = self._create_trajectory_from_data(human_traj)
sim_traj = self._create_trajectory_from_data(robot_traj)
diff_traj = self._create_trajectory_from_data(negative_traj)
# Create metadata
metadata = {
"task": task,
"negative_task": negative_task,
"human_id": human_traj["id"],
"robot_id": robot_traj["id"],
"negative_id": negative_traj["id"],
}
# Add metadata to trajectories
if ref_traj.metadata is None:
ref_traj.metadata = {}
ref_traj.metadata.update(metadata)
sample = SimilaritySample(
ref_trajectory=ref_traj,
sim_trajectory=sim_traj,
diff_trajectory=diff_traj,
data_gen_strategy="similarity_score_eval",
)
return sample
def _create_trajectory_from_data(self, traj_data: dict) -> Trajectory:
"""Create a Trajectory object from dataset entry."""
metadata = {
"data_gen_strategy": "similarity_score_eval",
}
trajectory = self._get_traj_from_data(
traj=traj_data,
metadata=metadata,
)
return trajectory
def __len__(self):
return len(self.sample_indices)
def __getitem__(self, idx):
return self._generate_sample_from_indices(self.sample_indices[idx])