""" TeamForge Grader Deterministic scoring of a completed episode. Final score formula (0.0–1.0): score = 0.40 * test_pass_rate + 0.25 * lint_score + 0.20 * efficiency_score + 0.10 * review_quality + 0.05 * reflection_quality Anti-exploit measures: - Test tampering detection (trivial pass/assert True bodies) - Implementation existence check (no empty stubs) - Minimum review length requirement - Lint-fix-only exploit detection """ from __future__ import annotations import ast import math import re import subprocess import sys from pathlib import Path from typing import List, Optional from models import EpisodeResult, ReviewArtifact, ReflectionArtifact # ───────────────────────────────────────────── # SCORING CONFIG # ───────────────────────────────────────────── _SCORE_MIN = 0.10 # deep interior to avoid all boundary issues _SCORE_MAX = 0.90 # deep interior to avoid all boundary issues def _clamp(score: float) -> float: """Ensure score is strictly within the open interval (0, 1).""" return round(max(_SCORE_MIN, min(_SCORE_MAX, score)), 4) # ───────────────────────────────────────────── # ANTI-EXPLOIT GUARDS # ───────────────────────────────────────────── def _detect_test_tampering(repo_path: str) -> bool: """ Detect trivially-passing test rewrites. Flags: single-statement test with `pass`, `return`, or `assert True`. """ for test_file in Path(repo_path).rglob("test_*.py"): try: src = test_file.read_text() tree = ast.parse(src) for node in ast.walk(tree): if isinstance(node, ast.FunctionDef) and node.name.startswith("test_"): body = node.body if len(body) == 1: stmt = body[0] if isinstance(stmt, (ast.Pass, ast.Return)): return True if isinstance(stmt, ast.Assert): val = stmt.test if isinstance(val, ast.Constant) and val.value is True: return True except Exception: pass return False def _implementation_exists(repo_path: str) -> bool: """Verify at least some non-test code was written (>5 non-blank lines).""" total = 0 for f in Path(repo_path).rglob("*.py"): if "test" in f.name or "__pycache__" in str(f): continue try: lines = [l for l in f.read_text().splitlines() if l.strip()] total += len(lines) except Exception: pass return total >= 5 # ───────────────────────────────────────────── # SUB-SCORERS # ───────────────────────────────────────────── def score_tests(repo_path: str, timeout: int = 60) -> tuple[float, str]: """Run pytest and return (pass_rate 0-1, output).""" result = subprocess.run( [sys.executable, "-m", "pytest", "--tb=short", "-q", "--no-header"], cwd=repo_path, capture_output=True, text=True, timeout=timeout, ) output = result.stdout + result.stderr passed = failed = errors = 0 m_p = re.search(r"(\d+) passed", output) m_f = re.search(r"(\d+) failed", output) m_e = re.search(r"(\d+) error", output) if m_p: passed = int(m_p.group(1)) if m_f: failed = int(m_f.group(1)) if m_e: errors = int(m_e.group(1)) total = passed + failed + errors if total == 0: return _SCORE_MIN, output pass_rate = passed / total return _clamp(pass_rate), output def score_lint(repo_path: str) -> tuple[float, str]: """Run ruff and return (lint_score 0-1, output).""" result = subprocess.run( [sys.executable, "-m", "ruff", "check", "."], cwd=repo_path, capture_output=True, text=True, ) output = result.stdout + result.stderr if result.returncode == 0: return _SCORE_MAX, "No lint violations." violations = len([ ln for ln in output.splitlines() if re.match(r".+:\d+:\d+:", ln) ]) # Strictly interior [SCORE_MIN, SCORE_MAX] raw_score = 1.0 - violations * 0.07 return _clamp(raw_score), output def score_review_quality( reviews: List[ReviewArtifact], required_keywords: List[str], ) -> float: """Keyword-based review quality with minimum length requirement.""" if not reviews: return _SCORE_MIN combined = " ".join(r.text.lower() for r in reviews) # Anti-exploit: minimum meaningful length if len(combined.strip()) < 40: return _SCORE_MIN + 0.05 # Keyword coverage if not required_keywords: kw_score = 0.5 else: found = sum(1 for kw in required_keywords if kw.lower() in combined) kw_score = found / len(required_keywords) # Length bonus (up to 0.2 for thorough reviews) avg_len = sum(len(r.text) for r in reviews) / len(reviews) length_bonus = min(0.2, avg_len / 600) # Specificity bonus: mentions actual code identifiers code_words = re.findall(r'\b[a-z_]{3,}\(\)', combined) specificity = min(0.1, len(set(code_words)) * 0.025) return _clamp(kw_score * 0.7 + length_bonus + specificity) def score_reflection_quality(reflections: List[ReflectionArtifact]) -> float: """Score reflections on depth and actionability.""" if not reflections: return _SCORE_MIN total = 0.0 for ref in reflections: depth = 0.0 if len(ref.what_went_well.strip()) > 40: depth += 0.5 if len(ref.what_to_improve.strip()) > 40: depth += 0.5 # Bonus if adjusted_plan provided if ref.adjusted_plan and len(ref.adjusted_plan.strip()) > 20: depth = min(0.9, depth + 0.2) total += depth # Strictly (0, 1) - Safer interior return _clamp(total / max(1, len(reflections))) def score_efficiency(total_steps: int, max_steps: int) -> float: """Reward solving in fewer steps with smooth decay curve.""" ratio = total_steps / max_steps # Strictly (0, 1) - Safer interior return _clamp(math.exp(-2.0 * max(0, ratio - 0.25))) # ───────────────────────────────────────────── # MAIN GRADER # ───────────────────────────────────────────── def grade_episode( repo_path: str, task_id: str, total_steps: int, max_steps: int, reviews: List[ReviewArtifact], reflections: List[ReflectionArtifact], required_keywords: Optional[List[str]] = None, ) -> EpisodeResult: """ Run full deterministic grading pipeline. Returns EpisodeResult with final_score in [0.0, 1.0]. """ log: List[str] = [] required_keywords = required_keywords or [] # ── Anti-exploit checks ── if _detect_test_tampering(repo_path): log.append("[GRADER] ⚠ TEST TAMPERING DETECTED — score zeroed") return EpisodeResult( task_id=task_id, total_steps=total_steps, final_score=_SCORE_MIN, passed=False, log=log + ["Test files were trivially rewritten to force passes."], ) if not _implementation_exists(repo_path): log.append("[GRADER] ⚠ NO IMPLEMENTATION FOUND — score zeroed") return EpisodeResult( task_id=task_id, total_steps=total_steps, final_score=_SCORE_MIN, passed=False, log=log + ["No non-test code was written."], ) # ── Tests ── test_pass_rate, test_output = score_tests(repo_path) log.append(f"[GRADER] test_pass_rate={test_pass_rate:.3f}") log.append(f"[GRADER] test_output=\n{test_output[:800]}") # ── Lint ── lint_score, lint_output = score_lint(repo_path) log.append(f"[GRADER] lint_score={lint_score:.3f}") # ── Efficiency ── efficiency = score_efficiency(total_steps, max_steps) log.append(f"[GRADER] efficiency={efficiency:.3f} ({total_steps}/{max_steps} steps)") # ── Review quality ── review_q = score_review_quality(reviews, required_keywords) log.append(f"[GRADER] review_quality={review_q:.3f}") # ── Reflection quality ── reflect_q = score_reflection_quality(reflections) log.append(f"[GRADER] reflection_quality={reflect_q:.3f}") # ── Final weighted score ── final = ( 0.40 * test_pass_rate + 0.25 * lint_score + 0.20 * efficiency + 0.10 * review_q + 0.05 * reflect_q ) # Strictly (0, 1) interior range to satisfy Phase 2 validator final = _clamp(final) log.append(f"[GRADER] FINAL_SCORE={final:.4f}") return EpisodeResult( task_id=task_id, total_steps=total_steps, test_pass_rate=test_pass_rate, lint_score=lint_score, efficiency_score=efficiency, review_quality=review_q, reflection_quality=reflect_q, final_score=final, passed=test_pass_rate >= 0.9 and lint_score >= 0.7, log=log, ) def grade_task(repo_path: str, **kwargs) -> float: """ OpenEnv standard grader bridge – entry point from YAML. Returns ONLY a float strictly between 0 and 1. """ import json import os from typing import List from pydantic import TypeAdapter metadata_path = os.path.join(repo_path, "grading_metadata.json") # Default fallback values for out-of-band grading task_id = "unknown" total_steps = 1 max_steps = 20 reviews = [] reflections = [] required_keywords = [] if os.path.exists(metadata_path): try: with open(metadata_path, "r") as f: meta = json.load(f) task_id = meta.get("task_id", task_id) total_steps = meta.get("total_steps", total_steps) max_steps = meta.get("max_steps", max_steps) # Use TypeAdapter for robust Pydantic deserialization from models import ReviewArtifact, ReflectionArtifact reviews = TypeAdapter(List[ReviewArtifact]).validate_python(meta.get("reviews", [])) reflections = TypeAdapter(List[ReflectionArtifact]).validate_python(meta.get("reflections", [])) required_keywords = meta.get("required_keywords", []) except Exception: pass try: result = grade_episode( repo_path=repo_path, task_id=task_id, total_steps=total_steps, max_steps=max_steps, reviews=reviews, reflections=reflections, required_keywords=required_keywords, ) return float(result.final_score) except Exception: return _SCORE_MIN