Spaces:
Build error
Build error
| import logging | |
| from prometheus_client import Counter, Histogram, start_http_server | |
| from functools import wraps | |
| import time | |
| from typing import Callable, Any | |
| class MetricsManager: | |
| def __init__(self): | |
| self.requests_total = Counter( | |
| 'requests_total', | |
| 'Total requests processed', | |
| ['endpoint'] | |
| ) | |
| self.request_duration = Histogram( | |
| 'request_duration_seconds', | |
| 'Request duration in seconds', | |
| ['endpoint'] | |
| ) | |
| self.errors_total = Counter( | |
| 'errors_total', | |
| 'Total errors encountered', | |
| ['type'] | |
| ) | |
| # Start Prometheus metrics server | |
| start_http_server(8000) | |
| def monitor_performance(endpoint: str) -> Callable: | |
| def decorator(func: Callable) -> Callable: | |
| def wrapper(*args, **kwargs) -> Any: | |
| start_time = time.time() | |
| try: | |
| result = func(*args, **kwargs) | |
| metrics_manager.requests_total.labels(endpoint=endpoint).inc() | |
| return result | |
| except Exception as e: | |
| metrics_manager.errors_total.labels(type=type(e).__name__).inc() | |
| raise | |
| finally: | |
| duration = time.time() - start_time | |
| metrics_manager.request_duration.labels(endpoint=endpoint).observe(duration) | |
| return wrapper | |
| return decorator | |
| metrics_manager = MetricsManager() |