| import asyncio |
| from typing import Dict, List, Any |
| from dataclasses import dataclass |
| from monitoring_models import AgentSpecialization |
| from models import ReliabilityEvent, AnomalyResult |
|
|
| @dataclass |
| class AgentResult: |
| specialization: AgentSpecialization |
| confidence: float |
| findings: Dict[str, Any] |
| recommendations: List[str] |
| processing_time: float |
|
|
| class BaseAgent: |
| def __init__(self, specialization: AgentSpecialization): |
| self.specialization = specialization |
| self.performance_metrics = { |
| 'processed_events': 0, |
| 'successful_analyses': 0, |
| 'average_confidence': 0.0 |
| } |
|
|
| async def analyze(self, event: ReliabilityEvent) -> AgentResult: |
| """Base analysis method to be implemented by specialized agents""" |
| raise NotImplementedError |
|
|
| class AnomalyDetectionAgent(BaseAgent): |
| def __init__(self): |
| super().__init__(AgentSpecialization.DETECTIVE) |
| self.adaptive_thresholds = { |
| 'latency_p99': 150, |
| 'error_rate': 0.05, |
| 'cpu_util': 0.8, |
| 'memory_util': 0.8 |
| } |
|
|
| async def analyze(self, event: ReliabilityEvent) -> AgentResult: |
| """Enhanced anomaly detection with pattern recognition""" |
| start_time = asyncio.get_event_loop().time() |
|
|
| |
| anomaly_score = self._calculate_anomaly_score(event) |
| pattern_match = self._detect_known_patterns(event) |
|
|
| return AgentResult( |
| specialization=self.specialization, |
| confidence=anomaly_score, |
| findings={ |
| 'anomaly_score': anomaly_score, |
| 'detected_patterns': pattern_match, |
| 'affected_metrics': self._identify_affected_metrics(event), |
| 'severity_tier': self._classify_severity(anomaly_score) |
| }, |
| recommendations=self._generate_detection_recommendations(event, anomaly_score), |
| processing_time=asyncio.get_event_loop().time() - start_time |
| ) |
|
|
| def _calculate_anomaly_score(self, event: ReliabilityEvent) -> float: |
| """Calculate comprehensive anomaly score (0-1)""" |
| scores = [] |
|
|
| |
| if event.latency_p99 > self.adaptive_thresholds['latency_p99']: |
| latency_score = min(1.0, (event.latency_p99 - self.adaptive_thresholds['latency_p99']) / 500) |
| scores.append(0.4 * latency_score) |
|
|
| |
| if event.error_rate > self.adaptive_thresholds['error_rate']: |
| error_score = min(1.0, event.error_rate / 0.3) |
| scores.append(0.3 * error_score) |
|
|
| |
| resource_score = 0 |
| if event.cpu_util and event.cpu_util > self.adaptive_thresholds['cpu_util']: |
| resource_score += 0.15 * min(1.0, (event.cpu_util - self.adaptive_thresholds['cpu_util']) / 0.2) |
| if event.memory_util and event.memory_util > self.adaptive_thresholds['memory_util']: |
| resource_score += 0.15 * min(1.0, (event.memory_util - self.adaptive_thresholds['memory_util']) / 0.2) |
| scores.append(resource_score) |
|
|
| return min(1.0, sum(scores)) |
|
|
| def _detect_known_patterns(self, event: ReliabilityEvent) -> List[str]: |
| """Detect known failure patterns""" |
| patterns = [] |
| |
| |
| if event.latency_p99 > 500 and event.error_rate > 0.2: |
| patterns.append("database_timeout") |
| |
| |
| if event.cpu_util and event.cpu_util > 0.9 and event.memory_util and event.memory_util > 0.9: |
| patterns.append("resource_exhaustion") |
| |
| |
| if event.error_rate > 0.15 and event.latency_p99 > 300: |
| patterns.append("cascading_failure") |
| |
| |
| if event.latency_p99 > 200 and event.throughput > 2000: |
| patterns.append("traffic_spike") |
| |
| |
| if 150 < event.latency_p99 < 300 and 0.05 < event.error_rate < 0.15: |
| patterns.append("gradual_degradation") |
| |
| return patterns if patterns else ["unknown_pattern"] |
|
|
| def _identify_affected_metrics(self, event: ReliabilityEvent) -> List[str]: |
| """Identify which metrics are outside normal range""" |
| affected = [] |
| |
| if event.latency_p99 > self.adaptive_thresholds['latency_p99']: |
| affected.append("latency") |
| |
| if event.error_rate > self.adaptive_thresholds['error_rate']: |
| affected.append("error_rate") |
| |
| if event.cpu_util and event.cpu_util > self.adaptive_thresholds['cpu_util']: |
| affected.append("cpu") |
| |
| if event.memory_util and event.memory_util > self.adaptive_thresholds['memory_util']: |
| affected.append("memory") |
| |
| if event.throughput < 500: |
| affected.append("throughput") |
| |
| return affected if affected else ["none"] |
|
|
| def _classify_severity(self, anomaly_score: float) -> str: |
| """Classify severity based on anomaly score""" |
| if anomaly_score > 0.8: |
| return "CRITICAL" |
| elif anomaly_score > 0.6: |
| return "HIGH" |
| elif anomaly_score > 0.4: |
| return "MEDIUM" |
| return "LOW" |
|
|
| def _generate_detection_recommendations(self, event: ReliabilityEvent, anomaly_score: float) -> List[str]: |
| """Generate actionable recommendations based on detected anomalies""" |
| recommendations = [] |
| |
| |
| if event.latency_p99 > 500: |
| recommendations.append("🚨 CRITICAL: Latency >500ms - Check database connections and external APIs immediately") |
| elif event.latency_p99 > 300: |
| recommendations.append("⚠️ HIGH: Latency >300ms - Investigate slow queries and service dependencies") |
| elif event.latency_p99 > 150: |
| recommendations.append("📈 Latency elevated - Monitor trends and consider optimization") |
| |
| |
| if event.error_rate > 0.3: |
| recommendations.append("🚨 CRITICAL: Error rate >30% - Rollback recent deployments or enable circuit breaker") |
| elif event.error_rate > 0.15: |
| recommendations.append("⚠️ HIGH: Error rate >15% - Review application logs for exceptions") |
| elif event.error_rate > 0.05: |
| recommendations.append("📈 Errors increasing - Check for configuration issues") |
| |
| |
| if event.cpu_util and event.cpu_util > 0.9: |
| recommendations.append("🔥 CPU CRITICAL: >90% utilization - Scale horizontally or optimize hot paths") |
| elif event.cpu_util and event.cpu_util > 0.8: |
| recommendations.append("⚡ CPU HIGH: >80% utilization - Consider adding capacity") |
| |
| if event.memory_util and event.memory_util > 0.9: |
| recommendations.append("💾 MEMORY CRITICAL: >90% utilization - Check for memory leaks") |
| elif event.memory_util and event.memory_util > 0.8: |
| recommendations.append("💾 MEMORY HIGH: >80% utilization - Monitor for leaks") |
| |
| |
| if anomaly_score > 0.8: |
| recommendations.append("🎯 IMMEDIATE ACTION REQUIRED: Multiple critical metrics affected") |
| elif anomaly_score > 0.6: |
| recommendations.append("🎯 INVESTIGATE: Significant performance degradation detected") |
| elif anomaly_score > 0.4: |
| recommendations.append("📊 MONITOR: Early warning signs detected") |
| |
| return recommendations[:5] |
|
|
| class RootCauseAgent(BaseAgent): |
| def __init__(self): |
| super().__init__(AgentSpecialization.DIAGNOSTICIAN) |
| self.causal_patterns = self._load_causal_patterns() |
|
|
| async def analyze(self, event: ReliabilityEvent) -> AgentResult: |
| """AI-powered root cause analysis""" |
| start_time = asyncio.get_event_loop().time() |
|
|
| root_cause_analysis = self._perform_causal_analysis(event) |
|
|
| return AgentResult( |
| specialization=self.specialization, |
| confidence=root_cause_analysis['confidence'], |
| findings={ |
| 'likely_root_causes': root_cause_analysis['causes'], |
| 'evidence_patterns': root_cause_analysis['evidence'], |
| 'dependency_analysis': self._analyze_dependencies(event), |
| 'timeline_correlation': self._check_temporal_patterns(event) |
| }, |
| recommendations=root_cause_analysis['investigation_steps'], |
| processing_time=asyncio.get_event_loop().time() - start_time |
| ) |
|
|
| def _load_causal_patterns(self) -> Dict[str, Any]: |
| """Load known causal patterns for root cause analysis""" |
| return { |
| 'high_latency_high_errors': { |
| 'pattern': ['latency > 500', 'error_rate > 0.2'], |
| 'cause': 'Database or external dependency failure', |
| 'confidence': 0.85 |
| }, |
| 'high_cpu_high_memory': { |
| 'pattern': ['cpu > 0.9', 'memory > 0.9'], |
| 'cause': 'Resource exhaustion or memory leak', |
| 'confidence': 0.90 |
| }, |
| 'high_errors_normal_latency': { |
| 'pattern': ['error_rate > 0.3', 'latency < 200'], |
| 'cause': 'Application bug or configuration issue', |
| 'confidence': 0.75 |
| }, |
| 'gradual_degradation': { |
| 'pattern': ['200 < latency < 400', '0.05 < error_rate < 0.15'], |
| 'cause': 'Resource saturation or dependency degradation', |
| 'confidence': 0.65 |
| } |
| } |
|
|
| def _perform_causal_analysis(self, event: ReliabilityEvent) -> Dict[str, Any]: |
| """Analyze likely root causes based on event patterns""" |
| causes = [] |
| evidence = [] |
| confidence = 0.5 |
| |
| |
| if event.latency_p99 > 500 and event.error_rate > 0.2: |
| causes.append({ |
| "cause": "Database/External Dependency Failure", |
| "confidence": 0.85, |
| "evidence": f"Extreme latency ({event.latency_p99:.0f}ms) with high errors ({event.error_rate*100:.1f}%)", |
| "investigation": "Check database connection pool, external API health, network connectivity" |
| }) |
| evidence.append("extreme_latency_with_errors") |
| confidence = 0.85 |
| |
| |
| if event.cpu_util and event.cpu_util > 0.9 and event.memory_util and event.memory_util > 0.9: |
| causes.append({ |
| "cause": "Resource Exhaustion", |
| "confidence": 0.90, |
| "evidence": f"CPU ({event.cpu_util*100:.1f}%) and Memory ({event.memory_util*100:.1f}%) critically high", |
| "investigation": "Check for memory leaks, infinite loops, insufficient resource allocation" |
| }) |
| evidence.append("correlated_resource_exhaustion") |
| confidence = max(confidence, 0.90) |
| |
| |
| if event.error_rate > 0.3 and event.latency_p99 < 200: |
| causes.append({ |
| "cause": "Application Bug / Configuration Issue", |
| "confidence": 0.75, |
| "evidence": f"High error rate ({event.error_rate*100:.1f}%) without latency impact", |
| "investigation": "Review recent deployments, configuration changes, application logs, and error traces" |
| }) |
| evidence.append("errors_without_latency") |
| confidence = max(confidence, 0.75) |
| |
| |
| if 200 <= event.latency_p99 <= 400 and 0.05 <= event.error_rate <= 0.15: |
| causes.append({ |
| "cause": "Gradual Performance Degradation", |
| "confidence": 0.65, |
| "evidence": f"Moderate latency ({event.latency_p99:.0f}ms) and errors ({event.error_rate*100:.1f}%)", |
| "investigation": "Check resource trends, dependency performance, capacity planning, and scaling policies" |
| }) |
| evidence.append("gradual_degradation") |
| confidence = max(confidence, 0.65) |
| |
| |
| if event.latency_p99 > 200 and event.throughput > 2000: |
| causes.append({ |
| "cause": "Traffic Spike / Capacity Issue", |
| "confidence": 0.70, |
| "evidence": f"Elevated latency ({event.latency_p99:.0f}ms) with high throughput ({event.throughput:.0f} req/s)", |
| "investigation": "Check autoscaling configuration, rate limiting, and load balancer health" |
| }) |
| evidence.append("traffic_spike") |
| confidence = max(confidence, 0.70) |
| |
| |
| if not causes: |
| causes.append({ |
| "cause": "Unknown - Requires Investigation", |
| "confidence": 0.3, |
| "evidence": "Pattern does not match known failure modes", |
| "investigation": "Complete system review needed - check logs, metrics, and recent changes" |
| }) |
| evidence.append("unknown_pattern") |
| confidence = 0.3 |
| |
| |
| investigation_steps = [cause['investigation'] for cause in causes[:3]] |
| |
| return { |
| 'confidence': confidence, |
| 'causes': causes, |
| 'evidence': evidence, |
| 'investigation_steps': investigation_steps |
| } |
|
|
| def _analyze_dependencies(self, event: ReliabilityEvent) -> Dict[str, Any]: |
| """Analyze dependency health and potential cascade effects""" |
| analysis = { |
| 'has_upstream_deps': len(event.upstream_deps) > 0, |
| 'upstream_services': event.upstream_deps, |
| 'potential_cascade': False, |
| 'cascade_risk_score': 0.0 |
| } |
| |
| |
| if event.error_rate > 0.2: |
| analysis['potential_cascade'] = True |
| analysis['cascade_risk_score'] = min(1.0, event.error_rate * 2) |
| |
| if event.latency_p99 > 500: |
| analysis['potential_cascade'] = True |
| analysis['cascade_risk_score'] = max( |
| analysis['cascade_risk_score'], |
| min(1.0, event.latency_p99 / 1000) |
| ) |
| |
| return analysis |
|
|
| def _check_temporal_patterns(self, event: ReliabilityEvent) -> Dict[str, Any]: |
| """Check for time-based correlations""" |
| import datetime |
| |
| current_time = datetime.datetime.now() |
| hour = current_time.hour |
| |
| |
| patterns = { |
| 'time_of_day_correlation': False, |
| 'is_peak_hours': 9 <= hour <= 17, |
| 'is_off_hours': hour < 6 or hour > 22, |
| 'deployment_window': 14 <= hour <= 16, |
| 'weekend': current_time.weekday() >= 5 |
| } |
| |
| |
| if patterns['is_peak_hours'] and event.latency_p99 > 200: |
| patterns['time_of_day_correlation'] = True |
| |
| return patterns |
|
|
| class OrchestrationManager: |
| def __init__(self): |
| self.agents = { |
| AgentSpecialization.DETECTIVE: AnomalyDetectionAgent(), |
| AgentSpecialization.DIAGNOSTICIAN: RootCauseAgent(), |
| } |
| self.incident_history = [] |
|
|
| async def orchestrate_analysis(self, event: ReliabilityEvent) -> Dict[str, Any]: |
| """Coordinate multiple agents for comprehensive analysis""" |
| agent_tasks = { |
| spec: agent.analyze(event) |
| for spec, agent in self.agents.items() |
| } |
|
|
| |
| agent_results = {} |
| for specialization, task in agent_tasks.items(): |
| try: |
| result = await asyncio.wait_for(task, timeout=10.0) |
| agent_results[specialization.value] = result |
| except asyncio.TimeoutError: |
| |
| print(f"Agent {specialization.value} timed out") |
| continue |
| except Exception as e: |
| |
| print(f"Agent {specialization.value} error: {e}") |
| continue |
|
|
| |
| return self._synthesize_agent_findings(event, agent_results) |
|
|
| def _synthesize_agent_findings(self, event: ReliabilityEvent, agent_results: Dict) -> Dict[str, Any]: |
| """Combine insights from all specialized agents""" |
| detective_result = agent_results.get(AgentSpecialization.DETECTIVE.value) |
| diagnostician_result = agent_results.get(AgentSpecialization.DIAGNOSTICIAN.value) |
|
|
| if not detective_result: |
| return {'error': 'No agent results available'} |
|
|
| |
| synthesis = { |
| 'incident_summary': { |
| 'severity': detective_result.findings.get('severity_tier', 'UNKNOWN'), |
| 'anomaly_confidence': detective_result.confidence, |
| 'primary_metrics_affected': detective_result.findings.get('affected_metrics', []) |
| }, |
| 'root_cause_insights': diagnostician_result.findings if diagnostician_result else {}, |
| 'recommended_actions': self._prioritize_actions( |
| detective_result.recommendations, |
| diagnostician_result.recommendations if diagnostician_result else [] |
| ), |
| 'business_context': self._add_business_context(event, detective_result.confidence), |
| 'agent_metadata': { |
| 'participating_agents': list(agent_results.keys()), |
| 'processing_times': {k: v.processing_time for k, v in agent_results.items()} |
| } |
| } |
|
|
| return synthesis |
|
|
| def _prioritize_actions(self, detection_actions: List[str], diagnosis_actions: List[str]) -> List[str]: |
| """Combine and prioritize actions from multiple agents""" |
| all_actions = [] |
| |
| |
| critical = [a for a in detection_actions + diagnosis_actions if '🚨' in a] |
| all_actions.extend(critical) |
| |
| |
| high = [a for a in detection_actions + diagnosis_actions if '⚠️' in a and a not in all_actions] |
| all_actions.extend(high) |
| |
| |
| remaining = [a for a in detection_actions + diagnosis_actions if a not in all_actions] |
| all_actions.extend(remaining) |
| |
| |
| seen = set() |
| unique_actions = [] |
| for action in all_actions: |
| if action not in seen: |
| seen.add(action) |
| unique_actions.append(action) |
| |
| return unique_actions[:5] |
|
|
| def _add_business_context(self, event: ReliabilityEvent, confidence: float) -> Dict[str, Any]: |
| """Add business impact context to the analysis""" |
| |
| if confidence > 0.8: |
| business_severity = "CRITICAL" |
| elif confidence > 0.6: |
| business_severity = "HIGH" |
| elif confidence > 0.4: |
| business_severity = "MEDIUM" |
| else: |
| business_severity = "LOW" |
| |
| return { |
| 'business_severity': business_severity, |
| 'estimated_impact': f"{confidence * 100:.0f}% confidence of incident", |
| 'recommended_escalation': confidence > 0.7 |
| } |