| from __future__ import annotations |
|
|
| import json |
| import logging |
| import smtplib |
| from email.mime.text import MIMEText |
| from typing import Any |
|
|
| import requests |
|
|
| from app.db.trend_subscription_dao import get_channel |
|
|
| logger = logging.getLogger(__name__) |
|
|
| DEFAULT_TIMEOUT = 10 |
|
|
|
|
| class NotificationService: |
| """Dispatches push notifications through configured channels.""" |
|
|
| def send( |
| self, |
| channel_id: int, |
| title: str, |
| body: str, |
| url: str = "", |
| ) -> dict: |
| """Send a notification through a specific channel. Returns result dict.""" |
| channel = get_channel(channel_id) |
| if channel is None: |
| return {"success": False, "error": f"Channel {channel_id} not found"} |
| if not channel.enabled: |
| return {"success": False, "error": "Channel is disabled"} |
|
|
| config = json.loads(channel.config or "{}") |
|
|
| try: |
| if channel.type == "webhook": |
| return self._send_webhook(config, title, body, url) |
| elif channel.type == "bark": |
| return self._send_bark(config, title, body, url) |
| elif channel.type == "email": |
| return self._send_email(config, title, body) |
| else: |
| return {"success": False, "error": f"Unknown channel type: {channel.type}"} |
| except Exception as exc: |
| logger.exception(f"Notification failed for channel {channel_id}") |
| return {"success": False, "error": str(exc)} |
|
|
| def send_batch( |
| self, |
| channel_ids: list[int], |
| title: str, |
| body: str, |
| url: str = "", |
| ) -> list[dict]: |
| """Send to multiple channels. Returns list of per-channel results.""" |
| results: list[dict] = [] |
| for cid in channel_ids: |
| results.append(self.send(cid, title, body, url)) |
| return results |
|
|
| def send_test(self, channel_id: int) -> dict: |
| """Send a test notification to verify channel config.""" |
| return self.send( |
| channel_id=channel_id, |
| title="๐ฏ VideoMemo ๆต่ฏ้็ฅ", |
| body="ๅฆๆไฝ ๆถๅฐ่ฟๆกๆถๆฏ๏ผ่ฏดๆ้็ฅ้้้
็ฝฎๆๅ๏ผ\n\nIf you see this, the notification channel is working!", |
| url="", |
| ) |
|
|
| |
|
|
| def _send_webhook(self, config: dict, title: str, body: str, url: str) -> dict: |
| webhook_url = str(config.get("url") or "").strip() |
| if not webhook_url: |
| return {"success": False, "error": "Webhook URL is empty"} |
|
|
| payload: dict[str, Any] = { |
| "title": title, |
| "body": body, |
| } |
| if url: |
| payload["url"] = url |
|
|
| |
| template = config.get("template", "") |
| if template: |
| try: |
| payload = json.loads( |
| template.replace("{{title}}", json.dumps(title)) |
| .replace("{{body}}", json.dumps(body)) |
| .replace("{{url}}", json.dumps(url)) |
| ) |
| except json.JSONDecodeError: |
| pass |
|
|
| resp = requests.post( |
| webhook_url, |
| json=payload, |
| timeout=DEFAULT_TIMEOUT, |
| headers={"Content-Type": "application/json"}, |
| ) |
| resp.raise_for_status() |
| return {"success": True, "status_code": resp.status_code} |
|
|
| def _send_bark(self, config: dict, title: str, body: str, url: str) -> dict: |
| bark_url = str(config.get("url") or "https://api.day.app/push").strip() |
| device_key = str(config.get("device_key") or "").strip() |
| if not device_key: |
| return {"success": False, "error": "Bark device key is empty"} |
|
|
| full_url = f"{bark_url.rstrip('/')}/{device_key}" |
| params: dict[str, str] = { |
| "title": title, |
| "body": body, |
| } |
| if url: |
| params["url"] = url |
| if config.get("sound"): |
| params["sound"] = config["sound"] |
| if config.get("group"): |
| params["group"] = config["group"] |
|
|
| resp = requests.post(full_url, json=params, timeout=DEFAULT_TIMEOUT) |
| resp.raise_for_status() |
| return {"success": True, "status_code": resp.status_code} |
|
|
| def _send_email(self, config: dict, title: str, body: str) -> dict: |
| smtp_host = str(config.get("smtp_host") or "").strip() |
| smtp_port = int(config.get("smtp_port") or 587) |
| smtp_user = str(config.get("smtp_user") or "").strip() |
| smtp_password = str(config.get("smtp_password") or "").strip() |
| to_addr = str(config.get("to") or "").strip() |
|
|
| if not all([smtp_host, smtp_user, smtp_password, to_addr]): |
| return {"success": False, "error": "Email config incomplete"} |
|
|
| msg = MIMEText(body, "plain", "utf-8") |
| msg["Subject"] = title |
| msg["From"] = smtp_user |
| msg["To"] = to_addr |
|
|
| with smtplib.SMTP(smtp_host, smtp_port, timeout=DEFAULT_TIMEOUT) as server: |
| server.starttls() |
| server.login(smtp_user, smtp_password) |
| server.send_message(msg) |
|
|
| return {"success": True} |
|
|