""" Prometheus Metrics Integration Enterprise-grade metrics collection and exposure """ import time from fastapi.responses import PlainTextResponse from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, generate_latest from core.logging.advanced_logging import structured_logger class PrometheusMetricsCollector: """Custom Prometheus metrics collector""" def __init__(self): self.logger = structured_logger self.registry = CollectorRegistry() # HTTP Request Metrics self.http_requests_total = Counter( 'http_requests_total', 'Total number of HTTP requests', ['method', 'endpoint', 'status_code'], registry=self.registry ) self.http_request_duration_seconds = Histogram( 'http_request_duration_seconds', 'HTTP request duration in seconds', ['method', 'endpoint'], registry=self.registry ) # Database Metrics self.db_connections_active = Gauge( 'db_connections_active', 'Number of active database connections', registry=self.registry ) self.db_query_duration_seconds = Histogram( 'db_query_duration_seconds', 'Database query duration in seconds', ['query_type'], registry=self.registry ) # Cache Metrics self.cache_hits_total = Counter( 'cache_hits_total', 'Total number of cache hits', ['cache_type'], registry=self.registry ) self.cache_misses_total = Counter( 'cache_misses_total', 'Total number of cache misses', ['cache_type'], registry=self.registry ) # Business Metrics self.cases_created_total = Counter( 'cases_created_total', 'Total number of cases created', ['priority', 'status'], registry=self.registry ) self.active_users = Gauge( 'active_users', 'Number of currently active users', registry=self.registry ) # System Metrics self.memory_usage_bytes = Gauge( 'memory_usage_bytes', 'Current memory usage in bytes', registry=self.registry ) self.cpu_usage_percent = Gauge( 'cpu_usage_percent', 'Current CPU usage percentage', registry=self.registry ) # Custom Business Metrics self.fraud_detection_accuracy = Gauge( 'fraud_detection_accuracy', 'Fraud detection model accuracy (0-1)', registry=self.registry ) self.api_response_time_p95 = Gauge( 'api_response_time_p95', '95th percentile API response time in seconds', registry=self.registry ) def record_http_request(self, method: str, endpoint: str, status_code: int, duration: float): """Record HTTP request metrics""" self.http_requests_total.labels( method=method, endpoint=endpoint, status_code=str(status_code) ).inc() self.http_request_duration_seconds.labels( method=method, endpoint=endpoint ).observe(duration) def record_database_metrics(self, connections_active: int, query_duration: float, query_type: str = 'select'): """Record database performance metrics""" self.db_connections_active.set(connections_active) self.db_query_duration_seconds.labels(query_type=query_type).observe(query_duration) def record_cache_metrics(self, cache_type: str, hit: bool): """Record cache hit/miss metrics""" if hit: self.cache_hits_total.labels(cache_type=cache_type).inc() else: self.cache_misses_total.labels(cache_type=cache_type).inc() def record_case_created(self, priority: str, status: str): """Record case creation metrics""" self.cases_created_total.labels(priority=priority, status=status).inc() def update_system_metrics(self, memory_bytes: int, cpu_percent: float): """Update system resource metrics""" self.memory_usage_bytes.set(memory_bytes) self.cpu_usage_percent.set(cpu_percent) def update_business_metrics(self, active_users: int, fraud_accuracy: float, api_p95: float): """Update business-specific metrics""" self.active_users.set(active_users) self.fraud_detection_accuracy.set(fraud_accuracy) self.api_response_time_p95.set(api_p95) def get_metrics(self) -> str: """Get all metrics in Prometheus format""" return generate_latest(self.registry).decode('utf-8') class MetricsMiddleware: """FastAPI middleware for collecting Prometheus metrics""" def __init__(self, app, metrics_collector: PrometheusMetricsCollector): self.app = app self.metrics = metrics_collector async def __call__(self, scope, receive, send): if scope["type"] != "http": await self.app(scope, receive, send) return # Extract request info method = scope["method"] path = scope["path"] # Skip metrics endpoint to avoid recursion if path.startswith("/metrics"): await self.app(scope, receive, send) return start_time = time.time() # Create response capture response_status = [200] # Default async def capture_response(message): if message["type"] == "http.response.start": response_status[0] = message["status"] await send(message) await self.app(scope, receive, capture_response) # Record metrics duration = time.time() - start_time self.metrics.record_http_request(method, path, response_status[0], duration) # Global metrics collector metrics_collector = PrometheusMetricsCollector() def get_metrics_endpoint(): """FastAPI endpoint for exposing Prometheus metrics""" async def metrics(): """Prometheus metrics endpoint""" try: # Update system metrics import psutil memory_bytes = psutil.virtual_memory().used cpu_percent = psutil.cpu_percent() metrics_collector.update_system_metrics(memory_bytes, cpu_percent) # Update business metrics (placeholder - would be calculated from actual data) metrics_collector.update_business_metrics( active_users=42, # Would be calculated from active sessions fraud_accuracy=0.87, # Would be calculated from model performance api_p95=0.15 # Would be calculated from recent request data ) return PlainTextResponse( metrics_collector.get_metrics(), media_type="text/plain; charset=utf-8" ) except Exception as e: structured_logger.error(f"Metrics collection failed: {e}") return PlainTextResponse( f"# Error collecting metrics: {e}", media_type="text/plain; charset=utf-8", status_code=500 ) return metrics