teoat's picture
Upload folder using huggingface_hub
4ae946d verified
"""
Production Monitoring Configuration
Sets up monitoring for security events and failed authentication attempts
"""
import logging
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Any
logger = logging.getLogger(__name__)
class SecurityMonitor:
"""Monitor security events and trigger alerts"""
def __init__(self):
self.failed_auth_attempts = defaultdict(list)
self.admin_operations = []
self.critical_events = []
def log_failed_auth(self, user_id: str, ip_address: str, reason: str):
"""Log failed authentication attempt"""
event = {
"timestamp": datetime.now().isoformat(),
"user_id": user_id,
"ip_address": ip_address,
"reason": reason,
"event_type": "FAILED_AUTH",
}
self.failed_auth_attempts[ip_address].append(event)
# Alert if more than 5 failed attempts from same IP in 5 minutes
recent_failures = [
e
for e in self.failed_auth_attempts[ip_address]
if datetime.fromisoformat(e["timestamp"])
> datetime.now() - timedelta(minutes=5)
]
if len(recent_failures) >= 5:
self.trigger_alert(
"BRUTE_FORCE_DETECTED",
{
"ip_address": ip_address,
"attempts": len(recent_failures),
"timeframe": "5_minutes",
},
)
def log_admin_operation(
self, user_id: str, operation: str, details: dict[str, Any]
):
"""Log admin operation for monitoring"""
event = {
"timestamp": datetime.now().isoformat(),
"user_id": user_id,
"operation": operation,
"details": details,
"event_type": "ADMIN_OPERATION",
}
self.admin_operations.append(event)
logger.warning(f"Admin operation: {operation} by {user_id}")
def log_critical_event(self, event_type: str, details: dict[str, Any]):
"""Log critical security event"""
event = {
"timestamp": datetime.now().isoformat(),
"event_type": event_type,
"details": details,
"severity": "CRITICAL",
}
self.critical_events.append(event)
logger.critical(f"Critical security event: {event_type}")
# Always trigger alert for critical events
self.trigger_alert(event_type, details)
def trigger_alert(self, alert_type: str, details: dict[str, Any]):
"""Trigger security alert"""
{
"timestamp": datetime.now().isoformat(),
"alert_type": alert_type,
"details": details,
"requires_action": True,
}
logger.critical(f"SECURITY ALERT: {alert_type} - {details}")
# In production, this would:
# - Send email to security team
# - Post to Slack/Teams channel
# - Create PagerDuty incident
# - Write to SIEM system
def get_security_summary(self) -> dict[str, Any]:
"""Get summary of recent security events"""
now = datetime.now()
last_24h = now - timedelta(hours=24)
# Count recent events
recent_failed_auths = sum(
len(
[
e
for e in attempts
if datetime.fromisoformat(e["timestamp"]) > last_24h
]
)
for attempts in self.failed_auth_attempts.values()
)
recent_admin_ops = len(
[
e
for e in self.admin_operations
if datetime.fromisoformat(e["timestamp"]) > last_24h
]
)
recent_critical = len(
[
e
for e in self.critical_events
if datetime.fromisoformat(e["timestamp"]) > last_24h
]
)
return {
"period": "last_24_hours",
"failed_auth_attempts": recent_failed_auths,
"admin_operations": recent_admin_ops,
"critical_events": recent_critical,
"high_risk_ips": self._get_high_risk_ips(),
"status": "healthy" if recent_critical == 0 else "alert",
}
def _get_high_risk_ips(self) -> list[str]:
"""Get IPs with suspicious activity"""
high_risk = []
now = datetime.now()
last_hour = now - timedelta(hours=1)
for ip, attempts in self.failed_auth_attempts.items():
recent = [
e
for e in attempts
if datetime.fromisoformat(e["timestamp"]) > last_hour
]
if len(recent) >= 3: # 3+ failures in last hour
high_risk.append(ip)
return high_risk
# Global monitoring instance
security_monitor = SecurityMonitor()