Spaces:
Sleeping
Sleeping
| """ | |
| app/agents/escalation_agent.py | |
| ------------------------------- | |
| EscalationAgent β Autonomy Layer, Step 5. | |
| Triggers when priority is "high"/"critical" OR risk is "high". | |
| Logs structured alerts and simulates an external notification. | |
| In production, replace _notify() with a real webhook/SMS/email call. | |
| Output written to context["alerts"]: | |
| [ | |
| { | |
| "level": "warning" | "critical", | |
| "trigger": str, # what caused this alert | |
| "message": str, # human-readable description | |
| "timestamp": str, # ISO-8601 | |
| "notified": bool, # True if notification was "sent" | |
| }, | |
| ... | |
| ] | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| from datetime import datetime, timezone | |
| from typing import Any | |
| from app.core.base_agent import BaseAgent | |
| logger = logging.getLogger(__name__) | |
| _ESCALATION_PRIORITIES = {"high", "critical"} | |
| _ESCALATION_RISKS = {"high"} | |
| def _now_iso() -> str: | |
| return datetime.now(timezone.utc).isoformat() | |
| class EscalationAgent(BaseAgent): | |
| """Detect high-priority or high-risk situations and raise alerts.""" | |
| name = "EscalationAgent" | |
| input_keys = ["priority", "risk", "errors", "intent", "data"] | |
| output_keys = ["alerts"] | |
| action = "Raise alerts and simulate notifications for critical situations" | |
| def execute(self, context: dict[str, Any]) -> dict[str, Any]: | |
| priority = (context.get("priority") or "low").lower() | |
| risk = context.get("risk", {}) | |
| errors = context.get("errors", []) | |
| intent = (context.get("intent") or "other").lower() | |
| data = context.get("data", {}) | |
| alerts: list[dict[str, Any]] = [] | |
| # ββ Trigger 1: high priority score (was "critical" in old model) βββββ | |
| score = context.get("priority_score", 0) | |
| if score >= 80: | |
| alert = self._build_alert( | |
| level = "critical", | |
| trigger = f"priority_score={score}", | |
| message = ( | |
| f"CRITICAL priority score {score}/100. " | |
| f"Intent: {intent}. " | |
| f"Customer: {data.get('customer', 'unknown')}." | |
| ), | |
| ) | |
| alerts.append(alert) | |
| self._notify(alert) | |
| # ββ Trigger 2: high priority label βββββββββββββββββββββββββββββββββββ | |
| elif priority == "high": | |
| alert = self._build_alert( | |
| level = "warning", | |
| trigger = f"priority=high score={score}", | |
| message = ( | |
| f"High priority transaction flagged. " | |
| f"Intent: {intent}. " | |
| f"Score: {score}/100. " | |
| f"Reason: {context.get('metadata', {}).get('urgency_reason', 'n/a')}." | |
| ), | |
| ) | |
| alerts.append(alert) | |
| self._notify(alert) | |
| # ββ Trigger 3: high risk βββββββββββββββββββββββββββββββββββββββββββββ | |
| if risk.get("level") in _ESCALATION_RISKS: | |
| risk_reasons = "; ".join(risk.get("reasons", [])) | |
| alert = self._build_alert( | |
| level = "warning", | |
| trigger = "risk=high", | |
| message = f"High risk transaction. Reasons: {risk_reasons}", | |
| ) | |
| alerts.append(alert) | |
| self._notify(alert) | |
| # ββ Trigger 4: critical errors in pipeline βββββββββββββββββββββββββββ | |
| critical_errors = [e for e in errors if "[Monitor]" in e] | |
| if len(critical_errors) >= 2: | |
| alert = self._build_alert( | |
| level = "warning", | |
| trigger = "monitor_issues", | |
| message = ( | |
| f"{len(critical_errors)} monitor issue(s) detected: " | |
| f"{critical_errors[0]}" | |
| ), | |
| ) | |
| alerts.append(alert) | |
| context["alerts"] = alerts | |
| if alerts: | |
| logger.warning( | |
| "[EscalationAgent] %d alert(s) raised (priority=%s risk=%s)", | |
| len(alerts), priority, risk.get("level", "unknown") | |
| ) | |
| else: | |
| logger.info("[EscalationAgent] no escalation needed") | |
| return context | |
| def _build_alert(level: str, trigger: str, message: str) -> dict[str, Any]: | |
| return { | |
| "level": level, | |
| "trigger": trigger, | |
| "message": message, | |
| "timestamp": _now_iso(), | |
| "notified": False, # updated by _notify() | |
| } | |
| def _notify(alert: dict[str, Any]) -> None: | |
| """ | |
| Simulate sending an external notification. | |
| Replace this method with a real webhook/SMS/email integration. | |
| """ | |
| alert["notified"] = True | |
| logger.warning( | |
| "[EscalationAgent] π¨ ALERT [%s] %s", | |
| alert["level"].upper(), alert["message"] | |
| ) |