| """ |
| rubric.py — Layered, composable reward for OpenSOC. |
| |
| The reward is the task definition. This module exposes two pure functions |
| the env calls at episode end: |
| |
| * `score_defender(action, ground_truth, triggering_log_id, cited_log_id)` |
| * `score_attacker(plausible, defender_correct, novelty)` |
| |
| Each returns ``(reward, breakdown)``. Both functions are pure and easy to |
| unit-test (see `tests/test_rubric.py`). The numerical ranges deliberately |
| match `openenv.yaml`'s declared `[-1.5, 1.1]` reward range so the manifest |
| stays a faithful description of behaviour. |
| |
| Reward design choices |
| --------------------- |
| 1. We grade `dismiss-on-malicious` (missed malicious) much more harshly than |
| over-reaction; in real SOC workflows missing a real attacker has a much |
| larger blast radius than briefly quarantining a benign host. |
| 2. Containment over-reaction (`block_ip` / `quarantine_host` on benign) is |
| penalized but not catastrophically — these are reversible operationally. |
| 3. The `+0.1` cited-log-id bonus is a structural verifier: a defender that |
| guesses the right action without citing the right event gets less than a |
| defender that cites the right event, which discourages "always pick the |
| most aggressive action" shortcuts. |
| 4. Attacker reward is gated on plausibility, not on how angry the narrative |
| sounds. This is what neutralizes the classic self-play reward hack. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from typing import Dict, Tuple |
|
|
| from schema import ( |
| ACTION_COST, |
| CONTAINMENT_ACTIONS, |
| TriageAction, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| def score_defender( |
| action: TriageAction, |
| ground_truth: TriageAction, |
| triggering_log_id: str, |
| cited_log_id: str, |
| ) -> Tuple[float, Dict[str, float]]: |
| """Return ``(reward, breakdown)`` for a defender's submit_triage.""" |
| breakdown: Dict[str, float] = {} |
| reward = 0.0 |
|
|
| if action is ground_truth: |
| reward += 1.0 |
| breakdown["correct_action"] = 1.0 |
| else: |
| |
| if action is TriageAction.DISMISS and ground_truth is not TriageAction.DISMISS: |
| reward -= 1.0 |
| breakdown["missed_malicious"] = -1.0 |
| elif ( |
| action in CONTAINMENT_ACTIONS |
| and ground_truth in (TriageAction.DISMISS, TriageAction.MONITOR) |
| ): |
| reward -= 0.3 |
| breakdown["over_react_containment"] = -0.3 |
| elif ( |
| action is TriageAction.ESCALATE |
| and ACTION_COST[ground_truth] < ACTION_COST[TriageAction.ESCALATE] |
| ): |
| reward -= 0.05 |
| breakdown["unnecessary_escalation"] = -0.05 |
| else: |
| |
| |
| cost_gap = abs(ACTION_COST[action] - ACTION_COST[ground_truth]) |
| penalty = -0.2 * cost_gap |
| reward += penalty |
| breakdown["cost_gap_miss"] = penalty |
|
|
| if cited_log_id == triggering_log_id: |
| reward += 0.1 |
| breakdown["correct_citation_bonus"] = 0.1 |
|
|
| return reward, breakdown |
|
|
|
|
| |
| |
| |
|
|
| def score_attacker( |
| *, |
| plausible: bool, |
| schema_violation: bool, |
| defender_correct: bool, |
| novelty: float = 0.0, |
| ) -> Tuple[float, Dict[str, float]]: |
| """Return ``(reward, breakdown)`` for an attacker's craft_incident. |
| |
| Args: |
| plausible: Did the env's plausibility checker accept the incident? |
| schema_violation: Did pydantic / model_validator reject the |
| attacker's params (e.g. duplicate log ids, bad timestamps)? When |
| true, `plausible` should be False. |
| defender_correct: Did the defender pick the env-computed ground-truth |
| label? The attacker is rewarded for fooling the defender. |
| novelty: Optional [0, 1] score for how rare this incident's feature |
| combination is in the recent rollout batch (drives curriculum). |
| """ |
| breakdown: Dict[str, float] = {} |
| reward = 0.0 |
|
|
| if schema_violation: |
| breakdown["schema_violation"] = -0.5 |
| return -0.5, breakdown |
|
|
| if not plausible: |
| |
| breakdown["implausible"] = 0.0 |
| return 0.0, breakdown |
|
|
| if not defender_correct: |
| reward += 1.0 |
| breakdown["fooled_defender"] = 1.0 |
|
|
| if novelty > 0.0: |
| bonus = 0.2 * max(0.0, min(1.0, novelty)) |
| reward += bonus |
| breakdown["novelty_bonus"] = bonus |
|
|
| return reward, breakdown |
|
|
|
|
| __all__ = ["score_defender", "score_attacker"] |
|
|