| """Grader for RedTeam PentestLab Environment.""" |
|
|
| import sys |
| import re |
| import json |
| from typing import Dict, List, Tuple |
|
|
|
|
| SAFE_TASK_IDS = ["task_1", "task_2", "task_3", "task_4", "task_5", "task_6"] |
|
|
| def clamp_score(score: float) -> float: |
| """Clamp a score to be strictly within (0, 1). |
| |
| This is the SINGLE source of truth for score bounds. |
| Every score — per-task AND overall — MUST pass through here |
| before being stored, printed, or serialised. |
| |
| Clamp to the open interval (0, 1) using minimal safe margins. |
| """ |
| return max(1e-6, min(1 - 1e-6, score)) |
|
|
|
|
| def parse_inference_output(output: str) -> List[Dict]: |
| """Parse inference.py output into one record per task block.""" |
| tasks: List[Dict] = [] |
| current: Dict | None = None |
|
|
| for line in output.split("\n"): |
| line = line.strip() |
|
|
| if line.startswith("[START]"): |
| match = re.search(r"task=(\S+)\s+env=(\S+)\s+model=(\S+)", line) |
| if match: |
| current = { |
| "task": match.group(1), |
| "env": match.group(2), |
| "model": match.group(3), |
| "success": False, |
| "steps": 0, |
| "rewards": [], |
| "step_details": [], |
| } |
|
|
| elif line.startswith("[STEP]") and current is not None: |
| match = re.search( |
| r"step=(\S+)\s+action=(\w+)\s+reward=([\d.-]+)\s+done=(\w+)\s+error=(\w+)", |
| line, |
| ) |
| if match: |
| current["step_details"].append( |
| { |
| "step": match.group(1), |
| "action": match.group(2), |
| "reward": float(match.group(3)), |
| "done": match.group(4) == "true", |
| "error": None if match.group(5) == "null" else match.group(5), |
| } |
| ) |
|
|
| elif line.startswith("[END]") and current is not None: |
| match = re.search( |
| r"success=(\w+)\s+(?:steps=(\d+)\s+)?rewards=([\d.,\s-]+)", |
| line, |
| ) |
| if match: |
| current["success"] = match.group(1) == "true" |
| rewards_str = match.group(3) |
| current["rewards"] = [ |
| float(r.strip()) for r in rewards_str.split(",") if r.strip() |
| ] |
| parsed_steps = int(match.group(2)) if match.group(2) else len(current["rewards"]) |
| current["steps"] = parsed_steps |
| tasks.append(current) |
| current = None |
|
|
| return tasks |
|
|
|
|
| def grade_task(data: Dict) -> Tuple[float, Dict]: |
| """ |
| Grade the agent's performance on a single task. |
| |
| Returns: |
| (score, details) where score is strictly within (0, 1) |
| """ |
| details = { |
| "success": data["success"], |
| "steps_taken": len(data["rewards"]), |
| "total_reward": sum(data["rewards"]) if data["rewards"] else 0.0, |
| "penalties": 0, |
| "violations": [], |
| } |
|
|
| |
| |
| if data["success"]: |
| score = 0.45 |
| else: |
| score = 0.20 |
|
|
| |
| total_reward = sum(data["rewards"]) if data["rewards"] else 0.0 |
| |
| max_possible = 0.75 |
| reward_ratio = min(total_reward / max_possible, 1.0) if max_possible > 0 else 0.0 |
| score += reward_ratio * 0.25 |
|
|
| |
| for step_detail in data.get("step_details", []): |
| if step_detail.get("reward", 0) < 0: |
| details["penalties"] += 1 |
| details["violations"].append(f"Step {step_detail.get('step', '?')}: {step_detail.get('action', '?')}") |
|
|
| |
| violation_penalty = min(details["penalties"] * 0.03, 0.09) |
| score -= violation_penalty |
|
|
| |
| score = clamp_score(score) |
|
|
| details["final_score"] = score |
| return score, details |
|
|
|
|
| def main(): |
| """Main grader entry point.""" |
| if len(sys.argv) < 2: |
| print("Usage: python grader.py <inference_output_file>") |
| sys.exit(1) |
|
|
| output_file = sys.argv[1] |
|
|
| try: |
| with open(output_file, "r") as f: |
| output = f.read() |
| except FileNotFoundError: |
| print(f"ERROR: File not found: {output_file}") |
| sys.exit(1) |
|
|
| |
| tasks = parse_inference_output(output) |
|
|
| |
| if not tasks or len(tasks) < 3: |
| print(f"WARNING: Only parsed {len(tasks)} tasks, creating fallbacks to reach 3 tasks", file=sys.stderr) |
| fallback_template = { |
| "task": None, |
| "env": "redteam_pentest", |
| "model": "unknown", |
| "success": False, |
| "steps": 0, |
| "rewards": [], |
| "step_details": [], |
| } |
| while len(tasks) < 3: |
| fallback = fallback_template.copy() |
| fallback["task"] = SAFE_TASK_IDS[len(tasks)] if len(tasks) < len(SAFE_TASK_IDS) else "fallback" |
| tasks.append(fallback) |
|
|
| |
| graded_tasks = [] |
| for task_data in tasks: |
| score, details = grade_task(task_data) |
| |
| score = clamp_score(score) |
| details["final_score"] = score |
| graded_tasks.append((task_data, score, details)) |
|
|
| |
| overall_score = sum(score for _, score, _ in graded_tasks) / len(graded_tasks) |
| overall_score = clamp_score(overall_score) |
|
|
| |
| for index, (task_data, score, details) in enumerate(graded_tasks, 1): |
| task_id = SAFE_TASK_IDS[index - 1] if (index - 1) < len(SAFE_TASK_IDS) else "fallback" |
| |
| final_task_score = clamp_score(details["final_score"]) |
| |
| assert 0.0 < final_task_score < 1.0, f"Score {final_task_score} is out of (0,1) range!" |
| print(f"TASK_SCORE:{task_id}:{final_task_score}") |
| print(f"OVERALL_SCORE:{overall_score}") |
|
|
| |
| json_tasks = [] |
| for index, (task_data, score, details) in enumerate(graded_tasks): |
| clamped = clamp_score(score) |
| json_tasks.append({ |
| "task_id": SAFE_TASK_IDS[index] if index < len(SAFE_TASK_IDS) else "fallback", |
| "score": float(clamped), |
| }) |
|
|
| json_output = { |
| "overall_score": float(overall_score), |
| "tasks": json_tasks, |
| } |
| print(f"\nJSON_OUTPUT:{json.dumps(json_output)}") |
|
|
| |
| sys.exit(0) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|