File size: 3,250 Bytes
8b3905d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
"""Alerting agent — Slack, Discord, email, Teams, webhooks."""

from __future__ import annotations

import os
from typing import Any

import httpx

from models.schemas import AlertPayload, Severity


async def send_alert(payload: AlertPayload) -> dict[str, Any]:
    channel = payload.channel.lower()
    if channel == "slack":
        return await _slack(payload)
    if channel == "discord":
        return await _discord(payload)
    if channel == "teams":
        return await _teams(payload)
    if channel == "email":
        return {"status": "queued", "detail": "Configure SMTP — payload logged server-side"}
    if channel == "webhook":
        return await _webhook(payload)
    return {"status": "ignored", "channel": channel}


async def _slack(payload: AlertPayload) -> dict[str, Any]:
    url = os.getenv("SLACK_WEBHOOK_URL")
    if not url:
        return {"status": "skipped", "reason": "SLACK_WEBHOOK_URL not set"}
    body = {
        "text": f"*{payload.title}* [{payload.severity}]\n{payload.body}",
        "attachments": [{"color": _color(payload.severity), "fields": [{"title": "meta", "value": str(payload.metadata)}]}],
    }
    async with httpx.AsyncClient(timeout=10.0) as client:
        r = await client.post(url, json=body)
        return {"status": r.status_code, "channel": "slack"}


async def _discord(payload: AlertPayload) -> dict[str, Any]:
    url = os.getenv("DISCORD_WEBHOOK_URL")
    if not url:
        return {"status": "skipped", "reason": "DISCORD_WEBHOOK_URL not set"}
    async with httpx.AsyncClient(timeout=10.0) as client:
        r = await client.post(
            url,
            json={"content": f"**{payload.title}** ({payload.severity})\n{payload.body}"},
        )
        return {"status": r.status_code, "channel": "discord"}


async def _teams(payload: AlertPayload) -> dict[str, Any]:
    url = os.getenv("TEAMS_WEBHOOK_URL")
    if not url:
        return {"status": "skipped", "reason": "TEAMS_WEBHOOK_URL not set"}
    card = {
        "@type": "MessageCard",
        "@context": "https://schema.org/extensions",
        "summary": payload.title,
        "themeColor": "D83B01",
        "title": payload.title,
        "sections": [{"text": payload.body}],
    }
    async with httpx.AsyncClient(timeout=10.0) as client:
        r = await client.post(url, json=card)
        return {"status": r.status_code, "channel": "teams"}


async def _webhook(payload: AlertPayload) -> dict[str, Any]:
    url = os.getenv("GENERIC_ALERT_WEBHOOK")
    if not url:
        return {"status": "skipped", "reason": "GENERIC_ALERT_WEBHOOK not set"}
    async with httpx.AsyncClient(timeout=10.0) as client:
        r = await client.post(
            url,
            json={
                "title": payload.title,
                "body": payload.body,
                "severity": payload.severity.value,
                "metadata": payload.metadata,
            },
        )
        return {"status": r.status_code, "channel": "webhook"}


def _color(severity: Severity) -> str:
    return {
        Severity.CRITICAL: "#b00020",
        Severity.HIGH: "#ff6f00",
        Severity.MEDIUM: "#fbc02d",
        Severity.LOW: "#1976d2",
        Severity.INFO: "#455a64",
    }.get(severity, "#455a64")