kdemon1011's picture
Upload folder using huggingface_hub
599c9bd verified
"""
Base reward infrastructure β€” data classes, calculators, and transforms.
Merged from the shared repo-level modules into a self-contained file:
- Episode-level: RewardCalculator (custom mode)
- Per-step: StepRewardTransform + OpenEnvRewardCalculator (openenv mode)
Scoring formula (both modes):
total = 0.25 * quality/structural + 0.15 * efficiency + 0.60 * ground_truth + penalty
Usage:
from rewards.base import RewardCalculator, Scenario, EpisodeLog
calculator = RewardCalculator()
breakdown = calculator.calculate(episode, scenario, outcome_results)
"""
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Set
from openenv.core.env_server.interfaces import Transform
from openenv.core.env_server.mcp_types import CallToolObservation
from openenv.core.env_server.types import Observation
# ── Data Classes ──
@dataclass
class StepLog:
"""Record of a single tool call made by the agent."""
tool_name: str
arguments: Dict[str, Any]
success: bool
result: Any = None
error: Optional[str] = None
timestamp: Optional[str] = None
elapsed: float = 0.0
@dataclass
class EpisodeLog:
"""Record of all tool calls in one episode."""
steps: List[StepLog] = field(default_factory=list)
def add_step(
self,
tool_name: str,
arguments: Dict[str, Any],
success: bool,
result: Any = None,
error: Optional[str] = None,
timestamp: Optional[str] = None,
elapsed: float = 0.0,
) -> None:
self.steps.append(
StepLog(
tool_name=tool_name,
arguments=arguments,
success=success,
result=result,
error=error,
timestamp=timestamp,
elapsed=elapsed,
)
)
@property
def tools_used(self) -> List[str]:
return [s.tool_name for s in self.steps]
@property
def tools_used_set(self) -> Set[str]:
return set(self.tools_used)
@dataclass
class Scenario:
"""Definition of a task for the agent."""
id: str
prompt: str
expected_tools: List[str]
max_steps: int
outcome_checks: List[Dict[str, Any]]
@dataclass
class RewardBreakdown:
"""Detailed reward breakdown β€” useful for debugging and logging."""
structural: float = 0.0
ground_truth: float = 0.0
efficiency: float = 0.0
penalty: float = 0.0
total: float = 0.0
details: Dict[str, Any] = field(default_factory=dict)
def summary(self) -> str:
mode = self.details.get("reward_mode", "custom")
qual_label = "Quality" if mode == "openenv" else "Structural"
lines = [
f" {qual_label + ':':14s}{self.structural:.2f} (weight 0.25)",
f" Efficiency: {self.efficiency:.2f} (weight 0.15)",
f" Ground Truth: {self.ground_truth:.2f} (weight 0.60)",
]
if self.penalty < 0:
lines.append(f" Penalty: {self.penalty:.2f} (hallucination)")
lines.append(f" ────────────────────────")
lines.append(f" TOTAL: {self.total:.2f}")
return "\n".join(lines)
# ── Episode-Level Reward Calculator (custom mode) ──
class RewardCalculator:
"""
Computes episode-level reward from logs + scenario + verification results.
Weights: structural (0.25), ground_truth (0.60), efficiency (0.15).
"""
def __init__(
self,
w_structural: float = 0.25,
w_ground_truth: float = 0.60,
w_efficiency: float = 0.15,
):
self.w_structural = w_structural
self.w_ground_truth = w_ground_truth
self.w_efficiency = w_efficiency
def calculate(
self,
episode: EpisodeLog,
scenario: Scenario,
outcome_results: List[float],
) -> RewardBreakdown:
breakdown = RewardBreakdown()
breakdown.structural = self._structural_score(episode, scenario)
breakdown.ground_truth = self._ground_truth_score(outcome_results)
breakdown.efficiency = self._efficiency_score(episode, scenario)
breakdown.penalty = self._hallucination_penalty(episode, outcome_results)
breakdown.total = (
self.w_structural * breakdown.structural
+ self.w_ground_truth * breakdown.ground_truth
+ self.w_efficiency * breakdown.efficiency
+ breakdown.penalty
)
breakdown.total = max(-1.0, min(1.0, breakdown.total))
breakdown.details = {
"tools_expected": scenario.expected_tools,
"tools_used": episode.tools_used,
"outcome_checks_score_sum": sum(outcome_results),
"outcome_checks_total": len(outcome_results),
"outcome_checks_avg": sum(outcome_results) / len(outcome_results) if outcome_results else 0.0,
"steps_taken": len(episode.steps),
"max_steps": scenario.max_steps,
}
return breakdown
def _structural_score(self, episode: EpisodeLog, scenario: Scenario) -> float:
if not episode.steps:
return 0.0
expected = set(scenario.expected_tools)
used = episode.tools_used_set
intersection = expected & used
precision = len(intersection) / len(used) if used else 0.0
recall = len(intersection) / len(expected) if expected else 0.0
f1 = (
2 * precision * recall / (precision + recall)
if (precision + recall) > 0
else 0.0
)
success_rate = sum(1 for s in episode.steps if s.success) / len(episode.steps)
unexpected_calls = sum(
1 for s in episode.steps if s.tool_name not in expected
)
unexpected_ratio = unexpected_calls / len(episode.steps)
return max(0.0, 0.6 * f1 + 0.4 * success_rate - unexpected_ratio * 0.3)
def _ground_truth_score(self, outcome_results: List[float]) -> float:
if not outcome_results:
return 0.0
return sum(outcome_results) / len(outcome_results)
def _efficiency_score(self, episode: EpisodeLog, scenario: Scenario) -> float:
if not episode.steps:
return 0.0
return max(0.0, 1.0 - len(episode.steps) / scenario.max_steps)
def _hallucination_penalty(
self, episode: EpisodeLog, outcome_results: List[float]
) -> float:
if not episode.steps or not outcome_results:
return 0.0
all_calls_succeeded = all(s.success for s in episode.steps)
pass_rate = sum(outcome_results) / len(outcome_results)
if all_calls_succeeded and pass_rate == 0.0:
return -0.5
if all_calls_succeeded and pass_rate < 0.3:
return -0.2
return 0.0
# ── Per-Step Reward Transform (openenv mode) ──
class StepRewardTransform(Transform):
"""
Gym-agnostic per-step reward transform.
Sets observation.reward based on tool call success/failure.
Subclass for gym-specific logic (see transforms.py).
"""
def __call__(self, observation: Observation) -> Observation:
reward = self._compute_reward(observation)
observation.reward = reward
return observation
def _compute_reward(self, observation: Observation) -> float:
if isinstance(observation, CallToolObservation):
if observation.error is not None:
return -0.5
return 1.0
return 0.0
class OpenEnvRewardCalculator:
"""
Combines per-step transform rewards with ground truth verification.
Used as the alternative to RewardCalculator when --reward-mode openenv.
Quality is sign-based: only the sign of per-step rewards matters
(positive = productive, negative = harmful, zero = neutral).
"""
def __init__(
self,
w_quality: float = 0.25,
w_efficiency: float = 0.15,
w_ground_truth: float = 0.60,
):
self.w_quality = w_quality
self.w_efficiency = w_efficiency
self.w_ground_truth = w_ground_truth
def calculate(
self,
step_rewards: List[float],
outcome_results: List[bool],
max_steps: int = 0,
actual_steps: int = 0,
) -> RewardBreakdown:
productive = sum(1 for r in step_rewards if r > 0)
harmful = sum(1 for r in step_rewards if r < 0)
active = productive + harmful
quality = productive / active if active > 0 else 0.0
if max_steps > 0 and actual_steps > 0:
efficiency = max(0.0, 1.0 - actual_steps / max_steps)
else:
efficiency = 0.0
gt_score = sum(outcome_results) / len(outcome_results) if outcome_results else 0.0
penalty = 0.0
if step_rewards and outcome_results:
no_harmful = all(r >= 0 for r in step_rewards)
if no_harmful and gt_score == 0.0:
penalty = -0.5
elif no_harmful and gt_score < 0.3:
penalty = -0.2
total = (
self.w_quality * quality
+ self.w_efficiency * efficiency
+ self.w_ground_truth * gt_score
+ penalty
)
total = max(-1.0, min(1.0, total))
return RewardBreakdown(
structural=quality,
ground_truth=gt_score,
efficiency=efficiency,
penalty=penalty,
total=total,
details={
"reward_mode": "openenv",
"productive_steps": productive,
"harmful_steps": harmful,
"neutral_steps": len(step_rewards) - active,
"actual_steps": actual_steps,
"max_steps": max_steps,
},
)