opensoc-env / tests /test_schema.py
shivam2k3's picture
OpenSOC v1
bb6a031
"""Golden-case tests for `schema.py`.
Run with: pytest tests/test_schema.py -v
"""
from __future__ import annotations
import os
import sys
import pytest
from pydantic import ValidationError
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from schema import ( # noqa: E402
Action,
Alert,
CraftIncident,
Event,
EventType,
IncidentCategory,
IncidentParams,
SubmitTriage,
TriageAction,
is_internal_ip,
make_event,
make_log_id,
)
# ---------------------------------------------------------------------------
# Log id / timestamp formats
# ---------------------------------------------------------------------------
class TestLogIdFormat:
def test_make_log_id_canonical(self):
assert make_log_id(2, 5) == "L2-5"
def test_event_rejects_bad_log_id(self):
with pytest.raises(ValidationError):
Event(
log_id="bad-id",
timestamp="2026-04-25T10:00:00Z",
event_type=EventType.PROC_START,
)
def test_event_rejects_bad_timestamp(self):
with pytest.raises(ValidationError):
Event(
log_id="L1-1",
timestamp="2026-04-25 10:00:00",
event_type=EventType.PROC_START,
)
# ---------------------------------------------------------------------------
# IncidentParams validation
# ---------------------------------------------------------------------------
class TestIncidentParams:
def _events(self, n: int = 2):
return [
make_event(1, i, EventType.AUTH_LOGIN_FAILURE, f"2026-04-25T10:00:0{i}Z")
for i in range(n)
]
def test_accepts_valid_params(self):
params = IncidentParams(
target_label=TriageAction.MONITOR,
category=IncidentCategory.BRUTE_FORCE,
events=self._events(3),
)
assert len(params.events) == 3
def test_rejects_empty_events(self):
with pytest.raises(ValidationError):
IncidentParams(
target_label=TriageAction.DISMISS,
category=IncidentCategory.BENIGN_NOISE,
events=[],
)
def test_rejects_duplicate_log_ids(self):
evts = self._events(2)
evts[1] = Event(
log_id=evts[0].log_id,
timestamp="2026-04-25T10:00:05Z",
event_type=EventType.AUTH_LOGIN_FAILURE,
)
with pytest.raises(ValidationError):
IncidentParams(
target_label=TriageAction.MONITOR,
category=IncidentCategory.BRUTE_FORCE,
events=evts,
)
def test_rejects_non_monotonic_timestamps(self):
evts = self._events(2)
# Swap to make non-monotonic
evts[0], evts[1] = evts[1], evts[0]
with pytest.raises(ValidationError):
IncidentParams(
target_label=TriageAction.MONITOR,
category=IncidentCategory.BRUTE_FORCE,
events=evts,
)
# ---------------------------------------------------------------------------
# Action union
# ---------------------------------------------------------------------------
class TestActionUnion:
def test_attacker_action_round_trip(self):
a = Action(
craft_incident=CraftIncident(
target_label=TriageAction.BLOCK_IP,
category=IncidentCategory.C2_BEACON,
events=[
make_event(1, 0, EventType.NET_BEACON, "2026-04-25T10:00:00Z"),
],
)
)
assert a.submit_triage is None
assert a.craft_incident.target_label is TriageAction.BLOCK_IP
def test_defender_action_round_trip(self):
a = Action(
submit_triage=SubmitTriage(
action=TriageAction.QUARANTINE_HOST,
cited_log_id="L1-3",
rationale="LOLBin spawned by Outlook.",
)
)
assert a.craft_incident is None
assert a.submit_triage.action is TriageAction.QUARANTINE_HOST
def test_defender_rejects_bad_cited_log_id(self):
with pytest.raises(ValidationError):
SubmitTriage(action=TriageAction.DISMISS, cited_log_id="oops")
# ---------------------------------------------------------------------------
# Alert validation
# ---------------------------------------------------------------------------
class TestAlert:
def test_severity_must_be_known(self):
with pytest.raises(ValidationError):
Alert(
alert_id="A1",
category=IncidentCategory.PHISHING,
severity="catastrophic", # invalid
summary="x",
)
def test_known_severity_ok(self):
a = Alert(
alert_id="A1",
category=IncidentCategory.PHISHING,
severity="high",
summary="suspicious email opened",
)
assert a.severity == "high"
# ---------------------------------------------------------------------------
# Helper: is_internal_ip
# ---------------------------------------------------------------------------
class TestInternalIP:
@pytest.mark.parametrize("ip,expected", [
("10.0.0.1", True),
("172.16.5.4", True),
("192.168.1.20", True),
("8.8.8.8", False),
("203.0.113.5", False),
("not-an-ip", False),
])
def test_classifies(self, ip, expected):
assert is_internal_ip(ip) is expected