""" Feature engineering for behavioral fingerprinting Extracts behavioral features from raw events to create employee baselines """ import pandas as pd import numpy as np from datetime import datetime, timedelta, timezone from typing import List, Dict, Optional import models async def calculate_behavioral_fingerprint(employee_id: str, days_back: int = 30) -> Optional[Dict[str, float]]: """ Calculate behavioral fingerprint for an employee based on historical events Args: employee_id: Employee ID days_back: Number of days to look back for baseline calculation Returns: Dictionary of behavioral features """ cutoff_date = datetime.now(timezone.utc) - timedelta(days=days_back) # Get employee employee = await models.Employee.get(employee_id) if not employee: return None # Get events # Note: Beanie find returns a cursor, to_list executes it events = await models.BehavioralEvent.find( models.BehavioralEvent.employee_id == employee.id, models.BehavioralEvent.timestamp >= cutoff_date.replace(tzinfo=None) # naive datetime for mongo helper usually ).to_list() if not events: # Return default fingerprint for new employees return get_default_fingerprint() # Convert to DataFrame for easier analysis events_df = pd.DataFrame([{ 'event_type': e.event_type, 'timestamp': e.timestamp, 'location': e.location, 'ip_address': e.ip_address, 'port': e.port, 'file_path': e.file_path, 'action': e.action, 'success': e.success, 'cpu_usage': getattr(e, 'cpu_usage', 0.0), 'memory_usage': getattr(e, 'memory_usage', 0.0) } for e in events]) # Extract features features = {} # 1. Login time patterns login_events = events_df[events_df['event_type'] == 'login'] if len(login_events) > 0: login_hours = login_events['timestamp'].dt.hour features['avg_login_hour'] = float(login_hours.mean()) features['login_hour_std'] = float(login_hours.std()) if len(login_hours) > 1 else 0.0 else: features['avg_login_hour'] = 9.0 # Default 9 AM features['login_hour_std'] = 2.0 # 2. Location patterns unique_locations = events_df['location'].dropna().nunique() features['unique_locations_count'] = unique_locations # Calculate average distance from baseline location if employee.baseline_location: # Simplified: count how often location differs from baseline location_events = events_df[events_df['location'].notna()] if len(location_events) > 0: different_locations = (location_events['location'] != employee.baseline_location).sum() features['avg_location_distance'] = float(different_locations / len(location_events)) else: features['avg_location_distance'] = 0.0 else: features['avg_location_distance'] = 0.0 # 3. Port usage patterns port_events = events_df[events_df['port'].notna()] if len(port_events) > 0: features['unique_ports_count'] = int(port_events['port'].nunique()) features['avg_port_number'] = float(port_events['port'].mean()) else: features['unique_ports_count'] = 0 features['avg_port_number'] = 0.0 # 4. File access patterns file_events = events_df[events_df['event_type'] == 'file_access'] features['file_access_rate'] = len(file_events) / max(days_back, 1) # Sensitive file access (files in /etc, /root, or containing 'secret', 'password', etc.) if len(file_events) > 0: sensitive_keywords = ['secret', 'password', 'credential', 'key', '/etc/', '/root/', 'config'] sensitive_files = file_events[ file_events['file_path'].str.contains('|'.join(sensitive_keywords), case=False, na=False) ] features['sensitive_file_access_rate'] = len(sensitive_files) / max(days_back, 1) else: features['sensitive_file_access_rate'] = 0.0 # 5. Privilege escalation patterns priv_events = events_df[events_df['event_type'] == 'privilege_escalation'] features['privilege_escalation_rate'] = len(priv_events) / max(days_back, 1) # 6. Firewall changes firewall_events = events_df[events_df['event_type'] == 'firewall'] features['firewall_change_rate'] = len(firewall_events) / max(days_back / 7, 1) # per week # 7. Network activity volume (simplified: count of network events) network_events = events_df[events_df['event_type'] == 'network'] features['network_activity_volume'] = len(network_events) / max(days_back, 1) # 8. Failed login rate failed_logins = login_events[login_events['success'] == False] if len(login_events) > 0 else pd.DataFrame() features['failed_login_rate'] = len(failed_logins) / max(days_back, 1) # 9. Time-based patterns if len(events_df) > 0: events_df['day_of_week'] = events_df['timestamp'].dt.dayofweek events_df['hour'] = events_df['timestamp'].dt.hour # Weekday vs weekend activity weekday_events = events_df[events_df['day_of_week'] < 5] weekend_events = events_df[events_df['day_of_week'] >= 5] total_events = len(events_df) features['weekday_activity_ratio'] = len(weekday_events) / total_events if total_events > 0 else 0.7 # Night vs day activity (night: 22:00 - 06:00) night_events = events_df[(events_df['hour'] >= 22) | (events_df['hour'] < 6)] features['night_activity_ratio'] = len(night_events) / total_events if total_events > 0 else 0.0 else: features['weekday_activity_ratio'] = 0.7 features['night_activity_ratio'] = 0.0 # 10. System Resource Patterns if len(events_df) > 0: features['avg_cpu_usage'] = float(events_df['cpu_usage'].mean()) features['std_cpu_usage'] = float(events_df['cpu_usage'].std()) if len(events_df) > 1 else 0.0 features['avg_memory_usage'] = float(events_df['memory_usage'].mean()) features['std_memory_usage'] = float(events_df['memory_usage'].std()) if len(events_df) > 1 else 0.0 else: features['avg_cpu_usage'] = 0.0 features['std_cpu_usage'] = 0.0 features['avg_memory_usage'] = 0.0 features['std_memory_usage'] = 0.0 return features def get_default_fingerprint() -> Dict[str, float]: """Return default fingerprint for employees with no history""" return { 'avg_login_hour': 9.0, 'login_hour_std': 2.0, 'unique_locations_count': 1, 'avg_location_distance': 0.0, 'unique_ports_count': 3, 'avg_port_number': 443.0, 'file_access_rate': 5.0, 'sensitive_file_access_rate': 0.1, 'privilege_escalation_rate': 0.5, 'firewall_change_rate': 0.0, 'network_activity_volume': 10.0, 'failed_login_rate': 0.0, 'weekday_activity_ratio': 0.8, 'weekday_activity_ratio': 0.8, 'night_activity_ratio': 0.05, 'avg_cpu_usage': 10.0, 'std_cpu_usage': 5.0, 'avg_memory_usage': 40.0, 'std_memory_usage': 5.0 } async def extract_features_from_recent_events(employee_id: str, hours_back: int = 24) -> Dict[str, float]: """ Extract features from recent events for real-time anomaly detection Similar to fingerprint but for shorter time window """ cutoff_time = datetime.now(timezone.utc) - timedelta(hours=hours_back) # Get employee employee = await models.Employee.get(employee_id) if not employee: # This might happen for new unknown IDs in events, fallback to default return get_default_fingerprint() events = await models.BehavioralEvent.find( models.BehavioralEvent.employee_id == employee.id, models.BehavioralEvent.timestamp >= cutoff_time.replace(tzinfo=None) ).to_list() if not events: return get_default_fingerprint() # Use same logic as fingerprint calculation but with shorter window events_df = pd.DataFrame([{ 'event_type': e.event_type, 'timestamp': e.timestamp, 'location': e.location, 'ip_address': e.ip_address, 'port': e.port, 'file_path': e.file_path, 'action': e.action, 'success': e.success, 'cpu_usage': getattr(e, 'cpu_usage', 0.0), 'memory_usage': getattr(e, 'memory_usage', 0.0) } for e in events]) features = {} # Calculate same features but normalize by hours instead of days login_events = events_df[events_df['event_type'] == 'login'] if len(login_events) > 0: login_hours = login_events['timestamp'].dt.hour features['avg_login_hour'] = float(login_hours.mean()) features['login_hour_std'] = float(login_hours.std()) if len(login_hours) > 1 else 0.0 else: features['avg_login_hour'] = 9.0 features['login_hour_std'] = 2.0 features['unique_locations_count'] = events_df['location'].dropna().nunique() if employee.baseline_location: location_events = events_df[events_df['location'].notna()] if len(location_events) > 0: different_locations = (location_events['location'] != employee.baseline_location).sum() features['avg_location_distance'] = float(different_locations / len(location_events)) else: features['avg_location_distance'] = 0.0 else: features['avg_location_distance'] = 0.0 port_events = events_df[events_df['port'].notna()] if len(port_events) > 0: features['unique_ports_count'] = int(port_events['port'].nunique()) features['avg_port_number'] = float(port_events['port'].mean()) else: features['unique_ports_count'] = 0 features['avg_port_number'] = 0.0 file_events = events_df[events_df['event_type'] == 'file_access'] features['file_access_rate'] = len(file_events) / max(hours_back / 24, 1) if len(file_events) > 0: sensitive_keywords = ['secret', 'password', 'credential', 'key', '/etc/', '/root/', 'config'] sensitive_files = file_events[ file_events['file_path'].str.contains('|'.join(sensitive_keywords), case=False, na=False) ] features['sensitive_file_access_rate'] = len(sensitive_files) / max(hours_back / 24, 1) else: features['sensitive_file_access_rate'] = 0.0 priv_events = events_df[events_df['event_type'] == 'privilege_escalation'] features['privilege_escalation_rate'] = len(priv_events) / max(hours_back / 24, 1) firewall_events = events_df[events_df['event_type'] == 'firewall'] features['firewall_change_rate'] = len(firewall_events) / max(hours_back / (24 * 7), 1) network_events = events_df[events_df['event_type'] == 'network'] features['network_activity_volume'] = len(network_events) / max(hours_back / 24, 1) failed_logins = login_events[login_events['success'] == False] if len(login_events) > 0 else pd.DataFrame() features['failed_login_rate'] = len(failed_logins) / max(hours_back / 24, 1) if len(events_df) > 0: events_df['day_of_week'] = events_df['timestamp'].dt.dayofweek events_df['hour'] = events_df['timestamp'].dt.hour weekday_events = events_df[events_df['day_of_week'] < 5] total_events = len(events_df) features['weekday_activity_ratio'] = len(weekday_events) / total_events if total_events > 0 else 0.7 night_events = events_df[(events_df['hour'] >= 22) | (events_df['hour'] < 6)] features['night_activity_ratio'] = len(night_events) / total_events if total_events > 0 else 0.0 else: features['weekday_activity_ratio'] = 0.7 features['night_activity_ratio'] = 0.0 if len(events_df) > 0: features['avg_cpu_usage'] = float(events_df['cpu_usage'].mean()) features['std_cpu_usage'] = float(events_df['cpu_usage'].std()) if len(events_df) > 1 else 0.0 features['avg_memory_usage'] = float(events_df['memory_usage'].mean()) features['std_memory_usage'] = float(events_df['memory_usage'].std()) if len(events_df) > 1 else 0.0 else: features['avg_cpu_usage'] = 0.0 features['std_cpu_usage'] = 0.0 features['avg_memory_usage'] = 0.0 features['std_memory_usage'] = 0.0 return features def get_feature_names() -> List[str]: """Return list of feature names in consistent order""" return [ 'avg_login_hour', 'login_hour_std', 'unique_locations_count', 'avg_location_distance', 'unique_ports_count', 'avg_port_number', 'file_access_rate', 'sensitive_file_access_rate', 'privilege_escalation_rate', 'firewall_change_rate', 'network_activity_volume', 'failed_login_rate', 'weekday_activity_ratio', 'night_activity_ratio', 'avg_cpu_usage', 'std_cpu_usage', 'avg_memory_usage', 'std_memory_usage' ] def features_to_array(features: Dict[str, float]) -> np.ndarray: """Convert feature dictionary to numpy array in consistent order""" feature_names = get_feature_names() return np.array([features.get(name, 0.0) for name in feature_names]).reshape(1, -1)