| """Golden-case tests for `schema.py`. |
| |
| Run with: pytest tests/test_schema.py -v |
| """ |
| from __future__ import annotations |
|
|
| import os |
| import sys |
|
|
| import pytest |
| from pydantic import ValidationError |
|
|
| sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) |
|
|
| from schema import ( |
| Action, |
| Alert, |
| CraftIncident, |
| Event, |
| EventType, |
| IncidentCategory, |
| IncidentParams, |
| SubmitTriage, |
| TriageAction, |
| is_internal_ip, |
| make_event, |
| make_log_id, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| class TestLogIdFormat: |
| def test_make_log_id_canonical(self): |
| assert make_log_id(2, 5) == "L2-5" |
|
|
| def test_event_rejects_bad_log_id(self): |
| with pytest.raises(ValidationError): |
| Event( |
| log_id="bad-id", |
| timestamp="2026-04-25T10:00:00Z", |
| event_type=EventType.PROC_START, |
| ) |
|
|
| def test_event_rejects_bad_timestamp(self): |
| with pytest.raises(ValidationError): |
| Event( |
| log_id="L1-1", |
| timestamp="2026-04-25 10:00:00", |
| event_type=EventType.PROC_START, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| class TestIncidentParams: |
| def _events(self, n: int = 2): |
| return [ |
| make_event(1, i, EventType.AUTH_LOGIN_FAILURE, f"2026-04-25T10:00:0{i}Z") |
| for i in range(n) |
| ] |
|
|
| def test_accepts_valid_params(self): |
| params = IncidentParams( |
| target_label=TriageAction.MONITOR, |
| category=IncidentCategory.BRUTE_FORCE, |
| events=self._events(3), |
| ) |
| assert len(params.events) == 3 |
|
|
| def test_rejects_empty_events(self): |
| with pytest.raises(ValidationError): |
| IncidentParams( |
| target_label=TriageAction.DISMISS, |
| category=IncidentCategory.BENIGN_NOISE, |
| events=[], |
| ) |
|
|
| def test_rejects_duplicate_log_ids(self): |
| evts = self._events(2) |
| evts[1] = Event( |
| log_id=evts[0].log_id, |
| timestamp="2026-04-25T10:00:05Z", |
| event_type=EventType.AUTH_LOGIN_FAILURE, |
| ) |
| with pytest.raises(ValidationError): |
| IncidentParams( |
| target_label=TriageAction.MONITOR, |
| category=IncidentCategory.BRUTE_FORCE, |
| events=evts, |
| ) |
|
|
| def test_rejects_non_monotonic_timestamps(self): |
| evts = self._events(2) |
| |
| evts[0], evts[1] = evts[1], evts[0] |
| with pytest.raises(ValidationError): |
| IncidentParams( |
| target_label=TriageAction.MONITOR, |
| category=IncidentCategory.BRUTE_FORCE, |
| events=evts, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| class TestActionUnion: |
| def test_attacker_action_round_trip(self): |
| a = Action( |
| craft_incident=CraftIncident( |
| target_label=TriageAction.BLOCK_IP, |
| category=IncidentCategory.C2_BEACON, |
| events=[ |
| make_event(1, 0, EventType.NET_BEACON, "2026-04-25T10:00:00Z"), |
| ], |
| ) |
| ) |
| assert a.submit_triage is None |
| assert a.craft_incident.target_label is TriageAction.BLOCK_IP |
|
|
| def test_defender_action_round_trip(self): |
| a = Action( |
| submit_triage=SubmitTriage( |
| action=TriageAction.QUARANTINE_HOST, |
| cited_log_id="L1-3", |
| rationale="LOLBin spawned by Outlook.", |
| ) |
| ) |
| assert a.craft_incident is None |
| assert a.submit_triage.action is TriageAction.QUARANTINE_HOST |
|
|
| def test_defender_rejects_bad_cited_log_id(self): |
| with pytest.raises(ValidationError): |
| SubmitTriage(action=TriageAction.DISMISS, cited_log_id="oops") |
|
|
|
|
| |
| |
| |
|
|
| class TestAlert: |
| def test_severity_must_be_known(self): |
| with pytest.raises(ValidationError): |
| Alert( |
| alert_id="A1", |
| category=IncidentCategory.PHISHING, |
| severity="catastrophic", |
| summary="x", |
| ) |
|
|
| def test_known_severity_ok(self): |
| a = Alert( |
| alert_id="A1", |
| category=IncidentCategory.PHISHING, |
| severity="high", |
| summary="suspicious email opened", |
| ) |
| assert a.severity == "high" |
|
|
|
|
| |
| |
| |
|
|
| class TestInternalIP: |
| @pytest.mark.parametrize("ip,expected", [ |
| ("10.0.0.1", True), |
| ("172.16.5.4", True), |
| ("192.168.1.20", True), |
| ("8.8.8.8", False), |
| ("203.0.113.5", False), |
| ("not-an-ip", False), |
| ]) |
| def test_classifies(self, ip, expected): |
| assert is_internal_ip(ip) is expected |
|
|