""" Mitigation strategy engine Generates actionable mitigation recommendations based on detected anomalies """ from typing import List, Dict class MitigationEngine: """ Generates mitigation strategies for detected threats """ def __init__(self): # Define mitigation templates by anomaly type self.mitigation_templates = { 'unusual_login_time': [ { 'priority': 1, 'category': 'immediate', 'action': 'Verify employee activity', 'description': 'Contact employee to confirm the login was legitimate' }, { 'priority': 2, 'category': 'immediate', 'action': 'Review access logs', 'description': 'Check all activities performed during the unusual login session' }, { 'priority': 3, 'category': 'short_term', 'action': 'Enable MFA alerts', 'description': 'Configure alerts for logins outside normal hours' } ], 'location_variance': [ { 'priority': 1, 'category': 'immediate', 'action': 'Verify location', 'description': 'Confirm employee is traveling or working from new location' }, { 'priority': 2, 'category': 'immediate', 'action': 'Check for VPN usage', 'description': 'Verify if location change is due to VPN or proxy' }, { 'priority': 3, 'category': 'short_term', 'action': 'Implement geo-fencing', 'description': 'Set up alerts for logins from unexpected geographic locations' } ], 'unusual_port': [ { 'priority': 1, 'category': 'immediate', 'action': 'Block suspicious port', 'description': 'Temporarily block the unusual port pending investigation' }, { 'priority': 2, 'category': 'immediate', 'action': 'Analyze network traffic', 'description': 'Review all traffic on the unusual port for malicious activity' }, { 'priority': 3, 'category': 'short_term', 'action': 'Update firewall rules', 'description': 'Restrict port access to authorized users only' } ], 'sensitive_file_access': [ { 'priority': 1, 'category': 'immediate', 'action': 'Review file access', 'description': 'Audit which sensitive files were accessed and why' }, { 'priority': 1, 'category': 'immediate', 'action': 'Check for data exfiltration', 'description': 'Monitor for unusual data transfers or downloads' }, { 'priority': 2, 'category': 'short_term', 'action': 'Restrict file permissions', 'description': 'Review and tighten access controls on sensitive files' }, { 'priority': 3, 'category': 'long_term', 'action': 'Implement DLP', 'description': 'Deploy Data Loss Prevention tools to monitor sensitive data' } ], 'privilege_escalation': [ { 'priority': 1, 'category': 'immediate', 'action': 'Suspend elevated privileges', 'description': 'Temporarily revoke sudo/admin access pending investigation' }, { 'priority': 1, 'category': 'immediate', 'action': 'Review privilege usage', 'description': 'Audit all commands executed with elevated privileges' }, { 'priority': 2, 'category': 'short_term', 'action': 'Implement privilege monitoring', 'description': 'Set up real-time alerts for privilege escalation attempts' }, { 'priority': 3, 'category': 'long_term', 'action': 'Apply least privilege principle', 'description': 'Review and minimize privilege assignments across organization' } ], 'firewall_change': [ { 'priority': 1, 'category': 'immediate', 'action': 'Revert firewall changes', 'description': 'Roll back unauthorized firewall rule modifications' }, { 'priority': 1, 'category': 'immediate', 'action': 'Investigate change reason', 'description': 'Determine why firewall rules were modified' }, { 'priority': 2, 'category': 'short_term', 'action': 'Restrict firewall access', 'description': 'Limit firewall configuration access to security team only' }, { 'priority': 3, 'category': 'long_term', 'action': 'Implement change management', 'description': 'Require approval workflow for all firewall changes' } ], 'failed_login': [ { 'priority': 1, 'category': 'immediate', 'action': 'Lock account temporarily', 'description': 'Prevent further login attempts to protect account' }, { 'priority': 2, 'category': 'immediate', 'action': 'Contact employee', 'description': 'Verify if employee is having login issues or if account is compromised' }, { 'priority': 2, 'category': 'short_term', 'action': 'Force password reset', 'description': 'Require employee to reset password with strong requirements' }, { 'priority': 3, 'category': 'short_term', 'action': 'Enable account monitoring', 'description': 'Set up enhanced monitoring for this account' } ], 'network_activity': [ { 'priority': 1, 'category': 'immediate', 'action': 'Analyze traffic patterns', 'description': 'Review network logs for signs of data exfiltration' }, { 'priority': 2, 'category': 'immediate', 'action': 'Check for malware', 'description': 'Scan employee workstation for malware or backdoors' }, { 'priority': 3, 'category': 'short_term', 'action': 'Implement bandwidth limits', 'description': 'Set reasonable bandwidth limits for user accounts' } ], 'night_activity': [ { 'priority': 1, 'category': 'immediate', 'action': 'Verify employee activity', 'description': 'Confirm if employee was working late or if account is compromised' }, { 'priority': 2, 'category': 'short_term', 'action': 'Review activities performed', 'description': 'Audit all actions taken during off-hours' }, { 'priority': 3, 'category': 'short_term', 'action': 'Set up off-hours alerts', 'description': 'Configure notifications for activity outside business hours' } ], 'high_cpu_usage': [ { 'priority': 1, 'category': 'immediate', 'action': 'Investigate running processes', 'description': 'Identify processes consuming high CPU (potential crypto miner)' }, { 'priority': 2, 'category': 'immediate', 'action': 'Scan for malware', 'description': 'Run deep system scan for resource hijacking malware' }, { 'priority': 3, 'category': 'short_term', 'action': 'Kill suspicious process', 'description': 'Terminate any unauthorized high-resource processes' } ], 'high_memory_usage': [ { 'priority': 1, 'category': 'immediate', 'action': 'Check for memory leaks/bloat', 'description': 'Identify applications using excessive memory' }, { 'priority': 2, 'category': 'immediate', 'action': 'Scan for memory-resident malware', 'description': 'Check for malware injecting into legitimate processes' } ] } # Default mitigation for unknown anomaly types self.default_mitigations = [ { 'priority': 1, 'category': 'immediate', 'action': 'Investigate anomaly', 'description': 'Review the detected anomaly and gather more context' }, { 'priority': 2, 'category': 'immediate', 'action': 'Contact employee', 'description': 'Verify the unusual behavior with the employee' }, { 'priority': 3, 'category': 'short_term', 'action': 'Monitor account', 'description': 'Enable enhanced monitoring for this employee account' } ] def generate_strategies( self, anomaly_type: str, risk_level: str, mitre_techniques: List[Dict] = None ) -> List[Dict]: """ Generate mitigation strategies for an anomaly Args: anomaly_type: Type of anomaly detected risk_level: Risk level (low, medium, high, critical) mitre_techniques: List of mapped MITRE techniques Returns: List of mitigation strategies """ # Get base strategies for anomaly type strategies = self.mitigation_templates.get(anomaly_type, self.default_mitigations.copy()) # Make a copy to avoid modifying templates strategies = [s.copy() for s in strategies] # Adjust priorities based on risk level if risk_level == 'critical': # Add urgent response for critical threats strategies.insert(0, { 'priority': 1, 'category': 'immediate', 'action': 'Escalate to security team', 'description': 'CRITICAL: Immediately notify security operations center' }) elif risk_level == 'high': strategies.insert(0, { 'priority': 1, 'category': 'immediate', 'action': 'Alert security team', 'description': 'HIGH RISK: Notify security team for immediate review' }) # Add MITRE-specific mitigations if available if mitre_techniques: for technique in mitre_techniques[:2]: # Top 2 techniques mitre_strategy = self._get_mitre_mitigation(technique) if mitre_strategy: strategies.append(mitre_strategy) # Re-sort by priority strategies.sort(key=lambda x: x['priority']) return strategies def _get_mitre_mitigation(self, technique: Dict) -> Dict: """ Get mitigation strategy specific to MITRE technique Args: technique: MITRE technique dictionary Returns: Mitigation strategy or None """ technique_id = technique.get('technique_id') mitre_mitigations = { 'T1078': { 'priority': 2, 'category': 'short_term', 'action': 'Implement MFA', 'description': 'Enable multi-factor authentication to prevent credential abuse' }, 'T1021': { 'priority': 2, 'category': 'short_term', 'action': 'Restrict remote access', 'description': 'Limit remote service access to authorized users and IPs' }, 'T1068': { 'priority': 1, 'category': 'immediate', 'action': 'Patch vulnerabilities', 'description': 'Apply security patches to prevent privilege escalation exploits' }, 'T1048': { 'priority': 1, 'category': 'immediate', 'action': 'Monitor data transfers', 'description': 'Implement network monitoring to detect data exfiltration' }, 'T1562': { 'priority': 1, 'category': 'immediate', 'action': 'Restore security controls', 'description': 'Re-enable any disabled security mechanisms' }, 'T1530': { 'priority': 2, 'category': 'short_term', 'action': 'Audit cloud access', 'description': 'Review and restrict cloud storage access permissions' }, 'T1496': { 'priority': 1, 'category': 'immediate', 'action': 'Isolate and Clean', 'description': 'Disconnect from network and remove crypto-mining malware' } } return mitre_mitigations.get(technique_id) def get_compliance_recommendations(self, anomaly_type: str) -> List[str]: """ Get compliance-related recommendations Args: anomaly_type: Type of anomaly Returns: List of compliance recommendations """ compliance_map = { 'sensitive_file_access': [ 'Document incident per GDPR Article 33 (breach notification)', 'Review compliance with SOC 2 access controls', 'Ensure HIPAA audit trail requirements are met' ], 'privilege_escalation': [ 'Review against PCI DSS requirement 7 (access control)', 'Document for SOC 2 CC6.1 (logical access controls)', 'Verify compliance with ISO 27001 A.9.2.3' ], 'failed_login': [ 'Check NIST 800-53 AC-7 (unsuccessful login attempts)', 'Review against CIS Controls 16.11', 'Document per SOC 2 CC6.1' ] } return compliance_map.get(anomaly_type, [ 'Document incident in security log', 'Review against organizational security policies' ])