Spaces:
Sleeping
Sleeping
File size: 5,139 Bytes
1c85a69 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 | """
Database models for insider threat detection system (MongoDB/Beanie)
"""
from typing import Optional, List, Dict, Any
from datetime import datetime
from beanie import Document, Link, PydanticObjectId
from pydantic import Field, EmailStr
class Employee(Document):
"""Employee document"""
employee_id: str = Field(..., description="Unique employee ID string")
name: str
email: str = Field(..., description="Unique email")
department: str
role: str
baseline_location: Optional[str] = None
is_isolated: bool = False
created_at: datetime = Field(default_factory=datetime.utcnow)
class Settings:
name = "employees"
indexes = [
"employee_id",
"email",
"name"
]
class BehavioralEvent(Document):
"""Behavioral event document"""
employee_id: PydanticObjectId = Field(..., description="Reference to Employee ID")
event_type: str # login, file_access, network, firewall, privilege_escalation
timestamp: datetime = Field(default_factory=datetime.utcnow)
# Event details
location: Optional[str] = None
ip_address: Optional[str] = None
port: Optional[int] = None
file_path: Optional[str] = None
action: Optional[str] = None # read, write, delete, execute
success: bool = True
# Additional metadata
event_metadata: Optional[Dict[str, Any]] = None
# System Metrics
cpu_usage: float = 0.0
memory_usage: float = 0.0
class Settings:
name = "behavioral_events"
indexes = [
"employee_id",
"event_type",
"timestamp"
]
class BehavioralFingerprint(Document):
"""Behavioral fingerprint document (baseline)"""
employee_id: PydanticObjectId = Field(..., description="Reference to Employee ID")
computed_at: datetime = Field(default_factory=datetime.utcnow)
# Behavioral features (baseline)
avg_login_hour: float
login_hour_std: float
unique_locations_count: int
avg_location_distance: float
unique_ports_count: int
avg_port_number: float
file_access_rate: float # files per day
sensitive_file_access_rate: float
privilege_escalation_rate: float # sudo attempts per day
firewall_change_rate: float # changes per week
network_activity_volume: float # MB per day
failed_login_rate: float
# Time-based patterns
weekday_activity_ratio: float # weekday vs weekend
night_activity_ratio: float # night (10pm-6am) vs day
class Settings:
name = "behavioral_fingerprints"
indexes = [
"employee_id",
"computed_at"
]
class Anomaly(Document):
"""Anomaly document"""
employee_id: PydanticObjectId = Field(..., description="Reference to Employee ID")
detected_at: datetime = Field(default_factory=datetime.utcnow)
# Anomaly details
anomaly_score: float # -1 to 1 (Isolation Forest score)
risk_level: str # low, medium, high, critical
risk_score: int # 0-100
# Event that triggered anomaly
trigger_event_id: Optional[PydanticObjectId] = None
# Anomaly description
description: str
anomaly_type: str # unusual_login, unusual_location, unusual_port, etc.
# SHAP explanation
shap_values: Optional[Dict[str, float]] = None # Feature contributions
top_features: Optional[List[Dict[str, Any]]] = None # Top contributing features
# Status
status: str = "open" # open, investigating, resolved, false_positive
resolved_at: Optional[datetime] = None
resolved_by: Optional[str] = None
resolution_notes: Optional[str] = None
class Settings:
name = "anomalies"
indexes = [
"employee_id",
"detected_at",
"risk_level",
"status"
]
class MitreMapping(Document):
"""MITRE ATT&CK mapping document"""
anomaly_id: PydanticObjectId = Field(..., description="Reference to Anomaly ID")
# MITRE ATT&CK details
technique_id: str # e.g., T1078
technique_name: str
tactic: str # e.g., Initial Access, Privilege Escalation
description: str
confidence: float # 0-1
class Settings:
name = "mitre_mappings"
indexes = [
"anomaly_id",
"technique_id"
]
class MitigationStrategy(Document):
"""Mitigation strategy document"""
anomaly_id: PydanticObjectId = Field(..., description="Reference to Anomaly ID")
# Strategy details
priority: int # 1 (highest) to 5 (lowest)
category: str # immediate, short_term, long_term
action: str
description: str
# Status
implemented: bool = False
implemented_at: Optional[datetime] = None
implemented_by: Optional[str] = None
class Settings:
name = "mitigation_strategies"
indexes = [
"anomaly_id",
"priority"
]
|