Spaces:
Paused
Paused
| """ | |
| Prometheus Metrics Integration | |
| Enterprise-grade metrics collection and exposure | |
| """ | |
| import time | |
| from fastapi.responses import PlainTextResponse | |
| from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, generate_latest | |
| from core.logging.advanced_logging import structured_logger | |
| class PrometheusMetricsCollector: | |
| """Custom Prometheus metrics collector""" | |
| def __init__(self): | |
| self.logger = structured_logger | |
| self.registry = CollectorRegistry() | |
| # HTTP Request Metrics | |
| self.http_requests_total = Counter( | |
| 'http_requests_total', | |
| 'Total number of HTTP requests', | |
| ['method', 'endpoint', 'status_code'], | |
| registry=self.registry | |
| ) | |
| self.http_request_duration_seconds = Histogram( | |
| 'http_request_duration_seconds', | |
| 'HTTP request duration in seconds', | |
| ['method', 'endpoint'], | |
| registry=self.registry | |
| ) | |
| # Database Metrics | |
| self.db_connections_active = Gauge( | |
| 'db_connections_active', | |
| 'Number of active database connections', | |
| registry=self.registry | |
| ) | |
| self.db_query_duration_seconds = Histogram( | |
| 'db_query_duration_seconds', | |
| 'Database query duration in seconds', | |
| ['query_type'], | |
| registry=self.registry | |
| ) | |
| # Cache Metrics | |
| self.cache_hits_total = Counter( | |
| 'cache_hits_total', | |
| 'Total number of cache hits', | |
| ['cache_type'], | |
| registry=self.registry | |
| ) | |
| self.cache_misses_total = Counter( | |
| 'cache_misses_total', | |
| 'Total number of cache misses', | |
| ['cache_type'], | |
| registry=self.registry | |
| ) | |
| # Business Metrics | |
| self.cases_created_total = Counter( | |
| 'cases_created_total', | |
| 'Total number of cases created', | |
| ['priority', 'status'], | |
| registry=self.registry | |
| ) | |
| self.active_users = Gauge( | |
| 'active_users', | |
| 'Number of currently active users', | |
| registry=self.registry | |
| ) | |
| # System Metrics | |
| self.memory_usage_bytes = Gauge( | |
| 'memory_usage_bytes', | |
| 'Current memory usage in bytes', | |
| registry=self.registry | |
| ) | |
| self.cpu_usage_percent = Gauge( | |
| 'cpu_usage_percent', | |
| 'Current CPU usage percentage', | |
| registry=self.registry | |
| ) | |
| # Custom Business Metrics | |
| self.fraud_detection_accuracy = Gauge( | |
| 'fraud_detection_accuracy', | |
| 'Fraud detection model accuracy (0-1)', | |
| registry=self.registry | |
| ) | |
| self.api_response_time_p95 = Gauge( | |
| 'api_response_time_p95', | |
| '95th percentile API response time in seconds', | |
| registry=self.registry | |
| ) | |
| def record_http_request(self, method: str, endpoint: str, status_code: int, duration: float): | |
| """Record HTTP request metrics""" | |
| self.http_requests_total.labels( | |
| method=method, | |
| endpoint=endpoint, | |
| status_code=str(status_code) | |
| ).inc() | |
| self.http_request_duration_seconds.labels( | |
| method=method, | |
| endpoint=endpoint | |
| ).observe(duration) | |
| def record_database_metrics(self, connections_active: int, query_duration: float, query_type: str = 'select'): | |
| """Record database performance metrics""" | |
| self.db_connections_active.set(connections_active) | |
| self.db_query_duration_seconds.labels(query_type=query_type).observe(query_duration) | |
| def record_cache_metrics(self, cache_type: str, hit: bool): | |
| """Record cache hit/miss metrics""" | |
| if hit: | |
| self.cache_hits_total.labels(cache_type=cache_type).inc() | |
| else: | |
| self.cache_misses_total.labels(cache_type=cache_type).inc() | |
| def record_case_created(self, priority: str, status: str): | |
| """Record case creation metrics""" | |
| self.cases_created_total.labels(priority=priority, status=status).inc() | |
| def update_system_metrics(self, memory_bytes: int, cpu_percent: float): | |
| """Update system resource metrics""" | |
| self.memory_usage_bytes.set(memory_bytes) | |
| self.cpu_usage_percent.set(cpu_percent) | |
| def update_business_metrics(self, active_users: int, fraud_accuracy: float, api_p95: float): | |
| """Update business-specific metrics""" | |
| self.active_users.set(active_users) | |
| self.fraud_detection_accuracy.set(fraud_accuracy) | |
| self.api_response_time_p95.set(api_p95) | |
| def get_metrics(self) -> str: | |
| """Get all metrics in Prometheus format""" | |
| return generate_latest(self.registry).decode('utf-8') | |
| class MetricsMiddleware: | |
| """FastAPI middleware for collecting Prometheus metrics""" | |
| def __init__(self, app, metrics_collector: PrometheusMetricsCollector): | |
| self.app = app | |
| self.metrics = metrics_collector | |
| async def __call__(self, scope, receive, send): | |
| if scope["type"] != "http": | |
| await self.app(scope, receive, send) | |
| return | |
| # Extract request info | |
| method = scope["method"] | |
| path = scope["path"] | |
| # Skip metrics endpoint to avoid recursion | |
| if path.startswith("/metrics"): | |
| await self.app(scope, receive, send) | |
| return | |
| start_time = time.time() | |
| # Create response capture | |
| response_status = [200] # Default | |
| async def capture_response(message): | |
| if message["type"] == "http.response.start": | |
| response_status[0] = message["status"] | |
| await send(message) | |
| await self.app(scope, receive, capture_response) | |
| # Record metrics | |
| duration = time.time() - start_time | |
| self.metrics.record_http_request(method, path, response_status[0], duration) | |
| # Global metrics collector | |
| metrics_collector = PrometheusMetricsCollector() | |
| def get_metrics_endpoint(): | |
| """FastAPI endpoint for exposing Prometheus metrics""" | |
| async def metrics(): | |
| """Prometheus metrics endpoint""" | |
| try: | |
| # Update system metrics | |
| import psutil | |
| memory_bytes = psutil.virtual_memory().used | |
| cpu_percent = psutil.cpu_percent() | |
| metrics_collector.update_system_metrics(memory_bytes, cpu_percent) | |
| # Update business metrics (placeholder - would be calculated from actual data) | |
| metrics_collector.update_business_metrics( | |
| active_users=42, # Would be calculated from active sessions | |
| fraud_accuracy=0.87, # Would be calculated from model performance | |
| api_p95=0.15 # Would be calculated from recent request data | |
| ) | |
| return PlainTextResponse( | |
| metrics_collector.get_metrics(), | |
| media_type="text/plain; charset=utf-8" | |
| ) | |
| except Exception as e: | |
| structured_logger.error(f"Metrics collection failed: {e}") | |
| return PlainTextResponse( | |
| f"# Error collecting metrics: {e}", | |
| media_type="text/plain; charset=utf-8", | |
| status_code=500 | |
| ) | |
| return metrics | |