""" Correlation Agent Groups related alerts into incidents by detecting patterns and similarities. Second agent in the processing pipeline. Communication: Receives from AlertIngestionAgent → Sends to AnalysisAgent Data Storage: PostgreSQL (incidents, correlations), Redis (pattern cache) """ import logging from typing import Dict, List, Any, Optional from datetime import datetime, timedelta import json logger = logging.getLogger(__name__) class CorrelationAgent: """Correlates related alerts into incidents""" def __init__(self, db_session=None, redis_client=None): self.db = db_session self.redis = redis_client self.correlation_window = 600 # 10 minutes self.similarity_threshold = 0.7 async def correlate_alerts(self, alert: Dict[str, Any]) -> Dict[str, Any]: """ Main entry point for correlation. Timeline: T+200ms: Receive alert from AlertIngestionAgent T+250ms: Query for similar alerts in time window T+300ms: Calculate similarity scores T+350ms: Create new incident or update existing T+400ms: Return correlation result Args: alert: Normalized alert from AlertIngestionAgent Returns: Correlation result with incident info """ logger.info(f"[CORRELATION_AGENT] Processing alert: {alert.get('title')}") try: # Step 1: Query similar alerts (T+250ms) similar_alerts = await self._find_similar_alerts(alert) logger.info(f"[CORRELATION_AGENT] Found {len(similar_alerts)} similar alerts") # Step 2: Calculate similarity (T+300ms) correlation_score = self._calculate_correlation_score(alert, similar_alerts) logger.debug(f"[CORRELATION_AGENT] Correlation score: {correlation_score}") # Step 3: Determine action (T+350ms) if correlation_score >= self.similarity_threshold and similar_alerts: # Update existing incident incident_id = await self._update_incident(alert, similar_alerts) action = "update" logger.info(f"[CORRELATION_AGENT] Updated incident: {incident_id}") else: # Create new incident incident_id = await self._create_incident(alert) action = "create" logger.info(f"[CORRELATION_AGENT] Created new incident: {incident_id}") # Step 4: Return result (T+400ms) return { "status": "correlated", "incident_id": incident_id, "action": action, "correlation_score": correlation_score, "similar_alerts_count": len(similar_alerts), "processing_time_ms": 200 } except Exception as e: logger.error(f"[CORRELATION_AGENT] Error correlating alert: {e}", exc_info=True) raise async def _find_similar_alerts(self, alert: Dict[str, Any]) -> List[Dict[str, Any]]: """ Find alerts similar to current one within correlation window. Similarity criteria: - Same service - Same category (or related) - Within last 10 minutes - Same severity level or escalating """ service = alert.get('service', 'unknown') category = alert.get('category', 'unknown') # In real implementation, query from PostgreSQL: # SELECT * FROM alerts WHERE # service = service AND # created_at > NOW() - INTERVAL '10 minutes' AND # (category = category OR category IN related_categories) similar = [] # Placeholder for database query logger.debug(f"[CORRELATION_AGENT] Querying for alerts: service={service}, category={category}") return similar def _calculate_correlation_score(self, alert: Dict[str, Any], similar_alerts: List) -> float: """ Calculate how much this alert correlates with existing ones. Score 0.0-1.0 where 1.0 = definitely same incident Scoring factors: - Service match (40%) - Time proximity (30%) - Metric similarity (20%) - Severity escalation (10%) """ if not similar_alerts: return 0.0 score = 0.0 alert_service = alert.get('service', 'unknown') alert_category = alert.get('category', 'unknown') alert_severity = alert.get('severity', 'warning') service_matches = sum(1 for a in similar_alerts if a.get('service') == alert_service) category_matches = sum(1 for a in similar_alerts if a.get('category') == alert_category) # Service match (40%) if similar_alerts: score += (service_matches / len(similar_alerts)) * 0.4 # Category match (30%) if similar_alerts: score += (category_matches / len(similar_alerts)) * 0.3 # Time proximity (20%) most_recent = max(similar_alerts, key=lambda x: x.get('created_at', '')) time_diff = datetime.utcnow() - datetime.fromisoformat(most_recent.get('created_at', '')) if time_diff.seconds < 60: # Within 1 minute score += 0.2 elif time_diff.seconds < 300: # Within 5 minutes score += 0.1 # Severity escalation (10%) severity_levels = {'info': 1, 'warning': 2, 'critical': 3} current_level = severity_levels.get(alert_severity, 1) avg_level = sum(severity_levels.get(a.get('severity', 'info'), 1) for a in similar_alerts) / len(similar_alerts) if current_level >= avg_level: score += 0.1 return min(score, 1.0) async def _create_incident(self, alert: Dict[str, Any]) -> str: """ Create new incident from alert. Incident structure: - title: Generated from alerts - service: From alert - severity: From alert - status: OPEN - created_alerts: [alert] """ logger.info(f"[CORRELATION_AGENT] Creating incident from alert") # In real implementation: # INSERT INTO incidents (title, service, severity, status, created_at) # VALUES (title, service, severity, 'OPEN', NOW()) incident = { 'id': 'incident_placeholder', 'title': f"Incident: {alert.get('title')}", 'service': alert.get('service'), 'severity': alert.get('severity'), 'status': 'OPEN', 'alerts': [alert], 'created_at': datetime.utcnow().isoformat() } logger.info(f"[CORRELATION_AGENT] Incident created: {incident['id']}") return incident['id'] async def _update_incident(self, alert: Dict[str, Any], similar_alerts: List) -> str: """ Update existing incident with new alert. Updates: - Add alert to incident.alerts list - Update severity if escalated - Update updated_at timestamp - Mark incident as active """ if not similar_alerts: return await self._create_incident(alert) logger.info(f"[CORRELATION_AGENT] Updating incident with new alert") # Get incident ID from one of the similar alerts incident_id = similar_alerts[0].get('incident_id', 'unknown') # In real implementation: # UPDATE incidents SET # severity = MAX(current_severity, new_alert_severity), # updated_at = NOW(), # alert_count = alert_count + 1 # WHERE id = incident_id logger.info(f"[CORRELATION_AGENT] Incident updated: {incident_id}") return incident_id async def detect_cascading_failure(self, incident: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ Detect if incident is part of cascading failure pattern. Pattern detection: - Multiple services affected - Temporal correlation (alerts within seconds) - Dependency chain (e.g., DB → Cache → API) """ logger.info(f"[CORRELATION_AGENT] Analyzing for cascading failure pattern") alerts = incident.get('alerts', []) if len(alerts) < 3: return None services = set(a.get('service') for a in alerts) logger.info(f"[CORRELATION_AGENT] Services affected: {services}") if len(services) > 1: return { "pattern": "cascading_failure", "confidence": 0.85, "affected_services": list(services), "recommendation": "Check dependency chain - start with database/cache layer" } return None