Spaces:
Sleeping
Sleeping
| """ | |
| Base reward infrastructure β data classes, calculators, and transforms. | |
| Merged from the shared repo-level modules into a self-contained file: | |
| - Episode-level: RewardCalculator (custom mode) | |
| - Per-step: StepRewardTransform + OpenEnvRewardCalculator (openenv mode) | |
| Scoring formula (both modes): | |
| total = 0.25 * quality/structural + 0.15 * efficiency + 0.60 * ground_truth + penalty | |
| Usage: | |
| from rewards.base import RewardCalculator, Scenario, EpisodeLog | |
| calculator = RewardCalculator() | |
| breakdown = calculator.calculate(episode, scenario, outcome_results) | |
| """ | |
| from dataclasses import dataclass, field | |
| from typing import Any, Dict, List, Optional, Set | |
| from openenv.core.env_server.interfaces import Transform | |
| from openenv.core.env_server.mcp_types import CallToolObservation | |
| from openenv.core.env_server.types import Observation | |
| # ββ Data Classes ββ | |
| class StepLog: | |
| """Record of a single tool call made by the agent.""" | |
| tool_name: str | |
| arguments: Dict[str, Any] | |
| success: bool | |
| result: Any = None | |
| error: Optional[str] = None | |
| timestamp: Optional[str] = None | |
| elapsed: float = 0.0 | |
| class EpisodeLog: | |
| """Record of all tool calls in one episode.""" | |
| steps: List[StepLog] = field(default_factory=list) | |
| def add_step( | |
| self, | |
| tool_name: str, | |
| arguments: Dict[str, Any], | |
| success: bool, | |
| result: Any = None, | |
| error: Optional[str] = None, | |
| timestamp: Optional[str] = None, | |
| elapsed: float = 0.0, | |
| ) -> None: | |
| self.steps.append( | |
| StepLog( | |
| tool_name=tool_name, | |
| arguments=arguments, | |
| success=success, | |
| result=result, | |
| error=error, | |
| timestamp=timestamp, | |
| elapsed=elapsed, | |
| ) | |
| ) | |
| def tools_used(self) -> List[str]: | |
| return [s.tool_name for s in self.steps] | |
| def tools_used_set(self) -> Set[str]: | |
| return set(self.tools_used) | |
| class Scenario: | |
| """Definition of a task for the agent.""" | |
| id: str | |
| prompt: str | |
| expected_tools: List[str] | |
| max_steps: int | |
| outcome_checks: List[Dict[str, Any]] | |
| class RewardBreakdown: | |
| """Detailed reward breakdown β useful for debugging and logging.""" | |
| structural: float = 0.0 | |
| ground_truth: float = 0.0 | |
| efficiency: float = 0.0 | |
| penalty: float = 0.0 | |
| total: float = 0.0 | |
| details: Dict[str, Any] = field(default_factory=dict) | |
| def summary(self) -> str: | |
| mode = self.details.get("reward_mode", "custom") | |
| qual_label = "Quality" if mode == "openenv" else "Structural" | |
| lines = [ | |
| f" {qual_label + ':':14s}{self.structural:.2f} (weight 0.25)", | |
| f" Efficiency: {self.efficiency:.2f} (weight 0.15)", | |
| f" Ground Truth: {self.ground_truth:.2f} (weight 0.60)", | |
| ] | |
| if self.penalty < 0: | |
| lines.append(f" Penalty: {self.penalty:.2f} (hallucination)") | |
| lines.append(f" ββββββββββββββββββββββββ") | |
| lines.append(f" TOTAL: {self.total:.2f}") | |
| return "\n".join(lines) | |
| # ββ Episode-Level Reward Calculator (custom mode) ββ | |
| class RewardCalculator: | |
| """ | |
| Computes episode-level reward from logs + scenario + verification results. | |
| Weights: structural (0.25), ground_truth (0.60), efficiency (0.15). | |
| """ | |
| def __init__( | |
| self, | |
| w_structural: float = 0.25, | |
| w_ground_truth: float = 0.60, | |
| w_efficiency: float = 0.15, | |
| ): | |
| self.w_structural = w_structural | |
| self.w_ground_truth = w_ground_truth | |
| self.w_efficiency = w_efficiency | |
| def calculate( | |
| self, | |
| episode: EpisodeLog, | |
| scenario: Scenario, | |
| outcome_results: List[float], | |
| ) -> RewardBreakdown: | |
| breakdown = RewardBreakdown() | |
| breakdown.structural = self._structural_score(episode, scenario) | |
| breakdown.ground_truth = self._ground_truth_score(outcome_results) | |
| breakdown.efficiency = self._efficiency_score(episode, scenario) | |
| breakdown.penalty = self._hallucination_penalty(episode, outcome_results) | |
| breakdown.total = ( | |
| self.w_structural * breakdown.structural | |
| + self.w_ground_truth * breakdown.ground_truth | |
| + self.w_efficiency * breakdown.efficiency | |
| + breakdown.penalty | |
| ) | |
| breakdown.total = max(-1.0, min(1.0, breakdown.total)) | |
| breakdown.details = { | |
| "tools_expected": scenario.expected_tools, | |
| "tools_used": episode.tools_used, | |
| "outcome_checks_score_sum": sum(outcome_results), | |
| "outcome_checks_total": len(outcome_results), | |
| "outcome_checks_avg": sum(outcome_results) / len(outcome_results) if outcome_results else 0.0, | |
| "steps_taken": len(episode.steps), | |
| "max_steps": scenario.max_steps, | |
| } | |
| return breakdown | |
| def _structural_score(self, episode: EpisodeLog, scenario: Scenario) -> float: | |
| if not episode.steps: | |
| return 0.0 | |
| expected = set(scenario.expected_tools) | |
| used = episode.tools_used_set | |
| intersection = expected & used | |
| precision = len(intersection) / len(used) if used else 0.0 | |
| recall = len(intersection) / len(expected) if expected else 0.0 | |
| f1 = ( | |
| 2 * precision * recall / (precision + recall) | |
| if (precision + recall) > 0 | |
| else 0.0 | |
| ) | |
| success_rate = sum(1 for s in episode.steps if s.success) / len(episode.steps) | |
| unexpected_calls = sum( | |
| 1 for s in episode.steps if s.tool_name not in expected | |
| ) | |
| unexpected_ratio = unexpected_calls / len(episode.steps) | |
| return max(0.0, 0.6 * f1 + 0.4 * success_rate - unexpected_ratio * 0.3) | |
| def _ground_truth_score(self, outcome_results: List[float]) -> float: | |
| if not outcome_results: | |
| return 0.0 | |
| return sum(outcome_results) / len(outcome_results) | |
| def _efficiency_score(self, episode: EpisodeLog, scenario: Scenario) -> float: | |
| if not episode.steps: | |
| return 0.0 | |
| return max(0.0, 1.0 - len(episode.steps) / scenario.max_steps) | |
| def _hallucination_penalty( | |
| self, episode: EpisodeLog, outcome_results: List[float] | |
| ) -> float: | |
| if not episode.steps or not outcome_results: | |
| return 0.0 | |
| all_calls_succeeded = all(s.success for s in episode.steps) | |
| pass_rate = sum(outcome_results) / len(outcome_results) | |
| if all_calls_succeeded and pass_rate == 0.0: | |
| return -0.5 | |
| if all_calls_succeeded and pass_rate < 0.3: | |
| return -0.2 | |
| return 0.0 | |
| # ββ Per-Step Reward Transform (openenv mode) ββ | |
| class StepRewardTransform(Transform): | |
| """ | |
| Gym-agnostic per-step reward transform. | |
| Sets observation.reward based on tool call success/failure. | |
| Subclass for gym-specific logic (see transforms.py). | |
| """ | |
| def __call__(self, observation: Observation) -> Observation: | |
| reward = self._compute_reward(observation) | |
| observation.reward = reward | |
| return observation | |
| def _compute_reward(self, observation: Observation) -> float: | |
| if isinstance(observation, CallToolObservation): | |
| if observation.error is not None: | |
| return -0.5 | |
| return 1.0 | |
| return 0.0 | |
| class OpenEnvRewardCalculator: | |
| """ | |
| Combines per-step transform rewards with ground truth verification. | |
| Used as the alternative to RewardCalculator when --reward-mode openenv. | |
| Quality is sign-based: only the sign of per-step rewards matters | |
| (positive = productive, negative = harmful, zero = neutral). | |
| """ | |
| def __init__( | |
| self, | |
| w_quality: float = 0.25, | |
| w_efficiency: float = 0.15, | |
| w_ground_truth: float = 0.60, | |
| ): | |
| self.w_quality = w_quality | |
| self.w_efficiency = w_efficiency | |
| self.w_ground_truth = w_ground_truth | |
| def calculate( | |
| self, | |
| step_rewards: List[float], | |
| outcome_results: List[bool], | |
| max_steps: int = 0, | |
| actual_steps: int = 0, | |
| ) -> RewardBreakdown: | |
| productive = sum(1 for r in step_rewards if r > 0) | |
| harmful = sum(1 for r in step_rewards if r < 0) | |
| active = productive + harmful | |
| quality = productive / active if active > 0 else 0.0 | |
| if max_steps > 0 and actual_steps > 0: | |
| efficiency = max(0.0, 1.0 - actual_steps / max_steps) | |
| else: | |
| efficiency = 0.0 | |
| gt_score = sum(outcome_results) / len(outcome_results) if outcome_results else 0.0 | |
| penalty = 0.0 | |
| if step_rewards and outcome_results: | |
| no_harmful = all(r >= 0 for r in step_rewards) | |
| if no_harmful and gt_score == 0.0: | |
| penalty = -0.5 | |
| elif no_harmful and gt_score < 0.3: | |
| penalty = -0.2 | |
| total = ( | |
| self.w_quality * quality | |
| + self.w_efficiency * efficiency | |
| + self.w_ground_truth * gt_score | |
| + penalty | |
| ) | |
| total = max(-1.0, min(1.0, total)) | |
| return RewardBreakdown( | |
| structural=quality, | |
| ground_truth=gt_score, | |
| efficiency=efficiency, | |
| penalty=penalty, | |
| total=total, | |
| details={ | |
| "reward_mode": "openenv", | |
| "productive_steps": productive, | |
| "harmful_steps": harmful, | |
| "neutral_steps": len(step_rewards) - active, | |
| "actual_steps": actual_steps, | |
| "max_steps": max_steps, | |
| }, | |
| ) | |