Spaces:
Sleeping
Sleeping
Your Name
fix(OpenEnv): implement system-wide [0.1, 0.9] boundary scrub for Phase 2 compliance
efa2d2a | """ | |
| TeamForge Grader | |
| Deterministic scoring of a completed episode. | |
| Final score formula (0.0β1.0): | |
| score = 0.40 * test_pass_rate | |
| + 0.25 * lint_score | |
| + 0.20 * efficiency_score | |
| + 0.10 * review_quality | |
| + 0.05 * reflection_quality | |
| Anti-exploit measures: | |
| - Test tampering detection (trivial pass/assert True bodies) | |
| - Implementation existence check (no empty stubs) | |
| - Minimum review length requirement | |
| - Lint-fix-only exploit detection | |
| """ | |
| from __future__ import annotations | |
| import ast | |
| import math | |
| import re | |
| import subprocess | |
| import sys | |
| from pathlib import Path | |
| from typing import List, Optional | |
| from models import EpisodeResult, ReviewArtifact, ReflectionArtifact | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # SCORING CONFIG | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| _SCORE_MIN = 0.10 # deep interior to avoid all boundary issues | |
| _SCORE_MAX = 0.90 # deep interior to avoid all boundary issues | |
| def _clamp(score: float) -> float: | |
| """Ensure score is strictly within the open interval (0, 1).""" | |
| return round(max(_SCORE_MIN, min(_SCORE_MAX, score)), 4) | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # ANTI-EXPLOIT GUARDS | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def _detect_test_tampering(repo_path: str) -> bool: | |
| """ | |
| Detect trivially-passing test rewrites. | |
| Flags: single-statement test with `pass`, `return`, or `assert True`. | |
| """ | |
| for test_file in Path(repo_path).rglob("test_*.py"): | |
| try: | |
| src = test_file.read_text() | |
| tree = ast.parse(src) | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.FunctionDef) and node.name.startswith("test_"): | |
| body = node.body | |
| if len(body) == 1: | |
| stmt = body[0] | |
| if isinstance(stmt, (ast.Pass, ast.Return)): | |
| return True | |
| if isinstance(stmt, ast.Assert): | |
| val = stmt.test | |
| if isinstance(val, ast.Constant) and val.value is True: | |
| return True | |
| except Exception: | |
| pass | |
| return False | |
| def _implementation_exists(repo_path: str) -> bool: | |
| """Verify at least some non-test code was written (>5 non-blank lines).""" | |
| total = 0 | |
| for f in Path(repo_path).rglob("*.py"): | |
| if "test" in f.name or "__pycache__" in str(f): | |
| continue | |
| try: | |
| lines = [l for l in f.read_text().splitlines() if l.strip()] | |
| total += len(lines) | |
| except Exception: | |
| pass | |
| return total >= 5 | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # SUB-SCORERS | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def score_tests(repo_path: str, timeout: int = 60) -> tuple[float, str]: | |
| """Run pytest and return (pass_rate 0-1, output).""" | |
| result = subprocess.run( | |
| [sys.executable, "-m", "pytest", "--tb=short", "-q", "--no-header"], | |
| cwd=repo_path, | |
| capture_output=True, | |
| text=True, | |
| timeout=timeout, | |
| ) | |
| output = result.stdout + result.stderr | |
| passed = failed = errors = 0 | |
| m_p = re.search(r"(\d+) passed", output) | |
| m_f = re.search(r"(\d+) failed", output) | |
| m_e = re.search(r"(\d+) error", output) | |
| if m_p: passed = int(m_p.group(1)) | |
| if m_f: failed = int(m_f.group(1)) | |
| if m_e: errors = int(m_e.group(1)) | |
| total = passed + failed + errors | |
| if total == 0: | |
| return _SCORE_MIN, output | |
| pass_rate = passed / total | |
| return _clamp(pass_rate), output | |
| def score_lint(repo_path: str) -> tuple[float, str]: | |
| """Run ruff and return (lint_score 0-1, output).""" | |
| result = subprocess.run( | |
| [sys.executable, "-m", "ruff", "check", "."], | |
| cwd=repo_path, | |
| capture_output=True, | |
| text=True, | |
| ) | |
| output = result.stdout + result.stderr | |
| if result.returncode == 0: | |
| return _SCORE_MAX, "No lint violations." | |
| violations = len([ | |
| ln for ln in output.splitlines() | |
| if re.match(r".+:\d+:\d+:", ln) | |
| ]) | |
| # Strictly interior [SCORE_MIN, SCORE_MAX] | |
| raw_score = 1.0 - violations * 0.07 | |
| return _clamp(raw_score), output | |
| def score_review_quality( | |
| reviews: List[ReviewArtifact], | |
| required_keywords: List[str], | |
| ) -> float: | |
| """Keyword-based review quality with minimum length requirement.""" | |
| if not reviews: | |
| return _SCORE_MIN | |
| combined = " ".join(r.text.lower() for r in reviews) | |
| # Anti-exploit: minimum meaningful length | |
| if len(combined.strip()) < 40: | |
| return _SCORE_MIN + 0.05 | |
| # Keyword coverage | |
| if not required_keywords: | |
| kw_score = 0.5 | |
| else: | |
| found = sum(1 for kw in required_keywords if kw.lower() in combined) | |
| kw_score = found / len(required_keywords) | |
| # Length bonus (up to 0.2 for thorough reviews) | |
| avg_len = sum(len(r.text) for r in reviews) / len(reviews) | |
| length_bonus = min(0.2, avg_len / 600) | |
| # Specificity bonus: mentions actual code identifiers | |
| code_words = re.findall(r'\b[a-z_]{3,}\(\)', combined) | |
| specificity = min(0.1, len(set(code_words)) * 0.025) | |
| return _clamp(kw_score * 0.7 + length_bonus + specificity) | |
| def score_reflection_quality(reflections: List[ReflectionArtifact]) -> float: | |
| """Score reflections on depth and actionability.""" | |
| if not reflections: | |
| return _SCORE_MIN | |
| total = 0.0 | |
| for ref in reflections: | |
| depth = 0.0 | |
| if len(ref.what_went_well.strip()) > 40: | |
| depth += 0.5 | |
| if len(ref.what_to_improve.strip()) > 40: | |
| depth += 0.5 | |
| # Bonus if adjusted_plan provided | |
| if ref.adjusted_plan and len(ref.adjusted_plan.strip()) > 20: | |
| depth = min(0.9, depth + 0.2) | |
| total += depth | |
| # Strictly (0, 1) - Safer interior | |
| return _clamp(total / max(1, len(reflections))) | |
| def score_efficiency(total_steps: int, max_steps: int) -> float: | |
| """Reward solving in fewer steps with smooth decay curve.""" | |
| ratio = total_steps / max_steps | |
| # Strictly (0, 1) - Safer interior | |
| return _clamp(math.exp(-2.0 * max(0, ratio - 0.25))) | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # MAIN GRADER | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def grade_episode( | |
| repo_path: str, | |
| task_id: str, | |
| total_steps: int, | |
| max_steps: int, | |
| reviews: List[ReviewArtifact], | |
| reflections: List[ReflectionArtifact], | |
| required_keywords: Optional[List[str]] = None, | |
| ) -> EpisodeResult: | |
| """ | |
| Run full deterministic grading pipeline. | |
| Returns EpisodeResult with final_score in [0.0, 1.0]. | |
| """ | |
| log: List[str] = [] | |
| required_keywords = required_keywords or [] | |
| # ββ Anti-exploit checks ββ | |
| if _detect_test_tampering(repo_path): | |
| log.append("[GRADER] β TEST TAMPERING DETECTED β score zeroed") | |
| return EpisodeResult( | |
| task_id=task_id, total_steps=total_steps, | |
| final_score=_SCORE_MIN, passed=False, | |
| log=log + ["Test files were trivially rewritten to force passes."], | |
| ) | |
| if not _implementation_exists(repo_path): | |
| log.append("[GRADER] β NO IMPLEMENTATION FOUND β score zeroed") | |
| return EpisodeResult( | |
| task_id=task_id, total_steps=total_steps, | |
| final_score=_SCORE_MIN, passed=False, | |
| log=log + ["No non-test code was written."], | |
| ) | |
| # ββ Tests ββ | |
| test_pass_rate, test_output = score_tests(repo_path) | |
| log.append(f"[GRADER] test_pass_rate={test_pass_rate:.3f}") | |
| log.append(f"[GRADER] test_output=\n{test_output[:800]}") | |
| # ββ Lint ββ | |
| lint_score, lint_output = score_lint(repo_path) | |
| log.append(f"[GRADER] lint_score={lint_score:.3f}") | |
| # ββ Efficiency ββ | |
| efficiency = score_efficiency(total_steps, max_steps) | |
| log.append(f"[GRADER] efficiency={efficiency:.3f} ({total_steps}/{max_steps} steps)") | |
| # ββ Review quality ββ | |
| review_q = score_review_quality(reviews, required_keywords) | |
| log.append(f"[GRADER] review_quality={review_q:.3f}") | |
| # ββ Reflection quality ββ | |
| reflect_q = score_reflection_quality(reflections) | |
| log.append(f"[GRADER] reflection_quality={reflect_q:.3f}") | |
| # ββ Final weighted score ββ | |
| final = ( | |
| 0.40 * test_pass_rate | |
| + 0.25 * lint_score | |
| + 0.20 * efficiency | |
| + 0.10 * review_q | |
| + 0.05 * reflect_q | |
| ) | |
| # Strictly (0, 1) interior range to satisfy Phase 2 validator | |
| final = _clamp(final) | |
| log.append(f"[GRADER] FINAL_SCORE={final:.4f}") | |
| return EpisodeResult( | |
| task_id=task_id, | |
| total_steps=total_steps, | |
| test_pass_rate=test_pass_rate, | |
| lint_score=lint_score, | |
| efficiency_score=efficiency, | |
| review_quality=review_q, | |
| reflection_quality=reflect_q, | |
| final_score=final, | |
| passed=test_pass_rate >= 0.9 and lint_score >= 0.7, | |
| log=log, | |
| ) | |
| def grade_task(repo_path: str, **kwargs) -> float: | |
| """ | |
| OpenEnv standard grader bridge β entry point from YAML. | |
| Returns ONLY a float strictly between 0 and 1. | |
| """ | |
| import json | |
| import os | |
| from typing import List | |
| from pydantic import TypeAdapter | |
| metadata_path = os.path.join(repo_path, "grading_metadata.json") | |
| # Default fallback values for out-of-band grading | |
| task_id = "unknown" | |
| total_steps = 1 | |
| max_steps = 20 | |
| reviews = [] | |
| reflections = [] | |
| required_keywords = [] | |
| if os.path.exists(metadata_path): | |
| try: | |
| with open(metadata_path, "r") as f: | |
| meta = json.load(f) | |
| task_id = meta.get("task_id", task_id) | |
| total_steps = meta.get("total_steps", total_steps) | |
| max_steps = meta.get("max_steps", max_steps) | |
| # Use TypeAdapter for robust Pydantic deserialization | |
| from models import ReviewArtifact, ReflectionArtifact | |
| reviews = TypeAdapter(List[ReviewArtifact]).validate_python(meta.get("reviews", [])) | |
| reflections = TypeAdapter(List[ReflectionArtifact]).validate_python(meta.get("reflections", [])) | |
| required_keywords = meta.get("required_keywords", []) | |
| except Exception: | |
| pass | |
| try: | |
| result = grade_episode( | |
| repo_path=repo_path, | |
| task_id=task_id, | |
| total_steps=total_steps, | |
| max_steps=max_steps, | |
| reviews=reviews, | |
| reflections=reflections, | |
| required_keywords=required_keywords, | |
| ) | |
| return float(result.final_score) | |
| except Exception: | |
| return _SCORE_MIN | |