Spaces:
Sleeping
Sleeping
| """ | |
| Deterministic graders for ForensicShell tasks. | |
| Each grader takes a submitted ForensicReport (as dict) and the scenario ground-truth | |
| dict and returns a float in [0.0, 1.0]. Partial credit is awarded per correct subfield | |
| so the reward function has meaningful gradient, not just 0/1. | |
| Design choices: | |
| - modified_files uses F0.5 (precision-weighted) instead of Jaccard: submitting | |
| false-positive files (claiming an unmodified file was attacked) is penalized | |
| more than missing a file. This mirrors real forensics where false positives | |
| waste incident response effort. | |
| - Timeline scoring is multiplicative (phase_F1 * ordering): having all 5 phases | |
| in the wrong order scores 0, not ~0.30. Correct phases AND correct order | |
| required for full credit. | |
| """ | |
| from typing import Dict, List | |
| def _safe_str(x) -> str: | |
| return (x or "").strip().lower() if isinstance(x, str) else "" | |
| def _fbeta(pred: List[str], truth: List[str], beta: float = 0.5) -> float: | |
| """ | |
| F-beta score over string sets. beta < 1 weighs precision more than recall. | |
| F0.5 penalizes false positives (extra wrong files) 2x harder than false | |
| negatives (missing files), matching real forensic triage priorities. | |
| """ | |
| pred_set = {s.strip() for s in pred if isinstance(s, str) and s.strip()} | |
| truth_set = {s.strip() for s in truth if isinstance(s, str) and s.strip()} | |
| if not pred_set and not truth_set: | |
| return 1.0 | |
| if not pred_set or not truth_set: | |
| return 0.0 | |
| tp = len(pred_set & truth_set) | |
| precision = tp / len(pred_set) | |
| recall = tp / len(truth_set) | |
| if precision + recall == 0: | |
| return 0.0 | |
| beta2 = beta * beta | |
| return (1 + beta2) * precision * recall / (beta2 * precision + recall) | |
| def _kendall_tau_normalized(pred_order: List[str], true_order: List[str]) -> float: | |
| """ | |
| Normalized Kendall-tau in [0, 1] where 1.0 == identical ordering restricted to the | |
| overlap set. If fewer than 2 shared phases, returns 1.0 (nothing to order). | |
| """ | |
| overlap = [p for p in pred_order if p in true_order] | |
| # Keep only first occurrence of each overlap item in prediction | |
| seen = set() | |
| pred_overlap: List[str] = [] | |
| for p in overlap: | |
| if p not in seen: | |
| pred_overlap.append(p) | |
| seen.add(p) | |
| true_overlap = [p for p in true_order if p in seen] | |
| n = len(pred_overlap) | |
| if n < 2: | |
| return 1.0 | |
| true_rank = {p: i for i, p in enumerate(true_overlap)} | |
| concordant = 0 | |
| discordant = 0 | |
| for i in range(n): | |
| for j in range(i + 1, n): | |
| a = true_rank[pred_overlap[i]] | |
| b = true_rank[pred_overlap[j]] | |
| if a < b: | |
| concordant += 1 | |
| elif a > b: | |
| discordant += 1 | |
| total = concordant + discordant | |
| if total == 0: | |
| return 1.0 | |
| tau = (concordant - discordant) / total # in [-1, 1] | |
| return (tau + 1.0) / 2.0 # normalize to [0, 1] | |
| def _grade_t1_login(report: Dict, truth: Dict) -> float: | |
| user_ok = 1.0 if _safe_str(report.get("compromised_user")) == _safe_str(truth.get("compromised_user")) else 0.0 | |
| ip_ok = 1.0 if _safe_str(report.get("initial_ip")) == _safe_str(truth.get("initial_ip")) else 0.0 | |
| return 0.5 * user_ok + 0.5 * ip_ok | |
| def _grade_t2_modified(report: Dict, truth: Dict) -> float: | |
| user_ok = 1.0 if _safe_str(report.get("compromised_user")) == _safe_str(truth.get("compromised_user")) else 0.0 | |
| ip_ok = 1.0 if _safe_str(report.get("initial_ip")) == _safe_str(truth.get("initial_ip")) else 0.0 | |
| # F0.5: precision-weighted — false positives penalized harder than false negatives | |
| files_score = _fbeta(report.get("modified_files") or [], truth.get("modified_files") or [], beta=0.5) | |
| sha_ok = 1.0 if _safe_str(report.get("backdoor_sha256")) == _safe_str(truth.get("backdoor_sha256")) else 0.0 | |
| return 0.2 * user_ok + 0.2 * ip_ok + 0.3 * files_score + 0.3 * sha_ok | |
| def _grade_t3_timeline(report: Dict, truth: Dict) -> float: | |
| user_ok = 1.0 if _safe_str(report.get("compromised_user")) == _safe_str(truth.get("compromised_user")) else 0.0 | |
| ip_ok = 1.0 if _safe_str(report.get("initial_ip")) == _safe_str(truth.get("initial_ip")) else 0.0 | |
| # F0.5 for files (same precision-weighting as t2) | |
| files_score = _fbeta(report.get("modified_files") or [], truth.get("modified_files") or [], beta=0.5) | |
| sha_ok = 1.0 if _safe_str(report.get("backdoor_sha256")) == _safe_str(truth.get("backdoor_sha256")) else 0.0 | |
| pred_timeline = report.get("timeline") or [] | |
| true_timeline = truth.get("timeline") or [] | |
| pred_phases = [ | |
| (e.get("phase") if isinstance(e, dict) else getattr(e, "phase", None)) | |
| for e in pred_timeline | |
| ] | |
| pred_phases = [p for p in pred_phases if isinstance(p, str)] | |
| true_phases = [e["phase"] for e in true_timeline] | |
| # F1 over phase set (standard F1 — we don't precision-weight phases) | |
| pred_set = set(pred_phases) | |
| true_set = set(true_phases) | |
| if not pred_set and not true_set: | |
| phase_f1 = 1.0 | |
| elif not pred_set or not true_set: | |
| phase_f1 = 0.0 | |
| else: | |
| tp = len(pred_set & true_set) | |
| precision = tp / len(pred_set) | |
| recall = tp / len(true_set) | |
| phase_f1 = 0.0 if (precision + recall) == 0 else 2 * precision * recall / (precision + recall) | |
| # Ordering quality | |
| order_score = _kendall_tau_normalized(pred_phases, true_phases) | |
| # MULTIPLICATIVE timeline scoring: having all phases in wrong order gives | |
| # F1=1.0 * tau=0.0 = 0.0, not the ~0.30 an additive scheme would produce. | |
| # Correct phases AND correct order both required for full timeline credit. | |
| timeline_score = phase_f1 * order_score | |
| return ( | |
| 0.15 * user_ok | |
| + 0.15 * ip_ok | |
| + 0.15 * files_score | |
| + 0.15 * sha_ok | |
| + 0.40 * timeline_score | |
| ) | |
| GRADERS = { | |
| "t1_login": _grade_t1_login, | |
| "t2_modified": _grade_t2_modified, | |
| "t3_timeline": _grade_t3_timeline, | |
| } | |
| def _grade_generic(report: Dict, truth: Dict) -> float: | |
| """ | |
| Dispatcher for procedurally generated scenarios. Picks the right sub-grader | |
| by inspecting which fields are present in the ground-truth dict. | |
| """ | |
| if "timeline" in truth: | |
| return _grade_t3_timeline(report, truth) | |
| if "backdoor_sha256" in truth: | |
| return _grade_t2_modified(report, truth) | |
| return _grade_t1_login(report, truth) | |
| def grade(task_id: str, report: Dict, truth: Dict) -> float: | |
| """Dispatch to the right grader for this task. Returns float in [0.0, 1.0].""" | |
| if task_id and task_id.startswith("gen_"): | |
| fn = _grade_generic | |
| else: | |
| fn = GRADERS.get(task_id) | |
| if fn is None: | |
| return 0.0 | |
| score = fn(report or {}, truth or {}) | |
| if score < 0.0: | |
| return 0.0 | |
| if score > 1.0: | |
| return 1.0 | |
| return float(score) | |