zenith-backend / app /services /monitoring_collector.py
teoat's picture
fix(backend): fix port and health check robustness
d29a5a0 verified
"""
Simplified Monitoring Collector for Zenith Platform
This is a lightweight monitoring collector that doesn't depend on external services
during testing/development phases.
"""
import statistics
import threading
from collections import defaultdict, deque
from datetime import datetime, timedelta
from typing import Any
class MonitoringCollector:
"""Simplified monitoring collector with in-memory storage"""
def __init__(self):
# Performance metrics
self.request_times = deque(maxlen=1000)
self.request_counts = defaultdict(int)
self.error_counts = defaultdict(int)
self.response_sizes = deque(maxlen=500)
# Security metrics
self.failed_logins = deque(maxlen=100)
self.security_events = deque(maxlen=200)
self.blocked_ips = set()
# Resource metrics
self.memory_usage = deque(maxlen=100)
self.cpu_usage = deque(maxlen=100)
# Lock for thread safety
self._lock = threading.Lock()
print("Simplified monitoring collector initialized")
def record_metric(self, metric_name: str, value: float, tags: dict[str, Any] = None):
"""Record a metric"""
with self._lock:
if metric_name == "request_time":
self.request_times.append(value)
if tags and "endpoint" in tags:
self.request_counts[tags["endpoint"]] += 1
elif metric_name == "error_rate":
if tags and "endpoint" in tags:
self.error_counts[tags["endpoint"]] += 1
elif metric_name == "response_size":
self.response_sizes.append(value)
elif metric_name == "memory_usage":
self.memory_usage.append(value)
elif metric_name == "cpu_usage":
self.cpu_usage.append(value)
def record_security_event(self, event_type: str, details: dict[str, Any]):
"""Record a security event"""
with self._lock:
event = {"timestamp": datetime.utcnow(), "type": event_type, "details": details}
self.security_events.append(event)
if event_type == "failed_login":
self.failed_logins.append(event)
def get_performance_metrics(self, minutes: int = 5) -> dict[str, Any]:
"""Get performance metrics for the last N minutes"""
with self._lock:
# Calculate actual metrics from collected data
avg_response_time = statistics.mean(self.request_times) if self.request_times else 0.0
p95_response_time = (
sorted(self.request_times)[int(len(self.request_times) * 0.95)] if self.request_times else 0.0
)
p99_response_time = (
sorted(self.request_times)[int(len(self.request_times) * 0.99)] if self.request_times else 0.0
)
avg_response_size = statistics.mean(self.response_sizes) if self.response_sizes else 0.0
total_requests = sum(self.request_counts.values())
total_errors = sum(self.error_counts.values())
error_rate = (total_errors / total_requests * 100) if total_requests > 0 else 0.0
avg_memory = statistics.mean(self.memory_usage) if self.memory_usage else 0.0
avg_cpu = statistics.mean(self.cpu_usage) if self.cpu_usage else 0.0
return {
"timestamp": datetime.utcnow().isoformat(),
"period_minutes": minutes,
"performance": {
"avg_response_time": round(avg_response_time, 3),
"p95_response_time": round(p95_response_time, 3),
"p99_response_time": round(p99_response_time, 3),
"avg_response_size": round(avg_response_size, 2),
"total_requests": total_requests,
"error_rate": round(error_rate, 2),
},
"resources": {"avg_memory_usage": round(avg_memory, 2), "avg_cpu_usage": round(avg_cpu, 2)},
}
def get_security_metrics(self, minutes: int = 5) -> dict[str, Any]:
"""Get security metrics for the last N minutes"""
with self._lock:
cutoff_time = datetime.utcnow() - timedelta(minutes=minutes)
# Count recent security events
recent_events = [event for event in self.security_events if event["timestamp"] > cutoff_time]
recent_failed_logins = [event for event in self.failed_logins if event["timestamp"] > cutoff_time]
# Count events by type
event_types = defaultdict(int)
for event in recent_events:
event_types[event["type"]] += 1
# Calculate attack patterns
unique_ips = set()
for event in recent_failed_logins:
if "ip_address" in event["details"]:
unique_ips.add(event["details"]["ip_address"])
return {
"timestamp": datetime.utcnow().isoformat(),
"period_minutes": minutes,
"security": {
"total_events": len(recent_events),
"failed_logins": len(recent_failed_logins),
"unique_attack_ips": len(unique_ips),
"blocked_ips": len(self.blocked_ips),
"event_types": dict(event_types),
},
}
def get_business_metrics(self, minutes: int = 5) -> dict[str, Any]:
"""Get business metrics for the last N minutes"""
with self._lock:
# Simulated business metrics
# In a real implementation, these would come from actual business data
return {
"timestamp": datetime.utcnow().isoformat(),
"period_minutes": minutes,
"business": {
"active_users": 45,
"new_registrations": 3,
"cases_processed": 12,
"alerts_generated": 8,
"case_resolution_rate": 85.5,
"fraud_detection_rate": 92.1,
},
}
def get_health_status(self) -> dict[str, Any]:
"""Get overall system health status"""
with self._lock:
# Determine health based on metrics
avg_response_time = statistics.mean(self.request_times) if self.request_times else 0
total_requests = sum(self.request_counts.values())
total_errors = sum(self.error_counts.values())
error_rate = (total_errors / total_requests * 100) if total_requests > 0 else 0
# Health determination
status = "healthy"
issues = []
if avg_response_time > 1000: # > 1 second
status = "degraded"
issues.append("High response times")
if error_rate > 5: # > 5% error rate
status = "unhealthy"
issues.append("High error rate")
if len(self.failed_logins) > 10: # Many failed logins
if status != "unhealthy":
status = "degraded"
issues.append("Security concerns")
return {
"timestamp": datetime.utcnow().isoformat(),
"status": status,
"issues": issues,
"uptime_percentage": 99.9, # Simulated
"active_services": {
"database": self._check_database_health(),
"api": self._check_api_health(),
"monitoring": True,
},
}
def _check_database_health(self) -> bool:
"""Check database health (simplified)"""
# In a real implementation, this would check actual database connectivity
return True
def _check_api_health(self) -> bool:
"""Check API health (simplified)"""
# In a real implementation, this would check API endpoints
return True