""" Advanced Security Hardening Suite Zenith Fraud Detection Platform - Enterprise-Grade Security """ import time import logging from datetime import datetime from typing import Any from dataclasses import dataclass import re from app.services.monitoring_collector import MonitoringCollector @dataclass class SecurityEvent: """Security event data structure""" timestamp: datetime event_type: str severity: str source_ip: str user_agent: str details: dict[str, Any] risk_score: int class AdvancedSecurityManager: """Advanced security management with AI-powered threat detection""" def __init__(self): self.monitoring_collector = MonitoringCollector() self.blocked_ips = set() self.suspicious_patterns = [] self.security_events = [] self.rate_limit_store = {} self.anomaly_detector = AnomalyDetector() self.logger = logging.getLogger(__name__) async def initialize(self): """Initialize advanced security features""" await self.load_security_policies() await self.setup_threat_intelligence() await self.configure_advanced_rules() self.logger.info("Advanced security manager initialized") async def analyze_request_security(self, request_data: dict[str, Any]) -> SecurityEvent: """Advanced request security analysis""" security_score = 0 security_issues = [] # IP-based analysis ip_analysis = await self.analyze_ip_risk(request_data.get("client_ip", "")) security_score += ip_analysis["risk_score"] security_issues.extend(ip_analysis["issues"]) # Request pattern analysis pattern_analysis = await self.analyze_request_patterns(request_data) security_score += pattern_analysis["risk_score"] security_issues.extend(pattern_analysis["issues"]) # Payload analysis for attacks payload_analysis = await self.analyze_payload_security(request_data) security_score += payload_analysis["risk_score"] security_issues.extend(payload_analysis["issues"]) # User behavior analysis behavior_analysis = await self.analyze_user_behavior(request_data) security_score += behavior_analysis["risk_score"] security_issues.extend(behavior_analysis["issues"]) # Create security event event = SecurityEvent( timestamp=datetime.utcnow(), event_type="REQUEST_SECURITY_ANALYSIS", severity=self._calculate_severity(security_score), source_ip=request_data.get("client_ip", ""), user_agent=request_data.get("user_agent", ""), details={ "security_score": security_score, "issues": security_issues, "endpoint": request_data.get("endpoint", ""), "method": request_data.get("method", ""), }, risk_score=security_score, ) # Log security event await self.log_security_event(event) # Auto-block if high risk if security_score > 80: await self.auto_block_ip(request_data.get("client_ip", ""), "High risk score detected") return event async def analyze_ip_risk(self, ip: str) -> dict[str, Any]: """Advanced IP risk analysis""" risk_score = 0 issues = [] # Check against blocked IPs if ip in self.blocked_ips: risk_score += 100 issues.append("IP already blocked") # Check for known malicious patterns if self._is_suspicious_ip(ip): risk_score += 50 issues.append("Suspicious IP pattern detected") # Geolocation risk analysis (simulated) if self._is_high_risk_country(ip): risk_score += 20 issues.append("High-risk geographical location") # Proxy/VPN detection (simulated) if self._is_proxy_or_vpn(ip): risk_score += 15 issues.append("Proxy/VPN usage detected") return {"risk_score": risk_score, "issues": issues} async def analyze_request_patterns(self, request_data: dict[str, Any]) -> dict[str, Any]: """Advanced request pattern analysis""" risk_score = 0 issues = [] endpoint = request_data.get("endpoint", "") method = request_data.get("method", "") headers = request_data.get("headers", {}) # Check for attack patterns in endpoint if self._contains_attack_patterns(endpoint): risk_score += 40 issues.append("Attack pattern detected in endpoint") # Check for suspicious headers suspicious_headers = self._detect_suspicious_headers(headers) if suspicious_headers: risk_score += 20 * len(suspicious_headers) issues.extend([f"Suspicious header: {h}" for h in suspicious_headers]) # Rate limiting analysis ip = request_data.get("client_ip", "") if self._is_rate_limited(ip): risk_score += 30 issues.append("Rate limit exceeded") # Check for unusual request methods if method not in ["GET", "POST", "PUT", "DELETE", "PATCH"]: risk_score += 25 issues.append("Unusual HTTP method") return {"risk_score": risk_score, "issues": issues} async def analyze_payload_security(self, request_data: dict[str, Any]) -> dict[str, Any]: """Advanced payload security analysis""" risk_score = 0 issues = [] payload = request_data.get("body", "") query_params = request_data.get("query_params", {}) # SQL injection detection (advanced) sql_injection_risks = self._detect_advanced_sql_injection(payload, query_params) if sql_injection_risks: risk_score += 60 issues.extend(sql_injection_risks) # XSS detection (advanced) xss_risks = self._detect_advanced_xss(payload) if xss_risks: risk_score += 50 issues.extend(xss_risks) # Command injection detection command_injection_risks = self._detect_command_injection(payload) if command_injection_risks: risk_score += 70 issues.extend(command_injection_risks) # File upload security if request_data.get("files"): upload_risks = await self._analyze_file_uploads(request_data["files"]) risk_score += upload_risks["risk_score"] issues.extend(upload_risks["issues"]) return {"risk_score": risk_score, "issues": issues} async def analyze_user_behavior(self, request_data: dict[str, Any]) -> dict[str, Any]: """AI-powered user behavior analysis""" risk_score = 0 issues = [] user_id = request_data.get("user_id") if not user_id: return {"risk_score": 0, "issues": []} # Anomalous behavior detection behavior_score = await self.anomaly_detector.detect_anomaly(user_id, request_data) risk_score += behavior_score if behavior_score > 30: issues.append("Anomalous user behavior detected") # Session security analysis session_risks = await self._analyze_session_security(user_id, request_data) risk_score += session_risks["risk_score"] issues.extend(session_risks["issues"]) return {"risk_score": risk_score, "issues": issues} def _is_suspicious_ip(self, ip: str) -> bool: """Check for suspicious IP patterns""" # Common bot/scanner patterns suspicious_patterns = [ r"^10\.", # Private ranges (shouldn't be external) r"^192\.168\.", # Private ranges r"^172\.1[6-9]\.", # Private ranges r"^172\.2[0-9]\.", # Private ranges r"^172\.3[0-1]\.", # Private ranges ] for pattern in suspicious_patterns: if re.match(pattern, ip): return True return False def _is_high_risk_country(self, ip: str) -> bool: """Simulated high-risk country detection""" # In production, use real geolocation database # high_risk_countries = ["CN", "RU", "KP", "IR"] # Simulated check return False def _is_proxy_or_vpn(self, ip: str) -> bool: """Simulated proxy/VPN detection""" # In production, use real proxy detection services return False def _contains_attack_patterns(self, endpoint: str) -> bool: """Check endpoint for attack patterns""" attack_patterns = [ "\.\./", # Directory traversal " list[str]: """Detect suspicious HTTP headers""" suspicious_headers = [] # Known malicious headers malicious_headers = [ "X-Forwarded-For: 127.0.0.1", "X-Real-IP: 127.0.0.1", "X-Originating-IP", "X-Cluster-Client-IP", ] for header, value in headers.items(): header_line = f"{header}: {value}" for malicious in malicious_headers: if malicious.lower() in header_line.lower(): suspicious_headers.append(header) return suspicious_headers def _is_rate_limited(self, ip: str) -> bool: """Advanced rate limiting check""" current_time = time.time() window_size = 300 # 5 minutes max_requests = 100 if ip not in self.rate_limit_store: self.rate_limit_store[ip] = [] # Remove old requests self.rate_limit_store[ip] = [ req_time for req_time in self.rate_limit_store[ip] if current_time - req_time < window_size ] # Add current request self.rate_limit_store[ip].append(current_time) return len(self.rate_limit_store[ip]) > max_requests def _detect_advanced_sql_injection(self, payload: str, query_params: dict[str, Any]) -> list[str]: """Advanced SQL injection detection""" risks = [] # Advanced SQL injection patterns advanced_patterns = [ r"(\%27|\')(\s*|%20)*(or|and)(\s*|%20).*\=.*\s*--", # Classic SQLi r"union(\s*|%20)*(all|%20)*(select|%20)", # Union-based r"select(\s*|%20).*from(\s*|%20).*information_schema", # Information schema r"(\%27|\').*or(\s*|%20).*\=.*\s*(or|%20).*\=", # Boolean-based r"waitfor(\s*|%20)delay", # Time-based r"sleep\(\d+\)", # Time-based MySQL r"benchmark\(", # Time-based MySQL r"pg_sleep\(", # Time-based PostgreSQL ] test_data = payload + str(query_params) for pattern in advanced_patterns: if re.search(pattern, test_data, re.IGNORECASE): risks.append(f"Advanced SQL injection pattern: {pattern}") return risks def _detect_advanced_xss(self, payload: str) -> list[str]: """Advanced XSS detection""" risks = [] # Advanced XSS patterns advanced_patterns = [ r"]*>.*?", # Script tags r"javascript:", # JavaScript protocol r"on\w+\s*=", # Event handlers r"]*src", # Iframe injection r"]*data", # Object injection r"]*src", # Embed injection r"]*href", # Link injection r"]*http-equiv", # Meta injection r"expression\(", # CSS expression r"@import", # CSS import r"]* list[str]: """Advanced command injection detection""" risks = [] # Command injection patterns command_patterns = [ r";\s*(rm|del|format|fdisk)", # Destructive commands r"(\|\||&)\s*(rm|del|format|fdisk)", # Piped destructive commands r"\$\(", # Command substitution r"`[^`]*`", # Backticks r"curl(\s*|%20).*\|\s*sh", # Download and execute r"wget(\s*|%20).*\|\s*sh", # Download and execute r"eval\s*\(", # PHP eval r"system\s*\(", # PHP system r"exec\s*\(", # PHP exec r"shell_exec\s*\(", # PHP shell_exec ] for pattern in command_patterns: if re.search(pattern, payload, re.IGNORECASE): risks.append(f"Command injection pattern: {pattern}") return risks async def _analyze_file_uploads(self, files: list[dict[str, Any]]) -> dict[str, Any]: """Advanced file upload security analysis""" risk_score = 0 issues = [] dangerous_extensions = [ ".php", ".phtml", ".php3", ".php4", ".php5", ".asp", ".aspx", ".jsp", ".exe", ".bat", ".cmd", ".sh", ".ps1", ".vbs", ".jar", ] for file_info in files: filename = file_info.get("filename", "") file_size = file_info.get("size", 0) # Check file extension file_ext = "." + filename.split(".")[-1].lower() if "." in filename else "" if file_ext in dangerous_extensions: risk_score += 40 issues.append(f"Dangerous file extension: {file_ext}") # Check file size if file_size > 50 * 1024 * 1024: # 50MB limit risk_score += 20 issues.append(f"Large file upload: {file_size} bytes") # Check filename for suspicious patterns if any(pattern in filename.lower() for pattern in ["webshell", "backdoor", "shell", "cmd"]): risk_score += 60 issues.append(f"Suspicious filename: {filename}") return {"risk_score": risk_score, "issues": issues} async def _analyze_session_security(self, user_id: str, request_data: dict[str, Any]) -> dict[str, Any]: """Advanced session security analysis""" risk_score = 0 issues = [] # Check for session hijacking indicators _ip = request_data.get("client_ip", "") user_agent = request_data.get("user_agent", "") # In production, check against stored session data # For now, simulate session checks # Multiple concurrent sessions session_count = await self._get_user_session_count(user_id) if session_count > 5: risk_score += 25 issues.append(f"Multiple concurrent sessions: {session_count}") # Suspicious user agent changes if await self._has_user_agent_changed(user_id, user_agent): risk_score += 30 issues.append("User agent changed during session") return {"risk_score": risk_score, "issues": issues} async def _get_user_session_count(self, user_id: str) -> int: """Get current session count for user""" # Simulated - in production, check session store return 1 async def _has_user_agent_changed(self, user_id: str, user_agent: str) -> bool: """Check if user agent changed during session""" # Simulated - in production, check against session store return False def _calculate_severity(self, risk_score: int) -> str: """Calculate security event severity""" if risk_score >= 80: return "CRITICAL" elif risk_score >= 60: return "HIGH" elif risk_score >= 40: return "MEDIUM" elif risk_score >= 20: return "LOW" else: return "INFO" async def log_security_event(self, event: SecurityEvent): """Log security event and update monitoring""" self.security_events.append(event) # Update monitoring collector self.monitoring_collector.record_security_event( event.event_type, { "severity": event.severity, "source_ip": event.source_ip, "risk_score": event.risk_score, "details": event.details, }, ) # Log to file self.logger.warning(f"Security Event: {event.event_type} - {event.severity} - Score: {event.risk_score}") async def auto_block_ip(self, ip: str, reason: str): """Automatically block suspicious IP""" self.blocked_ips.add(ip) # Log block event await self.monitoring_collector.record_security_event( "IP_BLOCKED", {"ip_address": ip, "reason": reason, "timestamp": datetime.utcnow().isoformat(), "auto_blocked": True}, ) self.logger.warning(f"IP auto-blocked: {ip} - Reason: {reason}") async def load_security_policies(self): """Load security policies and rules""" # Load from configuration or database self.suspicious_patterns = [ r"select.*from.*information_schema", r"union.*select.*from", r".*", r"javascript:", r"eval\s*\(", r"system\s*\(", ] async def setup_threat_intelligence(self): """Setup threat intelligence feeds""" # Connect to threat intelligence APIs # Load known malicious IPs, domains, patterns pass async def configure_advanced_rules(self): """Configure advanced security rules""" # Setup custom security rules # Configure alert thresholds pass class AnomalyDetector: """AI-powered anomaly detection for user behavior""" def __init__(self): self.user_profiles = {} self.behavior_patterns = {} async def detect_anomaly(self, user_id: str, request_data: dict[str, Any]) -> int: """Detect behavioral anomalies""" anomaly_score = 0 # Build user profile if not exists if user_id not in self.user_profiles: self.user_profiles[user_id] = await self._create_user_profile(user_id) # Analyze request patterns pattern_anomaly = await self._analyze_request_patterns(user_id, request_data) anomaly_score += pattern_anomaly # Analyze time patterns time_anomaly = await self._analyze_time_patterns(user_id, request_data) anomaly_score += time_anomaly # Analyze location patterns location_anomaly = await self._analyze_location_patterns(user_id, request_data) anomaly_score += location_anomaly return min(anomaly_score, 100) # Cap at 100 async def _create_user_profile(self, user_id: str) -> dict[str, Any]: """Create user behavior profile""" return {"request_patterns": {}, "time_patterns": [], "location_patterns": [], "created_at": datetime.utcnow()} async def _analyze_request_patterns(self, user_id: str, request_data: dict[str, Any]) -> int: """Analyze request pattern anomalies""" # Simulated pattern analysis return 15 async def _analyze_time_patterns(self, user_id: str, request_data: dict[str, Any]) -> int: """Analyze time-based anomalies""" # Simulated time pattern analysis return 10 async def _analyze_location_patterns(self, user_id: str, request_data: dict[str, Any]) -> int: """Analyze location-based anomalies""" # Simulated location analysis return 5 # Global security manager instance advanced_security_manager = AdvancedSecurityManager() async def get_advanced_security_manager() -> AdvancedSecurityManager: """Get advanced security manager instance""" if not hasattr(advanced_security_manager, "initialized"): await advanced_security_manager.initialize() advanced_security_manager.initialized = True return advanced_security_manager # Security middleware for FastAPI class AdvancedSecurityMiddleware: """Advanced security middleware for request filtering""" def __init__(self, app): self.app = app async def __call__(self, scope, receive, send): if scope["type"] == "http": # Extract request data request_data = await self._extract_request_data(scope, receive) # Get security manager security_manager = await get_advanced_security_manager() # Analyze request security security_event = await security_manager.analyze_request_security(request_data) # Block if high risk if security_event.risk_score > 80: await self._send_blocked_response(send) return await self.app(scope, receive, send) async def _extract_request_data(self, scope, receive) -> dict[str, Any]: """Extract request data for security analysis""" # Simplified request data extraction return { "client_ip": scope.get("client", ("", 0))[0], "method": scope["method"], "endpoint": scope["path"], "headers": dict(scope.get("headers", [])), "query_params": {}, # Extract from query string "body": "", # Extract from receive } async def _send_blocked_response(self, send): """Send blocked response to suspicious requests""" response = { "type": "http.response.start", "status": 403, "headers": [[b"content-type", b"application/json"]], } await send(response) body = json.dumps( { "error": "Access Denied", "message": "Security violation detected", "timestamp": datetime.utcnow().isoformat(), } ).encode() await send( { "type": "http.response.body", "body": body, } )