opensoc-env / rubric.py
shivam2k3's picture
OpenSOC v1
bb6a031
"""
rubric.py — Layered, composable reward for OpenSOC.
The reward is the task definition. This module exposes two pure functions
the env calls at episode end:
* `score_defender(action, ground_truth, triggering_log_id, cited_log_id)`
* `score_attacker(plausible, defender_correct, novelty)`
Each returns ``(reward, breakdown)``. Both functions are pure and easy to
unit-test (see `tests/test_rubric.py`). The numerical ranges deliberately
match `openenv.yaml`'s declared `[-1.5, 1.1]` reward range so the manifest
stays a faithful description of behaviour.
Reward design choices
---------------------
1. We grade `dismiss-on-malicious` (missed malicious) much more harshly than
over-reaction; in real SOC workflows missing a real attacker has a much
larger blast radius than briefly quarantining a benign host.
2. Containment over-reaction (`block_ip` / `quarantine_host` on benign) is
penalized but not catastrophically — these are reversible operationally.
3. The `+0.1` cited-log-id bonus is a structural verifier: a defender that
guesses the right action without citing the right event gets less than a
defender that cites the right event, which discourages "always pick the
most aggressive action" shortcuts.
4. Attacker reward is gated on plausibility, not on how angry the narrative
sounds. This is what neutralizes the classic self-play reward hack.
"""
from __future__ import annotations
from typing import Dict, Tuple
from schema import (
ACTION_COST,
CONTAINMENT_ACTIONS,
TriageAction,
)
# ---------------------------------------------------------------------------
# Defender scoring
# ---------------------------------------------------------------------------
def score_defender(
action: TriageAction,
ground_truth: TriageAction,
triggering_log_id: str,
cited_log_id: str,
) -> Tuple[float, Dict[str, float]]:
"""Return ``(reward, breakdown)`` for a defender's submit_triage."""
breakdown: Dict[str, float] = {}
reward = 0.0
if action is ground_truth:
reward += 1.0
breakdown["correct_action"] = 1.0
else:
# Missed-malicious is by far the worst error class.
if action is TriageAction.DISMISS and ground_truth is not TriageAction.DISMISS:
reward -= 1.0
breakdown["missed_malicious"] = -1.0
elif (
action in CONTAINMENT_ACTIONS
and ground_truth in (TriageAction.DISMISS, TriageAction.MONITOR)
):
reward -= 0.3
breakdown["over_react_containment"] = -0.3
elif (
action is TriageAction.ESCALATE
and ACTION_COST[ground_truth] < ACTION_COST[TriageAction.ESCALATE]
):
reward -= 0.05
breakdown["unnecessary_escalation"] = -0.05
else:
# Any other mismatch (e.g. monitor-when-block-ip-was-needed) is
# graded as a partial-credit miss using the cost gap.
cost_gap = abs(ACTION_COST[action] - ACTION_COST[ground_truth])
penalty = -0.2 * cost_gap
reward += penalty
breakdown["cost_gap_miss"] = penalty
if cited_log_id == triggering_log_id:
reward += 0.1
breakdown["correct_citation_bonus"] = 0.1
return reward, breakdown
# ---------------------------------------------------------------------------
# Attacker scoring
# ---------------------------------------------------------------------------
def score_attacker(
*,
plausible: bool,
schema_violation: bool,
defender_correct: bool,
novelty: float = 0.0,
) -> Tuple[float, Dict[str, float]]:
"""Return ``(reward, breakdown)`` for an attacker's craft_incident.
Args:
plausible: Did the env's plausibility checker accept the incident?
schema_violation: Did pydantic / model_validator reject the
attacker's params (e.g. duplicate log ids, bad timestamps)? When
true, `plausible` should be False.
defender_correct: Did the defender pick the env-computed ground-truth
label? The attacker is rewarded for fooling the defender.
novelty: Optional [0, 1] score for how rare this incident's feature
combination is in the recent rollout batch (drives curriculum).
"""
breakdown: Dict[str, float] = {}
reward = 0.0
if schema_violation:
breakdown["schema_violation"] = -0.5
return -0.5, breakdown
if not plausible:
# Gibberish that satisfies pydantic but fails plausibility.
breakdown["implausible"] = 0.0
return 0.0, breakdown
if not defender_correct:
reward += 1.0
breakdown["fooled_defender"] = 1.0
if novelty > 0.0:
bonus = 0.2 * max(0.0, min(1.0, novelty))
reward += bonus
breakdown["novelty_bonus"] = bonus
return reward, breakdown
__all__ = ["score_defender", "score_attacker"]