| """Grader for RedTeam PentestLab - scores STRICTLY inside (0.0, 1.0) exclusive.""" |
|
|
| import json |
| import re |
| import sys |
| from typing import Dict, List, Tuple |
|
|
|
|
| SCORE_FLOOR = 0.10 |
| SCORE_CEIL = 0.90 |
| TASK_IDS = ["alpha", "bravo", "charlie"] |
|
|
|
|
| def strict_clamp(score: float) -> float: |
| """ |
| Clamp score to STRICTLY inside (0.0, 1.0). |
| |
| This is the ONLY function that sets score bounds. |
| Every score - per-step, per-task, overall - passes through here. |
| Uses wide margins (0.10 to 0.90) to survive float rounding in any context. |
| Never asserts. Never raises. Always returns a valid float. |
| """ |
| try: |
| s = float(score) |
| except (TypeError, ValueError): |
| return SCORE_FLOOR |
|
|
| if s != s: |
| return SCORE_FLOOR |
| if s == float("inf"): |
| return SCORE_CEIL |
| if s == float("-inf"): |
| return SCORE_FLOOR |
|
|
| s = max(SCORE_FLOOR, min(SCORE_CEIL, s)) |
|
|
| if s <= 0.0: |
| return SCORE_FLOOR |
| if s >= 1.0: |
| return SCORE_CEIL |
|
|
| s = round(s, 4) |
|
|
| if s <= 0.0: |
| return SCORE_FLOOR |
| if s >= 1.0: |
| return SCORE_CEIL |
|
|
| return s |
|
|
|
|
| def parse_inference_output(output: str) -> List[Dict]: |
| """Parse inference.py stdout into one record per [START]..[END] block.""" |
| tasks: List[Dict] = [] |
| current: Dict = {} |
| active = False |
|
|
| for raw_line in output.splitlines(): |
| line = raw_line.strip() |
|
|
| if line.startswith("[START]"): |
| m = re.search(r"task=(\S+)\s+env=(\S+)\s+model=(\S+)", line) |
| if m: |
| current = { |
| "task": m.group(1), |
| "env": m.group(2), |
| "model": m.group(3), |
| "success": False, |
| "steps": 0, |
| "rewards": [], |
| "step_details": [], |
| } |
| active = True |
|
|
| elif line.startswith("[STEP]") and active: |
| m = re.search( |
| r"step=(\S+)\s+action=(\w+)\s+reward=([\d.eE+-]+)\s+done=(\w+)\s+error=(\S+)", |
| line, |
| ) |
| if m: |
| try: |
| rew = float(m.group(3)) |
| except ValueError: |
| rew = 0.10 |
| current["step_details"].append( |
| { |
| "step": m.group(1), |
| "action": m.group(2), |
| "reward": rew, |
| "done": m.group(4).lower() == "true", |
| "error": None if m.group(5).lower() == "null" else m.group(5), |
| } |
| ) |
|
|
| elif line.startswith("[END]") and active: |
| m = re.search(r"success=(\w+)\s+rewards=([\d.,\s.eE+-]*)", line) |
| if m: |
| current["success"] = m.group(1).lower() == "true" |
| raw_rewards = m.group(2) or "" |
| parsed_rewards: List[float] = [] |
| for tok in raw_rewards.split(","): |
| tok = tok.strip() |
| if not tok: |
| continue |
| try: |
| parsed_rewards.append(float(tok)) |
| except ValueError: |
| continue |
| current["rewards"] = parsed_rewards |
| current["steps"] = len(parsed_rewards) |
| tasks.append(current) |
| current = {} |
| active = False |
|
|
| return tasks |
|
|
|
|
| def make_fallback_task(task_id: str) -> Dict: |
| return { |
| "task": task_id, |
| "env": "redteam_pentest", |
| "model": "unknown", |
| "success": False, |
| "steps": 0, |
| "rewards": [], |
| "step_details": [], |
| } |
|
|
|
|
| def grade_task(data: Dict) -> Tuple[float, Dict]: |
| """ |
| Grade one task. Returns (score, details) where score is strictly in (0.0, 1.0). |
| |
| Scoring breakdown (designed so theoretical max < 0.90, min > 0.10): |
| Base: 0.35 (success) or 0.15 (failure) |
| Reward bonus: up to 0.30 (scaled to max_possible=0.80) |
| Chain penalty: up to -0.09 (0.03 per negative-reward step, max 3) |
| Max possible: 0.65 |
| Min possible: 0.06 before strict clamp |
| """ |
| success = bool(data.get("success", False)) |
| rewards = data.get("rewards", []) or [] |
| step_details = data.get("step_details", []) or [] |
|
|
| score = 0.35 if success else 0.15 |
|
|
| total_reward = sum(max(0.0, r) for r in rewards) |
| reward_bonus = min((total_reward / 0.80) * 0.30, 0.30) if total_reward > 0 else 0.0 |
| score += reward_bonus |
|
|
| violations = sum(1 for s in step_details if float(s.get("reward", 0.0)) < 0.0) |
| score -= min(violations * 0.03, 0.09) |
|
|
| score = strict_clamp(score) |
| details = { |
| "success": success, |
| "steps_taken": len(rewards), |
| "total_reward": round(sum(rewards), 4) if rewards else 0.0, |
| "violations": violations, |
| "final_score": score, |
| } |
| return score, details |
|
|
|
|
| def _run() -> None: |
| output = "" |
|
|
| if len(sys.argv) >= 2: |
| output_file = sys.argv[1] |
| try: |
| with open(output_file, "r", encoding="utf-8") as f: |
| output = f.read() |
| except OSError as e: |
| print(f"WARNING: unable to read '{output_file}': {e}", file=sys.stderr) |
| output = "" |
| else: |
| try: |
| output = sys.stdin.read() |
| except Exception: |
| output = "" |
|
|
| try: |
| tasks = parse_inference_output(output) |
| except Exception as e: |
| print(f"WARNING: parse error ({e}); using fallback tasks", file=sys.stderr) |
| tasks = [] |
|
|
| while len(tasks) < 3: |
| idx = len(tasks) |
| tid = TASK_IDS[idx] if idx < len(TASK_IDS) else f"task_{idx}" |
| tasks.append(make_fallback_task(tid)) |
|
|
| graded: List[Tuple[Dict, float, Dict]] = [] |
| for i, task_data in enumerate(tasks[:3]): |
| try: |
| score, details = grade_task(task_data) |
| except Exception as e: |
| print(f"WARNING: grading error on task {i}: {e}", file=sys.stderr) |
| score = SCORE_FLOOR |
| details = {"final_score": SCORE_FLOOR, "success": False} |
|
|
| score = strict_clamp(score) |
| if not (0.0 < score < 1.0): |
| print(f"WARNING: out-of-range score {score} on task {i}; forcing floor", file=sys.stderr) |
| score = SCORE_FLOOR |
|
|
| details["final_score"] = strict_clamp(score) |
| graded.append((task_data, strict_clamp(score), details)) |
|
|
| overall = strict_clamp(sum(score for _, score, _ in graded) / 3.0) |
|
|
| for i, (_, score, _) in enumerate(graded): |
| tid = TASK_IDS[i] if i < len(TASK_IDS) else f"task_{i}" |
| out_score = strict_clamp(score) |
| print(f"TASK_SCORE:{tid}:{out_score}") |
|
|
| print(f"OVERALL_SCORE:{overall}") |
|
|
| json_tasks = [] |
| for i, (_, score, _) in enumerate(graded): |
| tid = TASK_IDS[i] if i < len(TASK_IDS) else f"task_{i}" |
| json_tasks.append({"task_id": tid, "score": strict_clamp(score)}) |
|
|
| payload = { |
| "overall_score": strict_clamp(overall), |
| "tasks": json_tasks, |
| } |
| print(f"JSON_OUTPUT:{json.dumps(payload)}") |
|
|
|
|
| def main() -> None: |
| try: |
| _run() |
| except Exception as e: |
| print(f"WARNING: unhandled grader exception: {e}", file=sys.stderr) |
| fallback_payload = { |
| "overall_score": SCORE_FLOOR, |
| "tasks": [ |
| {"task_id": "alpha", "score": SCORE_FLOOR}, |
| {"task_id": "bravo", "score": SCORE_FLOOR}, |
| {"task_id": "charlie", "score": SCORE_FLOOR}, |
| ], |
| } |
| print("TASK_SCORE:alpha:0.1") |
| print("TASK_SCORE:bravo:0.1") |
| print("TASK_SCORE:charlie:0.1") |
| print("OVERALL_SCORE:0.1") |
| print(f"JSON_OUTPUT:{json.dumps(fallback_payload)}") |
| finally: |
| sys.exit(0) |
|
|
|
|
| if __name__ == "__main__": |
| main() |