Spaces:
Running
Running
| """Alerting agent β Slack, Discord, email, Teams, webhooks.""" | |
| from __future__ import annotations | |
| import os | |
| from typing import Any | |
| import httpx | |
| from models.schemas import AlertPayload, Severity | |
| async def send_alert(payload: AlertPayload) -> dict[str, Any]: | |
| channel = payload.channel.lower() | |
| if channel == "slack": | |
| return await _slack(payload) | |
| if channel == "discord": | |
| return await _discord(payload) | |
| if channel == "teams": | |
| return await _teams(payload) | |
| if channel == "email": | |
| return {"status": "queued", "detail": "Configure SMTP β payload logged server-side"} | |
| if channel == "webhook": | |
| return await _webhook(payload) | |
| return {"status": "ignored", "channel": channel} | |
| async def _slack(payload: AlertPayload) -> dict[str, Any]: | |
| url = os.getenv("SLACK_WEBHOOK_URL") | |
| if not url: | |
| return {"status": "skipped", "reason": "SLACK_WEBHOOK_URL not set"} | |
| body = { | |
| "text": f"*{payload.title}* [{payload.severity}]\n{payload.body}", | |
| "attachments": [{"color": _color(payload.severity), "fields": [{"title": "meta", "value": str(payload.metadata)}]}], | |
| } | |
| async with httpx.AsyncClient(timeout=10.0) as client: | |
| r = await client.post(url, json=body) | |
| return {"status": r.status_code, "channel": "slack"} | |
| async def _discord(payload: AlertPayload) -> dict[str, Any]: | |
| url = os.getenv("DISCORD_WEBHOOK_URL") | |
| if not url: | |
| return {"status": "skipped", "reason": "DISCORD_WEBHOOK_URL not set"} | |
| async with httpx.AsyncClient(timeout=10.0) as client: | |
| r = await client.post( | |
| url, | |
| json={"content": f"**{payload.title}** ({payload.severity})\n{payload.body}"}, | |
| ) | |
| return {"status": r.status_code, "channel": "discord"} | |
| async def _teams(payload: AlertPayload) -> dict[str, Any]: | |
| url = os.getenv("TEAMS_WEBHOOK_URL") | |
| if not url: | |
| return {"status": "skipped", "reason": "TEAMS_WEBHOOK_URL not set"} | |
| card = { | |
| "@type": "MessageCard", | |
| "@context": "https://schema.org/extensions", | |
| "summary": payload.title, | |
| "themeColor": "D83B01", | |
| "title": payload.title, | |
| "sections": [{"text": payload.body}], | |
| } | |
| async with httpx.AsyncClient(timeout=10.0) as client: | |
| r = await client.post(url, json=card) | |
| return {"status": r.status_code, "channel": "teams"} | |
| async def _webhook(payload: AlertPayload) -> dict[str, Any]: | |
| url = os.getenv("GENERIC_ALERT_WEBHOOK") | |
| if not url: | |
| return {"status": "skipped", "reason": "GENERIC_ALERT_WEBHOOK not set"} | |
| async with httpx.AsyncClient(timeout=10.0) as client: | |
| r = await client.post( | |
| url, | |
| json={ | |
| "title": payload.title, | |
| "body": payload.body, | |
| "severity": payload.severity.value, | |
| "metadata": payload.metadata, | |
| }, | |
| ) | |
| return {"status": r.status_code, "channel": "webhook"} | |
| def _color(severity: Severity) -> str: | |
| return { | |
| Severity.CRITICAL: "#b00020", | |
| Severity.HIGH: "#ff6f00", | |
| Severity.MEDIUM: "#fbc02d", | |
| Severity.LOW: "#1976d2", | |
| Severity.INFO: "#455a64", | |
| }.get(severity, "#455a64") | |