Agentic-Reliability-Framework-API / agent_orchestrator.py
petter2025's picture
Create agent_orchestrator.py
e78c83f verified
raw
history blame
6.53 kB
import asyncio
from typing import Dict, List, Any
from dataclasses import dataclass
from monitoring_models import AgentSpecialization
from models import ReliabilityEvent, AnomalyResult
@dataclass
class AgentResult:
specialization: AgentSpecialization
confidence: float
findings: Dict[str, Any]
recommendations: List[str]
processing_time: float
class BaseAgent:
def __init__(self, specialization: AgentSpecialization):
self.specialization = specialization
self.performance_metrics = {
'processed_events': 0,
'successful_analyses': 0,
'average_confidence': 0.0
}
async def analyze(self, event: ReliabilityEvent) -> AgentResult:
"""Base analysis method to be implemented by specialized agents"""
raise NotImplementedError
class AnomalyDetectionAgent(BaseAgent):
def __init__(self):
super().__init__(AgentSpecialization.DETECTIVE)
self.adaptive_thresholds = {}
async def analyze(self, event: ReliabilityEvent) -> AgentResult:
"""Enhanced anomaly detection with pattern recognition"""
start_time = asyncio.get_event_loop().time()
# Multi-dimensional anomaly scoring
anomaly_score = self._calculate_anomaly_score(event)
pattern_match = self._detect_known_patterns(event)
return AgentResult(
specialization=self.specialization,
confidence=anomaly_score,
findings={
'anomaly_score': anomaly_score,
'detected_patterns': pattern_match,
'affected_metrics': self._identify_affected_metrics(event),
'severity_tier': self._classify_severity(anomaly_score)
},
recommendations=self._generate_detection_recommendations(event, anomaly_score),
processing_time=asyncio.get_event_loop().time() - start_time
)
def _calculate_anomaly_score(self, event: ReliabilityEvent) -> float:
"""Calculate comprehensive anomaly score (0-1)"""
scores = []
# Latency anomaly (weighted 40%)
if event.latency_p99 > 150:
latency_score = min(1.0, (event.latency_p99 - 150) / 500)
scores.append(0.4 * latency_score)
# Error rate anomaly (weighted 30%)
if event.error_rate > 0.05:
error_score = min(1.0, event.error_rate / 0.3)
scores.append(0.3 * error_score)
# Resource anomaly (weighted 30%)
resource_score = 0
if event.cpu_util and event.cpu_util > 0.8:
resource_score += 0.15 * min(1.0, (event.cpu_util - 0.8) / 0.2)
if event.memory_util and event.memory_util > 0.8:
resource_score += 0.15 * min(1.0, (event.memory_util - 0.8) / 0.2)
scores.append(resource_score)
return min(1.0, sum(scores))
class RootCauseAgent(BaseAgent):
def __init__(self):
super().__init__(AgentSpecialization.DIAGNOSTICIAN)
self.causal_patterns = self._load_causal_patterns()
async def analyze(self, event: ReliabilityEvent) -> AgentResult:
"""AI-powered root cause analysis"""
start_time = asyncio.get_event_loop().time()
root_cause_analysis = self._perform_causal_analysis(event)
return AgentResult(
specialization=self.specialization,
confidence=root_cause_analysis['confidence'],
findings={
'likely_root_causes': root_cause_analysis['causes'],
'evidence_patterns': root_cause_analysis['evidence'],
'dependency_analysis': self._analyze_dependencies(event),
'timeline_correlation': self._check_temporal_patterns(event)
},
recommendations=root_cause_analysis['investigation_steps'],
processing_time=asyncio.get_event_loop().time() - start_time
)
class OrchestrationManager:
def __init__(self):
self.agents = {
AgentSpecialization.DETECTIVE: AnomalyDetectionAgent(),
AgentSpecialization.DIAGNOSTICIAN: RootCauseAgent(),
# Add more agents as we build them
}
self.incident_history = []
async def orchestrate_analysis(self, event: ReliabilityEvent) -> Dict[str, Any]:
"""Coordinate multiple agents for comprehensive analysis"""
agent_tasks = {
spec: agent.analyze(event)
for spec, agent in self.agents.items()
}
# Parallel agent execution
agent_results = {}
for specialization, task in agent_tasks.items():
try:
result = await asyncio.wait_for(task, timeout=10.0)
agent_results[specialization.value] = result
except asyncio.TimeoutError:
# Agent timeout - continue with others
continue
# Synthesize results
return self._synthesize_agent_findings(event, agent_results)
def _synthesize_agent_findings(self, event: ReliabilityEvent, agent_results: Dict) -> Dict[str, Any]:
"""Combine insights from all specialized agents"""
detective_result = agent_results.get(AgentSpecialization.DETECTIVE.value)
diagnostician_result = agent_results.get(AgentSpecialization.DIAGNOSTICIAN.value)
if not detective_result:
return {'error': 'No agent results available'}
# Build comprehensive analysis
synthesis = {
'incident_summary': {
'severity': detective_result.findings.get('severity_tier', 'UNKNOWN'),
'anomaly_confidence': detective_result.confidence,
'primary_metrics_affected': detective_result.findings.get('affected_metrics', [])
},
'root_cause_insights': diagnostician_result.findings if diagnostician_result else {},
'recommended_actions': self._prioritize_actions(
detective_result.recommendations,
diagnostician_result.recommendations if diagnostician_result else []
),
'business_context': self._add_business_context(event, detective_result.confidence),
'agent_metadata': {
'participating_agents': list(agent_results.keys()),
'processing_times': {k: v.processing_time for k, v in agent_results.items()}
}
}
return synthesis