loo / grader.py
MrHuman00's picture
Upload 17 files
c44dbf3 verified
Raw
History Blame Contribute Delete
7.03 kB
"""Grader for RedTeam PentestLab Environment."""
import sys
import re
import json
from typing import Dict, List, Tuple
SAFE_TASK_IDS = ["task_1", "task_2", "task_3", "task_4", "task_5", "task_6"]
def clamp_score(score: float) -> float:
"""Clamp a score to be strictly within (0, 1).
This is the SINGLE source of truth for score bounds.
Every score — per-task AND overall — MUST pass through here
before being stored, printed, or serialised.
Clamp to the open interval (0, 1) using minimal safe margins.
"""
return max(1e-6, min(1 - 1e-6, score))
def parse_inference_output(output: str) -> List[Dict]:
"""Parse inference.py output into one record per task block."""
tasks: List[Dict] = []
current: Dict | None = None
for line in output.split("\n"):
line = line.strip()
if line.startswith("[START]"):
match = re.search(r"task=(\S+)\s+env=(\S+)\s+model=(\S+)", line)
if match:
current = {
"task": match.group(1),
"env": match.group(2),
"model": match.group(3),
"success": False,
"steps": 0,
"rewards": [],
"step_details": [],
}
elif line.startswith("[STEP]") and current is not None:
match = re.search(
r"step=(\S+)\s+action=(\w+)\s+reward=([\d.-]+)\s+done=(\w+)\s+error=(\w+)",
line,
)
if match:
current["step_details"].append(
{
"step": match.group(1),
"action": match.group(2),
"reward": float(match.group(3)),
"done": match.group(4) == "true",
"error": None if match.group(5) == "null" else match.group(5),
}
)
elif line.startswith("[END]") and current is not None:
match = re.search(
r"success=(\w+)\s+(?:steps=(\d+)\s+)?rewards=([\d.,\s-]+)",
line,
)
if match:
current["success"] = match.group(1) == "true"
rewards_str = match.group(3)
current["rewards"] = [
float(r.strip()) for r in rewards_str.split(",") if r.strip()
]
parsed_steps = int(match.group(2)) if match.group(2) else len(current["rewards"])
current["steps"] = parsed_steps
tasks.append(current)
current = None
return tasks
def grade_task(data: Dict) -> Tuple[float, Dict]:
"""
Grade the agent's performance on a single task.
Returns:
(score, details) where score is strictly within (0, 1)
"""
details = {
"success": data["success"],
"steps_taken": len(data["rewards"]),
"total_reward": sum(data["rewards"]) if data["rewards"] else 0.0,
"penalties": 0,
"violations": [],
}
# Base score: 0.45 for success, 0.20 for failure
# (chosen so that final score stays well inside (0, 1))
if data["success"]:
score = 0.45
else:
score = 0.20
# Reward bonus (up to 0.25)
total_reward = sum(data["rewards"]) if data["rewards"] else 0.0
# Per-task max: easy≈0.62, medium≈0.63, hard≈0.72. Use 0.75 as safe ceiling.
max_possible = 0.75
reward_ratio = min(total_reward / max_possible, 1.0) if max_possible > 0 else 0.0
score += reward_ratio * 0.25
# Check for violations
for step_detail in data.get("step_details", []):
if step_detail.get("reward", 0) < 0:
details["penalties"] += 1
details["violations"].append(f"Step {step_detail.get('step', '?')}: {step_detail.get('action', '?')}")
# Penalty for violations (-0.03 per violation, max -0.09)
violation_penalty = min(details["penalties"] * 0.03, 0.09)
score -= violation_penalty
# *** CRITICAL: clamp to strictly (0, 1) ***
score = clamp_score(score)
details["final_score"] = score
return score, details
def main():
"""Main grader entry point."""
if len(sys.argv) < 2:
print("Usage: python grader.py <inference_output_file>")
sys.exit(1)
output_file = sys.argv[1]
try:
with open(output_file, "r") as f:
output = f.read()
except FileNotFoundError:
print(f"ERROR: File not found: {output_file}")
sys.exit(1)
# Parse output
tasks = parse_inference_output(output)
# Ensure we always have at least 3 tasks (contest requirement)
if not tasks or len(tasks) < 3:
print(f"WARNING: Only parsed {len(tasks)} tasks, creating fallbacks to reach 3 tasks", file=sys.stderr)
fallback_template = {
"task": None,
"env": "redteam_pentest",
"model": "unknown",
"success": False,
"steps": 0,
"rewards": [],
"step_details": [],
}
while len(tasks) < 3:
fallback = fallback_template.copy()
fallback["task"] = SAFE_TASK_IDS[len(tasks)] if len(tasks) < len(SAFE_TASK_IDS) else "fallback"
tasks.append(fallback)
# Grade each task independently
graded_tasks = []
for task_data in tasks:
score, details = grade_task(task_data)
# Double-check: clamp again (should be redundant, but safety first)
score = clamp_score(score)
details["final_score"] = score
graded_tasks.append((task_data, score, details))
# Compute overall score
overall_score = sum(score for _, score, _ in graded_tasks) / len(graded_tasks)
overall_score = clamp_score(overall_score)
# Output individual task scores in machine-readable format
for index, (task_data, score, details) in enumerate(graded_tasks, 1):
task_id = SAFE_TASK_IDS[index - 1] if (index - 1) < len(SAFE_TASK_IDS) else "fallback"
# Final clamp right at the output boundary
final_task_score = clamp_score(details["final_score"])
# Validate strictly: must be > 0 and < 1
assert 0.0 < final_task_score < 1.0, f"Score {final_task_score} is out of (0,1) range!"
print(f"TASK_SCORE:{task_id}:{final_task_score}")
print(f"OVERALL_SCORE:{overall_score}")
# Output JSON for machine parsing
json_tasks = []
for index, (task_data, score, details) in enumerate(graded_tasks):
clamped = clamp_score(score)
json_tasks.append({
"task_id": SAFE_TASK_IDS[index] if index < len(SAFE_TASK_IDS) else "fallback",
"score": float(clamped),
})
json_output = {
"overall_score": float(overall_score),
"tasks": json_tasks,
}
print(f"\nJSON_OUTPUT:{json.dumps(json_output)}")
# Exit with 0 so the evaluation platform does not treat the grader as crashed.
sys.exit(0)
if __name__ == "__main__":
main()