model / grader.py
MrHuman00's picture
Upload 30 files
aee8fe7 verified
"""Grader for RedTeam PentestLab - scores STRICTLY inside (0.0, 1.0) exclusive."""
import json
import re
import sys
from typing import Dict, List, Tuple
SCORE_FLOOR = 0.10
SCORE_CEIL = 0.90
TASK_IDS = ["alpha", "bravo", "charlie"]
def strict_clamp(score: float) -> float:
"""
Clamp score to STRICTLY inside (0.0, 1.0).
This is the ONLY function that sets score bounds.
Every score - per-step, per-task, overall - passes through here.
Uses wide margins (0.10 to 0.90) to survive float rounding in any context.
Never asserts. Never raises. Always returns a valid float.
"""
try:
s = float(score)
except (TypeError, ValueError):
return SCORE_FLOOR
if s != s:
return SCORE_FLOOR
if s == float("inf"):
return SCORE_CEIL
if s == float("-inf"):
return SCORE_FLOOR
s = max(SCORE_FLOOR, min(SCORE_CEIL, s))
if s <= 0.0:
return SCORE_FLOOR
if s >= 1.0:
return SCORE_CEIL
s = round(s, 4)
if s <= 0.0:
return SCORE_FLOOR
if s >= 1.0:
return SCORE_CEIL
return s
def parse_inference_output(output: str) -> List[Dict]:
"""Parse inference.py stdout into one record per [START]..[END] block."""
tasks: List[Dict] = []
current: Dict = {}
active = False
for raw_line in output.splitlines():
line = raw_line.strip()
if line.startswith("[START]"):
m = re.search(r"task=(\S+)\s+env=(\S+)\s+model=(\S+)", line)
if m:
current = {
"task": m.group(1),
"env": m.group(2),
"model": m.group(3),
"success": False,
"steps": 0,
"rewards": [],
"step_details": [],
}
active = True
elif line.startswith("[STEP]") and active:
m = re.search(
r"step=(\S+)\s+action=(\w+)\s+reward=([\d.eE+-]+)\s+done=(\w+)\s+error=(\S+)",
line,
)
if m:
try:
rew = float(m.group(3))
except ValueError:
rew = 0.10
current["step_details"].append(
{
"step": m.group(1),
"action": m.group(2),
"reward": rew,
"done": m.group(4).lower() == "true",
"error": None if m.group(5).lower() == "null" else m.group(5),
}
)
elif line.startswith("[END]") and active:
m = re.search(r"success=(\w+)\s+rewards=([\d.,\s.eE+-]*)", line)
if m:
current["success"] = m.group(1).lower() == "true"
raw_rewards = m.group(2) or ""
parsed_rewards: List[float] = []
for tok in raw_rewards.split(","):
tok = tok.strip()
if not tok:
continue
try:
parsed_rewards.append(float(tok))
except ValueError:
continue
current["rewards"] = parsed_rewards
current["steps"] = len(parsed_rewards)
tasks.append(current)
current = {}
active = False
return tasks
def make_fallback_task(task_id: str) -> Dict:
return {
"task": task_id,
"env": "redteam_pentest",
"model": "unknown",
"success": False,
"steps": 0,
"rewards": [],
"step_details": [],
}
def grade_task(data: Dict) -> Tuple[float, Dict]:
"""
Grade one task. Returns (score, details) where score is strictly in (0.0, 1.0).
Scoring breakdown (designed so theoretical max < 0.90, min > 0.10):
Base: 0.35 (success) or 0.15 (failure)
Reward bonus: up to 0.30 (scaled to max_possible=0.80)
Chain penalty: up to -0.09 (0.03 per negative-reward step, max 3)
Max possible: 0.65
Min possible: 0.06 before strict clamp
"""
success = bool(data.get("success", False))
rewards = data.get("rewards", []) or []
step_details = data.get("step_details", []) or []
score = 0.35 if success else 0.15
total_reward = sum(max(0.0, r) for r in rewards)
reward_bonus = min((total_reward / 0.80) * 0.30, 0.30) if total_reward > 0 else 0.0
score += reward_bonus
violations = sum(1 for s in step_details if float(s.get("reward", 0.0)) < 0.0)
score -= min(violations * 0.03, 0.09)
score = strict_clamp(score)
details = {
"success": success,
"steps_taken": len(rewards),
"total_reward": round(sum(rewards), 4) if rewards else 0.0,
"violations": violations,
"final_score": score,
}
return score, details
def _run() -> None:
output = ""
if len(sys.argv) >= 2:
output_file = sys.argv[1]
try:
with open(output_file, "r", encoding="utf-8") as f:
output = f.read()
except OSError as e:
print(f"WARNING: unable to read '{output_file}': {e}", file=sys.stderr)
output = ""
else:
try:
output = sys.stdin.read()
except Exception:
output = ""
try:
tasks = parse_inference_output(output)
except Exception as e:
print(f"WARNING: parse error ({e}); using fallback tasks", file=sys.stderr)
tasks = []
while len(tasks) < 3:
idx = len(tasks)
tid = TASK_IDS[idx] if idx < len(TASK_IDS) else f"task_{idx}"
tasks.append(make_fallback_task(tid))
graded: List[Tuple[Dict, float, Dict]] = []
for i, task_data in enumerate(tasks[:3]):
try:
score, details = grade_task(task_data)
except Exception as e:
print(f"WARNING: grading error on task {i}: {e}", file=sys.stderr)
score = SCORE_FLOOR
details = {"final_score": SCORE_FLOOR, "success": False}
score = strict_clamp(score)
if not (0.0 < score < 1.0):
print(f"WARNING: out-of-range score {score} on task {i}; forcing floor", file=sys.stderr)
score = SCORE_FLOOR
details["final_score"] = strict_clamp(score)
graded.append((task_data, strict_clamp(score), details))
overall = strict_clamp(sum(score for _, score, _ in graded) / 3.0)
for i, (_, score, _) in enumerate(graded):
tid = TASK_IDS[i] if i < len(TASK_IDS) else f"task_{i}"
out_score = strict_clamp(score)
print(f"TASK_SCORE:{tid}:{out_score}")
print(f"OVERALL_SCORE:{overall}")
json_tasks = []
for i, (_, score, _) in enumerate(graded):
tid = TASK_IDS[i] if i < len(TASK_IDS) else f"task_{i}"
json_tasks.append({"task_id": tid, "score": strict_clamp(score)})
payload = {
"overall_score": strict_clamp(overall),
"tasks": json_tasks,
}
print(f"JSON_OUTPUT:{json.dumps(payload)}")
def main() -> None:
try:
_run()
except Exception as e:
print(f"WARNING: unhandled grader exception: {e}", file=sys.stderr)
fallback_payload = {
"overall_score": SCORE_FLOOR,
"tasks": [
{"task_id": "alpha", "score": SCORE_FLOOR},
{"task_id": "bravo", "score": SCORE_FLOOR},
{"task_id": "charlie", "score": SCORE_FLOOR},
],
}
print("TASK_SCORE:alpha:0.1")
print("TASK_SCORE:bravo:0.1")
print("TASK_SCORE:charlie:0.1")
print("OVERALL_SCORE:0.1")
print(f"JSON_OUTPUT:{json.dumps(fallback_payload)}")
finally:
sys.exit(0)
if __name__ == "__main__":
main()