petter2025 commited on
Commit
e78c83f
·
verified ·
1 Parent(s): cf788e2

Create agent_orchestrator.py

Browse files
Files changed (1) hide show
  1. agent_orchestrator.py +158 -0
agent_orchestrator.py ADDED
@@ -0,0 +1,158 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ from typing import Dict, List, Any
3
+ from dataclasses import dataclass
4
+ from monitoring_models import AgentSpecialization
5
+ from models import ReliabilityEvent, AnomalyResult
6
+
7
+ @dataclass
8
+ class AgentResult:
9
+ specialization: AgentSpecialization
10
+ confidence: float
11
+ findings: Dict[str, Any]
12
+ recommendations: List[str]
13
+ processing_time: float
14
+
15
+ class BaseAgent:
16
+ def __init__(self, specialization: AgentSpecialization):
17
+ self.specialization = specialization
18
+ self.performance_metrics = {
19
+ 'processed_events': 0,
20
+ 'successful_analyses': 0,
21
+ 'average_confidence': 0.0
22
+ }
23
+
24
+ async def analyze(self, event: ReliabilityEvent) -> AgentResult:
25
+ """Base analysis method to be implemented by specialized agents"""
26
+ raise NotImplementedError
27
+
28
+ class AnomalyDetectionAgent(BaseAgent):
29
+ def __init__(self):
30
+ super().__init__(AgentSpecialization.DETECTIVE)
31
+ self.adaptive_thresholds = {}
32
+
33
+ async def analyze(self, event: ReliabilityEvent) -> AgentResult:
34
+ """Enhanced anomaly detection with pattern recognition"""
35
+ start_time = asyncio.get_event_loop().time()
36
+
37
+ # Multi-dimensional anomaly scoring
38
+ anomaly_score = self._calculate_anomaly_score(event)
39
+ pattern_match = self._detect_known_patterns(event)
40
+
41
+ return AgentResult(
42
+ specialization=self.specialization,
43
+ confidence=anomaly_score,
44
+ findings={
45
+ 'anomaly_score': anomaly_score,
46
+ 'detected_patterns': pattern_match,
47
+ 'affected_metrics': self._identify_affected_metrics(event),
48
+ 'severity_tier': self._classify_severity(anomaly_score)
49
+ },
50
+ recommendations=self._generate_detection_recommendations(event, anomaly_score),
51
+ processing_time=asyncio.get_event_loop().time() - start_time
52
+ )
53
+
54
+ def _calculate_anomaly_score(self, event: ReliabilityEvent) -> float:
55
+ """Calculate comprehensive anomaly score (0-1)"""
56
+ scores = []
57
+
58
+ # Latency anomaly (weighted 40%)
59
+ if event.latency_p99 > 150:
60
+ latency_score = min(1.0, (event.latency_p99 - 150) / 500)
61
+ scores.append(0.4 * latency_score)
62
+
63
+ # Error rate anomaly (weighted 30%)
64
+ if event.error_rate > 0.05:
65
+ error_score = min(1.0, event.error_rate / 0.3)
66
+ scores.append(0.3 * error_score)
67
+
68
+ # Resource anomaly (weighted 30%)
69
+ resource_score = 0
70
+ if event.cpu_util and event.cpu_util > 0.8:
71
+ resource_score += 0.15 * min(1.0, (event.cpu_util - 0.8) / 0.2)
72
+ if event.memory_util and event.memory_util > 0.8:
73
+ resource_score += 0.15 * min(1.0, (event.memory_util - 0.8) / 0.2)
74
+ scores.append(resource_score)
75
+
76
+ return min(1.0, sum(scores))
77
+
78
+ class RootCauseAgent(BaseAgent):
79
+ def __init__(self):
80
+ super().__init__(AgentSpecialization.DIAGNOSTICIAN)
81
+ self.causal_patterns = self._load_causal_patterns()
82
+
83
+ async def analyze(self, event: ReliabilityEvent) -> AgentResult:
84
+ """AI-powered root cause analysis"""
85
+ start_time = asyncio.get_event_loop().time()
86
+
87
+ root_cause_analysis = self._perform_causal_analysis(event)
88
+
89
+ return AgentResult(
90
+ specialization=self.specialization,
91
+ confidence=root_cause_analysis['confidence'],
92
+ findings={
93
+ 'likely_root_causes': root_cause_analysis['causes'],
94
+ 'evidence_patterns': root_cause_analysis['evidence'],
95
+ 'dependency_analysis': self._analyze_dependencies(event),
96
+ 'timeline_correlation': self._check_temporal_patterns(event)
97
+ },
98
+ recommendations=root_cause_analysis['investigation_steps'],
99
+ processing_time=asyncio.get_event_loop().time() - start_time
100
+ )
101
+
102
+ class OrchestrationManager:
103
+ def __init__(self):
104
+ self.agents = {
105
+ AgentSpecialization.DETECTIVE: AnomalyDetectionAgent(),
106
+ AgentSpecialization.DIAGNOSTICIAN: RootCauseAgent(),
107
+ # Add more agents as we build them
108
+ }
109
+ self.incident_history = []
110
+
111
+ async def orchestrate_analysis(self, event: ReliabilityEvent) -> Dict[str, Any]:
112
+ """Coordinate multiple agents for comprehensive analysis"""
113
+ agent_tasks = {
114
+ spec: agent.analyze(event)
115
+ for spec, agent in self.agents.items()
116
+ }
117
+
118
+ # Parallel agent execution
119
+ agent_results = {}
120
+ for specialization, task in agent_tasks.items():
121
+ try:
122
+ result = await asyncio.wait_for(task, timeout=10.0)
123
+ agent_results[specialization.value] = result
124
+ except asyncio.TimeoutError:
125
+ # Agent timeout - continue with others
126
+ continue
127
+
128
+ # Synthesize results
129
+ return self._synthesize_agent_findings(event, agent_results)
130
+
131
+ def _synthesize_agent_findings(self, event: ReliabilityEvent, agent_results: Dict) -> Dict[str, Any]:
132
+ """Combine insights from all specialized agents"""
133
+ detective_result = agent_results.get(AgentSpecialization.DETECTIVE.value)
134
+ diagnostician_result = agent_results.get(AgentSpecialization.DIAGNOSTICIAN.value)
135
+
136
+ if not detective_result:
137
+ return {'error': 'No agent results available'}
138
+
139
+ # Build comprehensive analysis
140
+ synthesis = {
141
+ 'incident_summary': {
142
+ 'severity': detective_result.findings.get('severity_tier', 'UNKNOWN'),
143
+ 'anomaly_confidence': detective_result.confidence,
144
+ 'primary_metrics_affected': detective_result.findings.get('affected_metrics', [])
145
+ },
146
+ 'root_cause_insights': diagnostician_result.findings if diagnostician_result else {},
147
+ 'recommended_actions': self._prioritize_actions(
148
+ detective_result.recommendations,
149
+ diagnostician_result.recommendations if diagnostician_result else []
150
+ ),
151
+ 'business_context': self._add_business_context(event, detective_result.confidence),
152
+ 'agent_metadata': {
153
+ 'participating_agents': list(agent_results.keys()),
154
+ 'processing_times': {k: v.processing_time for k, v in agent_results.items()}
155
+ }
156
+ }
157
+
158
+ return synthesis