""" Deterministic graders for ForensicShell tasks. Each grader takes a submitted ForensicReport (as dict) and the scenario ground-truth dict and returns a float in [0.0, 1.0]. Partial credit is awarded per correct subfield so the reward function has meaningful gradient, not just 0/1. Design choices: - modified_files uses F0.5 (precision-weighted) instead of Jaccard: submitting false-positive files (claiming an unmodified file was attacked) is penalized more than missing a file. This mirrors real forensics where false positives waste incident response effort. - Timeline scoring is multiplicative (phase_F1 * ordering): having all 5 phases in the wrong order scores 0, not ~0.30. Correct phases AND correct order required for full credit. """ from typing import Dict, List def _safe_str(x) -> str: return (x or "").strip().lower() if isinstance(x, str) else "" def _fbeta(pred: List[str], truth: List[str], beta: float = 0.5) -> float: """ F-beta score over string sets. beta < 1 weighs precision more than recall. F0.5 penalizes false positives (extra wrong files) 2x harder than false negatives (missing files), matching real forensic triage priorities. """ pred_set = {s.strip() for s in pred if isinstance(s, str) and s.strip()} truth_set = {s.strip() for s in truth if isinstance(s, str) and s.strip()} if not pred_set and not truth_set: return 1.0 if not pred_set or not truth_set: return 0.0 tp = len(pred_set & truth_set) precision = tp / len(pred_set) recall = tp / len(truth_set) if precision + recall == 0: return 0.0 beta2 = beta * beta return (1 + beta2) * precision * recall / (beta2 * precision + recall) def _kendall_tau_normalized(pred_order: List[str], true_order: List[str]) -> float: """ Normalized Kendall-tau in [0, 1] where 1.0 == identical ordering restricted to the overlap set. If fewer than 2 shared phases, returns 1.0 (nothing to order). """ overlap = [p for p in pred_order if p in true_order] # Keep only first occurrence of each overlap item in prediction seen = set() pred_overlap: List[str] = [] for p in overlap: if p not in seen: pred_overlap.append(p) seen.add(p) true_overlap = [p for p in true_order if p in seen] n = len(pred_overlap) if n < 2: return 1.0 true_rank = {p: i for i, p in enumerate(true_overlap)} concordant = 0 discordant = 0 for i in range(n): for j in range(i + 1, n): a = true_rank[pred_overlap[i]] b = true_rank[pred_overlap[j]] if a < b: concordant += 1 elif a > b: discordant += 1 total = concordant + discordant if total == 0: return 1.0 tau = (concordant - discordant) / total # in [-1, 1] return (tau + 1.0) / 2.0 # normalize to [0, 1] def _grade_t1_login(report: Dict, truth: Dict) -> float: user_ok = 1.0 if _safe_str(report.get("compromised_user")) == _safe_str(truth.get("compromised_user")) else 0.0 ip_ok = 1.0 if _safe_str(report.get("initial_ip")) == _safe_str(truth.get("initial_ip")) else 0.0 return 0.5 * user_ok + 0.5 * ip_ok def _grade_t2_modified(report: Dict, truth: Dict) -> float: user_ok = 1.0 if _safe_str(report.get("compromised_user")) == _safe_str(truth.get("compromised_user")) else 0.0 ip_ok = 1.0 if _safe_str(report.get("initial_ip")) == _safe_str(truth.get("initial_ip")) else 0.0 # F0.5: precision-weighted — false positives penalized harder than false negatives files_score = _fbeta(report.get("modified_files") or [], truth.get("modified_files") or [], beta=0.5) sha_ok = 1.0 if _safe_str(report.get("backdoor_sha256")) == _safe_str(truth.get("backdoor_sha256")) else 0.0 return 0.2 * user_ok + 0.2 * ip_ok + 0.3 * files_score + 0.3 * sha_ok def _grade_t3_timeline(report: Dict, truth: Dict) -> float: user_ok = 1.0 if _safe_str(report.get("compromised_user")) == _safe_str(truth.get("compromised_user")) else 0.0 ip_ok = 1.0 if _safe_str(report.get("initial_ip")) == _safe_str(truth.get("initial_ip")) else 0.0 # F0.5 for files (same precision-weighting as t2) files_score = _fbeta(report.get("modified_files") or [], truth.get("modified_files") or [], beta=0.5) sha_ok = 1.0 if _safe_str(report.get("backdoor_sha256")) == _safe_str(truth.get("backdoor_sha256")) else 0.0 pred_timeline = report.get("timeline") or [] true_timeline = truth.get("timeline") or [] pred_phases = [ (e.get("phase") if isinstance(e, dict) else getattr(e, "phase", None)) for e in pred_timeline ] pred_phases = [p for p in pred_phases if isinstance(p, str)] true_phases = [e["phase"] for e in true_timeline] # F1 over phase set (standard F1 — we don't precision-weight phases) pred_set = set(pred_phases) true_set = set(true_phases) if not pred_set and not true_set: phase_f1 = 1.0 elif not pred_set or not true_set: phase_f1 = 0.0 else: tp = len(pred_set & true_set) precision = tp / len(pred_set) recall = tp / len(true_set) phase_f1 = 0.0 if (precision + recall) == 0 else 2 * precision * recall / (precision + recall) # Ordering quality order_score = _kendall_tau_normalized(pred_phases, true_phases) # MULTIPLICATIVE timeline scoring: having all phases in wrong order gives # F1=1.0 * tau=0.0 = 0.0, not the ~0.30 an additive scheme would produce. # Correct phases AND correct order both required for full timeline credit. timeline_score = phase_f1 * order_score return ( 0.15 * user_ok + 0.15 * ip_ok + 0.15 * files_score + 0.15 * sha_ok + 0.40 * timeline_score ) GRADERS = { "t1_login": _grade_t1_login, "t2_modified": _grade_t2_modified, "t3_timeline": _grade_t3_timeline, } def _grade_generic(report: Dict, truth: Dict) -> float: """ Dispatcher for procedurally generated scenarios. Picks the right sub-grader by inspecting which fields are present in the ground-truth dict. """ if "timeline" in truth: return _grade_t3_timeline(report, truth) if "backdoor_sha256" in truth: return _grade_t2_modified(report, truth) return _grade_t1_login(report, truth) def grade(task_id: str, report: Dict, truth: Dict) -> float: """Dispatch to the right grader for this task. Returns float in [0.0, 1.0].""" if task_id and task_id.startswith("gen_"): fn = _grade_generic else: fn = GRADERS.get(task_id) if fn is None: return 0.0 score = fn(report or {}, truth or {}) if score < 0.0: return 0.0 if score > 1.0: return 1.0 return float(score)