File size: 5,348 Bytes
6cfe55f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 | from __future__ import annotations
import json
import logging
import smtplib
from email.mime.text import MIMEText
from typing import Any
import requests
from app.db.trend_subscription_dao import get_channel
logger = logging.getLogger(__name__)
DEFAULT_TIMEOUT = 10 # seconds
class NotificationService:
"""Dispatches push notifications through configured channels."""
def send(
self,
channel_id: int,
title: str,
body: str,
url: str = "",
) -> dict:
"""Send a notification through a specific channel. Returns result dict."""
channel = get_channel(channel_id)
if channel is None:
return {"success": False, "error": f"Channel {channel_id} not found"}
if not channel.enabled:
return {"success": False, "error": "Channel is disabled"}
config = json.loads(channel.config or "{}")
try:
if channel.type == "webhook":
return self._send_webhook(config, title, body, url)
elif channel.type == "bark":
return self._send_bark(config, title, body, url)
elif channel.type == "email":
return self._send_email(config, title, body)
else:
return {"success": False, "error": f"Unknown channel type: {channel.type}"}
except Exception as exc:
logger.exception(f"Notification failed for channel {channel_id}")
return {"success": False, "error": str(exc)}
def send_batch(
self,
channel_ids: list[int],
title: str,
body: str,
url: str = "",
) -> list[dict]:
"""Send to multiple channels. Returns list of per-channel results."""
results: list[dict] = []
for cid in channel_ids:
results.append(self.send(cid, title, body, url))
return results
def send_test(self, channel_id: int) -> dict:
"""Send a test notification to verify channel config."""
return self.send(
channel_id=channel_id,
title="๐ฏ VideoMemo ๆต่ฏ้็ฅ",
body="ๅฆๆไฝ ๆถๅฐ่ฟๆกๆถๆฏ๏ผ่ฏดๆ้็ฅ้้้
็ฝฎๆๅ๏ผ\n\nIf you see this, the notification channel is working!",
url="",
)
# โโโ Channel implementations โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
def _send_webhook(self, config: dict, title: str, body: str, url: str) -> dict:
webhook_url = str(config.get("url") or "").strip()
if not webhook_url:
return {"success": False, "error": "Webhook URL is empty"}
payload: dict[str, Any] = {
"title": title,
"body": body,
}
if url:
payload["url"] = url
# Support custom payload template
template = config.get("template", "")
if template:
try:
payload = json.loads(
template.replace("{{title}}", json.dumps(title))
.replace("{{body}}", json.dumps(body))
.replace("{{url}}", json.dumps(url))
)
except json.JSONDecodeError:
pass
resp = requests.post(
webhook_url,
json=payload,
timeout=DEFAULT_TIMEOUT,
headers={"Content-Type": "application/json"},
)
resp.raise_for_status()
return {"success": True, "status_code": resp.status_code}
def _send_bark(self, config: dict, title: str, body: str, url: str) -> dict:
bark_url = str(config.get("url") or "https://api.day.app/push").strip()
device_key = str(config.get("device_key") or "").strip()
if not device_key:
return {"success": False, "error": "Bark device key is empty"}
full_url = f"{bark_url.rstrip('/')}/{device_key}"
params: dict[str, str] = {
"title": title,
"body": body,
}
if url:
params["url"] = url
if config.get("sound"):
params["sound"] = config["sound"]
if config.get("group"):
params["group"] = config["group"]
resp = requests.post(full_url, json=params, timeout=DEFAULT_TIMEOUT)
resp.raise_for_status()
return {"success": True, "status_code": resp.status_code}
def _send_email(self, config: dict, title: str, body: str) -> dict:
smtp_host = str(config.get("smtp_host") or "").strip()
smtp_port = int(config.get("smtp_port") or 587)
smtp_user = str(config.get("smtp_user") or "").strip()
smtp_password = str(config.get("smtp_password") or "").strip()
to_addr = str(config.get("to") or "").strip()
if not all([smtp_host, smtp_user, smtp_password, to_addr]):
return {"success": False, "error": "Email config incomplete"}
msg = MIMEText(body, "plain", "utf-8")
msg["Subject"] = title
msg["From"] = smtp_user
msg["To"] = to_addr
with smtplib.SMTP(smtp_host, smtp_port, timeout=DEFAULT_TIMEOUT) as server:
server.starttls()
server.login(smtp_user, smtp_password)
server.send_message(msg)
return {"success": True}
|