zenith-backend / app /services /advanced_security.py
teoat's picture
fix(backend): fix port and health check robustness
d29a5a0 verified
"""
Advanced Security Hardening Suite
Zenith Fraud Detection Platform - Enterprise-Grade Security
"""
import time
import logging
from datetime import datetime
from typing import Any
from dataclasses import dataclass
import re
from app.services.monitoring_collector import MonitoringCollector
@dataclass
class SecurityEvent:
"""Security event data structure"""
timestamp: datetime
event_type: str
severity: str
source_ip: str
user_agent: str
details: dict[str, Any]
risk_score: int
class AdvancedSecurityManager:
"""Advanced security management with AI-powered threat detection"""
def __init__(self):
self.monitoring_collector = MonitoringCollector()
self.blocked_ips = set()
self.suspicious_patterns = []
self.security_events = []
self.rate_limit_store = {}
self.anomaly_detector = AnomalyDetector()
self.logger = logging.getLogger(__name__)
async def initialize(self):
"""Initialize advanced security features"""
await self.load_security_policies()
await self.setup_threat_intelligence()
await self.configure_advanced_rules()
self.logger.info("Advanced security manager initialized")
async def analyze_request_security(self, request_data: dict[str, Any]) -> SecurityEvent:
"""Advanced request security analysis"""
security_score = 0
security_issues = []
# IP-based analysis
ip_analysis = await self.analyze_ip_risk(request_data.get("client_ip", ""))
security_score += ip_analysis["risk_score"]
security_issues.extend(ip_analysis["issues"])
# Request pattern analysis
pattern_analysis = await self.analyze_request_patterns(request_data)
security_score += pattern_analysis["risk_score"]
security_issues.extend(pattern_analysis["issues"])
# Payload analysis for attacks
payload_analysis = await self.analyze_payload_security(request_data)
security_score += payload_analysis["risk_score"]
security_issues.extend(payload_analysis["issues"])
# User behavior analysis
behavior_analysis = await self.analyze_user_behavior(request_data)
security_score += behavior_analysis["risk_score"]
security_issues.extend(behavior_analysis["issues"])
# Create security event
event = SecurityEvent(
timestamp=datetime.utcnow(),
event_type="REQUEST_SECURITY_ANALYSIS",
severity=self._calculate_severity(security_score),
source_ip=request_data.get("client_ip", ""),
user_agent=request_data.get("user_agent", ""),
details={
"security_score": security_score,
"issues": security_issues,
"endpoint": request_data.get("endpoint", ""),
"method": request_data.get("method", ""),
},
risk_score=security_score,
)
# Log security event
await self.log_security_event(event)
# Auto-block if high risk
if security_score > 80:
await self.auto_block_ip(request_data.get("client_ip", ""), "High risk score detected")
return event
async def analyze_ip_risk(self, ip: str) -> dict[str, Any]:
"""Advanced IP risk analysis"""
risk_score = 0
issues = []
# Check against blocked IPs
if ip in self.blocked_ips:
risk_score += 100
issues.append("IP already blocked")
# Check for known malicious patterns
if self._is_suspicious_ip(ip):
risk_score += 50
issues.append("Suspicious IP pattern detected")
# Geolocation risk analysis (simulated)
if self._is_high_risk_country(ip):
risk_score += 20
issues.append("High-risk geographical location")
# Proxy/VPN detection (simulated)
if self._is_proxy_or_vpn(ip):
risk_score += 15
issues.append("Proxy/VPN usage detected")
return {"risk_score": risk_score, "issues": issues}
async def analyze_request_patterns(self, request_data: dict[str, Any]) -> dict[str, Any]:
"""Advanced request pattern analysis"""
risk_score = 0
issues = []
endpoint = request_data.get("endpoint", "")
method = request_data.get("method", "")
headers = request_data.get("headers", {})
# Check for attack patterns in endpoint
if self._contains_attack_patterns(endpoint):
risk_score += 40
issues.append("Attack pattern detected in endpoint")
# Check for suspicious headers
suspicious_headers = self._detect_suspicious_headers(headers)
if suspicious_headers:
risk_score += 20 * len(suspicious_headers)
issues.extend([f"Suspicious header: {h}" for h in suspicious_headers])
# Rate limiting analysis
ip = request_data.get("client_ip", "")
if self._is_rate_limited(ip):
risk_score += 30
issues.append("Rate limit exceeded")
# Check for unusual request methods
if method not in ["GET", "POST", "PUT", "DELETE", "PATCH"]:
risk_score += 25
issues.append("Unusual HTTP method")
return {"risk_score": risk_score, "issues": issues}
async def analyze_payload_security(self, request_data: dict[str, Any]) -> dict[str, Any]:
"""Advanced payload security analysis"""
risk_score = 0
issues = []
payload = request_data.get("body", "")
query_params = request_data.get("query_params", {})
# SQL injection detection (advanced)
sql_injection_risks = self._detect_advanced_sql_injection(payload, query_params)
if sql_injection_risks:
risk_score += 60
issues.extend(sql_injection_risks)
# XSS detection (advanced)
xss_risks = self._detect_advanced_xss(payload)
if xss_risks:
risk_score += 50
issues.extend(xss_risks)
# Command injection detection
command_injection_risks = self._detect_command_injection(payload)
if command_injection_risks:
risk_score += 70
issues.extend(command_injection_risks)
# File upload security
if request_data.get("files"):
upload_risks = await self._analyze_file_uploads(request_data["files"])
risk_score += upload_risks["risk_score"]
issues.extend(upload_risks["issues"])
return {"risk_score": risk_score, "issues": issues}
async def analyze_user_behavior(self, request_data: dict[str, Any]) -> dict[str, Any]:
"""AI-powered user behavior analysis"""
risk_score = 0
issues = []
user_id = request_data.get("user_id")
if not user_id:
return {"risk_score": 0, "issues": []}
# Anomalous behavior detection
behavior_score = await self.anomaly_detector.detect_anomaly(user_id, request_data)
risk_score += behavior_score
if behavior_score > 30:
issues.append("Anomalous user behavior detected")
# Session security analysis
session_risks = await self._analyze_session_security(user_id, request_data)
risk_score += session_risks["risk_score"]
issues.extend(session_risks["issues"])
return {"risk_score": risk_score, "issues": issues}
def _is_suspicious_ip(self, ip: str) -> bool:
"""Check for suspicious IP patterns"""
# Common bot/scanner patterns
suspicious_patterns = [
r"^10\.", # Private ranges (shouldn't be external)
r"^192\.168\.", # Private ranges
r"^172\.1[6-9]\.", # Private ranges
r"^172\.2[0-9]\.", # Private ranges
r"^172\.3[0-1]\.", # Private ranges
]
for pattern in suspicious_patterns:
if re.match(pattern, ip):
return True
return False
def _is_high_risk_country(self, ip: str) -> bool:
"""Simulated high-risk country detection"""
# In production, use real geolocation database
# high_risk_countries = ["CN", "RU", "KP", "IR"]
# Simulated check
return False
def _is_proxy_or_vpn(self, ip: str) -> bool:
"""Simulated proxy/VPN detection"""
# In production, use real proxy detection services
return False
def _contains_attack_patterns(self, endpoint: str) -> bool:
"""Check endpoint for attack patterns"""
attack_patterns = [
"\.\./", # Directory traversal
"<script", # XSS attempt
"union.*select", # SQL injection
"cmd=", # Command injection
"../", # Path traversal
"admin", # Admin access attempt
"wp-admin", # WordPress admin
"phpmyadmin", # Database admin
]
for pattern in attack_patterns:
if re.search(pattern, endpoint, re.IGNORECASE):
return True
return False
def _detect_suspicious_headers(self, headers: dict[str, str]) -> list[str]:
"""Detect suspicious HTTP headers"""
suspicious_headers = []
# Known malicious headers
malicious_headers = [
"X-Forwarded-For: 127.0.0.1",
"X-Real-IP: 127.0.0.1",
"X-Originating-IP",
"X-Cluster-Client-IP",
]
for header, value in headers.items():
header_line = f"{header}: {value}"
for malicious in malicious_headers:
if malicious.lower() in header_line.lower():
suspicious_headers.append(header)
return suspicious_headers
def _is_rate_limited(self, ip: str) -> bool:
"""Advanced rate limiting check"""
current_time = time.time()
window_size = 300 # 5 minutes
max_requests = 100
if ip not in self.rate_limit_store:
self.rate_limit_store[ip] = []
# Remove old requests
self.rate_limit_store[ip] = [
req_time for req_time in self.rate_limit_store[ip] if current_time - req_time < window_size
]
# Add current request
self.rate_limit_store[ip].append(current_time)
return len(self.rate_limit_store[ip]) > max_requests
def _detect_advanced_sql_injection(self, payload: str, query_params: dict[str, Any]) -> list[str]:
"""Advanced SQL injection detection"""
risks = []
# Advanced SQL injection patterns
advanced_patterns = [
r"(\%27|\')(\s*|%20)*(or|and)(\s*|%20).*\=.*\s*--", # Classic SQLi
r"union(\s*|%20)*(all|%20)*(select|%20)", # Union-based
r"select(\s*|%20).*from(\s*|%20).*information_schema", # Information schema
r"(\%27|\').*or(\s*|%20).*\=.*\s*(or|%20).*\=", # Boolean-based
r"waitfor(\s*|%20)delay", # Time-based
r"sleep\(\d+\)", # Time-based MySQL
r"benchmark\(", # Time-based MySQL
r"pg_sleep\(", # Time-based PostgreSQL
]
test_data = payload + str(query_params)
for pattern in advanced_patterns:
if re.search(pattern, test_data, re.IGNORECASE):
risks.append(f"Advanced SQL injection pattern: {pattern}")
return risks
def _detect_advanced_xss(self, payload: str) -> list[str]:
"""Advanced XSS detection"""
risks = []
# Advanced XSS patterns
advanced_patterns = [
r"<script[^>]*>.*?</script>", # Script tags
r"javascript:", # JavaScript protocol
r"on\w+\s*=", # Event handlers
r"<iframe[^>]*src", # Iframe injection
r"<object[^>]*data", # Object injection
r"<embed[^>]*src", # Embed injection
r"<link[^>]*href", # Link injection
r"<meta[^>]*http-equiv", # Meta injection
r"expression\(", # CSS expression
r"@import", # CSS import
r"<svg[^>]*<script", # SVG injection
r"data:text/html", # Data URI
]
for pattern in advanced_patterns:
if re.search(pattern, payload, re.IGNORECASE):
risks.append(f"Advanced XSS pattern: {pattern}")
return risks
def _detect_command_injection(self, payload: str) -> list[str]:
"""Advanced command injection detection"""
risks = []
# Command injection patterns
command_patterns = [
r";\s*(rm|del|format|fdisk)", # Destructive commands
r"(\|\||&)\s*(rm|del|format|fdisk)", # Piped destructive commands
r"\$\(", # Command substitution
r"`[^`]*`", # Backticks
r"curl(\s*|%20).*\|\s*sh", # Download and execute
r"wget(\s*|%20).*\|\s*sh", # Download and execute
r"eval\s*\(", # PHP eval
r"system\s*\(", # PHP system
r"exec\s*\(", # PHP exec
r"shell_exec\s*\(", # PHP shell_exec
]
for pattern in command_patterns:
if re.search(pattern, payload, re.IGNORECASE):
risks.append(f"Command injection pattern: {pattern}")
return risks
async def _analyze_file_uploads(self, files: list[dict[str, Any]]) -> dict[str, Any]:
"""Advanced file upload security analysis"""
risk_score = 0
issues = []
dangerous_extensions = [
".php",
".phtml",
".php3",
".php4",
".php5",
".asp",
".aspx",
".jsp",
".exe",
".bat",
".cmd",
".sh",
".ps1",
".vbs",
".jar",
]
for file_info in files:
filename = file_info.get("filename", "")
file_size = file_info.get("size", 0)
# Check file extension
file_ext = "." + filename.split(".")[-1].lower() if "." in filename else ""
if file_ext in dangerous_extensions:
risk_score += 40
issues.append(f"Dangerous file extension: {file_ext}")
# Check file size
if file_size > 50 * 1024 * 1024: # 50MB limit
risk_score += 20
issues.append(f"Large file upload: {file_size} bytes")
# Check filename for suspicious patterns
if any(pattern in filename.lower() for pattern in ["webshell", "backdoor", "shell", "cmd"]):
risk_score += 60
issues.append(f"Suspicious filename: {filename}")
return {"risk_score": risk_score, "issues": issues}
async def _analyze_session_security(self, user_id: str, request_data: dict[str, Any]) -> dict[str, Any]:
"""Advanced session security analysis"""
risk_score = 0
issues = []
# Check for session hijacking indicators
_ip = request_data.get("client_ip", "")
user_agent = request_data.get("user_agent", "")
# In production, check against stored session data
# For now, simulate session checks
# Multiple concurrent sessions
session_count = await self._get_user_session_count(user_id)
if session_count > 5:
risk_score += 25
issues.append(f"Multiple concurrent sessions: {session_count}")
# Suspicious user agent changes
if await self._has_user_agent_changed(user_id, user_agent):
risk_score += 30
issues.append("User agent changed during session")
return {"risk_score": risk_score, "issues": issues}
async def _get_user_session_count(self, user_id: str) -> int:
"""Get current session count for user"""
# Simulated - in production, check session store
return 1
async def _has_user_agent_changed(self, user_id: str, user_agent: str) -> bool:
"""Check if user agent changed during session"""
# Simulated - in production, check against session store
return False
def _calculate_severity(self, risk_score: int) -> str:
"""Calculate security event severity"""
if risk_score >= 80:
return "CRITICAL"
elif risk_score >= 60:
return "HIGH"
elif risk_score >= 40:
return "MEDIUM"
elif risk_score >= 20:
return "LOW"
else:
return "INFO"
async def log_security_event(self, event: SecurityEvent):
"""Log security event and update monitoring"""
self.security_events.append(event)
# Update monitoring collector
self.monitoring_collector.record_security_event(
event.event_type,
{
"severity": event.severity,
"source_ip": event.source_ip,
"risk_score": event.risk_score,
"details": event.details,
},
)
# Log to file
self.logger.warning(f"Security Event: {event.event_type} - {event.severity} - Score: {event.risk_score}")
async def auto_block_ip(self, ip: str, reason: str):
"""Automatically block suspicious IP"""
self.blocked_ips.add(ip)
# Log block event
await self.monitoring_collector.record_security_event(
"IP_BLOCKED",
{"ip_address": ip, "reason": reason, "timestamp": datetime.utcnow().isoformat(), "auto_blocked": True},
)
self.logger.warning(f"IP auto-blocked: {ip} - Reason: {reason}")
async def load_security_policies(self):
"""Load security policies and rules"""
# Load from configuration or database
self.suspicious_patterns = [
r"select.*from.*information_schema",
r"union.*select.*from",
r"<script.*>.*</script>",
r"javascript:",
r"eval\s*\(",
r"system\s*\(",
]
async def setup_threat_intelligence(self):
"""Setup threat intelligence feeds"""
# Connect to threat intelligence APIs
# Load known malicious IPs, domains, patterns
pass
async def configure_advanced_rules(self):
"""Configure advanced security rules"""
# Setup custom security rules
# Configure alert thresholds
pass
class AnomalyDetector:
"""AI-powered anomaly detection for user behavior"""
def __init__(self):
self.user_profiles = {}
self.behavior_patterns = {}
async def detect_anomaly(self, user_id: str, request_data: dict[str, Any]) -> int:
"""Detect behavioral anomalies"""
anomaly_score = 0
# Build user profile if not exists
if user_id not in self.user_profiles:
self.user_profiles[user_id] = await self._create_user_profile(user_id)
# Analyze request patterns
pattern_anomaly = await self._analyze_request_patterns(user_id, request_data)
anomaly_score += pattern_anomaly
# Analyze time patterns
time_anomaly = await self._analyze_time_patterns(user_id, request_data)
anomaly_score += time_anomaly
# Analyze location patterns
location_anomaly = await self._analyze_location_patterns(user_id, request_data)
anomaly_score += location_anomaly
return min(anomaly_score, 100) # Cap at 100
async def _create_user_profile(self, user_id: str) -> dict[str, Any]:
"""Create user behavior profile"""
return {"request_patterns": {}, "time_patterns": [], "location_patterns": [], "created_at": datetime.utcnow()}
async def _analyze_request_patterns(self, user_id: str, request_data: dict[str, Any]) -> int:
"""Analyze request pattern anomalies"""
# Simulated pattern analysis
return 15
async def _analyze_time_patterns(self, user_id: str, request_data: dict[str, Any]) -> int:
"""Analyze time-based anomalies"""
# Simulated time pattern analysis
return 10
async def _analyze_location_patterns(self, user_id: str, request_data: dict[str, Any]) -> int:
"""Analyze location-based anomalies"""
# Simulated location analysis
return 5
# Global security manager instance
advanced_security_manager = AdvancedSecurityManager()
async def get_advanced_security_manager() -> AdvancedSecurityManager:
"""Get advanced security manager instance"""
if not hasattr(advanced_security_manager, "initialized"):
await advanced_security_manager.initialize()
advanced_security_manager.initialized = True
return advanced_security_manager
# Security middleware for FastAPI
class AdvancedSecurityMiddleware:
"""Advanced security middleware for request filtering"""
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] == "http":
# Extract request data
request_data = await self._extract_request_data(scope, receive)
# Get security manager
security_manager = await get_advanced_security_manager()
# Analyze request security
security_event = await security_manager.analyze_request_security(request_data)
# Block if high risk
if security_event.risk_score > 80:
await self._send_blocked_response(send)
return
await self.app(scope, receive, send)
async def _extract_request_data(self, scope, receive) -> dict[str, Any]:
"""Extract request data for security analysis"""
# Simplified request data extraction
return {
"client_ip": scope.get("client", ("", 0))[0],
"method": scope["method"],
"endpoint": scope["path"],
"headers": dict(scope.get("headers", [])),
"query_params": {}, # Extract from query string
"body": "", # Extract from receive
}
async def _send_blocked_response(self, send):
"""Send blocked response to suspicious requests"""
response = {
"type": "http.response.start",
"status": 403,
"headers": [[b"content-type", b"application/json"]],
}
await send(response)
body = json.dumps(
{
"error": "Access Denied",
"message": "Security violation detected",
"timestamp": datetime.utcnow().isoformat(),
}
).encode()
await send(
{
"type": "http.response.body",
"body": body,
}
)