Spaces:
Runtime error
Runtime error
| """ | |
| Correlation Agent | |
| Groups related alerts into incidents by detecting patterns and similarities. | |
| Second agent in the processing pipeline. | |
| Communication: Receives from AlertIngestionAgent → Sends to AnalysisAgent | |
| Data Storage: PostgreSQL (incidents, correlations), Redis (pattern cache) | |
| """ | |
| import logging | |
| from typing import Dict, List, Any, Optional | |
| from datetime import datetime, timedelta | |
| import json | |
| logger = logging.getLogger(__name__) | |
| class CorrelationAgent: | |
| """Correlates related alerts into incidents""" | |
| def __init__(self, db_session=None, redis_client=None): | |
| self.db = db_session | |
| self.redis = redis_client | |
| self.correlation_window = 600 # 10 minutes | |
| self.similarity_threshold = 0.7 | |
| async def correlate_alerts(self, alert: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Main entry point for correlation. | |
| Timeline: | |
| T+200ms: Receive alert from AlertIngestionAgent | |
| T+250ms: Query for similar alerts in time window | |
| T+300ms: Calculate similarity scores | |
| T+350ms: Create new incident or update existing | |
| T+400ms: Return correlation result | |
| Args: | |
| alert: Normalized alert from AlertIngestionAgent | |
| Returns: | |
| Correlation result with incident info | |
| """ | |
| logger.info(f"[CORRELATION_AGENT] Processing alert: {alert.get('title')}") | |
| try: | |
| # Step 1: Query similar alerts (T+250ms) | |
| similar_alerts = await self._find_similar_alerts(alert) | |
| logger.info(f"[CORRELATION_AGENT] Found {len(similar_alerts)} similar alerts") | |
| # Step 2: Calculate similarity (T+300ms) | |
| correlation_score = self._calculate_correlation_score(alert, similar_alerts) | |
| logger.debug(f"[CORRELATION_AGENT] Correlation score: {correlation_score}") | |
| # Step 3: Determine action (T+350ms) | |
| if correlation_score >= self.similarity_threshold and similar_alerts: | |
| # Update existing incident | |
| incident_id = await self._update_incident(alert, similar_alerts) | |
| action = "update" | |
| logger.info(f"[CORRELATION_AGENT] Updated incident: {incident_id}") | |
| else: | |
| # Create new incident | |
| incident_id = await self._create_incident(alert) | |
| action = "create" | |
| logger.info(f"[CORRELATION_AGENT] Created new incident: {incident_id}") | |
| # Step 4: Return result (T+400ms) | |
| return { | |
| "status": "correlated", | |
| "incident_id": incident_id, | |
| "action": action, | |
| "correlation_score": correlation_score, | |
| "similar_alerts_count": len(similar_alerts), | |
| "processing_time_ms": 200 | |
| } | |
| except Exception as e: | |
| logger.error(f"[CORRELATION_AGENT] Error correlating alert: {e}", exc_info=True) | |
| raise | |
| async def _find_similar_alerts(self, alert: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| """ | |
| Find alerts similar to current one within correlation window. | |
| Similarity criteria: | |
| - Same service | |
| - Same category (or related) | |
| - Within last 10 minutes | |
| - Same severity level or escalating | |
| """ | |
| service = alert.get('service', 'unknown') | |
| category = alert.get('category', 'unknown') | |
| # In real implementation, query from PostgreSQL: | |
| # SELECT * FROM alerts WHERE | |
| # service = service AND | |
| # created_at > NOW() - INTERVAL '10 minutes' AND | |
| # (category = category OR category IN related_categories) | |
| similar = [] | |
| # Placeholder for database query | |
| logger.debug(f"[CORRELATION_AGENT] Querying for alerts: service={service}, category={category}") | |
| return similar | |
| def _calculate_correlation_score(self, alert: Dict[str, Any], similar_alerts: List) -> float: | |
| """ | |
| Calculate how much this alert correlates with existing ones. | |
| Score 0.0-1.0 where 1.0 = definitely same incident | |
| Scoring factors: | |
| - Service match (40%) | |
| - Time proximity (30%) | |
| - Metric similarity (20%) | |
| - Severity escalation (10%) | |
| """ | |
| if not similar_alerts: | |
| return 0.0 | |
| score = 0.0 | |
| alert_service = alert.get('service', 'unknown') | |
| alert_category = alert.get('category', 'unknown') | |
| alert_severity = alert.get('severity', 'warning') | |
| service_matches = sum(1 for a in similar_alerts if a.get('service') == alert_service) | |
| category_matches = sum(1 for a in similar_alerts if a.get('category') == alert_category) | |
| # Service match (40%) | |
| if similar_alerts: | |
| score += (service_matches / len(similar_alerts)) * 0.4 | |
| # Category match (30%) | |
| if similar_alerts: | |
| score += (category_matches / len(similar_alerts)) * 0.3 | |
| # Time proximity (20%) | |
| most_recent = max(similar_alerts, key=lambda x: x.get('created_at', '')) | |
| time_diff = datetime.utcnow() - datetime.fromisoformat(most_recent.get('created_at', '')) | |
| if time_diff.seconds < 60: # Within 1 minute | |
| score += 0.2 | |
| elif time_diff.seconds < 300: # Within 5 minutes | |
| score += 0.1 | |
| # Severity escalation (10%) | |
| severity_levels = {'info': 1, 'warning': 2, 'critical': 3} | |
| current_level = severity_levels.get(alert_severity, 1) | |
| avg_level = sum(severity_levels.get(a.get('severity', 'info'), 1) for a in similar_alerts) / len(similar_alerts) | |
| if current_level >= avg_level: | |
| score += 0.1 | |
| return min(score, 1.0) | |
| async def _create_incident(self, alert: Dict[str, Any]) -> str: | |
| """ | |
| Create new incident from alert. | |
| Incident structure: | |
| - title: Generated from alerts | |
| - service: From alert | |
| - severity: From alert | |
| - status: OPEN | |
| - created_alerts: [alert] | |
| """ | |
| logger.info(f"[CORRELATION_AGENT] Creating incident from alert") | |
| # In real implementation: | |
| # INSERT INTO incidents (title, service, severity, status, created_at) | |
| # VALUES (title, service, severity, 'OPEN', NOW()) | |
| incident = { | |
| 'id': 'incident_placeholder', | |
| 'title': f"Incident: {alert.get('title')}", | |
| 'service': alert.get('service'), | |
| 'severity': alert.get('severity'), | |
| 'status': 'OPEN', | |
| 'alerts': [alert], | |
| 'created_at': datetime.utcnow().isoformat() | |
| } | |
| logger.info(f"[CORRELATION_AGENT] Incident created: {incident['id']}") | |
| return incident['id'] | |
| async def _update_incident(self, alert: Dict[str, Any], similar_alerts: List) -> str: | |
| """ | |
| Update existing incident with new alert. | |
| Updates: | |
| - Add alert to incident.alerts list | |
| - Update severity if escalated | |
| - Update updated_at timestamp | |
| - Mark incident as active | |
| """ | |
| if not similar_alerts: | |
| return await self._create_incident(alert) | |
| logger.info(f"[CORRELATION_AGENT] Updating incident with new alert") | |
| # Get incident ID from one of the similar alerts | |
| incident_id = similar_alerts[0].get('incident_id', 'unknown') | |
| # In real implementation: | |
| # UPDATE incidents SET | |
| # severity = MAX(current_severity, new_alert_severity), | |
| # updated_at = NOW(), | |
| # alert_count = alert_count + 1 | |
| # WHERE id = incident_id | |
| logger.info(f"[CORRELATION_AGENT] Incident updated: {incident_id}") | |
| return incident_id | |
| async def detect_cascading_failure(self, incident: Dict[str, Any]) -> Optional[Dict[str, Any]]: | |
| """ | |
| Detect if incident is part of cascading failure pattern. | |
| Pattern detection: | |
| - Multiple services affected | |
| - Temporal correlation (alerts within seconds) | |
| - Dependency chain (e.g., DB → Cache → API) | |
| """ | |
| logger.info(f"[CORRELATION_AGENT] Analyzing for cascading failure pattern") | |
| alerts = incident.get('alerts', []) | |
| if len(alerts) < 3: | |
| return None | |
| services = set(a.get('service') for a in alerts) | |
| logger.info(f"[CORRELATION_AGENT] Services affected: {services}") | |
| if len(services) > 1: | |
| return { | |
| "pattern": "cascading_failure", | |
| "confidence": 0.85, | |
| "affected_services": list(services), | |
| "recommendation": "Check dependency chain - start with database/cache layer" | |
| } | |
| return None | |