File size: 4,496 Bytes
f93577c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""api/models.py β€” Pydantic contracts matching schema_contract.json v3"""

from pydantic import BaseModel, Field
from typing import Any, Dict, List, Optional


# ── Inbound ───────────────────────────────────────────────────

class WorkerActionIn(BaseModel):
    intent:      str
    raw_payload: str


class InboundMessage(BaseModel):
    domain: str
    action: WorkerActionIn
    actor: str = "unknown"
    session_id: str = "default"
    service: str = ""
    environment: str = "production"
    provided_evidence: List[str] = Field(default_factory=list)


# ── Outbound sub-models ───────────────────────────────────────

class WorkerActionOut(BaseModel):
    intent:       str
    raw_payload:  str
    is_malicious: bool


class SupervisorDecision(BaseModel):
    action_taken:               str        # ALLOW | BLOCK | FORK | QUARANTINE
    risk_vector:                List[float] = Field(..., min_length=16, max_length=16)  # 16-dim risk feature vector
    ambiguity_score:            float  # [0,1] β€” how close to 0.5 risk midpoint
    quarantine_steps_remaining: int    # 0 if no active hold
    decision:                   Optional[str] = None
    confidence:                 Optional[float] = None
    uncertainty:                Optional[float] = None
    risk_score:                 Optional[float] = None
    cumulative_risk_score:      Optional[float] = None
    missing_evidence:           List[str] = Field(default_factory=list)
    required_evidence:          List[str] = Field(default_factory=list)
    explanation:                Optional[str] = None
    safe_outcome:               Optional[str] = None
    policy_name:                Optional[str] = None
    domain:                     Optional[str] = None
    mitre_tactic:               Optional[str] = None
    mitre_technique:            Optional[str] = None
    evidence_plan:              List[Dict[str, Any]] = Field(default_factory=list)
    structured_safe_outcome:    Dict[str, Any] = Field(default_factory=dict)
    decision_trace:             Dict[str, Any] = Field(default_factory=dict)
    memory_context:             Dict[str, Any] = Field(default_factory=dict)
    cumulative_risk_reason:     Optional[str] = None
    risk_indicators:            List[str] = Field(default_factory=list)
    safe_indicators:            List[str] = Field(default_factory=list)


class EnvironmentState(BaseModel):
    is_shadow_active: bool
    domain_data:      Dict[str, Any]


class MitreInfo(BaseModel):
    tactic:         str
    technique_id:   str
    technique_name: str
    confidence:     float


class IncidentReport(BaseModel):
    report_id:       str
    timestamp:       str
    domain:          str
    intent:          str
    severity:        str
    confidence:      float
    mitre:           MitreInfo
    blast_radius:    Dict[str, Any]
    cloudtrail:      List[Dict[str, Any]]
    recommendation:  str
    payload_snippet: str


class ForensicEvent(BaseModel):
    domain:    str
    intent:    str
    payload:   str
    step:      int
    timestamp: str


class QuarantineHoldStatus(BaseModel):
    """Per-domain quarantine hold observable state β€” sent to frontend every step."""
    active:             bool
    steps_remaining:    int          # 0-3
    context_signals:    List[Dict[str, Any]]  # signals emitted during hold
    latest_signal:      Optional[Dict[str, Any]] = None


class DomainQuarantineStatus(BaseModel):
    """Combines domain-level auto-quarantine + action-level hold."""
    domain_quarantined:    bool   # auto-quarantine after 3 consecutive hits
    hold_active:           bool   # QUARANTINE action hold in progress
    hold_steps_remaining:  int    # 0 if no hold


class OutboundMessage(BaseModel):
    domain:              str
    worker_action:       WorkerActionOut
    supervisor_decision: SupervisorDecision
    environment_state:   EnvironmentState
    health_scores:       Dict[str, float]
    # quarantine_status: one entry per domain with full observable state
    quarantine_status:   Dict[str, Any]
    # quarantine_hold: only populated when decision == QUARANTINE
    quarantine_hold:     Optional[QuarantineHoldStatus] = None
    forensic_log:        List[Dict[str, Any]] = Field(default_factory=list)
    incident_report:     Optional[IncidentReport] = None