Spaces:
Paused
Paused
| """ | |
| Simplified Monitoring Collector for Zenith Platform | |
| This is a lightweight monitoring collector that doesn't depend on external services | |
| during testing/development phases. | |
| """ | |
| import statistics | |
| import threading | |
| from collections import defaultdict, deque | |
| from datetime import datetime, timedelta | |
| from typing import Any | |
| class MonitoringCollector: | |
| """Simplified monitoring collector with in-memory storage""" | |
| def __init__(self): | |
| # Performance metrics | |
| self.request_times = deque(maxlen=1000) | |
| self.request_counts = defaultdict(int) | |
| self.error_counts = defaultdict(int) | |
| self.response_sizes = deque(maxlen=500) | |
| # Security metrics | |
| self.failed_logins = deque(maxlen=100) | |
| self.security_events = deque(maxlen=200) | |
| self.blocked_ips = set() | |
| # Resource metrics | |
| self.memory_usage = deque(maxlen=100) | |
| self.cpu_usage = deque(maxlen=100) | |
| # Lock for thread safety | |
| self._lock = threading.Lock() | |
| print("Simplified monitoring collector initialized") | |
| def record_metric(self, metric_name: str, value: float, tags: dict[str, Any] = None): | |
| """Record a metric""" | |
| with self._lock: | |
| if metric_name == "request_time": | |
| self.request_times.append(value) | |
| if tags and "endpoint" in tags: | |
| self.request_counts[tags["endpoint"]] += 1 | |
| elif metric_name == "error_rate": | |
| if tags and "endpoint" in tags: | |
| self.error_counts[tags["endpoint"]] += 1 | |
| elif metric_name == "response_size": | |
| self.response_sizes.append(value) | |
| elif metric_name == "memory_usage": | |
| self.memory_usage.append(value) | |
| elif metric_name == "cpu_usage": | |
| self.cpu_usage.append(value) | |
| def record_security_event(self, event_type: str, details: dict[str, Any]): | |
| """Record a security event""" | |
| with self._lock: | |
| event = {"timestamp": datetime.utcnow(), "type": event_type, "details": details} | |
| self.security_events.append(event) | |
| if event_type == "failed_login": | |
| self.failed_logins.append(event) | |
| def get_performance_metrics(self, minutes: int = 5) -> dict[str, Any]: | |
| """Get performance metrics for the last N minutes""" | |
| with self._lock: | |
| # Calculate actual metrics from collected data | |
| avg_response_time = statistics.mean(self.request_times) if self.request_times else 0.0 | |
| p95_response_time = ( | |
| sorted(self.request_times)[int(len(self.request_times) * 0.95)] if self.request_times else 0.0 | |
| ) | |
| p99_response_time = ( | |
| sorted(self.request_times)[int(len(self.request_times) * 0.99)] if self.request_times else 0.0 | |
| ) | |
| avg_response_size = statistics.mean(self.response_sizes) if self.response_sizes else 0.0 | |
| total_requests = sum(self.request_counts.values()) | |
| total_errors = sum(self.error_counts.values()) | |
| error_rate = (total_errors / total_requests * 100) if total_requests > 0 else 0.0 | |
| avg_memory = statistics.mean(self.memory_usage) if self.memory_usage else 0.0 | |
| avg_cpu = statistics.mean(self.cpu_usage) if self.cpu_usage else 0.0 | |
| return { | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "period_minutes": minutes, | |
| "performance": { | |
| "avg_response_time": round(avg_response_time, 3), | |
| "p95_response_time": round(p95_response_time, 3), | |
| "p99_response_time": round(p99_response_time, 3), | |
| "avg_response_size": round(avg_response_size, 2), | |
| "total_requests": total_requests, | |
| "error_rate": round(error_rate, 2), | |
| }, | |
| "resources": {"avg_memory_usage": round(avg_memory, 2), "avg_cpu_usage": round(avg_cpu, 2)}, | |
| } | |
| def get_security_metrics(self, minutes: int = 5) -> dict[str, Any]: | |
| """Get security metrics for the last N minutes""" | |
| with self._lock: | |
| cutoff_time = datetime.utcnow() - timedelta(minutes=minutes) | |
| # Count recent security events | |
| recent_events = [event for event in self.security_events if event["timestamp"] > cutoff_time] | |
| recent_failed_logins = [event for event in self.failed_logins if event["timestamp"] > cutoff_time] | |
| # Count events by type | |
| event_types = defaultdict(int) | |
| for event in recent_events: | |
| event_types[event["type"]] += 1 | |
| # Calculate attack patterns | |
| unique_ips = set() | |
| for event in recent_failed_logins: | |
| if "ip_address" in event["details"]: | |
| unique_ips.add(event["details"]["ip_address"]) | |
| return { | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "period_minutes": minutes, | |
| "security": { | |
| "total_events": len(recent_events), | |
| "failed_logins": len(recent_failed_logins), | |
| "unique_attack_ips": len(unique_ips), | |
| "blocked_ips": len(self.blocked_ips), | |
| "event_types": dict(event_types), | |
| }, | |
| } | |
| def get_business_metrics(self, minutes: int = 5) -> dict[str, Any]: | |
| """Get business metrics for the last N minutes""" | |
| with self._lock: | |
| # Simulated business metrics | |
| # In a real implementation, these would come from actual business data | |
| return { | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "period_minutes": minutes, | |
| "business": { | |
| "active_users": 45, | |
| "new_registrations": 3, | |
| "cases_processed": 12, | |
| "alerts_generated": 8, | |
| "case_resolution_rate": 85.5, | |
| "fraud_detection_rate": 92.1, | |
| }, | |
| } | |
| def get_health_status(self) -> dict[str, Any]: | |
| """Get overall system health status""" | |
| with self._lock: | |
| # Determine health based on metrics | |
| avg_response_time = statistics.mean(self.request_times) if self.request_times else 0 | |
| total_requests = sum(self.request_counts.values()) | |
| total_errors = sum(self.error_counts.values()) | |
| error_rate = (total_errors / total_requests * 100) if total_requests > 0 else 0 | |
| # Health determination | |
| status = "healthy" | |
| issues = [] | |
| if avg_response_time > 1000: # > 1 second | |
| status = "degraded" | |
| issues.append("High response times") | |
| if error_rate > 5: # > 5% error rate | |
| status = "unhealthy" | |
| issues.append("High error rate") | |
| if len(self.failed_logins) > 10: # Many failed logins | |
| if status != "unhealthy": | |
| status = "degraded" | |
| issues.append("Security concerns") | |
| return { | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "status": status, | |
| "issues": issues, | |
| "uptime_percentage": 99.9, # Simulated | |
| "active_services": { | |
| "database": self._check_database_health(), | |
| "api": self._check_api_health(), | |
| "monitoring": True, | |
| }, | |
| } | |
| def _check_database_health(self) -> bool: | |
| """Check database health (simplified)""" | |
| # In a real implementation, this would check actual database connectivity | |
| return True | |
| def _check_api_health(self) -> bool: | |
| """Check API health (simplified)""" | |
| # In a real implementation, this would check API endpoints | |
| return True | |