File size: 6,526 Bytes
e78c83f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 | import asyncio
from typing import Dict, List, Any
from dataclasses import dataclass
from monitoring_models import AgentSpecialization
from models import ReliabilityEvent, AnomalyResult
@dataclass
class AgentResult:
specialization: AgentSpecialization
confidence: float
findings: Dict[str, Any]
recommendations: List[str]
processing_time: float
class BaseAgent:
def __init__(self, specialization: AgentSpecialization):
self.specialization = specialization
self.performance_metrics = {
'processed_events': 0,
'successful_analyses': 0,
'average_confidence': 0.0
}
async def analyze(self, event: ReliabilityEvent) -> AgentResult:
"""Base analysis method to be implemented by specialized agents"""
raise NotImplementedError
class AnomalyDetectionAgent(BaseAgent):
def __init__(self):
super().__init__(AgentSpecialization.DETECTIVE)
self.adaptive_thresholds = {}
async def analyze(self, event: ReliabilityEvent) -> AgentResult:
"""Enhanced anomaly detection with pattern recognition"""
start_time = asyncio.get_event_loop().time()
# Multi-dimensional anomaly scoring
anomaly_score = self._calculate_anomaly_score(event)
pattern_match = self._detect_known_patterns(event)
return AgentResult(
specialization=self.specialization,
confidence=anomaly_score,
findings={
'anomaly_score': anomaly_score,
'detected_patterns': pattern_match,
'affected_metrics': self._identify_affected_metrics(event),
'severity_tier': self._classify_severity(anomaly_score)
},
recommendations=self._generate_detection_recommendations(event, anomaly_score),
processing_time=asyncio.get_event_loop().time() - start_time
)
def _calculate_anomaly_score(self, event: ReliabilityEvent) -> float:
"""Calculate comprehensive anomaly score (0-1)"""
scores = []
# Latency anomaly (weighted 40%)
if event.latency_p99 > 150:
latency_score = min(1.0, (event.latency_p99 - 150) / 500)
scores.append(0.4 * latency_score)
# Error rate anomaly (weighted 30%)
if event.error_rate > 0.05:
error_score = min(1.0, event.error_rate / 0.3)
scores.append(0.3 * error_score)
# Resource anomaly (weighted 30%)
resource_score = 0
if event.cpu_util and event.cpu_util > 0.8:
resource_score += 0.15 * min(1.0, (event.cpu_util - 0.8) / 0.2)
if event.memory_util and event.memory_util > 0.8:
resource_score += 0.15 * min(1.0, (event.memory_util - 0.8) / 0.2)
scores.append(resource_score)
return min(1.0, sum(scores))
class RootCauseAgent(BaseAgent):
def __init__(self):
super().__init__(AgentSpecialization.DIAGNOSTICIAN)
self.causal_patterns = self._load_causal_patterns()
async def analyze(self, event: ReliabilityEvent) -> AgentResult:
"""AI-powered root cause analysis"""
start_time = asyncio.get_event_loop().time()
root_cause_analysis = self._perform_causal_analysis(event)
return AgentResult(
specialization=self.specialization,
confidence=root_cause_analysis['confidence'],
findings={
'likely_root_causes': root_cause_analysis['causes'],
'evidence_patterns': root_cause_analysis['evidence'],
'dependency_analysis': self._analyze_dependencies(event),
'timeline_correlation': self._check_temporal_patterns(event)
},
recommendations=root_cause_analysis['investigation_steps'],
processing_time=asyncio.get_event_loop().time() - start_time
)
class OrchestrationManager:
def __init__(self):
self.agents = {
AgentSpecialization.DETECTIVE: AnomalyDetectionAgent(),
AgentSpecialization.DIAGNOSTICIAN: RootCauseAgent(),
# Add more agents as we build them
}
self.incident_history = []
async def orchestrate_analysis(self, event: ReliabilityEvent) -> Dict[str, Any]:
"""Coordinate multiple agents for comprehensive analysis"""
agent_tasks = {
spec: agent.analyze(event)
for spec, agent in self.agents.items()
}
# Parallel agent execution
agent_results = {}
for specialization, task in agent_tasks.items():
try:
result = await asyncio.wait_for(task, timeout=10.0)
agent_results[specialization.value] = result
except asyncio.TimeoutError:
# Agent timeout - continue with others
continue
# Synthesize results
return self._synthesize_agent_findings(event, agent_results)
def _synthesize_agent_findings(self, event: ReliabilityEvent, agent_results: Dict) -> Dict[str, Any]:
"""Combine insights from all specialized agents"""
detective_result = agent_results.get(AgentSpecialization.DETECTIVE.value)
diagnostician_result = agent_results.get(AgentSpecialization.DIAGNOSTICIAN.value)
if not detective_result:
return {'error': 'No agent results available'}
# Build comprehensive analysis
synthesis = {
'incident_summary': {
'severity': detective_result.findings.get('severity_tier', 'UNKNOWN'),
'anomaly_confidence': detective_result.confidence,
'primary_metrics_affected': detective_result.findings.get('affected_metrics', [])
},
'root_cause_insights': diagnostician_result.findings if diagnostician_result else {},
'recommended_actions': self._prioritize_actions(
detective_result.recommendations,
diagnostician_result.recommendations if diagnostician_result else []
),
'business_context': self._add_business_context(event, detective_result.confidence),
'agent_metadata': {
'participating_agents': list(agent_results.keys()),
'processing_times': {k: v.processing_time for k, v in agent_results.items()}
}
}
return synthesis |