Spaces:
Paused
Paused
| """ | |
| Application Performance Monitoring (APM) Integration | |
| Task 5.2: Monitoring & Alerting System | |
| Provides: | |
| - Request/Response tracking | |
| - Error tracking and aggregation | |
| - Performance metrics | |
| - Real-time alerting | |
| - Health checks | |
| """ | |
| import logging | |
| import time | |
| import traceback | |
| from collections.abc import Callable | |
| from datetime import datetime | |
| from typing import Any | |
| from fastapi import Request, Response | |
| from starlette.middleware.base import BaseHTTPMiddleware | |
| from starlette.types import ASGIApp | |
| logger = logging.getLogger(__name__) | |
| class MonitoringMetrics: | |
| """Central metrics collection""" | |
| def __init__(self): | |
| self.request_count = 0 | |
| self.error_count = 0 | |
| self.total_response_time = 0.0 | |
| self.endpoint_metrics: dict[str, dict[str, Any]] = {} | |
| self.errors: list = [] | |
| self.max_errors_stored = 100 | |
| def record_request( | |
| self, endpoint: str, method: str, duration: float, status_code: int | |
| ): | |
| """Record request metrics""" | |
| self.request_count += 1 | |
| self.total_response_time += duration | |
| key = f"{method} {endpoint}" | |
| if key not in self.endpoint_metrics: | |
| self.endpoint_metrics[key] = { | |
| "count": 0, | |
| "total_time": 0.0, | |
| "errors": 0, | |
| "min_time": float("inf"), | |
| "max_time": 0.0, | |
| "status_codes": {}, | |
| } | |
| metrics = self.endpoint_metrics[key] | |
| metrics["count"] += 1 | |
| metrics["total_time"] += duration | |
| metrics["min_time"] = min(metrics["min_time"], duration) | |
| metrics["max_time"] = max(metrics["max_time"], duration) | |
| # Track status codes | |
| status_key = str(status_code) | |
| metrics["status_codes"][status_key] = ( | |
| metrics["status_codes"].get(status_key, 0) + 1 | |
| ) | |
| if status_code >= 400: | |
| metrics["errors"] += 1 | |
| self.error_count += 1 | |
| def record_error(self, error: Exception, endpoint: str, context: dict[str, Any]): | |
| """Record error details""" | |
| error_record = { | |
| "timestamp": datetime.now().isoformat(), | |
| "type": type(error).__name__, | |
| "message": str(error), | |
| "endpoint": endpoint, | |
| "traceback": traceback.format_exc(), | |
| "context": context, | |
| } | |
| self.errors.append(error_record) | |
| # Keep only recent errors | |
| if len(self.errors) > self.max_errors_stored: | |
| self.errors = self.errors[-self.max_errors_stored :] | |
| def get_stats(self) -> dict[str, Any]: | |
| """Get aggregated statistics""" | |
| avg_response_time = ( | |
| self.total_response_time / self.request_count | |
| if self.request_count > 0 | |
| else 0 | |
| ) | |
| # Calculate endpoint statistics | |
| endpoint_stats = [] | |
| for endpoint, metrics in self.endpoint_metrics.items(): | |
| avg_time = metrics["total_time"] / metrics["count"] | |
| error_rate = ( | |
| metrics["errors"] / metrics["count"] if metrics["count"] > 0 else 0 | |
| ) | |
| endpoint_stats.append( | |
| { | |
| "endpoint": endpoint, | |
| "requests": metrics["count"], | |
| "avg_response_time_ms": round(avg_time * 1000, 2), | |
| "min_response_time_ms": round(metrics["min_time"] * 1000, 2), | |
| "max_response_time_ms": round(metrics["max_time"] * 1000, 2), | |
| "error_rate": round(error_rate * 100, 2), | |
| "status_codes": metrics["status_codes"], | |
| } | |
| ) | |
| # Sort by request count | |
| endpoint_stats.sort(key=lambda x: x["requests"], reverse=True) | |
| return { | |
| "total_requests": self.request_count, | |
| "total_errors": self.error_count, | |
| "error_rate": ( | |
| round(self.error_count / self.request_count * 100, 2) | |
| if self.request_count > 0 | |
| else 0 | |
| ), | |
| "avg_response_time_ms": round(avg_response_time * 1000, 2), | |
| "endpoints": endpoint_stats[:20], # Top 20 endpoints | |
| "recent_errors": self.errors[-10:], # Last 10 errors | |
| } | |
| class PerformanceMonitoringMiddleware(BaseHTTPMiddleware): | |
| """Middleware for automatic request/response monitoring""" | |
| def __init__(self, app: ASGIApp, metrics: MonitoringMetrics): | |
| super().__init__(app) | |
| self.metrics = metrics | |
| async def dispatch(self, request: Request, call_next: Callable) -> Response: | |
| """Track request timing and errors""" | |
| start_time = time.time() | |
| try: | |
| response = await call_next(request) | |
| duration = time.time() - start_time | |
| # Record metrics | |
| self.metrics.record_request( | |
| endpoint=request.url.path, | |
| method=request.method, | |
| duration=duration, | |
| status_code=response.status_code, | |
| ) | |
| # Add timing header | |
| response.headers["X-Response-Time"] = f"{duration * 1000:.2f}ms" | |
| return response | |
| except Exception as error: | |
| duration = time.time() - start_time | |
| # Record error | |
| self.metrics.record_error( | |
| error=error, | |
| endpoint=request.url.path, | |
| context={ | |
| "method": request.method, | |
| "duration": duration, | |
| "query_params": dict(request.query_params), | |
| "path_params": dict(request.path_params), | |
| }, | |
| ) | |
| # Log error | |
| logger.error( | |
| f"Error in {request.method} {request.url.path}: {error}", exc_info=True | |
| ) | |
| raise | |
| class HealthCheck: | |
| """System health monitoring""" | |
| def __init__(self): | |
| self.checks: dict[str, Callable] = {} | |
| def register(self, name: str, check_func: Callable): | |
| """Register a health check""" | |
| self.checks[name] = check_func | |
| async def run_checks(self) -> dict[str, Any]: | |
| """Run all health checks""" | |
| results = {} | |
| all_healthy = True | |
| for name, check_func in self.checks.items(): | |
| try: | |
| is_healthy = ( | |
| await check_func() if callable(check_func) else check_func() | |
| ) | |
| results[name] = { | |
| "status": "healthy" if is_healthy else "unhealthy", | |
| "timestamp": datetime.now().isoformat(), | |
| } | |
| if not is_healthy: | |
| all_healthy = False | |
| except Exception as e: | |
| results[name] = { | |
| "status": "error", | |
| "error": str(e), | |
| "timestamp": datetime.now().isoformat(), | |
| } | |
| all_healthy = False | |
| return { | |
| "status": "healthy" if all_healthy else "degraded", | |
| "checks": results, | |
| "timestamp": datetime.now().isoformat(), | |
| } | |
| class AlertManager: | |
| """Alert threshold management and notifications""" | |
| def __init__(self, metrics: MonitoringMetrics): | |
| self.metrics = metrics | |
| self.thresholds = { | |
| "error_rate": 5.0, # % | |
| "avg_response_time": 1000, # ms | |
| "error_count": 100, # total | |
| } | |
| self.alerts: list = [] | |
| def check_thresholds(self) -> list: | |
| """Check if any metrics exceed thresholds""" | |
| stats = self.metrics.get_stats() | |
| triggered_alerts = [] | |
| # Check error rate | |
| if stats["error_rate"] > self.thresholds["error_rate"]: | |
| triggered_alerts.append( | |
| { | |
| "type": "error_rate", | |
| "severity": "high", | |
| "message": f"Error rate {stats['error_rate']}% exceeds threshold {self.thresholds['error_rate']}%", | |
| "value": stats["error_rate"], | |
| "threshold": self.thresholds["error_rate"], | |
| } | |
| ) | |
| # Check response time | |
| if stats["avg_response_time_ms"] > self.thresholds["avg_response_time"]: | |
| triggered_alerts.append( | |
| { | |
| "type": "response_time", | |
| "severity": "medium", | |
| "message": ( | |
| f"Average response time {stats['avg_response_time_ms']}ms exceeds " | |
| f"threshold {self.thresholds['avg_response_time']}ms" | |
| ), | |
| "value": stats["avg_response_time_ms"], | |
| "threshold": self.thresholds["avg_response_time"], | |
| } | |
| ) | |
| # Check total errors | |
| if stats["total_errors"] > self.thresholds["error_count"]: | |
| triggered_alerts.append( | |
| { | |
| "type": "error_count", | |
| "severity": "high", | |
| "message": ( | |
| f"Total errors {stats['total_errors']} exceeds threshold {self.thresholds['error_count']}" | |
| ), | |
| "value": stats["total_errors"], | |
| "threshold": self.thresholds["error_count"], | |
| } | |
| ) | |
| if triggered_alerts: | |
| self.alerts.extend(triggered_alerts) | |
| return triggered_alerts | |
| def get_active_alerts(self) -> list: | |
| """Get recent alerts""" | |
| return self.alerts[-50:] # Last 50 alerts | |
| # Global instances | |
| metrics = MonitoringMetrics() | |
| health_check = HealthCheck() | |
| alert_manager = AlertManager(metrics) | |
| # Integration example for FastAPI | |
| def setup_monitoring(app): | |
| """Setup monitoring for FastAPI application""" | |
| # Add middleware | |
| app.add_middleware(PerformanceMonitoringMiddleware, metrics=metrics) | |
| # Register health checks | |
| async def check_database(): | |
| # Add your database health check | |
| return True | |
| async def check_redis(): | |
| # Add your Redis health check | |
| return True | |
| health_check.register("database", check_database) | |
| health_check.register("redis", check_redis) | |
| # Add monitoring endpoints | |
| async def get_metrics(): | |
| """Get application metrics""" | |
| return metrics.get_stats() | |
| async def get_health(): | |
| """Get health check status""" | |
| return await health_check.run_checks() | |
| async def get_alerts(): | |
| """Get active alerts""" | |
| return { | |
| "active_alerts": alert_manager.get_active_alerts(), | |
| "thresholds": alert_manager.thresholds, | |
| } | |
| return app | |
| if __name__ == "__main__": | |
| print("Monitoring System Components:") | |
| print(" ✓ Performance tracking") | |
| print(" ✓ Error aggregation") | |
| print(" ✓ Health checks") | |
| print(" ✓ Alert management") | |
| print("\n_integration:") | |
| print(" from app.monitoring import setup_monitoring") | |
| print(" app = setup_monitoring(app)") | |