""" Real-Time Observability Engine - Performance monitoring and anomaly detection """ import logging import statistics import time from collections import defaultdict from dataclasses import dataclass, field from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) @dataclass class ExecutionEvent: """An execution event for monitoring""" event_type: str data: Dict[str, Any] timestamp: float = field(default_factory=time.time) @dataclass class ToolMetrics: """Metrics for a specific tool""" tool_name: str execution_count: int = 0 success_count: int = 0 failure_count: int = 0 durations: List[float] = field(default_factory=list) total_cost: float = 0.0 total_tokens: int = 0 @property def success_rate(self) -> float: if self.execution_count == 0: return 0.0 return self.success_count / self.execution_count @property def p50_duration(self) -> float: if not self.durations: return 0.0 return statistics.median(self.durations) @property def p95_duration(self) -> float: if not self.durations: return 0.0 sorted_durations = sorted(self.durations) idx = int(len(sorted_durations) * 0.95) return sorted_durations[min(idx, len(sorted_durations) - 1)] @property def avg_duration(self) -> float: if not self.durations: return 0.0 return statistics.mean(self.durations) @dataclass class Anomaly: """Detected anomaly""" tool_name: str anomaly_type: str duration: float expected: float deviation_percent: float timestamp: float recommendations: List[str] @dataclass class PredictiveWarning: """Predictive warning about future issues""" predicted_issue: str confidence: float recommended_action: str eta_seconds: float class RealTimeObservabilityEngine: """ Monitors and optimizes in real-time using: - Latency tracking per tool - Token usage monitoring - Cost tracking - Model performance metrics - Anomaly detection - Predictive issue detection """ def __init__(self, anomaly_threshold_multiplier: float = 2.5): self.anomaly_threshold_multiplier = anomaly_threshold_multiplier self.tool_metrics: Dict[str, ToolMetrics] = defaultdict( lambda: ToolMetrics(tool_name="unknown") ) self.anomalies: List[Anomaly] = [] self.execution_history: List[ExecutionEvent] = [] self.max_history_size = 10000 def track_execution(self, event: ExecutionEvent) -> Optional[Anomaly]: """Track and analyze execution in real-time""" # Add to history self.execution_history.append(event) if len(self.execution_history) > self.max_history_size: self.execution_history = self.execution_history[-self.max_history_size:] # Track tool execution if event.event_type == "tool_execution_complete": tool_name = event.data.get("tool", "unknown") duration = event.data.get("duration", 0) success = event.data.get("success", False) cost = event.data.get("cost", 0.0) tokens = event.data.get("tokens", 0) metrics = self.tool_metrics[tool_name] metrics.tool_name = tool_name metrics.execution_count += 1 metrics.total_cost += cost metrics.total_tokens += tokens if success: metrics.success_count += 1 else: metrics.failure_count += 1 metrics.durations.append(duration) # Keep only last 100 durations for memory efficiency if len(metrics.durations) > 100: metrics.durations = metrics.durations[-100:] # Check for anomalies anomaly = self._check_anomaly(tool_name, duration) if anomaly: self.anomalies.append(anomaly) logger.warning( f"Anomaly detected: {tool_name} took {duration:.2f}s " f"(expected {anomaly.expected:.2f}s, " f"+{anomaly.deviation_percent:.1f}%)" ) return anomaly return None def _check_anomaly(self, tool_name: str, duration: float) -> Optional[Anomaly]: """Check if execution is anomalous""" metrics = self.tool_metrics[tool_name] if metrics.execution_count < 5: # Not enough data return None p50 = metrics.p50_duration if duration > p50 * self.anomaly_threshold_multiplier: deviation = (duration / p50 - 1) * 100 recommendations = [ "Check tool service health", "Reduce concurrent executions", "Consider switching to fallback tool" ] return Anomaly( tool_name=tool_name, anomaly_type="slow_execution", duration=duration, expected=p50, deviation_percent=deviation, timestamp=time.time(), recommendations=recommendations ) return None def predict_failure(self) -> Optional[PredictiveWarning]: """Predict future issues based on trends""" # Simple prediction based on recent failure rate recent_events = [ e for e in self.execution_history if time.time() - e.timestamp < 300 # Last 5 minutes ] if len(recent_events) < 10: return None tool_failures = defaultdict(lambda: {"total": 0, "failures": 0}) for event in recent_events: if event.event_type == "tool_execution_complete": tool = event.data.get("tool", "unknown") tool_failures[tool]["total"] += 1 if not event.data.get("success", False): tool_failures[tool]["failures"] += 1 # Check for tools with high failure rates for tool, counts in tool_failures.items(): if counts["total"] >= 5: failure_rate = counts["failures"] / counts["total"] if failure_rate > 0.5: # More than 50% failures return PredictiveWarning( predicted_issue=f"{tool} showing high failure rate ({failure_rate*100:.1f}%)", confidence=min(failure_rate * 100, 95.0), recommended_action=f"Consider disabling {tool} or investigating root cause", eta_seconds=60.0 ) return None def get_tool_metrics(self, tool_name: Optional[str] = None) -> Dict[str, Any]: """Get metrics for a tool or all tools""" if tool_name: metrics = self.tool_metrics.get(tool_name) if not metrics: return {} return { "tool": metrics.tool_name, "execution_count": metrics.execution_count, "success_count": metrics.success_count, "failure_count": metrics.failure_count, "success_rate": metrics.success_rate, "avg_duration": metrics.avg_duration, "p50_duration": metrics.p50_duration, "p95_duration": metrics.p95_duration, "total_cost": metrics.total_cost, "total_tokens": metrics.total_tokens } return { name: { "tool": m.tool_name, "execution_count": m.execution_count, "success_rate": m.success_rate, "avg_duration": m.avg_duration, "p50_duration": m.p50_duration, "total_cost": m.total_cost } for name, m in self.tool_metrics.items() } def get_anomalies(self, limit: int = 10) -> List[Dict[str, Any]]: """Get recent anomalies""" return [ { "tool_name": a.tool_name, "type": a.anomaly_type, "duration": a.duration, "expected": a.expected, "deviation_percent": a.deviation_percent, "timestamp": a.timestamp, "recommendations": a.recommendations } for a in self.anomalies[-limit:] ] def get_summary(self) -> Dict[str, Any]: """Get overall observability summary""" total_executions = sum(m.execution_count for m in self.tool_metrics.values()) total_successes = sum(m.success_count for m in self.tool_metrics.values()) total_cost = sum(m.total_cost for m in self.tool_metrics.values()) return { "total_executions": total_executions, "total_successes": total_successes, "overall_success_rate": total_successes / total_executions if total_executions > 0 else 0, "total_cost": total_cost, "total_anomalies": len(self.anomalies), "tools_monitored": len(self.tool_metrics), "recent_anomalies": self.get_anomalies(5) } # Global observability engine observability = RealTimeObservabilityEngine()