| """ |
| Reward module - GRPO-compatible reward hook using Impact Oracle. |
| """ |
| import math |
| from typing import Any, Dict, List, Optional |
|
|
| import sys |
| from pathlib import Path |
| sys.path.insert(0, str(Path(__file__).parent.parent)) |
| from oracle.oracle import ImpactOracle |
|
|
|
|
| class RewardHook: |
| """ |
| Converts Impact Oracle scores into RL rewards. |
| Compatible with TRL GRPOTrainer via reward_funcs parameter. |
| """ |
|
|
| def __init__( |
| self, |
| oracle: Optional[ImpactOracle] = None, |
| mode: str = "retrieval_qa", |
| compute_budget: float = 10000.0, |
| target_accuracy: float = 0.8, |
| ): |
| self.oracle = oracle or ImpactOracle() |
| self.mode = mode |
| self.compute_budget = compute_budget |
| self.target_accuracy = target_accuracy |
| self.trajectory_history: List[Dict[str, Any]] = [] |
|
|
| def compute_rewards( |
| self, |
| prompts: List[str], |
| completions: List[str], |
| answers: List[Optional[str]], |
| gold_answers: List[str], |
| confidences: List[float], |
| compute_costs: List[float], |
| agent_ids: Optional[List[str]] = None, |
| **kwargs, |
| ) -> List[float]: |
| """ |
| Compute rewards for a batch of completions. |
| Returns list of float rewards (one per completion). |
| """ |
| rewards = [] |
| agent_ids = agent_ids or ["agent_default"] * len(prompts) |
|
|
| for i in range(len(prompts)): |
| oracle_res = self.oracle.score( |
| mode=self.mode, |
| action={"abstained": answers[i] is None}, |
| context={"gold_answer": gold_answers[i]}, |
| result={ |
| "answer": answers[i], |
| "confidence": confidences[i], |
| "evidence": kwargs.get("evidences", [{}] * len(prompts))[i], |
| "compute_cost": compute_costs[i], |
| }, |
| agent_id=agent_ids[i], |
| ) |
| rewards.append(oracle_res.reward_value) |
| self.trajectory_history.append({ |
| "prompt": prompts[i][:100], |
| "reward": oracle_res.reward_value, |
| "raw_score": oracle_res.raw_score, |
| "failure_tags": oracle_res.failure_tags, |
| }) |
|
|
| return rewards |
|
|
| def compute_reward_single( |
| self, |
| prompt: str, |
| completion: str, |
| answer: Optional[str], |
| gold_answer: str, |
| confidence: float, |
| compute_cost: float, |
| agent_id: str = "agent_default", |
| evidence: Optional[Dict[str, Any]] = None, |
| ) -> float: |
| """Compute reward for a single completion.""" |
| oracle_res = self.oracle.score( |
| mode=self.mode, |
| action={"abstained": answer is None}, |
| context={"gold_answer": gold_answer}, |
| result={ |
| "answer": answer, |
| "confidence": confidence, |
| "evidence": evidence or {}, |
| "compute_cost": compute_cost, |
| }, |
| agent_id=agent_id, |
| ) |
| self.trajectory_history.append({ |
| "prompt": prompt[:100], |
| "reward": oracle_res.reward_value, |
| "raw_score": oracle_res.raw_score, |
| "failure_tags": oracle_res.failure_tags, |
| }) |
| return oracle_res.reward_value |
|
|
|
|
| class OfflinePolicyComparator: |
| """ |
| Compare two policies using offline trajectory data. |
| Useful when full GRPO training is not feasible. |
| """ |
|
|
| def __init__(self, reward_hook: RewardHook): |
| self.reward_hook = reward_hook |
|
|
| def compare( |
| self, |
| policy_a_trajectories: List[Dict[str, Any]], |
| policy_b_trajectories: List[Dict[str, Any]], |
| ) -> Dict[str, Any]: |
| """Compare two policies on same test set.""" |
| rewards_a = [t["reward"] for t in policy_a_trajectories] |
| rewards_b = [t["reward"] for t in policy_b_trajectories] |
|
|
| return { |
| "mean_reward_a": sum(rewards_a) / len(rewards_a), |
| "mean_reward_b": sum(rewards_b) / len(rewards_b), |
| "win_rate": sum(1 for a, b in zip(rewards_a, rewards_b) if a > b) / len(rewards_a), |
| "improvement": (sum(rewards_a) - sum(rewards_b)) / max(abs(sum(rewards_b)), 1e-6), |
| "policy_a_failures": sum(1 for t in policy_a_trajectories if t.get("failure_tags")), |
| "policy_b_failures": sum(1 for t in policy_b_trajectories if t.get("failure_tags")), |
| } |
|
|