| import asyncio |
| from typing import Dict, List, Any |
| from dataclasses import dataclass |
| from monitoring_models import AgentSpecialization |
| from models import ReliabilityEvent, AnomalyResult |
|
|
| @dataclass |
| class AgentResult: |
| specialization: AgentSpecialization |
| confidence: float |
| findings: Dict[str, Any] |
| recommendations: List[str] |
| processing_time: float |
|
|
| class BaseAgent: |
| def __init__(self, specialization: AgentSpecialization): |
| self.specialization = specialization |
| self.performance_metrics = { |
| 'processed_events': 0, |
| 'successful_analyses': 0, |
| 'average_confidence': 0.0 |
| } |
| |
| async def analyze(self, event: ReliabilityEvent) -> AgentResult: |
| """Base analysis method to be implemented by specialized agents""" |
| raise NotImplementedError |
|
|
| class AnomalyDetectionAgent(BaseAgent): |
| def __init__(self): |
| super().__init__(AgentSpecialization.DETECTIVE) |
| self.adaptive_thresholds = {} |
| |
| async def analyze(self, event: ReliabilityEvent) -> AgentResult: |
| """Enhanced anomaly detection with pattern recognition""" |
| start_time = asyncio.get_event_loop().time() |
| |
| |
| anomaly_score = self._calculate_anomaly_score(event) |
| pattern_match = self._detect_known_patterns(event) |
| |
| return AgentResult( |
| specialization=self.specialization, |
| confidence=anomaly_score, |
| findings={ |
| 'anomaly_score': anomaly_score, |
| 'detected_patterns': pattern_match, |
| 'affected_metrics': self._identify_affected_metrics(event), |
| 'severity_tier': self._classify_severity(anomaly_score) |
| }, |
| recommendations=self._generate_detection_recommendations(event, anomaly_score), |
| processing_time=asyncio.get_event_loop().time() - start_time |
| ) |
| |
| def _calculate_anomaly_score(self, event: ReliabilityEvent) -> float: |
| """Calculate comprehensive anomaly score (0-1)""" |
| scores = [] |
| |
| |
| if event.latency_p99 > 150: |
| latency_score = min(1.0, (event.latency_p99 - 150) / 500) |
| scores.append(0.4 * latency_score) |
| |
| |
| if event.error_rate > 0.05: |
| error_score = min(1.0, event.error_rate / 0.3) |
| scores.append(0.3 * error_score) |
| |
| |
| resource_score = 0 |
| if event.cpu_util and event.cpu_util > 0.8: |
| resource_score += 0.15 * min(1.0, (event.cpu_util - 0.8) / 0.2) |
| if event.memory_util and event.memory_util > 0.8: |
| resource_score += 0.15 * min(1.0, (event.memory_util - 0.8) / 0.2) |
| scores.append(resource_score) |
| |
| return min(1.0, sum(scores)) |
|
|
| class RootCauseAgent(BaseAgent): |
| def __init__(self): |
| super().__init__(AgentSpecialization.DIAGNOSTICIAN) |
| self.causal_patterns = self._load_causal_patterns() |
| |
| async def analyze(self, event: ReliabilityEvent) -> AgentResult: |
| """AI-powered root cause analysis""" |
| start_time = asyncio.get_event_loop().time() |
| |
| root_cause_analysis = self._perform_causal_analysis(event) |
| |
| return AgentResult( |
| specialization=self.specialization, |
| confidence=root_cause_analysis['confidence'], |
| findings={ |
| 'likely_root_causes': root_cause_analysis['causes'], |
| 'evidence_patterns': root_cause_analysis['evidence'], |
| 'dependency_analysis': self._analyze_dependencies(event), |
| 'timeline_correlation': self._check_temporal_patterns(event) |
| }, |
| recommendations=root_cause_analysis['investigation_steps'], |
| processing_time=asyncio.get_event_loop().time() - start_time |
| ) |
|
|
| class OrchestrationManager: |
| def __init__(self): |
| self.agents = { |
| AgentSpecialization.DETECTIVE: AnomalyDetectionAgent(), |
| AgentSpecialization.DIAGNOSTICIAN: RootCauseAgent(), |
| |
| } |
| self.incident_history = [] |
| |
| async def orchestrate_analysis(self, event: ReliabilityEvent) -> Dict[str, Any]: |
| """Coordinate multiple agents for comprehensive analysis""" |
| agent_tasks = { |
| spec: agent.analyze(event) |
| for spec, agent in self.agents.items() |
| } |
| |
| |
| agent_results = {} |
| for specialization, task in agent_tasks.items(): |
| try: |
| result = await asyncio.wait_for(task, timeout=10.0) |
| agent_results[specialization.value] = result |
| except asyncio.TimeoutError: |
| |
| continue |
| |
| |
| return self._synthesize_agent_findings(event, agent_results) |
| |
| def _synthesize_agent_findings(self, event: ReliabilityEvent, agent_results: Dict) -> Dict[str, Any]: |
| """Combine insights from all specialized agents""" |
| detective_result = agent_results.get(AgentSpecialization.DETECTIVE.value) |
| diagnostician_result = agent_results.get(AgentSpecialization.DIAGNOSTICIAN.value) |
| |
| if not detective_result: |
| return {'error': 'No agent results available'} |
| |
| |
| synthesis = { |
| 'incident_summary': { |
| 'severity': detective_result.findings.get('severity_tier', 'UNKNOWN'), |
| 'anomaly_confidence': detective_result.confidence, |
| 'primary_metrics_affected': detective_result.findings.get('affected_metrics', []) |
| }, |
| 'root_cause_insights': diagnostician_result.findings if diagnostician_result else {}, |
| 'recommended_actions': self._prioritize_actions( |
| detective_result.recommendations, |
| diagnostician_result.recommendations if diagnostician_result else [] |
| ), |
| 'business_context': self._add_business_context(event, detective_result.confidence), |
| 'agent_metadata': { |
| 'participating_agents': list(agent_results.keys()), |
| 'processing_times': {k: v.processing_time for k, v in agent_results.items()} |
| } |
| } |
| |
| return synthesis |