""" Base reward infrastructure — data classes, calculators, and transforms. Merged from the shared repo-level modules into a self-contained file: - Episode-level: RewardCalculator (custom mode) - Per-step: StepRewardTransform + OpenEnvRewardCalculator (openenv mode) Scoring formula (both modes): total = 0.25 * quality/structural + 0.15 * efficiency + 0.60 * ground_truth + penalty Usage: from rewards.base import RewardCalculator, Scenario, EpisodeLog calculator = RewardCalculator() breakdown = calculator.calculate(episode, scenario, outcome_results) """ from dataclasses import dataclass, field from typing import Any, Dict, List, Optional, Set from openenv.core.env_server.interfaces import Transform from openenv.core.env_server.mcp_types import CallToolObservation from openenv.core.env_server.types import Observation # ── Data Classes ── @dataclass class StepLog: """Record of a single tool call made by the agent.""" tool_name: str arguments: Dict[str, Any] success: bool result: Any = None error: Optional[str] = None timestamp: Optional[str] = None elapsed: float = 0.0 @dataclass class EpisodeLog: """Record of all tool calls in one episode.""" steps: List[StepLog] = field(default_factory=list) def add_step( self, tool_name: str, arguments: Dict[str, Any], success: bool, result: Any = None, error: Optional[str] = None, timestamp: Optional[str] = None, elapsed: float = 0.0, ) -> None: self.steps.append( StepLog( tool_name=tool_name, arguments=arguments, success=success, result=result, error=error, timestamp=timestamp, elapsed=elapsed, ) ) @property def tools_used(self) -> List[str]: return [s.tool_name for s in self.steps] @property def tools_used_set(self) -> Set[str]: return set(self.tools_used) @dataclass class Scenario: """Definition of a task for the agent.""" id: str prompt: str expected_tools: List[str] max_steps: int outcome_checks: List[Dict[str, Any]] @dataclass class RewardBreakdown: """Detailed reward breakdown — useful for debugging and logging.""" structural: float = 0.0 ground_truth: float = 0.0 efficiency: float = 0.0 penalty: float = 0.0 total: float = 0.0 details: Dict[str, Any] = field(default_factory=dict) def summary(self) -> str: mode = self.details.get("reward_mode", "custom") qual_label = "Quality" if mode == "openenv" else "Structural" lines = [ f" {qual_label + ':':14s}{self.structural:.2f} (weight 0.25)", f" Efficiency: {self.efficiency:.2f} (weight 0.15)", f" Ground Truth: {self.ground_truth:.2f} (weight 0.60)", ] if self.penalty < 0: lines.append(f" Penalty: {self.penalty:.2f} (hallucination)") lines.append(f" ────────────────────────") lines.append(f" TOTAL: {self.total:.2f}") return "\n".join(lines) # ── Episode-Level Reward Calculator (custom mode) ── class RewardCalculator: """ Computes episode-level reward from logs + scenario + verification results. Weights: structural (0.25), ground_truth (0.60), efficiency (0.15). """ def __init__( self, w_structural: float = 0.25, w_ground_truth: float = 0.60, w_efficiency: float = 0.15, ): self.w_structural = w_structural self.w_ground_truth = w_ground_truth self.w_efficiency = w_efficiency def calculate( self, episode: EpisodeLog, scenario: Scenario, outcome_results: List[float], ) -> RewardBreakdown: breakdown = RewardBreakdown() breakdown.structural = self._structural_score(episode, scenario) breakdown.ground_truth = self._ground_truth_score(outcome_results) breakdown.efficiency = self._efficiency_score(episode, scenario) breakdown.penalty = self._hallucination_penalty(episode, outcome_results) breakdown.total = ( self.w_structural * breakdown.structural + self.w_ground_truth * breakdown.ground_truth + self.w_efficiency * breakdown.efficiency + breakdown.penalty ) breakdown.total = max(-1.0, min(1.0, breakdown.total)) breakdown.details = { "tools_expected": scenario.expected_tools, "tools_used": episode.tools_used, "outcome_checks_score_sum": sum(outcome_results), "outcome_checks_total": len(outcome_results), "outcome_checks_avg": sum(outcome_results) / len(outcome_results) if outcome_results else 0.0, "steps_taken": len(episode.steps), "max_steps": scenario.max_steps, } return breakdown def _structural_score(self, episode: EpisodeLog, scenario: Scenario) -> float: if not episode.steps: return 0.0 expected = set(scenario.expected_tools) used = episode.tools_used_set intersection = expected & used precision = len(intersection) / len(used) if used else 0.0 recall = len(intersection) / len(expected) if expected else 0.0 f1 = ( 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0 ) success_rate = sum(1 for s in episode.steps if s.success) / len(episode.steps) unexpected_calls = sum( 1 for s in episode.steps if s.tool_name not in expected ) unexpected_ratio = unexpected_calls / len(episode.steps) return max(0.0, 0.6 * f1 + 0.4 * success_rate - unexpected_ratio * 0.3) def _ground_truth_score(self, outcome_results: List[float]) -> float: if not outcome_results: return 0.0 return sum(outcome_results) / len(outcome_results) def _efficiency_score(self, episode: EpisodeLog, scenario: Scenario) -> float: if not episode.steps: return 0.0 return max(0.0, 1.0 - len(episode.steps) / scenario.max_steps) def _hallucination_penalty( self, episode: EpisodeLog, outcome_results: List[float] ) -> float: if not episode.steps or not outcome_results: return 0.0 all_calls_succeeded = all(s.success for s in episode.steps) pass_rate = sum(outcome_results) / len(outcome_results) if all_calls_succeeded and pass_rate == 0.0: return -0.5 if all_calls_succeeded and pass_rate < 0.3: return -0.2 return 0.0 # ── Per-Step Reward Transform (openenv mode) ── class StepRewardTransform(Transform): """ Gym-agnostic per-step reward transform. Sets observation.reward based on tool call success/failure. Subclass for gym-specific logic (see transforms.py). """ def __call__(self, observation: Observation) -> Observation: reward = self._compute_reward(observation) observation.reward = reward return observation def _compute_reward(self, observation: Observation) -> float: if isinstance(observation, CallToolObservation): if observation.error is not None: return -0.5 return 1.0 return 0.0 class OpenEnvRewardCalculator: """ Combines per-step transform rewards with ground truth verification. Used as the alternative to RewardCalculator when --reward-mode openenv. Quality is sign-based: only the sign of per-step rewards matters (positive = productive, negative = harmful, zero = neutral). """ def __init__( self, w_quality: float = 0.25, w_efficiency: float = 0.15, w_ground_truth: float = 0.60, ): self.w_quality = w_quality self.w_efficiency = w_efficiency self.w_ground_truth = w_ground_truth def calculate( self, step_rewards: List[float], outcome_results: List[bool], max_steps: int = 0, actual_steps: int = 0, ) -> RewardBreakdown: productive = sum(1 for r in step_rewards if r > 0) harmful = sum(1 for r in step_rewards if r < 0) active = productive + harmful quality = productive / active if active > 0 else 0.0 if max_steps > 0 and actual_steps > 0: efficiency = max(0.0, 1.0 - actual_steps / max_steps) else: efficiency = 0.0 gt_score = sum(outcome_results) / len(outcome_results) if outcome_results else 0.0 penalty = 0.0 if step_rewards and outcome_results: no_harmful = all(r >= 0 for r in step_rewards) if no_harmful and gt_score == 0.0: penalty = -0.5 elif no_harmful and gt_score < 0.3: penalty = -0.2 total = ( self.w_quality * quality + self.w_efficiency * efficiency + self.w_ground_truth * gt_score + penalty ) total = max(-1.0, min(1.0, total)) return RewardBreakdown( structural=quality, ground_truth=gt_score, efficiency=efficiency, penalty=penalty, total=total, details={ "reward_mode": "openenv", "productive_steps": productive, "harmful_steps": harmful, "neutral_steps": len(step_rewards) - active, "actual_steps": actual_steps, "max_steps": max_steps, }, )