sentinelback / models.py
paulstevemithun's picture
Upload 56 files
1c85a69 verified
"""
Database models for insider threat detection system (MongoDB/Beanie)
"""
from typing import Optional, List, Dict, Any
from datetime import datetime
from beanie import Document, Link, PydanticObjectId
from pydantic import Field, EmailStr
class Employee(Document):
"""Employee document"""
employee_id: str = Field(..., description="Unique employee ID string")
name: str
email: str = Field(..., description="Unique email")
department: str
role: str
baseline_location: Optional[str] = None
is_isolated: bool = False
created_at: datetime = Field(default_factory=datetime.utcnow)
class Settings:
name = "employees"
indexes = [
"employee_id",
"email",
"name"
]
class BehavioralEvent(Document):
"""Behavioral event document"""
employee_id: PydanticObjectId = Field(..., description="Reference to Employee ID")
event_type: str # login, file_access, network, firewall, privilege_escalation
timestamp: datetime = Field(default_factory=datetime.utcnow)
# Event details
location: Optional[str] = None
ip_address: Optional[str] = None
port: Optional[int] = None
file_path: Optional[str] = None
action: Optional[str] = None # read, write, delete, execute
success: bool = True
# Additional metadata
event_metadata: Optional[Dict[str, Any]] = None
# System Metrics
cpu_usage: float = 0.0
memory_usage: float = 0.0
class Settings:
name = "behavioral_events"
indexes = [
"employee_id",
"event_type",
"timestamp"
]
class BehavioralFingerprint(Document):
"""Behavioral fingerprint document (baseline)"""
employee_id: PydanticObjectId = Field(..., description="Reference to Employee ID")
computed_at: datetime = Field(default_factory=datetime.utcnow)
# Behavioral features (baseline)
avg_login_hour: float
login_hour_std: float
unique_locations_count: int
avg_location_distance: float
unique_ports_count: int
avg_port_number: float
file_access_rate: float # files per day
sensitive_file_access_rate: float
privilege_escalation_rate: float # sudo attempts per day
firewall_change_rate: float # changes per week
network_activity_volume: float # MB per day
failed_login_rate: float
# Time-based patterns
weekday_activity_ratio: float # weekday vs weekend
night_activity_ratio: float # night (10pm-6am) vs day
class Settings:
name = "behavioral_fingerprints"
indexes = [
"employee_id",
"computed_at"
]
class Anomaly(Document):
"""Anomaly document"""
employee_id: PydanticObjectId = Field(..., description="Reference to Employee ID")
detected_at: datetime = Field(default_factory=datetime.utcnow)
# Anomaly details
anomaly_score: float # -1 to 1 (Isolation Forest score)
risk_level: str # low, medium, high, critical
risk_score: int # 0-100
# Event that triggered anomaly
trigger_event_id: Optional[PydanticObjectId] = None
# Anomaly description
description: str
anomaly_type: str # unusual_login, unusual_location, unusual_port, etc.
# SHAP explanation
shap_values: Optional[Dict[str, float]] = None # Feature contributions
top_features: Optional[List[Dict[str, Any]]] = None # Top contributing features
# Status
status: str = "open" # open, investigating, resolved, false_positive
resolved_at: Optional[datetime] = None
resolved_by: Optional[str] = None
resolution_notes: Optional[str] = None
class Settings:
name = "anomalies"
indexes = [
"employee_id",
"detected_at",
"risk_level",
"status"
]
class MitreMapping(Document):
"""MITRE ATT&CK mapping document"""
anomaly_id: PydanticObjectId = Field(..., description="Reference to Anomaly ID")
# MITRE ATT&CK details
technique_id: str # e.g., T1078
technique_name: str
tactic: str # e.g., Initial Access, Privilege Escalation
description: str
confidence: float # 0-1
class Settings:
name = "mitre_mappings"
indexes = [
"anomaly_id",
"technique_id"
]
class MitigationStrategy(Document):
"""Mitigation strategy document"""
anomaly_id: PydanticObjectId = Field(..., description="Reference to Anomaly ID")
# Strategy details
priority: int # 1 (highest) to 5 (lowest)
category: str # immediate, short_term, long_term
action: str
description: str
# Status
implemented: bool = False
implemented_at: Optional[datetime] = None
implemented_by: Optional[str] = None
class Settings:
name = "mitigation_strategies"
indexes = [
"anomaly_id",
"priority"
]