| """Communication tool wrappers — Slack updates + postmortem generation.""" |
|
|
| import json |
| import logging |
| import os |
| import time |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from typing import Any |
|
|
| import requests |
| from jinja2 import Template |
|
|
| log = logging.getLogger("atlasops.comms") |
|
|
|
|
| SLACK_WEBHOOK = os.getenv("SLACK_WEBHOOK_URL", "") |
| DISCORD_WEBHOOK = os.getenv("DISCORD_WEBHOOK_URL", "") |
| POSTMORTEM_DIR = Path(os.getenv("POSTMORTEM_DIR", "docs/postmortems")) |
|
|
| _SEV_COLOR_HEX = {"P0": "ff0000", "P1": "ff8800", "P2": "ffcc00"} |
| _LOG_PATH = Path("data/slack_posts.jsonl") |
|
|
|
|
| def _discord_webhook_post_with_retry(url: str, json_body: dict[str, Any], *, context: str) -> tuple[bool, str | None]: |
| """POST to a Discord Incoming Webhook; retry on 429 (burst limit) and transient 5xx. |
| |
| Incoming webhooks are easy to saturate: one scenario can emit approval embed + closure embed |
| + every-run ping within seconds → Discord returns 429 with Retry-After. |
| """ |
| last_err: str | None = None |
| max_attempts = 8 |
| for attempt in range(max_attempts): |
| try: |
| r = requests.post(url, json=json_body, timeout=20) |
| if r.status_code == 429: |
| ra_raw = r.headers.get("Retry-After") |
| try: |
| wait = float(ra_raw) if ra_raw is not None else 2.0 |
| except (TypeError, ValueError): |
| wait = 2.0 |
| wait = min(max(wait, 0.5), 60.0) |
| log.warning( |
| "Discord 429 (%s); sleeping %.1fs then retry (%d/%d)", |
| context, wait, attempt + 1, max_attempts, |
| ) |
| time.sleep(wait) |
| continue |
| if 500 <= r.status_code < 600 and attempt < max_attempts - 1: |
| time.sleep(1.0 + attempt * 0.5) |
| continue |
| r.raise_for_status() |
| return True, None |
| except requests.RequestException as e: |
| last_err = str(e) |
| if attempt < max_attempts - 1: |
| time.sleep(1.0 + attempt * 0.75) |
| continue |
| log.warning("Discord webhook failed after retries (%s): %s", context, last_err) |
| return False, last_err |
|
|
|
|
| def discord_scenario_run_ping( |
| incident_id: str, |
| alert: dict[str, Any], |
| *, |
| resolved: bool, |
| triage_title: str, |
| triage_severity: str, |
| error: str | None = None, |
| ) -> dict[str, Any]: |
| """Always send **one** short Discord embed per incident run when `DISCORD_WEBHOOK_URL` is set. |
| |
| Separate from agent `slack_post_update` calls — survives LLM skips and shows up every scenario. |
| Set `ATLASOPS_DISCORD_EVERY_RUN_PING=0` to disable. |
| |
| Returns a small dict for logging/tests; failures are swallowed after logging except in strict flows. |
| """ |
| url = os.getenv("DISCORD_WEBHOOK_URL", "").strip() |
| disabled = os.getenv("ATLASOPS_DISCORD_EVERY_RUN_PING", "1").strip().lower() in ("0", "false", "no", "off") |
| if not url or disabled: |
| return {"ok": False, "skipped": True, "reason": "no webhook or disabled"} |
|
|
| scenario_id = _trunc(str(alert.get("scenario_id") or alert.get("commonLabels", {}).get("scenario_id") or ""), 200) |
| alertname = _trunc(str(alert.get("commonLabels", {}).get("alertname") or "live-alert"), 200) |
| triage_title = _trunc(str(triage_title or ""), 200) or "(no title yet)" |
| sev_disp = _trunc(str(triage_severity or "—"), 8) |
|
|
| if error: |
| color = int("ED4245", 16) |
| footer = "AtlasOps · scenario run ended with exception" |
| status_line = "**Run ended with error** (see coordinator logs)." |
| elif resolved: |
| color = int("57F287", 16) |
| footer = "AtlasOps · scenario run finished" |
| status_line = "**Pipeline completed** (remediation outcome: resolved)." |
| else: |
| color = int("FEE75C", 16) |
| footer = "AtlasOps · scenario run finished" |
| status_line = "**Pipeline finished** — not flagged resolved (manual / partial / escalation)." |
|
|
| scenario_display = scenario_id if scenario_id.strip() else "—" |
| desc_lines = [ |
| f"**{status_line}**", |
| f"**Incident** `{incident_id}`", |
| f"**Alert** {alertname}", |
| f"**Scenario / inject** `{scenario_display}`", |
| f"**Triage severity** `{sev_disp}` — **topic** {_trunc(triage_title, 300)}", |
| ] |
| if error: |
| desc_lines.append("") |
| desc_lines.append("```") |
| desc_lines.append(_trunc(error, 900)) |
| desc_lines.append("```") |
|
|
| body = { |
| "username": _trunc(os.getenv("DISCORD_BOT_USERNAME", "atlasops-bot"), 80), |
| "embeds": [{ |
| "title": _trunc(f"Scenario run complete · [{sev_disp}]", 256), |
| "description": _trunc("\n".join(desc_lines), 3900), |
| "color": color, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "footer": {"text": _trunc(footer, 2048)}, |
| }], |
| } |
|
|
| ok, err = _discord_webhook_post_with_retry(url, body, context=f"every-run ping {incident_id}") |
| if ok: |
| log.info("Discord every-run ping sent for incident %s", incident_id) |
| return {"ok": True, "sent": True, "mode": "discord_ping"} |
| return {"ok": False, "error": err or "unknown"} |
|
|
|
|
| def _build_slack_payload(channel: str, severity: str, title: str, |
| summary: str, action_items: list[str]) -> dict: |
| return { |
| "channel": channel, |
| "username": "atlasops-bot", |
| "icon_emoji": ":rotating_light:" if severity in ("P0", "P1") else ":warning:", |
| "attachments": [{ |
| "color": "#" + _SEV_COLOR_HEX.get(severity, "888888"), |
| "title": f"[{severity}] {title}", |
| "text": summary, |
| "fields": ( |
| [{"title": "Action Items", |
| "value": "\n".join(f"• {a}" for a in action_items)}] |
| if action_items else [] |
| ), |
| "ts": int(datetime.now(timezone.utc).timestamp()), |
| }], |
| } |
|
|
|
|
| def _trunc(s: str, max_len: int) -> str: |
| s = (s or "").strip() |
| return s if len(s) <= max_len else s[: max_len - 1] + "…" |
|
|
|
|
| def _post_to_discord(slack_payload: dict) -> None: |
| """Convert Slack payload to Discord embed and POST.""" |
| att = slack_payload["attachments"][0] |
| raw_title = att.get("title") or "[P3] Incident" |
| sev_key = raw_title.split("]")[0].lstrip("[").strip() if "]" in raw_title else "P3" |
| try: |
| color_int = int(_SEV_COLOR_HEX.get(sev_key, "888888"), 16) |
| except ValueError: |
| color_int = int("888888", 16) |
| fields_raw = [] |
| for f in att.get("fields", []) or []: |
| if not f.get("value"): |
| continue |
| fields_raw.append({ |
| "name": _trunc(str(f.get("title", "")), 256), |
| "value": _trunc(str(f["value"]), 1024), |
| "inline": False, |
| }) |
| discord_payload = { |
| "username": _trunc(slack_payload.get("username", "atlasops-bot"), 80), |
| "embeds": [{ |
| "title": _trunc(raw_title, 256), |
| "description": _trunc(att.get("text", ""), 4000), |
| "color": color_int, |
| "fields": fields_raw[:25], |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "footer": {"text": "AtlasOps · AMD MI300X"}, |
| }], |
| } |
| ok, err = _discord_webhook_post_with_retry(DISCORD_WEBHOOK, discord_payload, context="slack-style embed") |
| if not ok: |
| log.warning("Discord webhook (embed) failed: %s", err) |
| raise requests.RequestException(err or "Discord webhook failed") |
|
|
|
|
| def slack_post_update(channel: str, severity: str, title: str, summary: str, |
| action_items: list[str] | None = None) -> dict[str, Any]: |
| """Post an incident update. |
| |
| Always writes to local log (powers the UI feed). |
| Also delivers to Slack if SLACK_WEBHOOK_URL is set. |
| Also delivers to Discord if DISCORD_WEBHOOK_URL is set. |
| """ |
| payload = _build_slack_payload(channel, severity, title, summary, action_items or []) |
|
|
| |
| _LOG_PATH.parent.mkdir(parents=True, exist_ok=True) |
| with _LOG_PATH.open("a", encoding="utf-8") as f: |
| f.write(json.dumps(payload) + "\n") |
|
|
| |
| |
| modes: list[str] = ["logged_locally"] |
| errors: list[str] = [] |
|
|
| if SLACK_WEBHOOK: |
| try: |
| r = requests.post(SLACK_WEBHOOK, json=payload, timeout=10) |
| r.raise_for_status() |
| modes.append("slack") |
| except requests.RequestException as e: |
| errors.append(f"slack: {e}") |
|
|
| if DISCORD_WEBHOOK: |
| try: |
| _post_to_discord(payload) |
| modes.append("discord") |
| log.info("Discord embed delivered for [%s]", (payload["attachments"][0].get("title") or "")[:80]) |
| except requests.RequestException as e: |
| err = f"discord: {e}" |
| errors.append(err) |
| log.warning(err) |
|
|
| return { |
| "success": True, |
| "mode": "+".join(modes), |
| **({"errors": errors} if errors else {}), |
| } |
|
|
|
|
| POSTMORTEM_TEMPLATE = """# Postmortem: {{ title }} |
| |
| **Date:** {{ date }} |
| **Severity:** {{ severity }} |
| **Duration:** {{ duration }} |
| **Authors:** {{ authors }} |
| |
| ## Summary |
| {{ summary }} |
| |
| ## Impact |
| {{ impact }} |
| |
| ## Timeline (UTC) |
| {% for entry in timeline -%} |
| - **{{ entry.time }}** — {{ entry.event }} |
| {% endfor %} |
| |
| ## Root Cause |
| {{ root_cause }} |
| |
| ## Detection |
| {{ detection }} |
| |
| ## Resolution |
| {{ resolution }} |
| |
| ## What Went Well |
| {% for item in went_well -%} |
| - {{ item }} |
| {% endfor %} |
| |
| ## What Went Wrong |
| {% for item in went_wrong -%} |
| - {{ item }} |
| {% endfor %} |
| |
| ## Action Items |
| | # | Action | Owner | Priority | Due | |
| |---|---|---|---|---| |
| {% for ai in action_items -%} |
| | {{ loop.index }} | {{ ai.action }} | {{ ai.owner }} | {{ ai.priority }} | {{ ai.due }} | |
| {% endfor %} |
| """ |
|
|
|
|
| def postmortem_draft(incident: dict[str, Any], output_path: str = "") -> dict[str, Any]: |
| """Generate a Cloudflare-blog quality postmortem. |
| |
| incident dict shape (all optional — auto-filled from available data): |
| title, severity, duration, authors, summary, impact, |
| timeline: [{time, event}], root_cause, detection, resolution, |
| went_well: [str], went_wrong: [str], |
| action_items: [{action, owner, priority, due}] |
| """ |
| now = datetime.now(timezone.utc) |
|
|
| |
| def _get(*keys, default=""): |
| for key in keys: |
| if incident.get(key): |
| return incident[key] |
| return default |
|
|
| triage = incident.get("triage", {}) or {} |
| diagnosis = incident.get("diagnosis", {}) or {} |
| remediation = incident.get("remediation", {}) or {} |
|
|
| title = _get("title") or triage.get("title") or "Incident" |
| severity = _get("severity") or triage.get("severity") or "Unknown" |
| root_cause_raw = _get("root_cause") or diagnosis.get("root_cause") or diagnosis.get("specific") or "Under investigation" |
| root_cause = root_cause_raw if isinstance(root_cause_raw, str) else json.dumps(root_cause_raw) |
| resolution_raw = _get("resolution") or remediation.get("outcome") or "Resolved by on-call team" |
| resolution = resolution_raw if isinstance(resolution_raw, str) else json.dumps(resolution_raw) |
| actions_taken = remediation.get("actions_taken", []) |
|
|
| timeline = incident.get("timeline") or [ |
| {"time": now.strftime("%H:%M UTC"), "event": f"Alert fired: {title}"}, |
| {"time": now.strftime("%H:%M UTC"), "event": "Triage agent acknowledged"}, |
| {"time": now.strftime("%H:%M UTC"), "event": f"Root cause identified: {root_cause[:80]}"}, |
| {"time": now.strftime("%H:%M UTC"), "event": "Remediation applied"}, |
| ] |
| went_well = incident.get("went_well") or ["Automated detection by Prometheus/Alertmanager", "AtlasOps multi-agent response < 5 min"] |
| went_wrong = incident.get("went_wrong") or ["Alert was not suppressed during maintenance window"] |
| action_items = incident.get("action_items") or [ |
| {"action": f"Add runbook for {title}", "owner": "@sre-team", "priority": "P2", "due": "2026-06-01"}, |
| {"action": "Review alert thresholds", "owner": "@observability", "priority": "P3", "due": "2026-06-15"}, |
| ] |
| if actions_taken: |
| action_items.insert(0, { |
| "action": f"Verify fix stability: {str(actions_taken[0])[:80]}", |
| "owner": "@sre-oncall", "priority": "P1", |
| "due": now.strftime("%Y-%m-%d"), |
| }) |
|
|
| data = { |
| "title": title, "severity": severity, |
| "duration": incident.get("duration", "< 10 min"), |
| "authors": incident.get("authors", "AtlasOps automated response"), |
| "summary": incident.get("summary") or f"{severity} incident: {title}. Root cause: {root_cause[:120]}. Resolution: {resolution[:120]}.", |
| "impact": incident.get("impact") or f"Services affected: {triage.get('blast_radius', {}).get('services', ['unknown'])}. User impact: {triage.get('blast_radius', {}).get('user_impact_pct', 0)}%.", |
| "timeline": timeline, |
| "root_cause": root_cause, |
| "detection": incident.get("detection") or "Prometheus alert fired → Alertmanager forwarded to AtlasOps webhook.", |
| "resolution": resolution, |
| "went_well": went_well, |
| "went_wrong": went_wrong, |
| "action_items": action_items, |
| } |
|
|
| template = Template(POSTMORTEM_TEMPLATE) |
| rendered = template.render(date=now.date().isoformat(), **data) |
| POSTMORTEM_DIR.mkdir(parents=True, exist_ok=True) |
| if not output_path: |
| slug = title.lower().replace(" ", "-")[:60] |
| output_path = str(POSTMORTEM_DIR / f"{now.date()}-{slug}.md") |
| Path(output_path).write_text(rendered, encoding="utf-8") |
| return {"success": True, "path": output_path, "postmortem_path": output_path, "bytes": len(rendered)} |
|
|