File size: 4,332 Bytes
698965e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
from typing import Literal
import mimetypes
import os
import smtplib
from email.message import EmailMessage

import requests

from ..config import NotificationCenterConfig as NC


Channel = Literal["email", "slack"]


class EmailNotifier:
    def __init__(self):
        self.enabled = NC.ENABLE_EMAIL_ALERTS
        self.smtp_host = NC.SMTP_HOST
        self.smtp_port = NC.SMTP_PORT
        self.smtp_user = NC.SMTP_USER
        self.smtp_password = NC.SMTP_PASSWORD
        self.smtp_use_tls = NC.SMTP_USE_TLS
        self.from_email = NC.FROM_EMAIL
        self.to_emails = self._parse_recipients(NC.TO_EMAIL)

        if self.enabled:
            self._validate()

    @staticmethod
    def _parse_recipients(value: str | None) -> list[str]:
        if not value:
            return []
        return [email.strip() for email in value.split(",") if email.strip()]

    def _validate(self) -> None:
        missing = []

        if not self.smtp_host:
            missing.append("NOTIFY_SMTP_HOST")
        if not self.smtp_user:
            missing.append("NOTIFY_SMTP_USER")
        if not self.smtp_password:
            missing.append("NOTIFY_SMTP_PASSWORD")
        if not self.from_email:
            missing.append("NOTIFY_FROM_EMAIL")
        if not self.to_emails:
            missing.append("NOTIFY_TO_EMAIL")

        if missing:
            raise ValueError(f"Missing notification email config: {', '.join(missing)}")

    def send(
        self,
        subject: str,
        body: str,
        attachments: str | list[str] | None = None,
    ) -> None:
        if not self.enabled:
            return

        if isinstance(attachments, str):
            attachments = [attachments]

        msg = EmailMessage()
        msg["Subject"] = subject
        msg["From"] = self.from_email
        msg["To"] = ", ".join(self.to_emails)
        msg.set_content(body)

        if attachments:
            for file_path in attachments:
                if not file_path or not os.path.isfile(file_path):
                    continue

                mime_type, _ = mimetypes.guess_type(file_path)
                mime_type = mime_type or "application/octet-stream"
                maintype, subtype = mime_type.split("/", 1)

                with open(file_path, "rb") as f:
                    msg.add_attachment(
                        f.read(),
                        maintype=maintype,
                        subtype=subtype,
                        filename=os.path.basename(file_path),
                    )

        with smtplib.SMTP(self.smtp_host, self.smtp_port, timeout=20) as server:
            if self.smtp_use_tls:
                server.starttls()
            server.login(self.smtp_user, self.smtp_password)
            server.send_message(msg)


class SlackNotifier:
    def __init__(self):
        self.enabled = NC.ENABLE_SLACK_ALERTS
        self.webhook_url = NC.SLACK_WEBHOOK_URL

        if self.enabled:
            self._validate()

    def _validate(self) -> None:
        if not self.webhook_url:
            raise ValueError("Missing notification slack config: NOTIFY_SLACK_WEBHOOK_URL")

    def send(self, subject: str, body: str) -> None:
        if not self.enabled:
            return

        text = f"*{subject}*\n{body}"

        response = requests.post(
            self.webhook_url,
            json={"text": text},
            timeout=10,
        )

        response.raise_for_status()

        if response.status_code != 200:
            raise RuntimeError(
                f"Slack notification failed: {response.status_code} {response.text}"
            )


class NotificationCenter:
    def __init__(self):
        self.email = EmailNotifier()
        self.slack = SlackNotifier()

    def send_notification(
        self,
        subject: str,
        body: str,
        channel: Channel = "email",
        attachments: str | list[str] | None = None,
    ) -> None:

        match channel:
            case "all":
                self.email.send(subject, body, attachments)
                self.slack.send(subject, body)
            case "email":
                self.email.send(subject, body, attachments)
            case "slack":
                self.slack.send(subject, body)
            case _:
                raise ValueError(f"Unknown notification channel: {channel}")