""" Analysis Agent Performs AI-powered root cause analysis using Ollama LLM. Third agent in the processing pipeline. Communication: Receives from CorrelationAgent → Sends to ResponseAgent Data Storage: PostgreSQL (analysis results), Redis (LLM cache) """ import logging import json from typing import Dict, Any, Optional, List from datetime import datetime logger = logging.getLogger(__name__) class AnalysisAgent: """Performs AI-powered analysis of incidents using Ollama""" def __init__(self, llm_client=None): self.llm = llm_client self.cache_enabled = True async def analyze_incident(self, incident: Dict[str, Any]) -> Dict[str, Any]: """ Main entry point for analysis. Timeline: T+400ms: Receive incident from CorrelationAgent T+450ms: Prepare analysis context T+500ms: Send to Ollama T+500-1500ms: Ollama processes (1 second average) T+1500ms: Parse LLM response T+1550ms: Return analysis Args: incident: Correlated incident with alerts Returns: Analysis result with root cause and recommendations """ logger.info(f"[ANALYSIS_AGENT] Analyzing incident: {incident.get('id')}") try: # Step 1: Prepare context (T+450ms) context = self._prepare_analysis_context(incident) logger.debug(f"[ANALYSIS_AGENT] Context prepared: {len(json.dumps(context))} chars") # Step 2: Build prompt (T+480ms) prompt = self._build_analysis_prompt(context) logger.debug(f"[ANALYSIS_AGENT] Prompt built") # Step 3: Send to Ollama (T+500ms) if not self.llm: return self._fallback_analysis(incident) analysis_result = await self.llm.analyze_incident( alerts=incident.get('alerts', []), context=context, prompt=prompt ) logger.info(f"[ANALYSIS_AGENT] LLM analysis complete") # Step 4: Parse and structure response (T+1500ms) structured_analysis = self._structure_analysis(analysis_result) # Step 5: Return result (T+1550ms) return { "status": "analyzed", "incident_id": incident.get('id'), "analysis": structured_analysis, "processing_time_ms": 1150 } except Exception as e: logger.error(f"[ANALYSIS_AGENT] Error analyzing incident: {e}", exc_info=True) return self._fallback_analysis(incident) def _prepare_analysis_context(self, incident: Dict[str, Any]) -> Dict[str, Any]: """ Prepare comprehensive context for LLM analysis. Includes: - Alert timeline - Metrics - Service dependencies - Historical patterns """ alerts = incident.get('alerts', []) context = { 'incident_id': incident.get('id'), 'service': incident.get('service'), 'severity': incident.get('severity'), 'alert_count': len(alerts), 'time_range': { 'start': min(a.get('created_at', '') for a in alerts), 'end': max(a.get('created_at', '') for a in alerts) }, 'alerts_summary': [ { 'title': a.get('title'), 'category': a.get('category'), 'severity': a.get('severity'), 'timestamp': a.get('created_at') } for a in alerts ], 'metrics': self._aggregate_metrics(alerts), 'patterns': self._detect_patterns(alerts) } return context def _aggregate_metrics(self, alerts: List[Dict[str, Any]]) -> Dict[str, Any]: """Aggregate metrics from all alerts in incident""" metrics = {} for alert in alerts: alert_metrics = alert.get('metrics', {}) if isinstance(alert_metrics, dict): metrics.update(alert_metrics) return metrics def _detect_patterns(self, alerts: List[Dict[str, Any]]) -> List[str]: """Detect common patterns in alerts""" patterns = [] categories = [a.get('category') for a in alerts] # Pattern: Multiple resource alerts (CPU, Memory, Disk) resource_alerts = [c for c in categories if c in ['cpu', 'memory', 'disk', 'io']] if len(resource_alerts) >= 2: patterns.append("resource_exhaustion") # Pattern: Connection/Network related if any(c in ['connection', 'network', 'timeout'] for c in categories): patterns.append("connectivity_issue") # Pattern: Application errors if any(c in ['error', 'crash', 'exception'] for c in categories): patterns.append("application_failure") return patterns def _build_analysis_prompt(self, context: Dict[str, Any]) -> str: """Build prompt for Ollama LLM""" return f""" You are an expert infrastructure analyst. Analyze this incident and provide: 1. Root cause hypothesis 2. Confidence level (0-100) 3. Top 3 recommended actions 4. Severity assessment Incident Context: - Service: {context.get('service')} - Severity: {context.get('severity')} - Alert Count: {context.get('alert_count')} - Patterns Detected: {', '.join(context.get('patterns', []))} Alert Timeline: {json.dumps(context.get('alerts_summary', []), indent=2)} Metrics: {json.dumps(context.get('metrics', {}), indent=2)} Provide analysis in JSON format with fields: - root_cause - confidence (0-100) - evidence (list of supporting facts) - actions (list of 3 recommended actions) - severity_assessment """ async def _structure_analysis(self, llm_response: str) -> Dict[str, Any]: """Parse and structure LLM response""" try: # Extract JSON from LLM response import re json_match = re.search(r'\{.*\}', llm_response, re.DOTALL) if json_match: analysis = json.loads(json_match.group()) else: analysis = { 'root_cause': llm_response, 'confidence': 50, 'evidence': [], 'actions': [] } return { 'root_cause': analysis.get('root_cause', 'Unknown'), 'confidence': int(analysis.get('confidence', 50)), 'evidence': analysis.get('evidence', []), 'recommended_actions': analysis.get('actions', []), 'severity_assessment': analysis.get('severity_assessment', 'Medium'), 'timestamp': datetime.utcnow().isoformat() } except Exception as e: logger.error(f"[ANALYSIS_AGENT] Error parsing LLM response: {e}") return self._fallback_analysis_result() def _fallback_analysis(self, incident: Dict[str, Any]) -> Dict[str, Any]: """Fallback analysis when LLM is unavailable""" logger.warning(f"[ANALYSIS_AGENT] Using fallback analysis") return { "status": "analyzed", "incident_id": incident.get('id'), "analysis": self._fallback_analysis_result(), "mode": "fallback" } def _fallback_analysis_result(self) -> Dict[str, Any]: """Generate fallback analysis based on patterns""" return { 'root_cause': 'Unable to determine - LLM unavailable', 'confidence': 0, 'evidence': [], 'recommended_actions': [ 'Check application logs', 'Review service metrics', 'Check for recent deployments' ], 'severity_assessment': 'Unknown', 'timestamp': datetime.utcnow().isoformat() } async def analyze_alerts(self, alerts: List[Dict[str, Any]]) -> Dict[str, Any]: """ Analyze individual alerts for patterns and anomalies. Used by AlertIngestionAgent for pre-processing. """ logger.info(f"[ANALYSIS_AGENT] Analyzing {len(alerts)} alerts") try: context = { 'alert_count': len(alerts), 'services': list(set(a.get('service') for a in alerts)), 'severities': list(set(a.get('severity') for a in alerts)), 'categories': list(set(a.get('category') for a in alerts)) } analysis = { 'context': context, 'patterns_detected': self._detect_patterns(alerts), 'risk_level': self._calculate_risk_level(alerts), 'recommended_priority': self._calculate_priority(alerts) } return analysis except Exception as e: logger.error(f"[ANALYSIS_AGENT] Error in alert analysis: {e}") return {'error': str(e)} def _calculate_risk_level(self, alerts: List[Dict[str, Any]]) -> str: """Calculate overall risk level from alerts""" if not alerts: return 'low' critical_count = sum(1 for a in alerts if a.get('severity') == 'critical') warning_count = sum(1 for a in alerts if a.get('severity') == 'warning') if critical_count >= 3: return 'critical' elif critical_count >= 1 or warning_count >= 5: return 'high' elif warning_count >= 2: return 'medium' else: return 'low' def _calculate_priority(self, alerts: List[Dict[str, Any]]) -> int: """Calculate priority (1-5) for handling""" risk = self._calculate_risk_level(alerts) priority_map = { 'critical': 1, 'high': 2, 'medium': 3, 'low': 5 } return priority_map.get(risk, 4)