Spaces:
Sleeping
Sleeping
| """ | |
| Database models for insider threat detection system (MongoDB/Beanie) | |
| """ | |
| from typing import Optional, List, Dict, Any | |
| from datetime import datetime | |
| from beanie import Document, Link, PydanticObjectId | |
| from pydantic import Field, EmailStr | |
| class Employee(Document): | |
| """Employee document""" | |
| employee_id: str = Field(..., description="Unique employee ID string") | |
| name: str | |
| email: str = Field(..., description="Unique email") | |
| department: str | |
| role: str | |
| baseline_location: Optional[str] = None | |
| is_isolated: bool = False | |
| created_at: datetime = Field(default_factory=datetime.utcnow) | |
| class Settings: | |
| name = "employees" | |
| indexes = [ | |
| "employee_id", | |
| "email", | |
| "name" | |
| ] | |
| class BehavioralEvent(Document): | |
| """Behavioral event document""" | |
| employee_id: PydanticObjectId = Field(..., description="Reference to Employee ID") | |
| event_type: str # login, file_access, network, firewall, privilege_escalation | |
| timestamp: datetime = Field(default_factory=datetime.utcnow) | |
| # Event details | |
| location: Optional[str] = None | |
| ip_address: Optional[str] = None | |
| port: Optional[int] = None | |
| file_path: Optional[str] = None | |
| action: Optional[str] = None # read, write, delete, execute | |
| success: bool = True | |
| # Additional metadata | |
| event_metadata: Optional[Dict[str, Any]] = None | |
| # System Metrics | |
| cpu_usage: float = 0.0 | |
| memory_usage: float = 0.0 | |
| class Settings: | |
| name = "behavioral_events" | |
| indexes = [ | |
| "employee_id", | |
| "event_type", | |
| "timestamp" | |
| ] | |
| class BehavioralFingerprint(Document): | |
| """Behavioral fingerprint document (baseline)""" | |
| employee_id: PydanticObjectId = Field(..., description="Reference to Employee ID") | |
| computed_at: datetime = Field(default_factory=datetime.utcnow) | |
| # Behavioral features (baseline) | |
| avg_login_hour: float | |
| login_hour_std: float | |
| unique_locations_count: int | |
| avg_location_distance: float | |
| unique_ports_count: int | |
| avg_port_number: float | |
| file_access_rate: float # files per day | |
| sensitive_file_access_rate: float | |
| privilege_escalation_rate: float # sudo attempts per day | |
| firewall_change_rate: float # changes per week | |
| network_activity_volume: float # MB per day | |
| failed_login_rate: float | |
| # Time-based patterns | |
| weekday_activity_ratio: float # weekday vs weekend | |
| night_activity_ratio: float # night (10pm-6am) vs day | |
| class Settings: | |
| name = "behavioral_fingerprints" | |
| indexes = [ | |
| "employee_id", | |
| "computed_at" | |
| ] | |
| class Anomaly(Document): | |
| """Anomaly document""" | |
| employee_id: PydanticObjectId = Field(..., description="Reference to Employee ID") | |
| detected_at: datetime = Field(default_factory=datetime.utcnow) | |
| # Anomaly details | |
| anomaly_score: float # -1 to 1 (Isolation Forest score) | |
| risk_level: str # low, medium, high, critical | |
| risk_score: int # 0-100 | |
| # Event that triggered anomaly | |
| trigger_event_id: Optional[PydanticObjectId] = None | |
| # Anomaly description | |
| description: str | |
| anomaly_type: str # unusual_login, unusual_location, unusual_port, etc. | |
| # SHAP explanation | |
| shap_values: Optional[Dict[str, float]] = None # Feature contributions | |
| top_features: Optional[List[Dict[str, Any]]] = None # Top contributing features | |
| # Status | |
| status: str = "open" # open, investigating, resolved, false_positive | |
| resolved_at: Optional[datetime] = None | |
| resolved_by: Optional[str] = None | |
| resolution_notes: Optional[str] = None | |
| class Settings: | |
| name = "anomalies" | |
| indexes = [ | |
| "employee_id", | |
| "detected_at", | |
| "risk_level", | |
| "status" | |
| ] | |
| class MitreMapping(Document): | |
| """MITRE ATT&CK mapping document""" | |
| anomaly_id: PydanticObjectId = Field(..., description="Reference to Anomaly ID") | |
| # MITRE ATT&CK details | |
| technique_id: str # e.g., T1078 | |
| technique_name: str | |
| tactic: str # e.g., Initial Access, Privilege Escalation | |
| description: str | |
| confidence: float # 0-1 | |
| class Settings: | |
| name = "mitre_mappings" | |
| indexes = [ | |
| "anomaly_id", | |
| "technique_id" | |
| ] | |
| class MitigationStrategy(Document): | |
| """Mitigation strategy document""" | |
| anomaly_id: PydanticObjectId = Field(..., description="Reference to Anomaly ID") | |
| # Strategy details | |
| priority: int # 1 (highest) to 5 (lowest) | |
| category: str # immediate, short_term, long_term | |
| action: str | |
| description: str | |
| # Status | |
| implemented: bool = False | |
| implemented_at: Optional[datetime] = None | |
| implemented_by: Optional[str] = None | |
| class Settings: | |
| name = "mitigation_strategies" | |
| indexes = [ | |
| "anomaly_id", | |
| "priority" | |
| ] | |