zenith-backend / core /monitoring /prometheus_metrics.py
teoat's picture
Upload folder using huggingface_hub
4ae946d verified
"""
Prometheus Metrics Integration
Enterprise-grade metrics collection and exposure
"""
import time
from fastapi.responses import PlainTextResponse
from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, generate_latest
from core.logging.advanced_logging import structured_logger
class PrometheusMetricsCollector:
"""Custom Prometheus metrics collector"""
def __init__(self):
self.logger = structured_logger
self.registry = CollectorRegistry()
# HTTP Request Metrics
self.http_requests_total = Counter(
'http_requests_total',
'Total number of HTTP requests',
['method', 'endpoint', 'status_code'],
registry=self.registry
)
self.http_request_duration_seconds = Histogram(
'http_request_duration_seconds',
'HTTP request duration in seconds',
['method', 'endpoint'],
registry=self.registry
)
# Database Metrics
self.db_connections_active = Gauge(
'db_connections_active',
'Number of active database connections',
registry=self.registry
)
self.db_query_duration_seconds = Histogram(
'db_query_duration_seconds',
'Database query duration in seconds',
['query_type'],
registry=self.registry
)
# Cache Metrics
self.cache_hits_total = Counter(
'cache_hits_total',
'Total number of cache hits',
['cache_type'],
registry=self.registry
)
self.cache_misses_total = Counter(
'cache_misses_total',
'Total number of cache misses',
['cache_type'],
registry=self.registry
)
# Business Metrics
self.cases_created_total = Counter(
'cases_created_total',
'Total number of cases created',
['priority', 'status'],
registry=self.registry
)
self.active_users = Gauge(
'active_users',
'Number of currently active users',
registry=self.registry
)
# System Metrics
self.memory_usage_bytes = Gauge(
'memory_usage_bytes',
'Current memory usage in bytes',
registry=self.registry
)
self.cpu_usage_percent = Gauge(
'cpu_usage_percent',
'Current CPU usage percentage',
registry=self.registry
)
# Custom Business Metrics
self.fraud_detection_accuracy = Gauge(
'fraud_detection_accuracy',
'Fraud detection model accuracy (0-1)',
registry=self.registry
)
self.api_response_time_p95 = Gauge(
'api_response_time_p95',
'95th percentile API response time in seconds',
registry=self.registry
)
def record_http_request(self, method: str, endpoint: str, status_code: int, duration: float):
"""Record HTTP request metrics"""
self.http_requests_total.labels(
method=method,
endpoint=endpoint,
status_code=str(status_code)
).inc()
self.http_request_duration_seconds.labels(
method=method,
endpoint=endpoint
).observe(duration)
def record_database_metrics(self, connections_active: int, query_duration: float, query_type: str = 'select'):
"""Record database performance metrics"""
self.db_connections_active.set(connections_active)
self.db_query_duration_seconds.labels(query_type=query_type).observe(query_duration)
def record_cache_metrics(self, cache_type: str, hit: bool):
"""Record cache hit/miss metrics"""
if hit:
self.cache_hits_total.labels(cache_type=cache_type).inc()
else:
self.cache_misses_total.labels(cache_type=cache_type).inc()
def record_case_created(self, priority: str, status: str):
"""Record case creation metrics"""
self.cases_created_total.labels(priority=priority, status=status).inc()
def update_system_metrics(self, memory_bytes: int, cpu_percent: float):
"""Update system resource metrics"""
self.memory_usage_bytes.set(memory_bytes)
self.cpu_usage_percent.set(cpu_percent)
def update_business_metrics(self, active_users: int, fraud_accuracy: float, api_p95: float):
"""Update business-specific metrics"""
self.active_users.set(active_users)
self.fraud_detection_accuracy.set(fraud_accuracy)
self.api_response_time_p95.set(api_p95)
def get_metrics(self) -> str:
"""Get all metrics in Prometheus format"""
return generate_latest(self.registry).decode('utf-8')
class MetricsMiddleware:
"""FastAPI middleware for collecting Prometheus metrics"""
def __init__(self, app, metrics_collector: PrometheusMetricsCollector):
self.app = app
self.metrics = metrics_collector
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
# Extract request info
method = scope["method"]
path = scope["path"]
# Skip metrics endpoint to avoid recursion
if path.startswith("/metrics"):
await self.app(scope, receive, send)
return
start_time = time.time()
# Create response capture
response_status = [200] # Default
async def capture_response(message):
if message["type"] == "http.response.start":
response_status[0] = message["status"]
await send(message)
await self.app(scope, receive, capture_response)
# Record metrics
duration = time.time() - start_time
self.metrics.record_http_request(method, path, response_status[0], duration)
# Global metrics collector
metrics_collector = PrometheusMetricsCollector()
def get_metrics_endpoint():
"""FastAPI endpoint for exposing Prometheus metrics"""
async def metrics():
"""Prometheus metrics endpoint"""
try:
# Update system metrics
import psutil
memory_bytes = psutil.virtual_memory().used
cpu_percent = psutil.cpu_percent()
metrics_collector.update_system_metrics(memory_bytes, cpu_percent)
# Update business metrics (placeholder - would be calculated from actual data)
metrics_collector.update_business_metrics(
active_users=42, # Would be calculated from active sessions
fraud_accuracy=0.87, # Would be calculated from model performance
api_p95=0.15 # Would be calculated from recent request data
)
return PlainTextResponse(
metrics_collector.get_metrics(),
media_type="text/plain; charset=utf-8"
)
except Exception as e:
structured_logger.error(f"Metrics collection failed: {e}")
return PlainTextResponse(
f"# Error collecting metrics: {e}",
media_type="text/plain; charset=utf-8",
status_code=500
)
return metrics