water3 / agent /core /observability.py
onewayto's picture
Upload 187 files
070daf8 verified
"""
Real-Time Observability Engine - Performance monitoring and anomaly detection
"""
import logging
import statistics
import time
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
@dataclass
class ExecutionEvent:
"""An execution event for monitoring"""
event_type: str
data: Dict[str, Any]
timestamp: float = field(default_factory=time.time)
@dataclass
class ToolMetrics:
"""Metrics for a specific tool"""
tool_name: str
execution_count: int = 0
success_count: int = 0
failure_count: int = 0
durations: List[float] = field(default_factory=list)
total_cost: float = 0.0
total_tokens: int = 0
@property
def success_rate(self) -> float:
if self.execution_count == 0:
return 0.0
return self.success_count / self.execution_count
@property
def p50_duration(self) -> float:
if not self.durations:
return 0.0
return statistics.median(self.durations)
@property
def p95_duration(self) -> float:
if not self.durations:
return 0.0
sorted_durations = sorted(self.durations)
idx = int(len(sorted_durations) * 0.95)
return sorted_durations[min(idx, len(sorted_durations) - 1)]
@property
def avg_duration(self) -> float:
if not self.durations:
return 0.0
return statistics.mean(self.durations)
@dataclass
class Anomaly:
"""Detected anomaly"""
tool_name: str
anomaly_type: str
duration: float
expected: float
deviation_percent: float
timestamp: float
recommendations: List[str]
@dataclass
class PredictiveWarning:
"""Predictive warning about future issues"""
predicted_issue: str
confidence: float
recommended_action: str
eta_seconds: float
class RealTimeObservabilityEngine:
"""
Monitors and optimizes in real-time using:
- Latency tracking per tool
- Token usage monitoring
- Cost tracking
- Model performance metrics
- Anomaly detection
- Predictive issue detection
"""
def __init__(self, anomaly_threshold_multiplier: float = 2.5):
self.anomaly_threshold_multiplier = anomaly_threshold_multiplier
self.tool_metrics: Dict[str, ToolMetrics] = defaultdict(
lambda: ToolMetrics(tool_name="unknown")
)
self.anomalies: List[Anomaly] = []
self.execution_history: List[ExecutionEvent] = []
self.max_history_size = 10000
def track_execution(self, event: ExecutionEvent) -> Optional[Anomaly]:
"""Track and analyze execution in real-time"""
# Add to history
self.execution_history.append(event)
if len(self.execution_history) > self.max_history_size:
self.execution_history = self.execution_history[-self.max_history_size:]
# Track tool execution
if event.event_type == "tool_execution_complete":
tool_name = event.data.get("tool", "unknown")
duration = event.data.get("duration", 0)
success = event.data.get("success", False)
cost = event.data.get("cost", 0.0)
tokens = event.data.get("tokens", 0)
metrics = self.tool_metrics[tool_name]
metrics.tool_name = tool_name
metrics.execution_count += 1
metrics.total_cost += cost
metrics.total_tokens += tokens
if success:
metrics.success_count += 1
else:
metrics.failure_count += 1
metrics.durations.append(duration)
# Keep only last 100 durations for memory efficiency
if len(metrics.durations) > 100:
metrics.durations = metrics.durations[-100:]
# Check for anomalies
anomaly = self._check_anomaly(tool_name, duration)
if anomaly:
self.anomalies.append(anomaly)
logger.warning(
f"Anomaly detected: {tool_name} took {duration:.2f}s "
f"(expected {anomaly.expected:.2f}s, "
f"+{anomaly.deviation_percent:.1f}%)"
)
return anomaly
return None
def _check_anomaly(self, tool_name: str, duration: float) -> Optional[Anomaly]:
"""Check if execution is anomalous"""
metrics = self.tool_metrics[tool_name]
if metrics.execution_count < 5:
# Not enough data
return None
p50 = metrics.p50_duration
if duration > p50 * self.anomaly_threshold_multiplier:
deviation = (duration / p50 - 1) * 100
recommendations = [
"Check tool service health",
"Reduce concurrent executions",
"Consider switching to fallback tool"
]
return Anomaly(
tool_name=tool_name,
anomaly_type="slow_execution",
duration=duration,
expected=p50,
deviation_percent=deviation,
timestamp=time.time(),
recommendations=recommendations
)
return None
def predict_failure(self) -> Optional[PredictiveWarning]:
"""Predict future issues based on trends"""
# Simple prediction based on recent failure rate
recent_events = [
e for e in self.execution_history
if time.time() - e.timestamp < 300 # Last 5 minutes
]
if len(recent_events) < 10:
return None
tool_failures = defaultdict(lambda: {"total": 0, "failures": 0})
for event in recent_events:
if event.event_type == "tool_execution_complete":
tool = event.data.get("tool", "unknown")
tool_failures[tool]["total"] += 1
if not event.data.get("success", False):
tool_failures[tool]["failures"] += 1
# Check for tools with high failure rates
for tool, counts in tool_failures.items():
if counts["total"] >= 5:
failure_rate = counts["failures"] / counts["total"]
if failure_rate > 0.5: # More than 50% failures
return PredictiveWarning(
predicted_issue=f"{tool} showing high failure rate ({failure_rate*100:.1f}%)",
confidence=min(failure_rate * 100, 95.0),
recommended_action=f"Consider disabling {tool} or investigating root cause",
eta_seconds=60.0
)
return None
def get_tool_metrics(self, tool_name: Optional[str] = None) -> Dict[str, Any]:
"""Get metrics for a tool or all tools"""
if tool_name:
metrics = self.tool_metrics.get(tool_name)
if not metrics:
return {}
return {
"tool": metrics.tool_name,
"execution_count": metrics.execution_count,
"success_count": metrics.success_count,
"failure_count": metrics.failure_count,
"success_rate": metrics.success_rate,
"avg_duration": metrics.avg_duration,
"p50_duration": metrics.p50_duration,
"p95_duration": metrics.p95_duration,
"total_cost": metrics.total_cost,
"total_tokens": metrics.total_tokens
}
return {
name: {
"tool": m.tool_name,
"execution_count": m.execution_count,
"success_rate": m.success_rate,
"avg_duration": m.avg_duration,
"p50_duration": m.p50_duration,
"total_cost": m.total_cost
}
for name, m in self.tool_metrics.items()
}
def get_anomalies(self, limit: int = 10) -> List[Dict[str, Any]]:
"""Get recent anomalies"""
return [
{
"tool_name": a.tool_name,
"type": a.anomaly_type,
"duration": a.duration,
"expected": a.expected,
"deviation_percent": a.deviation_percent,
"timestamp": a.timestamp,
"recommendations": a.recommendations
}
for a in self.anomalies[-limit:]
]
def get_summary(self) -> Dict[str, Any]:
"""Get overall observability summary"""
total_executions = sum(m.execution_count for m in self.tool_metrics.values())
total_successes = sum(m.success_count for m in self.tool_metrics.values())
total_cost = sum(m.total_cost for m in self.tool_metrics.values())
return {
"total_executions": total_executions,
"total_successes": total_successes,
"overall_success_rate": total_successes / total_executions if total_executions > 0 else 0,
"total_cost": total_cost,
"total_anomalies": len(self.anomalies),
"tools_monitored": len(self.tool_metrics),
"recent_anomalies": self.get_anomalies(5)
}
# Global observability engine
observability = RealTimeObservabilityEngine()