Spaces:
Sleeping
Sleeping
| """Golden-case tests for `schema.py`. | |
| Run with: pytest tests/test_schema.py -v | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import sys | |
| import pytest | |
| from pydantic import ValidationError | |
| sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) | |
| from schema import ( # noqa: E402 | |
| Action, | |
| Alert, | |
| CraftIncident, | |
| Event, | |
| EventType, | |
| IncidentCategory, | |
| IncidentParams, | |
| SubmitTriage, | |
| TriageAction, | |
| is_internal_ip, | |
| make_event, | |
| make_log_id, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Log id / timestamp formats | |
| # --------------------------------------------------------------------------- | |
| class TestLogIdFormat: | |
| def test_make_log_id_canonical(self): | |
| assert make_log_id(2, 5) == "L2-5" | |
| def test_event_rejects_bad_log_id(self): | |
| with pytest.raises(ValidationError): | |
| Event( | |
| log_id="bad-id", | |
| timestamp="2026-04-25T10:00:00Z", | |
| event_type=EventType.PROC_START, | |
| ) | |
| def test_event_rejects_bad_timestamp(self): | |
| with pytest.raises(ValidationError): | |
| Event( | |
| log_id="L1-1", | |
| timestamp="2026-04-25 10:00:00", | |
| event_type=EventType.PROC_START, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # IncidentParams validation | |
| # --------------------------------------------------------------------------- | |
| class TestIncidentParams: | |
| def _events(self, n: int = 2): | |
| return [ | |
| make_event(1, i, EventType.AUTH_LOGIN_FAILURE, f"2026-04-25T10:00:0{i}Z") | |
| for i in range(n) | |
| ] | |
| def test_accepts_valid_params(self): | |
| params = IncidentParams( | |
| target_label=TriageAction.MONITOR, | |
| category=IncidentCategory.BRUTE_FORCE, | |
| events=self._events(3), | |
| ) | |
| assert len(params.events) == 3 | |
| def test_rejects_empty_events(self): | |
| with pytest.raises(ValidationError): | |
| IncidentParams( | |
| target_label=TriageAction.DISMISS, | |
| category=IncidentCategory.BENIGN_NOISE, | |
| events=[], | |
| ) | |
| def test_rejects_duplicate_log_ids(self): | |
| evts = self._events(2) | |
| evts[1] = Event( | |
| log_id=evts[0].log_id, | |
| timestamp="2026-04-25T10:00:05Z", | |
| event_type=EventType.AUTH_LOGIN_FAILURE, | |
| ) | |
| with pytest.raises(ValidationError): | |
| IncidentParams( | |
| target_label=TriageAction.MONITOR, | |
| category=IncidentCategory.BRUTE_FORCE, | |
| events=evts, | |
| ) | |
| def test_rejects_non_monotonic_timestamps(self): | |
| evts = self._events(2) | |
| # Swap to make non-monotonic | |
| evts[0], evts[1] = evts[1], evts[0] | |
| with pytest.raises(ValidationError): | |
| IncidentParams( | |
| target_label=TriageAction.MONITOR, | |
| category=IncidentCategory.BRUTE_FORCE, | |
| events=evts, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Action union | |
| # --------------------------------------------------------------------------- | |
| class TestActionUnion: | |
| def test_attacker_action_round_trip(self): | |
| a = Action( | |
| craft_incident=CraftIncident( | |
| target_label=TriageAction.BLOCK_IP, | |
| category=IncidentCategory.C2_BEACON, | |
| events=[ | |
| make_event(1, 0, EventType.NET_BEACON, "2026-04-25T10:00:00Z"), | |
| ], | |
| ) | |
| ) | |
| assert a.submit_triage is None | |
| assert a.craft_incident.target_label is TriageAction.BLOCK_IP | |
| def test_defender_action_round_trip(self): | |
| a = Action( | |
| submit_triage=SubmitTriage( | |
| action=TriageAction.QUARANTINE_HOST, | |
| cited_log_id="L1-3", | |
| rationale="LOLBin spawned by Outlook.", | |
| ) | |
| ) | |
| assert a.craft_incident is None | |
| assert a.submit_triage.action is TriageAction.QUARANTINE_HOST | |
| def test_defender_rejects_bad_cited_log_id(self): | |
| with pytest.raises(ValidationError): | |
| SubmitTriage(action=TriageAction.DISMISS, cited_log_id="oops") | |
| # --------------------------------------------------------------------------- | |
| # Alert validation | |
| # --------------------------------------------------------------------------- | |
| class TestAlert: | |
| def test_severity_must_be_known(self): | |
| with pytest.raises(ValidationError): | |
| Alert( | |
| alert_id="A1", | |
| category=IncidentCategory.PHISHING, | |
| severity="catastrophic", # invalid | |
| summary="x", | |
| ) | |
| def test_known_severity_ok(self): | |
| a = Alert( | |
| alert_id="A1", | |
| category=IncidentCategory.PHISHING, | |
| severity="high", | |
| summary="suspicious email opened", | |
| ) | |
| assert a.severity == "high" | |
| # --------------------------------------------------------------------------- | |
| # Helper: is_internal_ip | |
| # --------------------------------------------------------------------------- | |
| class TestInternalIP: | |
| def test_classifies(self, ip, expected): | |
| assert is_internal_ip(ip) is expected | |