import asyncio from typing import Dict, List, Any from dataclasses import dataclass from monitoring_models import AgentSpecialization from models import ReliabilityEvent, AnomalyResult @dataclass class AgentResult: specialization: AgentSpecialization confidence: float findings: Dict[str, Any] recommendations: List[str] processing_time: float class BaseAgent: def __init__(self, specialization: AgentSpecialization): self.specialization = specialization self.performance_metrics = { 'processed_events': 0, 'successful_analyses': 0, 'average_confidence': 0.0 } async def analyze(self, event: ReliabilityEvent) -> AgentResult: """Base analysis method to be implemented by specialized agents""" raise NotImplementedError class AnomalyDetectionAgent(BaseAgent): def __init__(self): super().__init__(AgentSpecialization.DETECTIVE) self.adaptive_thresholds = { 'latency_p99': 150, 'error_rate': 0.05, 'cpu_util': 0.8, 'memory_util': 0.8 } async def analyze(self, event: ReliabilityEvent) -> AgentResult: """Enhanced anomaly detection with pattern recognition""" start_time = asyncio.get_event_loop().time() # Multi-dimensional anomaly scoring anomaly_score = self._calculate_anomaly_score(event) pattern_match = self._detect_known_patterns(event) return AgentResult( specialization=self.specialization, confidence=anomaly_score, findings={ 'anomaly_score': anomaly_score, 'detected_patterns': pattern_match, 'affected_metrics': self._identify_affected_metrics(event), 'severity_tier': self._classify_severity(anomaly_score) }, recommendations=self._generate_detection_recommendations(event, anomaly_score), processing_time=asyncio.get_event_loop().time() - start_time ) def _calculate_anomaly_score(self, event: ReliabilityEvent) -> float: """Calculate comprehensive anomaly score (0-1)""" scores = [] # Latency anomaly (weighted 40%) if event.latency_p99 > self.adaptive_thresholds['latency_p99']: latency_score = min(1.0, (event.latency_p99 - self.adaptive_thresholds['latency_p99']) / 500) scores.append(0.4 * latency_score) # Error rate anomaly (weighted 30%) if event.error_rate > self.adaptive_thresholds['error_rate']: error_score = min(1.0, event.error_rate / 0.3) scores.append(0.3 * error_score) # Resource anomaly (weighted 30%) resource_score = 0 if event.cpu_util and event.cpu_util > self.adaptive_thresholds['cpu_util']: resource_score += 0.15 * min(1.0, (event.cpu_util - self.adaptive_thresholds['cpu_util']) / 0.2) if event.memory_util and event.memory_util > self.adaptive_thresholds['memory_util']: resource_score += 0.15 * min(1.0, (event.memory_util - self.adaptive_thresholds['memory_util']) / 0.2) scores.append(resource_score) return min(1.0, sum(scores)) def _detect_known_patterns(self, event: ReliabilityEvent) -> List[str]: """Detect known failure patterns""" patterns = [] # Database timeout pattern if event.latency_p99 > 500 and event.error_rate > 0.2: patterns.append("database_timeout") # Resource exhaustion pattern if event.cpu_util and event.cpu_util > 0.9 and event.memory_util and event.memory_util > 0.9: patterns.append("resource_exhaustion") # Cascading failure pattern if event.error_rate > 0.15 and event.latency_p99 > 300: patterns.append("cascading_failure") # Traffic spike pattern if event.latency_p99 > 200 and event.throughput > 2000: patterns.append("traffic_spike") # Gradual degradation if 150 < event.latency_p99 < 300 and 0.05 < event.error_rate < 0.15: patterns.append("gradual_degradation") return patterns if patterns else ["unknown_pattern"] def _identify_affected_metrics(self, event: ReliabilityEvent) -> List[str]: """Identify which metrics are outside normal range""" affected = [] if event.latency_p99 > self.adaptive_thresholds['latency_p99']: affected.append("latency") if event.error_rate > self.adaptive_thresholds['error_rate']: affected.append("error_rate") if event.cpu_util and event.cpu_util > self.adaptive_thresholds['cpu_util']: affected.append("cpu") if event.memory_util and event.memory_util > self.adaptive_thresholds['memory_util']: affected.append("memory") if event.throughput < 500: # Low throughput threshold affected.append("throughput") return affected if affected else ["none"] def _classify_severity(self, anomaly_score: float) -> str: """Classify severity based on anomaly score""" if anomaly_score > 0.8: return "CRITICAL" elif anomaly_score > 0.6: return "HIGH" elif anomaly_score > 0.4: return "MEDIUM" return "LOW" def _generate_detection_recommendations(self, event: ReliabilityEvent, anomaly_score: float) -> List[str]: """Generate actionable recommendations based on detected anomalies""" recommendations = [] # Latency recommendations if event.latency_p99 > 500: recommendations.append("🚨 CRITICAL: Latency >500ms - Check database connections and external APIs immediately") elif event.latency_p99 > 300: recommendations.append("⚠️ HIGH: Latency >300ms - Investigate slow queries and service dependencies") elif event.latency_p99 > 150: recommendations.append("📈 Latency elevated - Monitor trends and consider optimization") # Error rate recommendations if event.error_rate > 0.3: recommendations.append("🚨 CRITICAL: Error rate >30% - Rollback recent deployments or enable circuit breaker") elif event.error_rate > 0.15: recommendations.append("⚠️ HIGH: Error rate >15% - Review application logs for exceptions") elif event.error_rate > 0.05: recommendations.append("📈 Errors increasing - Check for configuration issues") # Resource recommendations if event.cpu_util and event.cpu_util > 0.9: recommendations.append("🔥 CPU CRITICAL: >90% utilization - Scale horizontally or optimize hot paths") elif event.cpu_util and event.cpu_util > 0.8: recommendations.append("⚡ CPU HIGH: >80% utilization - Consider adding capacity") if event.memory_util and event.memory_util > 0.9: recommendations.append("💾 MEMORY CRITICAL: >90% utilization - Check for memory leaks") elif event.memory_util and event.memory_util > 0.8: recommendations.append("💾 MEMORY HIGH: >80% utilization - Monitor for leaks") # Overall severity recommendations if anomaly_score > 0.8: recommendations.append("🎯 IMMEDIATE ACTION REQUIRED: Multiple critical metrics affected") elif anomaly_score > 0.6: recommendations.append("🎯 INVESTIGATE: Significant performance degradation detected") elif anomaly_score > 0.4: recommendations.append("📊 MONITOR: Early warning signs detected") return recommendations[:5] # Return top 5 recommendations class RootCauseAgent(BaseAgent): def __init__(self): super().__init__(AgentSpecialization.DIAGNOSTICIAN) self.causal_patterns = self._load_causal_patterns() async def analyze(self, event: ReliabilityEvent) -> AgentResult: """AI-powered root cause analysis""" start_time = asyncio.get_event_loop().time() root_cause_analysis = self._perform_causal_analysis(event) return AgentResult( specialization=self.specialization, confidence=root_cause_analysis['confidence'], findings={ 'likely_root_causes': root_cause_analysis['causes'], 'evidence_patterns': root_cause_analysis['evidence'], 'dependency_analysis': self._analyze_dependencies(event), 'timeline_correlation': self._check_temporal_patterns(event) }, recommendations=root_cause_analysis['investigation_steps'], processing_time=asyncio.get_event_loop().time() - start_time ) def _load_causal_patterns(self) -> Dict[str, Any]: """Load known causal patterns for root cause analysis""" return { 'high_latency_high_errors': { 'pattern': ['latency > 500', 'error_rate > 0.2'], 'cause': 'Database or external dependency failure', 'confidence': 0.85 }, 'high_cpu_high_memory': { 'pattern': ['cpu > 0.9', 'memory > 0.9'], 'cause': 'Resource exhaustion or memory leak', 'confidence': 0.90 }, 'high_errors_normal_latency': { 'pattern': ['error_rate > 0.3', 'latency < 200'], 'cause': 'Application bug or configuration issue', 'confidence': 0.75 }, 'gradual_degradation': { 'pattern': ['200 < latency < 400', '0.05 < error_rate < 0.15'], 'cause': 'Resource saturation or dependency degradation', 'confidence': 0.65 } } def _perform_causal_analysis(self, event: ReliabilityEvent) -> Dict[str, Any]: """Analyze likely root causes based on event patterns""" causes = [] evidence = [] confidence = 0.5 # Pattern 1: Database/External Dependency Failure if event.latency_p99 > 500 and event.error_rate > 0.2: causes.append({ "cause": "Database/External Dependency Failure", "confidence": 0.85, "evidence": f"Extreme latency ({event.latency_p99:.0f}ms) with high errors ({event.error_rate*100:.1f}%)", "investigation": "Check database connection pool, external API health, network connectivity" }) evidence.append("extreme_latency_with_errors") confidence = 0.85 # Pattern 2: Resource Exhaustion if event.cpu_util and event.cpu_util > 0.9 and event.memory_util and event.memory_util > 0.9: causes.append({ "cause": "Resource Exhaustion", "confidence": 0.90, "evidence": f"CPU ({event.cpu_util*100:.1f}%) and Memory ({event.memory_util*100:.1f}%) critically high", "investigation": "Check for memory leaks, infinite loops, insufficient resource allocation" }) evidence.append("correlated_resource_exhaustion") confidence = max(confidence, 0.90) # Pattern 3: Application Bug / Configuration Issue if event.error_rate > 0.3 and event.latency_p99 < 200: causes.append({ "cause": "Application Bug / Configuration Issue", "confidence": 0.75, "evidence": f"High error rate ({event.error_rate*100:.1f}%) without latency impact", "investigation": "Review recent deployments, configuration changes, application logs, and error traces" }) evidence.append("errors_without_latency") confidence = max(confidence, 0.75) # Pattern 4: Gradual Performance Degradation if 200 <= event.latency_p99 <= 400 and 0.05 <= event.error_rate <= 0.15: causes.append({ "cause": "Gradual Performance Degradation", "confidence": 0.65, "evidence": f"Moderate latency ({event.latency_p99:.0f}ms) and errors ({event.error_rate*100:.1f}%)", "investigation": "Check resource trends, dependency performance, capacity planning, and scaling policies" }) evidence.append("gradual_degradation") confidence = max(confidence, 0.65) # Pattern 5: Traffic Spike if event.latency_p99 > 200 and event.throughput > 2000: causes.append({ "cause": "Traffic Spike / Capacity Issue", "confidence": 0.70, "evidence": f"Elevated latency ({event.latency_p99:.0f}ms) with high throughput ({event.throughput:.0f} req/s)", "investigation": "Check autoscaling configuration, rate limiting, and load balancer health" }) evidence.append("traffic_spike") confidence = max(confidence, 0.70) # Default: Unknown pattern if not causes: causes.append({ "cause": "Unknown - Requires Investigation", "confidence": 0.3, "evidence": "Pattern does not match known failure modes", "investigation": "Complete system review needed - check logs, metrics, and recent changes" }) evidence.append("unknown_pattern") confidence = 0.3 # Generate investigation steps investigation_steps = [cause['investigation'] for cause in causes[:3]] return { 'confidence': confidence, 'causes': causes, 'evidence': evidence, 'investigation_steps': investigation_steps } def _analyze_dependencies(self, event: ReliabilityEvent) -> Dict[str, Any]: """Analyze dependency health and potential cascade effects""" analysis = { 'has_upstream_deps': len(event.upstream_deps) > 0, 'upstream_services': event.upstream_deps, 'potential_cascade': False, 'cascade_risk_score': 0.0 } # Calculate cascade risk if event.error_rate > 0.2: analysis['potential_cascade'] = True analysis['cascade_risk_score'] = min(1.0, event.error_rate * 2) if event.latency_p99 > 500: analysis['potential_cascade'] = True analysis['cascade_risk_score'] = max( analysis['cascade_risk_score'], min(1.0, event.latency_p99 / 1000) ) return analysis def _check_temporal_patterns(self, event: ReliabilityEvent) -> Dict[str, Any]: """Check for time-based correlations""" import datetime current_time = datetime.datetime.now() hour = current_time.hour # Check for typical patterns patterns = { 'time_of_day_correlation': False, 'is_peak_hours': 9 <= hour <= 17, # Business hours 'is_off_hours': hour < 6 or hour > 22, 'deployment_window': 14 <= hour <= 16, # Typical deployment window 'weekend': current_time.weekday() >= 5 } # Flag potential correlations if patterns['is_peak_hours'] and event.latency_p99 > 200: patterns['time_of_day_correlation'] = True return patterns class OrchestrationManager: def __init__(self): self.agents = { AgentSpecialization.DETECTIVE: AnomalyDetectionAgent(), AgentSpecialization.DIAGNOSTICIAN: RootCauseAgent(), } self.incident_history = [] async def orchestrate_analysis(self, event: ReliabilityEvent) -> Dict[str, Any]: """Coordinate multiple agents for comprehensive analysis""" agent_tasks = { spec: agent.analyze(event) for spec, agent in self.agents.items() } # Parallel agent execution with error handling agent_results = {} for specialization, task in agent_tasks.items(): try: result = await asyncio.wait_for(task, timeout=10.0) agent_results[specialization.value] = result except asyncio.TimeoutError: # Agent timeout - continue with others print(f"Agent {specialization.value} timed out") continue except Exception as e: # Agent error - log and continue print(f"Agent {specialization.value} error: {e}") continue # Synthesize results return self._synthesize_agent_findings(event, agent_results) def _synthesize_agent_findings(self, event: ReliabilityEvent, agent_results: Dict) -> Dict[str, Any]: """Combine insights from all specialized agents""" detective_result = agent_results.get(AgentSpecialization.DETECTIVE.value) diagnostician_result = agent_results.get(AgentSpecialization.DIAGNOSTICIAN.value) if not detective_result: return {'error': 'No agent results available'} # Build comprehensive analysis synthesis = { 'incident_summary': { 'severity': detective_result.findings.get('severity_tier', 'UNKNOWN'), 'anomaly_confidence': detective_result.confidence, 'primary_metrics_affected': detective_result.findings.get('affected_metrics', []) }, 'root_cause_insights': diagnostician_result.findings if diagnostician_result else {}, 'recommended_actions': self._prioritize_actions( detective_result.recommendations, diagnostician_result.recommendations if diagnostician_result else [] ), 'business_context': self._add_business_context(event, detective_result.confidence), 'agent_metadata': { 'participating_agents': list(agent_results.keys()), 'processing_times': {k: v.processing_time for k, v in agent_results.items()} } } return synthesis def _prioritize_actions(self, detection_actions: List[str], diagnosis_actions: List[str]) -> List[str]: """Combine and prioritize actions from multiple agents""" all_actions = [] # Add critical actions first (those with 🚨) critical = [a for a in detection_actions + diagnosis_actions if '🚨' in a] all_actions.extend(critical) # Add high priority actions (those with ⚠️) high = [a for a in detection_actions + diagnosis_actions if '⚠️' in a and a not in all_actions] all_actions.extend(high) # Add remaining actions remaining = [a for a in detection_actions + diagnosis_actions if a not in all_actions] all_actions.extend(remaining) # Remove duplicates while preserving order seen = set() unique_actions = [] for action in all_actions: if action not in seen: seen.add(action) unique_actions.append(action) return unique_actions[:5] # Return top 5 actions def _add_business_context(self, event: ReliabilityEvent, confidence: float) -> Dict[str, Any]: """Add business impact context to the analysis""" # Calculate business severity if confidence > 0.8: business_severity = "CRITICAL" elif confidence > 0.6: business_severity = "HIGH" elif confidence > 0.4: business_severity = "MEDIUM" else: business_severity = "LOW" return { 'business_severity': business_severity, 'estimated_impact': f"{confidence * 100:.0f}% confidence of incident", 'recommended_escalation': confidence > 0.7 }