Spaces:
Running
Running
File size: 3,250 Bytes
8b3905d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 | """Alerting agent — Slack, Discord, email, Teams, webhooks."""
from __future__ import annotations
import os
from typing import Any
import httpx
from models.schemas import AlertPayload, Severity
async def send_alert(payload: AlertPayload) -> dict[str, Any]:
channel = payload.channel.lower()
if channel == "slack":
return await _slack(payload)
if channel == "discord":
return await _discord(payload)
if channel == "teams":
return await _teams(payload)
if channel == "email":
return {"status": "queued", "detail": "Configure SMTP — payload logged server-side"}
if channel == "webhook":
return await _webhook(payload)
return {"status": "ignored", "channel": channel}
async def _slack(payload: AlertPayload) -> dict[str, Any]:
url = os.getenv("SLACK_WEBHOOK_URL")
if not url:
return {"status": "skipped", "reason": "SLACK_WEBHOOK_URL not set"}
body = {
"text": f"*{payload.title}* [{payload.severity}]\n{payload.body}",
"attachments": [{"color": _color(payload.severity), "fields": [{"title": "meta", "value": str(payload.metadata)}]}],
}
async with httpx.AsyncClient(timeout=10.0) as client:
r = await client.post(url, json=body)
return {"status": r.status_code, "channel": "slack"}
async def _discord(payload: AlertPayload) -> dict[str, Any]:
url = os.getenv("DISCORD_WEBHOOK_URL")
if not url:
return {"status": "skipped", "reason": "DISCORD_WEBHOOK_URL not set"}
async with httpx.AsyncClient(timeout=10.0) as client:
r = await client.post(
url,
json={"content": f"**{payload.title}** ({payload.severity})\n{payload.body}"},
)
return {"status": r.status_code, "channel": "discord"}
async def _teams(payload: AlertPayload) -> dict[str, Any]:
url = os.getenv("TEAMS_WEBHOOK_URL")
if not url:
return {"status": "skipped", "reason": "TEAMS_WEBHOOK_URL not set"}
card = {
"@type": "MessageCard",
"@context": "https://schema.org/extensions",
"summary": payload.title,
"themeColor": "D83B01",
"title": payload.title,
"sections": [{"text": payload.body}],
}
async with httpx.AsyncClient(timeout=10.0) as client:
r = await client.post(url, json=card)
return {"status": r.status_code, "channel": "teams"}
async def _webhook(payload: AlertPayload) -> dict[str, Any]:
url = os.getenv("GENERIC_ALERT_WEBHOOK")
if not url:
return {"status": "skipped", "reason": "GENERIC_ALERT_WEBHOOK not set"}
async with httpx.AsyncClient(timeout=10.0) as client:
r = await client.post(
url,
json={
"title": payload.title,
"body": payload.body,
"severity": payload.severity.value,
"metadata": payload.metadata,
},
)
return {"status": r.status_code, "channel": "webhook"}
def _color(severity: Severity) -> str:
return {
Severity.CRITICAL: "#b00020",
Severity.HIGH: "#ff6f00",
Severity.MEDIUM: "#fbc02d",
Severity.LOW: "#1976d2",
Severity.INFO: "#455a64",
}.get(severity, "#455a64")
|