RobotPai / src /core /monitoring.py
atr0p05's picture
Upload 291 files
8a682b5 verified
"""
Comprehensive monitoring with Prometheus metrics
"""
import time
import asyncio
from typing import Dict, Any, Optional
from collections import defaultdict
from datetime import datetime
import psutil
import prometheus_client
from prometheus_client import Counter, Histogram, Gauge, Info
from src.utils.logging import get_logger
logger = get_logger(__name__)
class MetricsCollector:
"""Collect and expose metrics for monitoring"""
def __init__(self):
# Request metrics
self.request_counter = Counter(
'ai_agent_requests_total',
'Total number of requests',
['endpoint', 'status']
)
self.request_duration = Histogram(
'ai_agent_request_duration_seconds',
'Request duration in seconds',
['endpoint']
)
# Tool metrics
self.tool_usage = Counter(
'ai_agent_tool_usage_total',
'Tool usage count',
['tool_name', 'status']
)
self.tool_duration = Histogram(
'ai_agent_tool_duration_seconds',
'Tool execution duration',
['tool_name']
)
# System metrics
self.cpu_usage = Gauge(
'ai_agent_cpu_usage_percent',
'CPU usage percentage'
)
self.memory_usage = Gauge(
'ai_agent_memory_usage_bytes',
'Memory usage in bytes'
)
self.active_connections = Gauge(
'ai_agent_active_connections',
'Number of active connections',
['connection_type']
)
# Error metrics
self.error_counter = Counter(
'ai_agent_errors_total',
'Total number of errors',
['error_type', 'component']
)
# Circuit breaker metrics
self.circuit_breaker_state = Gauge(
'ai_agent_circuit_breaker_state',
'Circuit breaker state (0=closed, 1=open, 2=half-open)',
['component']
)
# Custom metrics storage
self.custom_metrics = defaultdict(lambda: defaultdict(float))
# Background tasks
self._tasks = []
self._running = False
async def start(self):
"""Start metrics collection"""
self._running = True
# Start Prometheus HTTP server
prometheus_client.start_http_server(8000)
logger.info("Prometheus metrics server started on port 8000")
# Start background tasks
self._tasks.append(
asyncio.create_task(self._collect_system_metrics())
)
async def stop(self):
"""Stop metrics collection"""
self._running = False
# Cancel background tasks
for task in self._tasks:
task.cancel()
await asyncio.gather(*self._tasks, return_exceptions=True)
async def _collect_system_metrics(self):
"""Collect system metrics periodically"""
while self._running:
try:
# CPU usage
self.cpu_usage.set(psutil.cpu_percent(interval=1))
# Memory usage
memory = psutil.virtual_memory()
self.memory_usage.set(memory.used)
# Wait before next collection
await asyncio.sleep(10)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error collecting system metrics: {e}")
def track_request(self, endpoint: str, status: str = "processing"):
"""Track an incoming request"""
self.request_counter.labels(endpoint=endpoint, status=status).inc()
def track_request_duration(self, endpoint: str, duration: float):
"""Track request duration"""
self.request_duration.labels(endpoint=endpoint).observe(duration)
def track_success(self, endpoint: str):
"""Track successful request"""
self.request_counter.labels(endpoint=endpoint, status="success").inc()
def track_error(self, component: str, error_type: str):
"""Track an error"""
self.error_counter.labels(
error_type=error_type,
component=component
).inc()
def track_tool_usage(self, tool_name: str, status: str, duration: float):
"""Track tool usage"""
self.tool_usage.labels(tool_name=tool_name, status=status).inc()
if status == "success":
self.tool_duration.labels(tool_name=tool_name).observe(duration)
def update_circuit_breaker(self, component: str, state: str):
"""Update circuit breaker state"""
state_map = {"closed": 0, "open": 1, "half-open": 2}
self.circuit_breaker_state.labels(component=component).set(
state_map.get(state, -1)
)
def track_metric(self, name: str, value: float, labels: Optional[Dict] = None):
"""Track a custom metric"""
key = f"{name}:{str(labels)}" if labels else name
self.custom_metrics[name][key] = value
def get_all_metrics(self) -> Dict[str, Any]:
"""Get all metrics for display"""
return {
"system": {
"cpu_usage": psutil.cpu_percent(),
"memory_usage": psutil.virtual_memory().percent,
"disk_usage": psutil.disk_usage('/').percent
},
"custom": dict(self.custom_metrics),
"prometheus_endpoint": "http://localhost:8000/metrics"
}