Spaces:
Paused
Paused
| """ | |
| Production Monitoring Configuration | |
| Sets up monitoring for security events and failed authentication attempts | |
| """ | |
| import logging | |
| from collections import defaultdict | |
| from datetime import datetime, timedelta | |
| from typing import Any | |
| logger = logging.getLogger(__name__) | |
| class SecurityMonitor: | |
| """Monitor security events and trigger alerts""" | |
| def __init__(self): | |
| self.failed_auth_attempts = defaultdict(list) | |
| self.admin_operations = [] | |
| self.critical_events = [] | |
| def log_failed_auth(self, user_id: str, ip_address: str, reason: str): | |
| """Log failed authentication attempt""" | |
| event = { | |
| "timestamp": datetime.now().isoformat(), | |
| "user_id": user_id, | |
| "ip_address": ip_address, | |
| "reason": reason, | |
| "event_type": "FAILED_AUTH", | |
| } | |
| self.failed_auth_attempts[ip_address].append(event) | |
| # Alert if more than 5 failed attempts from same IP in 5 minutes | |
| recent_failures = [ | |
| e | |
| for e in self.failed_auth_attempts[ip_address] | |
| if datetime.fromisoformat(e["timestamp"]) | |
| > datetime.now() - timedelta(minutes=5) | |
| ] | |
| if len(recent_failures) >= 5: | |
| self.trigger_alert( | |
| "BRUTE_FORCE_DETECTED", | |
| { | |
| "ip_address": ip_address, | |
| "attempts": len(recent_failures), | |
| "timeframe": "5_minutes", | |
| }, | |
| ) | |
| def log_admin_operation( | |
| self, user_id: str, operation: str, details: dict[str, Any] | |
| ): | |
| """Log admin operation for monitoring""" | |
| event = { | |
| "timestamp": datetime.now().isoformat(), | |
| "user_id": user_id, | |
| "operation": operation, | |
| "details": details, | |
| "event_type": "ADMIN_OPERATION", | |
| } | |
| self.admin_operations.append(event) | |
| logger.warning(f"Admin operation: {operation} by {user_id}") | |
| def log_critical_event(self, event_type: str, details: dict[str, Any]): | |
| """Log critical security event""" | |
| event = { | |
| "timestamp": datetime.now().isoformat(), | |
| "event_type": event_type, | |
| "details": details, | |
| "severity": "CRITICAL", | |
| } | |
| self.critical_events.append(event) | |
| logger.critical(f"Critical security event: {event_type}") | |
| # Always trigger alert for critical events | |
| self.trigger_alert(event_type, details) | |
| def trigger_alert(self, alert_type: str, details: dict[str, Any]): | |
| """Trigger security alert""" | |
| { | |
| "timestamp": datetime.now().isoformat(), | |
| "alert_type": alert_type, | |
| "details": details, | |
| "requires_action": True, | |
| } | |
| logger.critical(f"SECURITY ALERT: {alert_type} - {details}") | |
| # In production, this would: | |
| # - Send email to security team | |
| # - Post to Slack/Teams channel | |
| # - Create PagerDuty incident | |
| # - Write to SIEM system | |
| def get_security_summary(self) -> dict[str, Any]: | |
| """Get summary of recent security events""" | |
| now = datetime.now() | |
| last_24h = now - timedelta(hours=24) | |
| # Count recent events | |
| recent_failed_auths = sum( | |
| len( | |
| [ | |
| e | |
| for e in attempts | |
| if datetime.fromisoformat(e["timestamp"]) > last_24h | |
| ] | |
| ) | |
| for attempts in self.failed_auth_attempts.values() | |
| ) | |
| recent_admin_ops = len( | |
| [ | |
| e | |
| for e in self.admin_operations | |
| if datetime.fromisoformat(e["timestamp"]) > last_24h | |
| ] | |
| ) | |
| recent_critical = len( | |
| [ | |
| e | |
| for e in self.critical_events | |
| if datetime.fromisoformat(e["timestamp"]) > last_24h | |
| ] | |
| ) | |
| return { | |
| "period": "last_24_hours", | |
| "failed_auth_attempts": recent_failed_auths, | |
| "admin_operations": recent_admin_ops, | |
| "critical_events": recent_critical, | |
| "high_risk_ips": self._get_high_risk_ips(), | |
| "status": "healthy" if recent_critical == 0 else "alert", | |
| } | |
| def _get_high_risk_ips(self) -> list[str]: | |
| """Get IPs with suspicious activity""" | |
| high_risk = [] | |
| now = datetime.now() | |
| last_hour = now - timedelta(hours=1) | |
| for ip, attempts in self.failed_auth_attempts.items(): | |
| recent = [ | |
| e | |
| for e in attempts | |
| if datetime.fromisoformat(e["timestamp"]) > last_hour | |
| ] | |
| if len(recent) >= 3: # 3+ failures in last hour | |
| high_risk.append(ip) | |
| return high_risk | |
| # Global monitoring instance | |
| security_monitor = SecurityMonitor() | |