""" Comprehensive monitoring with Prometheus metrics """ import time import asyncio from typing import Dict, Any, Optional from collections import defaultdict from datetime import datetime import psutil import prometheus_client from prometheus_client import Counter, Histogram, Gauge, Info from src.utils.logging import get_logger logger = get_logger(__name__) class MetricsCollector: """Collect and expose metrics for monitoring""" def __init__(self): # Request metrics self.request_counter = Counter( 'ai_agent_requests_total', 'Total number of requests', ['endpoint', 'status'] ) self.request_duration = Histogram( 'ai_agent_request_duration_seconds', 'Request duration in seconds', ['endpoint'] ) # Tool metrics self.tool_usage = Counter( 'ai_agent_tool_usage_total', 'Tool usage count', ['tool_name', 'status'] ) self.tool_duration = Histogram( 'ai_agent_tool_duration_seconds', 'Tool execution duration', ['tool_name'] ) # System metrics self.cpu_usage = Gauge( 'ai_agent_cpu_usage_percent', 'CPU usage percentage' ) self.memory_usage = Gauge( 'ai_agent_memory_usage_bytes', 'Memory usage in bytes' ) self.active_connections = Gauge( 'ai_agent_active_connections', 'Number of active connections', ['connection_type'] ) # Error metrics self.error_counter = Counter( 'ai_agent_errors_total', 'Total number of errors', ['error_type', 'component'] ) # Circuit breaker metrics self.circuit_breaker_state = Gauge( 'ai_agent_circuit_breaker_state', 'Circuit breaker state (0=closed, 1=open, 2=half-open)', ['component'] ) # Custom metrics storage self.custom_metrics = defaultdict(lambda: defaultdict(float)) # Background tasks self._tasks = [] self._running = False async def start(self): """Start metrics collection""" self._running = True # Start Prometheus HTTP server prometheus_client.start_http_server(8000) logger.info("Prometheus metrics server started on port 8000") # Start background tasks self._tasks.append( asyncio.create_task(self._collect_system_metrics()) ) async def stop(self): """Stop metrics collection""" self._running = False # Cancel background tasks for task in self._tasks: task.cancel() await asyncio.gather(*self._tasks, return_exceptions=True) async def _collect_system_metrics(self): """Collect system metrics periodically""" while self._running: try: # CPU usage self.cpu_usage.set(psutil.cpu_percent(interval=1)) # Memory usage memory = psutil.virtual_memory() self.memory_usage.set(memory.used) # Wait before next collection await asyncio.sleep(10) except asyncio.CancelledError: break except Exception as e: logger.error(f"Error collecting system metrics: {e}") def track_request(self, endpoint: str, status: str = "processing"): """Track an incoming request""" self.request_counter.labels(endpoint=endpoint, status=status).inc() def track_request_duration(self, endpoint: str, duration: float): """Track request duration""" self.request_duration.labels(endpoint=endpoint).observe(duration) def track_success(self, endpoint: str): """Track successful request""" self.request_counter.labels(endpoint=endpoint, status="success").inc() def track_error(self, component: str, error_type: str): """Track an error""" self.error_counter.labels( error_type=error_type, component=component ).inc() def track_tool_usage(self, tool_name: str, status: str, duration: float): """Track tool usage""" self.tool_usage.labels(tool_name=tool_name, status=status).inc() if status == "success": self.tool_duration.labels(tool_name=tool_name).observe(duration) def update_circuit_breaker(self, component: str, state: str): """Update circuit breaker state""" state_map = {"closed": 0, "open": 1, "half-open": 2} self.circuit_breaker_state.labels(component=component).set( state_map.get(state, -1) ) def track_metric(self, name: str, value: float, labels: Optional[Dict] = None): """Track a custom metric""" key = f"{name}:{str(labels)}" if labels else name self.custom_metrics[name][key] = value def get_all_metrics(self) -> Dict[str, Any]: """Get all metrics for display""" return { "system": { "cpu_usage": psutil.cpu_percent(), "memory_usage": psutil.virtual_memory().percent, "disk_usage": psutil.disk_usage('/').percent }, "custom": dict(self.custom_metrics), "prometheus_endpoint": "http://localhost:8000/metrics" }