| | """ |
| | Real-Time Observability Engine - Performance monitoring and anomaly detection |
| | """ |
| |
|
| | import logging |
| | import statistics |
| | import time |
| | from collections import defaultdict |
| | from dataclasses import dataclass, field |
| | from typing import Any, Dict, List, Optional |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | @dataclass |
| | class ExecutionEvent: |
| | """An execution event for monitoring""" |
| | event_type: str |
| | data: Dict[str, Any] |
| | timestamp: float = field(default_factory=time.time) |
| |
|
| |
|
| | @dataclass |
| | class ToolMetrics: |
| | """Metrics for a specific tool""" |
| | tool_name: str |
| | execution_count: int = 0 |
| | success_count: int = 0 |
| | failure_count: int = 0 |
| | durations: List[float] = field(default_factory=list) |
| | total_cost: float = 0.0 |
| | total_tokens: int = 0 |
| | |
| | @property |
| | def success_rate(self) -> float: |
| | if self.execution_count == 0: |
| | return 0.0 |
| | return self.success_count / self.execution_count |
| | |
| | @property |
| | def p50_duration(self) -> float: |
| | if not self.durations: |
| | return 0.0 |
| | return statistics.median(self.durations) |
| | |
| | @property |
| | def p95_duration(self) -> float: |
| | if not self.durations: |
| | return 0.0 |
| | sorted_durations = sorted(self.durations) |
| | idx = int(len(sorted_durations) * 0.95) |
| | return sorted_durations[min(idx, len(sorted_durations) - 1)] |
| | |
| | @property |
| | def avg_duration(self) -> float: |
| | if not self.durations: |
| | return 0.0 |
| | return statistics.mean(self.durations) |
| |
|
| |
|
| | @dataclass |
| | class Anomaly: |
| | """Detected anomaly""" |
| | tool_name: str |
| | anomaly_type: str |
| | duration: float |
| | expected: float |
| | deviation_percent: float |
| | timestamp: float |
| | recommendations: List[str] |
| |
|
| |
|
| | @dataclass |
| | class PredictiveWarning: |
| | """Predictive warning about future issues""" |
| | predicted_issue: str |
| | confidence: float |
| | recommended_action: str |
| | eta_seconds: float |
| |
|
| |
|
| | class RealTimeObservabilityEngine: |
| | """ |
| | Monitors and optimizes in real-time using: |
| | - Latency tracking per tool |
| | - Token usage monitoring |
| | - Cost tracking |
| | - Model performance metrics |
| | - Anomaly detection |
| | - Predictive issue detection |
| | """ |
| | |
| | def __init__(self, anomaly_threshold_multiplier: float = 2.5): |
| | self.anomaly_threshold_multiplier = anomaly_threshold_multiplier |
| | self.tool_metrics: Dict[str, ToolMetrics] = defaultdict( |
| | lambda: ToolMetrics(tool_name="unknown") |
| | ) |
| | self.anomalies: List[Anomaly] = [] |
| | self.execution_history: List[ExecutionEvent] = [] |
| | self.max_history_size = 10000 |
| | |
| | def track_execution(self, event: ExecutionEvent) -> Optional[Anomaly]: |
| | """Track and analyze execution in real-time""" |
| | |
| | |
| | self.execution_history.append(event) |
| | if len(self.execution_history) > self.max_history_size: |
| | self.execution_history = self.execution_history[-self.max_history_size:] |
| | |
| | |
| | if event.event_type == "tool_execution_complete": |
| | tool_name = event.data.get("tool", "unknown") |
| | duration = event.data.get("duration", 0) |
| | success = event.data.get("success", False) |
| | cost = event.data.get("cost", 0.0) |
| | tokens = event.data.get("tokens", 0) |
| | |
| | metrics = self.tool_metrics[tool_name] |
| | metrics.tool_name = tool_name |
| | metrics.execution_count += 1 |
| | metrics.total_cost += cost |
| | metrics.total_tokens += tokens |
| | |
| | if success: |
| | metrics.success_count += 1 |
| | else: |
| | metrics.failure_count += 1 |
| | |
| | metrics.durations.append(duration) |
| | |
| | |
| | if len(metrics.durations) > 100: |
| | metrics.durations = metrics.durations[-100:] |
| | |
| | |
| | anomaly = self._check_anomaly(tool_name, duration) |
| | if anomaly: |
| | self.anomalies.append(anomaly) |
| | logger.warning( |
| | f"Anomaly detected: {tool_name} took {duration:.2f}s " |
| | f"(expected {anomaly.expected:.2f}s, " |
| | f"+{anomaly.deviation_percent:.1f}%)" |
| | ) |
| | return anomaly |
| | |
| | return None |
| | |
| | def _check_anomaly(self, tool_name: str, duration: float) -> Optional[Anomaly]: |
| | """Check if execution is anomalous""" |
| | metrics = self.tool_metrics[tool_name] |
| | |
| | if metrics.execution_count < 5: |
| | |
| | return None |
| | |
| | p50 = metrics.p50_duration |
| | |
| | if duration > p50 * self.anomaly_threshold_multiplier: |
| | deviation = (duration / p50 - 1) * 100 |
| | |
| | recommendations = [ |
| | "Check tool service health", |
| | "Reduce concurrent executions", |
| | "Consider switching to fallback tool" |
| | ] |
| | |
| | return Anomaly( |
| | tool_name=tool_name, |
| | anomaly_type="slow_execution", |
| | duration=duration, |
| | expected=p50, |
| | deviation_percent=deviation, |
| | timestamp=time.time(), |
| | recommendations=recommendations |
| | ) |
| | |
| | return None |
| | |
| | def predict_failure(self) -> Optional[PredictiveWarning]: |
| | """Predict future issues based on trends""" |
| | |
| | |
| | recent_events = [ |
| | e for e in self.execution_history |
| | if time.time() - e.timestamp < 300 |
| | ] |
| | |
| | if len(recent_events) < 10: |
| | return None |
| | |
| | tool_failures = defaultdict(lambda: {"total": 0, "failures": 0}) |
| | |
| | for event in recent_events: |
| | if event.event_type == "tool_execution_complete": |
| | tool = event.data.get("tool", "unknown") |
| | tool_failures[tool]["total"] += 1 |
| | if not event.data.get("success", False): |
| | tool_failures[tool]["failures"] += 1 |
| | |
| | |
| | for tool, counts in tool_failures.items(): |
| | if counts["total"] >= 5: |
| | failure_rate = counts["failures"] / counts["total"] |
| | |
| | if failure_rate > 0.5: |
| | return PredictiveWarning( |
| | predicted_issue=f"{tool} showing high failure rate ({failure_rate*100:.1f}%)", |
| | confidence=min(failure_rate * 100, 95.0), |
| | recommended_action=f"Consider disabling {tool} or investigating root cause", |
| | eta_seconds=60.0 |
| | ) |
| | |
| | return None |
| | |
| | def get_tool_metrics(self, tool_name: Optional[str] = None) -> Dict[str, Any]: |
| | """Get metrics for a tool or all tools""" |
| | |
| | if tool_name: |
| | metrics = self.tool_metrics.get(tool_name) |
| | if not metrics: |
| | return {} |
| | return { |
| | "tool": metrics.tool_name, |
| | "execution_count": metrics.execution_count, |
| | "success_count": metrics.success_count, |
| | "failure_count": metrics.failure_count, |
| | "success_rate": metrics.success_rate, |
| | "avg_duration": metrics.avg_duration, |
| | "p50_duration": metrics.p50_duration, |
| | "p95_duration": metrics.p95_duration, |
| | "total_cost": metrics.total_cost, |
| | "total_tokens": metrics.total_tokens |
| | } |
| | |
| | return { |
| | name: { |
| | "tool": m.tool_name, |
| | "execution_count": m.execution_count, |
| | "success_rate": m.success_rate, |
| | "avg_duration": m.avg_duration, |
| | "p50_duration": m.p50_duration, |
| | "total_cost": m.total_cost |
| | } |
| | for name, m in self.tool_metrics.items() |
| | } |
| | |
| | def get_anomalies(self, limit: int = 10) -> List[Dict[str, Any]]: |
| | """Get recent anomalies""" |
| | return [ |
| | { |
| | "tool_name": a.tool_name, |
| | "type": a.anomaly_type, |
| | "duration": a.duration, |
| | "expected": a.expected, |
| | "deviation_percent": a.deviation_percent, |
| | "timestamp": a.timestamp, |
| | "recommendations": a.recommendations |
| | } |
| | for a in self.anomalies[-limit:] |
| | ] |
| | |
| | def get_summary(self) -> Dict[str, Any]: |
| | """Get overall observability summary""" |
| | total_executions = sum(m.execution_count for m in self.tool_metrics.values()) |
| | total_successes = sum(m.success_count for m in self.tool_metrics.values()) |
| | total_cost = sum(m.total_cost for m in self.tool_metrics.values()) |
| | |
| | return { |
| | "total_executions": total_executions, |
| | "total_successes": total_successes, |
| | "overall_success_rate": total_successes / total_executions if total_executions > 0 else 0, |
| | "total_cost": total_cost, |
| | "total_anomalies": len(self.anomalies), |
| | "tools_monitored": len(self.tool_metrics), |
| | "recent_anomalies": self.get_anomalies(5) |
| | } |
| |
|
| |
|
| | |
| | observability = RealTimeObservabilityEngine() |
| |
|