Agentic-Reliability-Framework-API / agent_orchestrator.py
petter2025's picture
Update agent_orchestrator.py
fc7752d verified
raw
history blame
20.1 kB
import asyncio
from typing import Dict, List, Any
from dataclasses import dataclass
from monitoring_models import AgentSpecialization
from models import ReliabilityEvent, AnomalyResult
@dataclass
class AgentResult:
specialization: AgentSpecialization
confidence: float
findings: Dict[str, Any]
recommendations: List[str]
processing_time: float
class BaseAgent:
def __init__(self, specialization: AgentSpecialization):
self.specialization = specialization
self.performance_metrics = {
'processed_events': 0,
'successful_analyses': 0,
'average_confidence': 0.0
}
async def analyze(self, event: ReliabilityEvent) -> AgentResult:
"""Base analysis method to be implemented by specialized agents"""
raise NotImplementedError
class AnomalyDetectionAgent(BaseAgent):
def __init__(self):
super().__init__(AgentSpecialization.DETECTIVE)
self.adaptive_thresholds = {
'latency_p99': 150,
'error_rate': 0.05,
'cpu_util': 0.8,
'memory_util': 0.8
}
async def analyze(self, event: ReliabilityEvent) -> AgentResult:
"""Enhanced anomaly detection with pattern recognition"""
start_time = asyncio.get_event_loop().time()
# Multi-dimensional anomaly scoring
anomaly_score = self._calculate_anomaly_score(event)
pattern_match = self._detect_known_patterns(event)
return AgentResult(
specialization=self.specialization,
confidence=anomaly_score,
findings={
'anomaly_score': anomaly_score,
'detected_patterns': pattern_match,
'affected_metrics': self._identify_affected_metrics(event),
'severity_tier': self._classify_severity(anomaly_score)
},
recommendations=self._generate_detection_recommendations(event, anomaly_score),
processing_time=asyncio.get_event_loop().time() - start_time
)
def _calculate_anomaly_score(self, event: ReliabilityEvent) -> float:
"""Calculate comprehensive anomaly score (0-1)"""
scores = []
# Latency anomaly (weighted 40%)
if event.latency_p99 > self.adaptive_thresholds['latency_p99']:
latency_score = min(1.0, (event.latency_p99 - self.adaptive_thresholds['latency_p99']) / 500)
scores.append(0.4 * latency_score)
# Error rate anomaly (weighted 30%)
if event.error_rate > self.adaptive_thresholds['error_rate']:
error_score = min(1.0, event.error_rate / 0.3)
scores.append(0.3 * error_score)
# Resource anomaly (weighted 30%)
resource_score = 0
if event.cpu_util and event.cpu_util > self.adaptive_thresholds['cpu_util']:
resource_score += 0.15 * min(1.0, (event.cpu_util - self.adaptive_thresholds['cpu_util']) / 0.2)
if event.memory_util and event.memory_util > self.adaptive_thresholds['memory_util']:
resource_score += 0.15 * min(1.0, (event.memory_util - self.adaptive_thresholds['memory_util']) / 0.2)
scores.append(resource_score)
return min(1.0, sum(scores))
def _detect_known_patterns(self, event: ReliabilityEvent) -> List[str]:
"""Detect known failure patterns"""
patterns = []
# Database timeout pattern
if event.latency_p99 > 500 and event.error_rate > 0.2:
patterns.append("database_timeout")
# Resource exhaustion pattern
if event.cpu_util and event.cpu_util > 0.9 and event.memory_util and event.memory_util > 0.9:
patterns.append("resource_exhaustion")
# Cascading failure pattern
if event.error_rate > 0.15 and event.latency_p99 > 300:
patterns.append("cascading_failure")
# Traffic spike pattern
if event.latency_p99 > 200 and event.throughput > 2000:
patterns.append("traffic_spike")
# Gradual degradation
if 150 < event.latency_p99 < 300 and 0.05 < event.error_rate < 0.15:
patterns.append("gradual_degradation")
return patterns if patterns else ["unknown_pattern"]
def _identify_affected_metrics(self, event: ReliabilityEvent) -> List[str]:
"""Identify which metrics are outside normal range"""
affected = []
if event.latency_p99 > self.adaptive_thresholds['latency_p99']:
affected.append("latency")
if event.error_rate > self.adaptive_thresholds['error_rate']:
affected.append("error_rate")
if event.cpu_util and event.cpu_util > self.adaptive_thresholds['cpu_util']:
affected.append("cpu")
if event.memory_util and event.memory_util > self.adaptive_thresholds['memory_util']:
affected.append("memory")
if event.throughput < 500: # Low throughput threshold
affected.append("throughput")
return affected if affected else ["none"]
def _classify_severity(self, anomaly_score: float) -> str:
"""Classify severity based on anomaly score"""
if anomaly_score > 0.8:
return "CRITICAL"
elif anomaly_score > 0.6:
return "HIGH"
elif anomaly_score > 0.4:
return "MEDIUM"
return "LOW"
def _generate_detection_recommendations(self, event: ReliabilityEvent, anomaly_score: float) -> List[str]:
"""Generate actionable recommendations based on detected anomalies"""
recommendations = []
# Latency recommendations
if event.latency_p99 > 500:
recommendations.append("🚨 CRITICAL: Latency >500ms - Check database connections and external APIs immediately")
elif event.latency_p99 > 300:
recommendations.append("⚠️ HIGH: Latency >300ms - Investigate slow queries and service dependencies")
elif event.latency_p99 > 150:
recommendations.append("📈 Latency elevated - Monitor trends and consider optimization")
# Error rate recommendations
if event.error_rate > 0.3:
recommendations.append("🚨 CRITICAL: Error rate >30% - Rollback recent deployments or enable circuit breaker")
elif event.error_rate > 0.15:
recommendations.append("⚠️ HIGH: Error rate >15% - Review application logs for exceptions")
elif event.error_rate > 0.05:
recommendations.append("📈 Errors increasing - Check for configuration issues")
# Resource recommendations
if event.cpu_util and event.cpu_util > 0.9:
recommendations.append("🔥 CPU CRITICAL: >90% utilization - Scale horizontally or optimize hot paths")
elif event.cpu_util and event.cpu_util > 0.8:
recommendations.append("⚡ CPU HIGH: >80% utilization - Consider adding capacity")
if event.memory_util and event.memory_util > 0.9:
recommendations.append("💾 MEMORY CRITICAL: >90% utilization - Check for memory leaks")
elif event.memory_util and event.memory_util > 0.8:
recommendations.append("💾 MEMORY HIGH: >80% utilization - Monitor for leaks")
# Overall severity recommendations
if anomaly_score > 0.8:
recommendations.append("🎯 IMMEDIATE ACTION REQUIRED: Multiple critical metrics affected")
elif anomaly_score > 0.6:
recommendations.append("🎯 INVESTIGATE: Significant performance degradation detected")
elif anomaly_score > 0.4:
recommendations.append("📊 MONITOR: Early warning signs detected")
return recommendations[:5] # Return top 5 recommendations
class RootCauseAgent(BaseAgent):
def __init__(self):
super().__init__(AgentSpecialization.DIAGNOSTICIAN)
self.causal_patterns = self._load_causal_patterns()
async def analyze(self, event: ReliabilityEvent) -> AgentResult:
"""AI-powered root cause analysis"""
start_time = asyncio.get_event_loop().time()
root_cause_analysis = self._perform_causal_analysis(event)
return AgentResult(
specialization=self.specialization,
confidence=root_cause_analysis['confidence'],
findings={
'likely_root_causes': root_cause_analysis['causes'],
'evidence_patterns': root_cause_analysis['evidence'],
'dependency_analysis': self._analyze_dependencies(event),
'timeline_correlation': self._check_temporal_patterns(event)
},
recommendations=root_cause_analysis['investigation_steps'],
processing_time=asyncio.get_event_loop().time() - start_time
)
def _load_causal_patterns(self) -> Dict[str, Any]:
"""Load known causal patterns for root cause analysis"""
return {
'high_latency_high_errors': {
'pattern': ['latency > 500', 'error_rate > 0.2'],
'cause': 'Database or external dependency failure',
'confidence': 0.85
},
'high_cpu_high_memory': {
'pattern': ['cpu > 0.9', 'memory > 0.9'],
'cause': 'Resource exhaustion or memory leak',
'confidence': 0.90
},
'high_errors_normal_latency': {
'pattern': ['error_rate > 0.3', 'latency < 200'],
'cause': 'Application bug or configuration issue',
'confidence': 0.75
},
'gradual_degradation': {
'pattern': ['200 < latency < 400', '0.05 < error_rate < 0.15'],
'cause': 'Resource saturation or dependency degradation',
'confidence': 0.65
}
}
def _perform_causal_analysis(self, event: ReliabilityEvent) -> Dict[str, Any]:
"""Analyze likely root causes based on event patterns"""
causes = []
evidence = []
confidence = 0.5
# Pattern 1: Database/External Dependency Failure
if event.latency_p99 > 500 and event.error_rate > 0.2:
causes.append({
"cause": "Database/External Dependency Failure",
"confidence": 0.85,
"evidence": f"Extreme latency ({event.latency_p99:.0f}ms) with high errors ({event.error_rate*100:.1f}%)",
"investigation": "Check database connection pool, external API health, network connectivity"
})
evidence.append("extreme_latency_with_errors")
confidence = 0.85
# Pattern 2: Resource Exhaustion
if event.cpu_util and event.cpu_util > 0.9 and event.memory_util and event.memory_util > 0.9:
causes.append({
"cause": "Resource Exhaustion",
"confidence": 0.90,
"evidence": f"CPU ({event.cpu_util*100:.1f}%) and Memory ({event.memory_util*100:.1f}%) critically high",
"investigation": "Check for memory leaks, infinite loops, insufficient resource allocation"
})
evidence.append("correlated_resource_exhaustion")
confidence = max(confidence, 0.90)
# Pattern 3: Application Bug / Configuration Issue
if event.error_rate > 0.3 and event.latency_p99 < 200:
causes.append({
"cause": "Application Bug / Configuration Issue",
"confidence": 0.75,
"evidence": f"High error rate ({event.error_rate*100:.1f}%) without latency impact",
"investigation": "Review recent deployments, configuration changes, application logs, and error traces"
})
evidence.append("errors_without_latency")
confidence = max(confidence, 0.75)
# Pattern 4: Gradual Performance Degradation
if 200 <= event.latency_p99 <= 400 and 0.05 <= event.error_rate <= 0.15:
causes.append({
"cause": "Gradual Performance Degradation",
"confidence": 0.65,
"evidence": f"Moderate latency ({event.latency_p99:.0f}ms) and errors ({event.error_rate*100:.1f}%)",
"investigation": "Check resource trends, dependency performance, capacity planning, and scaling policies"
})
evidence.append("gradual_degradation")
confidence = max(confidence, 0.65)
# Pattern 5: Traffic Spike
if event.latency_p99 > 200 and event.throughput > 2000:
causes.append({
"cause": "Traffic Spike / Capacity Issue",
"confidence": 0.70,
"evidence": f"Elevated latency ({event.latency_p99:.0f}ms) with high throughput ({event.throughput:.0f} req/s)",
"investigation": "Check autoscaling configuration, rate limiting, and load balancer health"
})
evidence.append("traffic_spike")
confidence = max(confidence, 0.70)
# Default: Unknown pattern
if not causes:
causes.append({
"cause": "Unknown - Requires Investigation",
"confidence": 0.3,
"evidence": "Pattern does not match known failure modes",
"investigation": "Complete system review needed - check logs, metrics, and recent changes"
})
evidence.append("unknown_pattern")
confidence = 0.3
# Generate investigation steps
investigation_steps = [cause['investigation'] for cause in causes[:3]]
return {
'confidence': confidence,
'causes': causes,
'evidence': evidence,
'investigation_steps': investigation_steps
}
def _analyze_dependencies(self, event: ReliabilityEvent) -> Dict[str, Any]:
"""Analyze dependency health and potential cascade effects"""
analysis = {
'has_upstream_deps': len(event.upstream_deps) > 0,
'upstream_services': event.upstream_deps,
'potential_cascade': False,
'cascade_risk_score': 0.0
}
# Calculate cascade risk
if event.error_rate > 0.2:
analysis['potential_cascade'] = True
analysis['cascade_risk_score'] = min(1.0, event.error_rate * 2)
if event.latency_p99 > 500:
analysis['potential_cascade'] = True
analysis['cascade_risk_score'] = max(
analysis['cascade_risk_score'],
min(1.0, event.latency_p99 / 1000)
)
return analysis
def _check_temporal_patterns(self, event: ReliabilityEvent) -> Dict[str, Any]:
"""Check for time-based correlations"""
import datetime
current_time = datetime.datetime.now()
hour = current_time.hour
# Check for typical patterns
patterns = {
'time_of_day_correlation': False,
'is_peak_hours': 9 <= hour <= 17, # Business hours
'is_off_hours': hour < 6 or hour > 22,
'deployment_window': 14 <= hour <= 16, # Typical deployment window
'weekend': current_time.weekday() >= 5
}
# Flag potential correlations
if patterns['is_peak_hours'] and event.latency_p99 > 200:
patterns['time_of_day_correlation'] = True
return patterns
class OrchestrationManager:
def __init__(self):
self.agents = {
AgentSpecialization.DETECTIVE: AnomalyDetectionAgent(),
AgentSpecialization.DIAGNOSTICIAN: RootCauseAgent(),
}
self.incident_history = []
async def orchestrate_analysis(self, event: ReliabilityEvent) -> Dict[str, Any]:
"""Coordinate multiple agents for comprehensive analysis"""
agent_tasks = {
spec: agent.analyze(event)
for spec, agent in self.agents.items()
}
# Parallel agent execution with error handling
agent_results = {}
for specialization, task in agent_tasks.items():
try:
result = await asyncio.wait_for(task, timeout=10.0)
agent_results[specialization.value] = result
except asyncio.TimeoutError:
# Agent timeout - continue with others
print(f"Agent {specialization.value} timed out")
continue
except Exception as e:
# Agent error - log and continue
print(f"Agent {specialization.value} error: {e}")
continue
# Synthesize results
return self._synthesize_agent_findings(event, agent_results)
def _synthesize_agent_findings(self, event: ReliabilityEvent, agent_results: Dict) -> Dict[str, Any]:
"""Combine insights from all specialized agents"""
detective_result = agent_results.get(AgentSpecialization.DETECTIVE.value)
diagnostician_result = agent_results.get(AgentSpecialization.DIAGNOSTICIAN.value)
if not detective_result:
return {'error': 'No agent results available'}
# Build comprehensive analysis
synthesis = {
'incident_summary': {
'severity': detective_result.findings.get('severity_tier', 'UNKNOWN'),
'anomaly_confidence': detective_result.confidence,
'primary_metrics_affected': detective_result.findings.get('affected_metrics', [])
},
'root_cause_insights': diagnostician_result.findings if diagnostician_result else {},
'recommended_actions': self._prioritize_actions(
detective_result.recommendations,
diagnostician_result.recommendations if diagnostician_result else []
),
'business_context': self._add_business_context(event, detective_result.confidence),
'agent_metadata': {
'participating_agents': list(agent_results.keys()),
'processing_times': {k: v.processing_time for k, v in agent_results.items()}
}
}
return synthesis
def _prioritize_actions(self, detection_actions: List[str], diagnosis_actions: List[str]) -> List[str]:
"""Combine and prioritize actions from multiple agents"""
all_actions = []
# Add critical actions first (those with 🚨)
critical = [a for a in detection_actions + diagnosis_actions if '🚨' in a]
all_actions.extend(critical)
# Add high priority actions (those with ⚠️)
high = [a for a in detection_actions + diagnosis_actions if '⚠️' in a and a not in all_actions]
all_actions.extend(high)
# Add remaining actions
remaining = [a for a in detection_actions + diagnosis_actions if a not in all_actions]
all_actions.extend(remaining)
# Remove duplicates while preserving order
seen = set()
unique_actions = []
for action in all_actions:
if action not in seen:
seen.add(action)
unique_actions.append(action)
return unique_actions[:5] # Return top 5 actions
def _add_business_context(self, event: ReliabilityEvent, confidence: float) -> Dict[str, Any]:
"""Add business impact context to the analysis"""
# Calculate business severity
if confidence > 0.8:
business_severity = "CRITICAL"
elif confidence > 0.6:
business_severity = "HIGH"
elif confidence > 0.4:
business_severity = "MEDIUM"
else:
business_severity = "LOW"
return {
'business_severity': business_severity,
'estimated_impact': f"{confidence * 100:.0f}% confidence of incident",
'recommended_escalation': confidence > 0.7
}