IncidentMgmtSystem / agents /analysis_agent.py
Debashis
Add multi-agent architecture implementation with detailed documentation
ffa310a
"""
Analysis Agent
Performs AI-powered root cause analysis using Ollama LLM.
Third agent in the processing pipeline.
Communication: Receives from CorrelationAgent → Sends to ResponseAgent
Data Storage: PostgreSQL (analysis results), Redis (LLM cache)
"""
import logging
import json
from typing import Dict, Any, Optional, List
from datetime import datetime
logger = logging.getLogger(__name__)
class AnalysisAgent:
"""Performs AI-powered analysis of incidents using Ollama"""
def __init__(self, llm_client=None):
self.llm = llm_client
self.cache_enabled = True
async def analyze_incident(self, incident: Dict[str, Any]) -> Dict[str, Any]:
"""
Main entry point for analysis.
Timeline:
T+400ms: Receive incident from CorrelationAgent
T+450ms: Prepare analysis context
T+500ms: Send to Ollama
T+500-1500ms: Ollama processes (1 second average)
T+1500ms: Parse LLM response
T+1550ms: Return analysis
Args:
incident: Correlated incident with alerts
Returns:
Analysis result with root cause and recommendations
"""
logger.info(f"[ANALYSIS_AGENT] Analyzing incident: {incident.get('id')}")
try:
# Step 1: Prepare context (T+450ms)
context = self._prepare_analysis_context(incident)
logger.debug(f"[ANALYSIS_AGENT] Context prepared: {len(json.dumps(context))} chars")
# Step 2: Build prompt (T+480ms)
prompt = self._build_analysis_prompt(context)
logger.debug(f"[ANALYSIS_AGENT] Prompt built")
# Step 3: Send to Ollama (T+500ms)
if not self.llm:
return self._fallback_analysis(incident)
analysis_result = await self.llm.analyze_incident(
alerts=incident.get('alerts', []),
context=context,
prompt=prompt
)
logger.info(f"[ANALYSIS_AGENT] LLM analysis complete")
# Step 4: Parse and structure response (T+1500ms)
structured_analysis = self._structure_analysis(analysis_result)
# Step 5: Return result (T+1550ms)
return {
"status": "analyzed",
"incident_id": incident.get('id'),
"analysis": structured_analysis,
"processing_time_ms": 1150
}
except Exception as e:
logger.error(f"[ANALYSIS_AGENT] Error analyzing incident: {e}", exc_info=True)
return self._fallback_analysis(incident)
def _prepare_analysis_context(self, incident: Dict[str, Any]) -> Dict[str, Any]:
"""
Prepare comprehensive context for LLM analysis.
Includes:
- Alert timeline
- Metrics
- Service dependencies
- Historical patterns
"""
alerts = incident.get('alerts', [])
context = {
'incident_id': incident.get('id'),
'service': incident.get('service'),
'severity': incident.get('severity'),
'alert_count': len(alerts),
'time_range': {
'start': min(a.get('created_at', '') for a in alerts),
'end': max(a.get('created_at', '') for a in alerts)
},
'alerts_summary': [
{
'title': a.get('title'),
'category': a.get('category'),
'severity': a.get('severity'),
'timestamp': a.get('created_at')
}
for a in alerts
],
'metrics': self._aggregate_metrics(alerts),
'patterns': self._detect_patterns(alerts)
}
return context
def _aggregate_metrics(self, alerts: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Aggregate metrics from all alerts in incident"""
metrics = {}
for alert in alerts:
alert_metrics = alert.get('metrics', {})
if isinstance(alert_metrics, dict):
metrics.update(alert_metrics)
return metrics
def _detect_patterns(self, alerts: List[Dict[str, Any]]) -> List[str]:
"""Detect common patterns in alerts"""
patterns = []
categories = [a.get('category') for a in alerts]
# Pattern: Multiple resource alerts (CPU, Memory, Disk)
resource_alerts = [c for c in categories if c in ['cpu', 'memory', 'disk', 'io']]
if len(resource_alerts) >= 2:
patterns.append("resource_exhaustion")
# Pattern: Connection/Network related
if any(c in ['connection', 'network', 'timeout'] for c in categories):
patterns.append("connectivity_issue")
# Pattern: Application errors
if any(c in ['error', 'crash', 'exception'] for c in categories):
patterns.append("application_failure")
return patterns
def _build_analysis_prompt(self, context: Dict[str, Any]) -> str:
"""Build prompt for Ollama LLM"""
return f"""
You are an expert infrastructure analyst. Analyze this incident and provide:
1. Root cause hypothesis
2. Confidence level (0-100)
3. Top 3 recommended actions
4. Severity assessment
Incident Context:
- Service: {context.get('service')}
- Severity: {context.get('severity')}
- Alert Count: {context.get('alert_count')}
- Patterns Detected: {', '.join(context.get('patterns', []))}
Alert Timeline:
{json.dumps(context.get('alerts_summary', []), indent=2)}
Metrics:
{json.dumps(context.get('metrics', {}), indent=2)}
Provide analysis in JSON format with fields:
- root_cause
- confidence (0-100)
- evidence (list of supporting facts)
- actions (list of 3 recommended actions)
- severity_assessment
"""
async def _structure_analysis(self, llm_response: str) -> Dict[str, Any]:
"""Parse and structure LLM response"""
try:
# Extract JSON from LLM response
import re
json_match = re.search(r'\{.*\}', llm_response, re.DOTALL)
if json_match:
analysis = json.loads(json_match.group())
else:
analysis = {
'root_cause': llm_response,
'confidence': 50,
'evidence': [],
'actions': []
}
return {
'root_cause': analysis.get('root_cause', 'Unknown'),
'confidence': int(analysis.get('confidence', 50)),
'evidence': analysis.get('evidence', []),
'recommended_actions': analysis.get('actions', []),
'severity_assessment': analysis.get('severity_assessment', 'Medium'),
'timestamp': datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"[ANALYSIS_AGENT] Error parsing LLM response: {e}")
return self._fallback_analysis_result()
def _fallback_analysis(self, incident: Dict[str, Any]) -> Dict[str, Any]:
"""Fallback analysis when LLM is unavailable"""
logger.warning(f"[ANALYSIS_AGENT] Using fallback analysis")
return {
"status": "analyzed",
"incident_id": incident.get('id'),
"analysis": self._fallback_analysis_result(),
"mode": "fallback"
}
def _fallback_analysis_result(self) -> Dict[str, Any]:
"""Generate fallback analysis based on patterns"""
return {
'root_cause': 'Unable to determine - LLM unavailable',
'confidence': 0,
'evidence': [],
'recommended_actions': [
'Check application logs',
'Review service metrics',
'Check for recent deployments'
],
'severity_assessment': 'Unknown',
'timestamp': datetime.utcnow().isoformat()
}
async def analyze_alerts(self, alerts: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Analyze individual alerts for patterns and anomalies.
Used by AlertIngestionAgent for pre-processing.
"""
logger.info(f"[ANALYSIS_AGENT] Analyzing {len(alerts)} alerts")
try:
context = {
'alert_count': len(alerts),
'services': list(set(a.get('service') for a in alerts)),
'severities': list(set(a.get('severity') for a in alerts)),
'categories': list(set(a.get('category') for a in alerts))
}
analysis = {
'context': context,
'patterns_detected': self._detect_patterns(alerts),
'risk_level': self._calculate_risk_level(alerts),
'recommended_priority': self._calculate_priority(alerts)
}
return analysis
except Exception as e:
logger.error(f"[ANALYSIS_AGENT] Error in alert analysis: {e}")
return {'error': str(e)}
def _calculate_risk_level(self, alerts: List[Dict[str, Any]]) -> str:
"""Calculate overall risk level from alerts"""
if not alerts:
return 'low'
critical_count = sum(1 for a in alerts if a.get('severity') == 'critical')
warning_count = sum(1 for a in alerts if a.get('severity') == 'warning')
if critical_count >= 3:
return 'critical'
elif critical_count >= 1 or warning_count >= 5:
return 'high'
elif warning_count >= 2:
return 'medium'
else:
return 'low'
def _calculate_priority(self, alerts: List[Dict[str, Any]]) -> int:
"""Calculate priority (1-5) for handling"""
risk = self._calculate_risk_level(alerts)
priority_map = {
'critical': 1,
'high': 2,
'medium': 3,
'low': 5
}
return priority_map.get(risk, 4)