Spaces:
Runtime error
Runtime error
| """ | |
| Analysis Agent | |
| Performs AI-powered root cause analysis using Ollama LLM. | |
| Third agent in the processing pipeline. | |
| Communication: Receives from CorrelationAgent → Sends to ResponseAgent | |
| Data Storage: PostgreSQL (analysis results), Redis (LLM cache) | |
| """ | |
| import logging | |
| import json | |
| from typing import Dict, Any, Optional, List | |
| from datetime import datetime | |
| logger = logging.getLogger(__name__) | |
| class AnalysisAgent: | |
| """Performs AI-powered analysis of incidents using Ollama""" | |
| def __init__(self, llm_client=None): | |
| self.llm = llm_client | |
| self.cache_enabled = True | |
| async def analyze_incident(self, incident: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Main entry point for analysis. | |
| Timeline: | |
| T+400ms: Receive incident from CorrelationAgent | |
| T+450ms: Prepare analysis context | |
| T+500ms: Send to Ollama | |
| T+500-1500ms: Ollama processes (1 second average) | |
| T+1500ms: Parse LLM response | |
| T+1550ms: Return analysis | |
| Args: | |
| incident: Correlated incident with alerts | |
| Returns: | |
| Analysis result with root cause and recommendations | |
| """ | |
| logger.info(f"[ANALYSIS_AGENT] Analyzing incident: {incident.get('id')}") | |
| try: | |
| # Step 1: Prepare context (T+450ms) | |
| context = self._prepare_analysis_context(incident) | |
| logger.debug(f"[ANALYSIS_AGENT] Context prepared: {len(json.dumps(context))} chars") | |
| # Step 2: Build prompt (T+480ms) | |
| prompt = self._build_analysis_prompt(context) | |
| logger.debug(f"[ANALYSIS_AGENT] Prompt built") | |
| # Step 3: Send to Ollama (T+500ms) | |
| if not self.llm: | |
| return self._fallback_analysis(incident) | |
| analysis_result = await self.llm.analyze_incident( | |
| alerts=incident.get('alerts', []), | |
| context=context, | |
| prompt=prompt | |
| ) | |
| logger.info(f"[ANALYSIS_AGENT] LLM analysis complete") | |
| # Step 4: Parse and structure response (T+1500ms) | |
| structured_analysis = self._structure_analysis(analysis_result) | |
| # Step 5: Return result (T+1550ms) | |
| return { | |
| "status": "analyzed", | |
| "incident_id": incident.get('id'), | |
| "analysis": structured_analysis, | |
| "processing_time_ms": 1150 | |
| } | |
| except Exception as e: | |
| logger.error(f"[ANALYSIS_AGENT] Error analyzing incident: {e}", exc_info=True) | |
| return self._fallback_analysis(incident) | |
| def _prepare_analysis_context(self, incident: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Prepare comprehensive context for LLM analysis. | |
| Includes: | |
| - Alert timeline | |
| - Metrics | |
| - Service dependencies | |
| - Historical patterns | |
| """ | |
| alerts = incident.get('alerts', []) | |
| context = { | |
| 'incident_id': incident.get('id'), | |
| 'service': incident.get('service'), | |
| 'severity': incident.get('severity'), | |
| 'alert_count': len(alerts), | |
| 'time_range': { | |
| 'start': min(a.get('created_at', '') for a in alerts), | |
| 'end': max(a.get('created_at', '') for a in alerts) | |
| }, | |
| 'alerts_summary': [ | |
| { | |
| 'title': a.get('title'), | |
| 'category': a.get('category'), | |
| 'severity': a.get('severity'), | |
| 'timestamp': a.get('created_at') | |
| } | |
| for a in alerts | |
| ], | |
| 'metrics': self._aggregate_metrics(alerts), | |
| 'patterns': self._detect_patterns(alerts) | |
| } | |
| return context | |
| def _aggregate_metrics(self, alerts: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """Aggregate metrics from all alerts in incident""" | |
| metrics = {} | |
| for alert in alerts: | |
| alert_metrics = alert.get('metrics', {}) | |
| if isinstance(alert_metrics, dict): | |
| metrics.update(alert_metrics) | |
| return metrics | |
| def _detect_patterns(self, alerts: List[Dict[str, Any]]) -> List[str]: | |
| """Detect common patterns in alerts""" | |
| patterns = [] | |
| categories = [a.get('category') for a in alerts] | |
| # Pattern: Multiple resource alerts (CPU, Memory, Disk) | |
| resource_alerts = [c for c in categories if c in ['cpu', 'memory', 'disk', 'io']] | |
| if len(resource_alerts) >= 2: | |
| patterns.append("resource_exhaustion") | |
| # Pattern: Connection/Network related | |
| if any(c in ['connection', 'network', 'timeout'] for c in categories): | |
| patterns.append("connectivity_issue") | |
| # Pattern: Application errors | |
| if any(c in ['error', 'crash', 'exception'] for c in categories): | |
| patterns.append("application_failure") | |
| return patterns | |
| def _build_analysis_prompt(self, context: Dict[str, Any]) -> str: | |
| """Build prompt for Ollama LLM""" | |
| return f""" | |
| You are an expert infrastructure analyst. Analyze this incident and provide: | |
| 1. Root cause hypothesis | |
| 2. Confidence level (0-100) | |
| 3. Top 3 recommended actions | |
| 4. Severity assessment | |
| Incident Context: | |
| - Service: {context.get('service')} | |
| - Severity: {context.get('severity')} | |
| - Alert Count: {context.get('alert_count')} | |
| - Patterns Detected: {', '.join(context.get('patterns', []))} | |
| Alert Timeline: | |
| {json.dumps(context.get('alerts_summary', []), indent=2)} | |
| Metrics: | |
| {json.dumps(context.get('metrics', {}), indent=2)} | |
| Provide analysis in JSON format with fields: | |
| - root_cause | |
| - confidence (0-100) | |
| - evidence (list of supporting facts) | |
| - actions (list of 3 recommended actions) | |
| - severity_assessment | |
| """ | |
| async def _structure_analysis(self, llm_response: str) -> Dict[str, Any]: | |
| """Parse and structure LLM response""" | |
| try: | |
| # Extract JSON from LLM response | |
| import re | |
| json_match = re.search(r'\{.*\}', llm_response, re.DOTALL) | |
| if json_match: | |
| analysis = json.loads(json_match.group()) | |
| else: | |
| analysis = { | |
| 'root_cause': llm_response, | |
| 'confidence': 50, | |
| 'evidence': [], | |
| 'actions': [] | |
| } | |
| return { | |
| 'root_cause': analysis.get('root_cause', 'Unknown'), | |
| 'confidence': int(analysis.get('confidence', 50)), | |
| 'evidence': analysis.get('evidence', []), | |
| 'recommended_actions': analysis.get('actions', []), | |
| 'severity_assessment': analysis.get('severity_assessment', 'Medium'), | |
| 'timestamp': datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"[ANALYSIS_AGENT] Error parsing LLM response: {e}") | |
| return self._fallback_analysis_result() | |
| def _fallback_analysis(self, incident: Dict[str, Any]) -> Dict[str, Any]: | |
| """Fallback analysis when LLM is unavailable""" | |
| logger.warning(f"[ANALYSIS_AGENT] Using fallback analysis") | |
| return { | |
| "status": "analyzed", | |
| "incident_id": incident.get('id'), | |
| "analysis": self._fallback_analysis_result(), | |
| "mode": "fallback" | |
| } | |
| def _fallback_analysis_result(self) -> Dict[str, Any]: | |
| """Generate fallback analysis based on patterns""" | |
| return { | |
| 'root_cause': 'Unable to determine - LLM unavailable', | |
| 'confidence': 0, | |
| 'evidence': [], | |
| 'recommended_actions': [ | |
| 'Check application logs', | |
| 'Review service metrics', | |
| 'Check for recent deployments' | |
| ], | |
| 'severity_assessment': 'Unknown', | |
| 'timestamp': datetime.utcnow().isoformat() | |
| } | |
| async def analyze_alerts(self, alerts: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """ | |
| Analyze individual alerts for patterns and anomalies. | |
| Used by AlertIngestionAgent for pre-processing. | |
| """ | |
| logger.info(f"[ANALYSIS_AGENT] Analyzing {len(alerts)} alerts") | |
| try: | |
| context = { | |
| 'alert_count': len(alerts), | |
| 'services': list(set(a.get('service') for a in alerts)), | |
| 'severities': list(set(a.get('severity') for a in alerts)), | |
| 'categories': list(set(a.get('category') for a in alerts)) | |
| } | |
| analysis = { | |
| 'context': context, | |
| 'patterns_detected': self._detect_patterns(alerts), | |
| 'risk_level': self._calculate_risk_level(alerts), | |
| 'recommended_priority': self._calculate_priority(alerts) | |
| } | |
| return analysis | |
| except Exception as e: | |
| logger.error(f"[ANALYSIS_AGENT] Error in alert analysis: {e}") | |
| return {'error': str(e)} | |
| def _calculate_risk_level(self, alerts: List[Dict[str, Any]]) -> str: | |
| """Calculate overall risk level from alerts""" | |
| if not alerts: | |
| return 'low' | |
| critical_count = sum(1 for a in alerts if a.get('severity') == 'critical') | |
| warning_count = sum(1 for a in alerts if a.get('severity') == 'warning') | |
| if critical_count >= 3: | |
| return 'critical' | |
| elif critical_count >= 1 or warning_count >= 5: | |
| return 'high' | |
| elif warning_count >= 2: | |
| return 'medium' | |
| else: | |
| return 'low' | |
| def _calculate_priority(self, alerts: List[Dict[str, Any]]) -> int: | |
| """Calculate priority (1-5) for handling""" | |
| risk = self._calculate_risk_level(alerts) | |
| priority_map = { | |
| 'critical': 1, | |
| 'high': 2, | |
| 'medium': 3, | |
| 'low': 5 | |
| } | |
| return priority_map.get(risk, 4) | |