""" rubric.py — Layered, composable reward for OpenSOC. The reward is the task definition. This module exposes two pure functions the env calls at episode end: * `score_defender(action, ground_truth, triggering_log_id, cited_log_id)` * `score_attacker(plausible, defender_correct, novelty)` Each returns ``(reward, breakdown)``. Both functions are pure and easy to unit-test (see `tests/test_rubric.py`). The numerical ranges deliberately match `openenv.yaml`'s declared `[-1.5, 1.1]` reward range so the manifest stays a faithful description of behaviour. Reward design choices --------------------- 1. We grade `dismiss-on-malicious` (missed malicious) much more harshly than over-reaction; in real SOC workflows missing a real attacker has a much larger blast radius than briefly quarantining a benign host. 2. Containment over-reaction (`block_ip` / `quarantine_host` on benign) is penalized but not catastrophically — these are reversible operationally. 3. The `+0.1` cited-log-id bonus is a structural verifier: a defender that guesses the right action without citing the right event gets less than a defender that cites the right event, which discourages "always pick the most aggressive action" shortcuts. 4. Attacker reward is gated on plausibility, not on how angry the narrative sounds. This is what neutralizes the classic self-play reward hack. """ from __future__ import annotations from typing import Dict, Tuple from schema import ( ACTION_COST, CONTAINMENT_ACTIONS, TriageAction, ) # --------------------------------------------------------------------------- # Defender scoring # --------------------------------------------------------------------------- def score_defender( action: TriageAction, ground_truth: TriageAction, triggering_log_id: str, cited_log_id: str, ) -> Tuple[float, Dict[str, float]]: """Return ``(reward, breakdown)`` for a defender's submit_triage.""" breakdown: Dict[str, float] = {} reward = 0.0 if action is ground_truth: reward += 1.0 breakdown["correct_action"] = 1.0 else: # Missed-malicious is by far the worst error class. if action is TriageAction.DISMISS and ground_truth is not TriageAction.DISMISS: reward -= 1.0 breakdown["missed_malicious"] = -1.0 elif ( action in CONTAINMENT_ACTIONS and ground_truth in (TriageAction.DISMISS, TriageAction.MONITOR) ): reward -= 0.3 breakdown["over_react_containment"] = -0.3 elif ( action is TriageAction.ESCALATE and ACTION_COST[ground_truth] < ACTION_COST[TriageAction.ESCALATE] ): reward -= 0.05 breakdown["unnecessary_escalation"] = -0.05 else: # Any other mismatch (e.g. monitor-when-block-ip-was-needed) is # graded as a partial-credit miss using the cost gap. cost_gap = abs(ACTION_COST[action] - ACTION_COST[ground_truth]) penalty = -0.2 * cost_gap reward += penalty breakdown["cost_gap_miss"] = penalty if cited_log_id == triggering_log_id: reward += 0.1 breakdown["correct_citation_bonus"] = 0.1 return reward, breakdown # --------------------------------------------------------------------------- # Attacker scoring # --------------------------------------------------------------------------- def score_attacker( *, plausible: bool, schema_violation: bool, defender_correct: bool, novelty: float = 0.0, ) -> Tuple[float, Dict[str, float]]: """Return ``(reward, breakdown)`` for an attacker's craft_incident. Args: plausible: Did the env's plausibility checker accept the incident? schema_violation: Did pydantic / model_validator reject the attacker's params (e.g. duplicate log ids, bad timestamps)? When true, `plausible` should be False. defender_correct: Did the defender pick the env-computed ground-truth label? The attacker is rewarded for fooling the defender. novelty: Optional [0, 1] score for how rare this incident's feature combination is in the recent rollout batch (drives curriculum). """ breakdown: Dict[str, float] = {} reward = 0.0 if schema_violation: breakdown["schema_violation"] = -0.5 return -0.5, breakdown if not plausible: # Gibberish that satisfies pydantic but fails plausibility. breakdown["implausible"] = 0.0 return 0.0, breakdown if not defender_correct: reward += 1.0 breakdown["fooled_defender"] = 1.0 if novelty > 0.0: bonus = 0.2 * max(0.0, min(1.0, novelty)) reward += bonus breakdown["novelty_bonus"] = bonus return reward, breakdown __all__ = ["score_defender", "score_attacker"]