IncidentMgmtSystem / agents /correlation_agent.py
Debashis
Add multi-agent architecture implementation with detailed documentation
ffa310a
"""
Correlation Agent
Groups related alerts into incidents by detecting patterns and similarities.
Second agent in the processing pipeline.
Communication: Receives from AlertIngestionAgent → Sends to AnalysisAgent
Data Storage: PostgreSQL (incidents, correlations), Redis (pattern cache)
"""
import logging
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import json
logger = logging.getLogger(__name__)
class CorrelationAgent:
"""Correlates related alerts into incidents"""
def __init__(self, db_session=None, redis_client=None):
self.db = db_session
self.redis = redis_client
self.correlation_window = 600 # 10 minutes
self.similarity_threshold = 0.7
async def correlate_alerts(self, alert: Dict[str, Any]) -> Dict[str, Any]:
"""
Main entry point for correlation.
Timeline:
T+200ms: Receive alert from AlertIngestionAgent
T+250ms: Query for similar alerts in time window
T+300ms: Calculate similarity scores
T+350ms: Create new incident or update existing
T+400ms: Return correlation result
Args:
alert: Normalized alert from AlertIngestionAgent
Returns:
Correlation result with incident info
"""
logger.info(f"[CORRELATION_AGENT] Processing alert: {alert.get('title')}")
try:
# Step 1: Query similar alerts (T+250ms)
similar_alerts = await self._find_similar_alerts(alert)
logger.info(f"[CORRELATION_AGENT] Found {len(similar_alerts)} similar alerts")
# Step 2: Calculate similarity (T+300ms)
correlation_score = self._calculate_correlation_score(alert, similar_alerts)
logger.debug(f"[CORRELATION_AGENT] Correlation score: {correlation_score}")
# Step 3: Determine action (T+350ms)
if correlation_score >= self.similarity_threshold and similar_alerts:
# Update existing incident
incident_id = await self._update_incident(alert, similar_alerts)
action = "update"
logger.info(f"[CORRELATION_AGENT] Updated incident: {incident_id}")
else:
# Create new incident
incident_id = await self._create_incident(alert)
action = "create"
logger.info(f"[CORRELATION_AGENT] Created new incident: {incident_id}")
# Step 4: Return result (T+400ms)
return {
"status": "correlated",
"incident_id": incident_id,
"action": action,
"correlation_score": correlation_score,
"similar_alerts_count": len(similar_alerts),
"processing_time_ms": 200
}
except Exception as e:
logger.error(f"[CORRELATION_AGENT] Error correlating alert: {e}", exc_info=True)
raise
async def _find_similar_alerts(self, alert: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Find alerts similar to current one within correlation window.
Similarity criteria:
- Same service
- Same category (or related)
- Within last 10 minutes
- Same severity level or escalating
"""
service = alert.get('service', 'unknown')
category = alert.get('category', 'unknown')
# In real implementation, query from PostgreSQL:
# SELECT * FROM alerts WHERE
# service = service AND
# created_at > NOW() - INTERVAL '10 minutes' AND
# (category = category OR category IN related_categories)
similar = []
# Placeholder for database query
logger.debug(f"[CORRELATION_AGENT] Querying for alerts: service={service}, category={category}")
return similar
def _calculate_correlation_score(self, alert: Dict[str, Any], similar_alerts: List) -> float:
"""
Calculate how much this alert correlates with existing ones.
Score 0.0-1.0 where 1.0 = definitely same incident
Scoring factors:
- Service match (40%)
- Time proximity (30%)
- Metric similarity (20%)
- Severity escalation (10%)
"""
if not similar_alerts:
return 0.0
score = 0.0
alert_service = alert.get('service', 'unknown')
alert_category = alert.get('category', 'unknown')
alert_severity = alert.get('severity', 'warning')
service_matches = sum(1 for a in similar_alerts if a.get('service') == alert_service)
category_matches = sum(1 for a in similar_alerts if a.get('category') == alert_category)
# Service match (40%)
if similar_alerts:
score += (service_matches / len(similar_alerts)) * 0.4
# Category match (30%)
if similar_alerts:
score += (category_matches / len(similar_alerts)) * 0.3
# Time proximity (20%)
most_recent = max(similar_alerts, key=lambda x: x.get('created_at', ''))
time_diff = datetime.utcnow() - datetime.fromisoformat(most_recent.get('created_at', ''))
if time_diff.seconds < 60: # Within 1 minute
score += 0.2
elif time_diff.seconds < 300: # Within 5 minutes
score += 0.1
# Severity escalation (10%)
severity_levels = {'info': 1, 'warning': 2, 'critical': 3}
current_level = severity_levels.get(alert_severity, 1)
avg_level = sum(severity_levels.get(a.get('severity', 'info'), 1) for a in similar_alerts) / len(similar_alerts)
if current_level >= avg_level:
score += 0.1
return min(score, 1.0)
async def _create_incident(self, alert: Dict[str, Any]) -> str:
"""
Create new incident from alert.
Incident structure:
- title: Generated from alerts
- service: From alert
- severity: From alert
- status: OPEN
- created_alerts: [alert]
"""
logger.info(f"[CORRELATION_AGENT] Creating incident from alert")
# In real implementation:
# INSERT INTO incidents (title, service, severity, status, created_at)
# VALUES (title, service, severity, 'OPEN', NOW())
incident = {
'id': 'incident_placeholder',
'title': f"Incident: {alert.get('title')}",
'service': alert.get('service'),
'severity': alert.get('severity'),
'status': 'OPEN',
'alerts': [alert],
'created_at': datetime.utcnow().isoformat()
}
logger.info(f"[CORRELATION_AGENT] Incident created: {incident['id']}")
return incident['id']
async def _update_incident(self, alert: Dict[str, Any], similar_alerts: List) -> str:
"""
Update existing incident with new alert.
Updates:
- Add alert to incident.alerts list
- Update severity if escalated
- Update updated_at timestamp
- Mark incident as active
"""
if not similar_alerts:
return await self._create_incident(alert)
logger.info(f"[CORRELATION_AGENT] Updating incident with new alert")
# Get incident ID from one of the similar alerts
incident_id = similar_alerts[0].get('incident_id', 'unknown')
# In real implementation:
# UPDATE incidents SET
# severity = MAX(current_severity, new_alert_severity),
# updated_at = NOW(),
# alert_count = alert_count + 1
# WHERE id = incident_id
logger.info(f"[CORRELATION_AGENT] Incident updated: {incident_id}")
return incident_id
async def detect_cascading_failure(self, incident: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
Detect if incident is part of cascading failure pattern.
Pattern detection:
- Multiple services affected
- Temporal correlation (alerts within seconds)
- Dependency chain (e.g., DB → Cache → API)
"""
logger.info(f"[CORRELATION_AGENT] Analyzing for cascading failure pattern")
alerts = incident.get('alerts', [])
if len(alerts) < 3:
return None
services = set(a.get('service') for a in alerts)
logger.info(f"[CORRELATION_AGENT] Services affected: {services}")
if len(services) > 1:
return {
"pattern": "cascading_failure",
"confidence": 0.85,
"affected_services": list(services),
"recommendation": "Check dependency chain - start with database/cache layer"
}
return None