File size: 5,139 Bytes
1c85a69
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
"""

Database models for insider threat detection system (MongoDB/Beanie)

"""
from typing import Optional, List, Dict, Any
from datetime import datetime
from beanie import Document, Link, PydanticObjectId
from pydantic import Field, EmailStr

class Employee(Document):
    """Employee document"""
    employee_id: str = Field(..., description="Unique employee ID string")
    name: str
    email: str = Field(..., description="Unique email")
    department: str
    role: str
    baseline_location: Optional[str] = None
    is_isolated: bool = False
    created_at: datetime = Field(default_factory=datetime.utcnow)
    
    class Settings:
        name = "employees"
        indexes = [
            "employee_id",
            "email",
            "name"
        ]

class BehavioralEvent(Document):
    """Behavioral event document"""
    employee_id: PydanticObjectId = Field(..., description="Reference to Employee ID")
    event_type: str  # login, file_access, network, firewall, privilege_escalation
    timestamp: datetime = Field(default_factory=datetime.utcnow)
    
    # Event details
    location: Optional[str] = None
    ip_address: Optional[str] = None
    port: Optional[int] = None
    file_path: Optional[str] = None
    action: Optional[str] = None # read, write, delete, execute
    success: bool = True
    
    # Additional metadata
    event_metadata: Optional[Dict[str, Any]] = None
    
    # System Metrics
    cpu_usage: float = 0.0
    memory_usage: float = 0.0
    
    class Settings:
        name = "behavioral_events"
        indexes = [
            "employee_id",
            "event_type",
            "timestamp"
        ]

class BehavioralFingerprint(Document):
    """Behavioral fingerprint document (baseline)"""
    employee_id: PydanticObjectId = Field(..., description="Reference to Employee ID")
    computed_at: datetime = Field(default_factory=datetime.utcnow)
    
    # Behavioral features (baseline)
    avg_login_hour: float
    login_hour_std: float
    unique_locations_count: int
    avg_location_distance: float
    unique_ports_count: int
    avg_port_number: float
    file_access_rate: float  # files per day
    sensitive_file_access_rate: float
    privilege_escalation_rate: float  # sudo attempts per day
    firewall_change_rate: float  # changes per week
    network_activity_volume: float  # MB per day
    failed_login_rate: float
    
    # Time-based patterns
    weekday_activity_ratio: float  # weekday vs weekend
    night_activity_ratio: float  # night (10pm-6am) vs day
    
    class Settings:
        name = "behavioral_fingerprints"
        indexes = [
            "employee_id",
            "computed_at"
        ]

class Anomaly(Document):
    """Anomaly document"""
    employee_id: PydanticObjectId = Field(..., description="Reference to Employee ID")
    detected_at: datetime = Field(default_factory=datetime.utcnow)
    
    # Anomaly details
    anomaly_score: float  # -1 to 1 (Isolation Forest score)
    risk_level: str  # low, medium, high, critical
    risk_score: int  # 0-100
    
    # Event that triggered anomaly
    trigger_event_id: Optional[PydanticObjectId] = None
    
    # Anomaly description
    description: str
    anomaly_type: str  # unusual_login, unusual_location, unusual_port, etc.
    
    # SHAP explanation
    shap_values: Optional[Dict[str, float]] = None # Feature contributions
    top_features: Optional[List[Dict[str, Any]]] = None # Top contributing features
    
    # Status
    status: str = "open"  # open, investigating, resolved, false_positive
    resolved_at: Optional[datetime] = None
    resolved_by: Optional[str] = None
    resolution_notes: Optional[str] = None
    
    class Settings:
        name = "anomalies"
        indexes = [
            "employee_id",
            "detected_at",
            "risk_level",
            "status"
        ]

class MitreMapping(Document):
    """MITRE ATT&CK mapping document"""
    anomaly_id: PydanticObjectId = Field(..., description="Reference to Anomaly ID")
    
    # MITRE ATT&CK details
    technique_id: str  # e.g., T1078
    technique_name: str
    tactic: str  # e.g., Initial Access, Privilege Escalation
    description: str
    confidence: float  # 0-1
    
    class Settings:
        name = "mitre_mappings"
        indexes = [
            "anomaly_id",
            "technique_id"
        ]

class MitigationStrategy(Document):
    """Mitigation strategy document"""
    anomaly_id: PydanticObjectId = Field(..., description="Reference to Anomaly ID")
    
    # Strategy details
    priority: int  # 1 (highest) to 5 (lowest)
    category: str  # immediate, short_term, long_term
    action: str
    description: str
    
    # Status
    implemented: bool = False
    implemented_at: Optional[datetime] = None
    implemented_by: Optional[str] = None
    
    class Settings:
        name = "mitigation_strategies"
        indexes = [
            "anomaly_id",
            "priority"
        ]