Spaces:
Running
Running
File size: 7,959 Bytes
19f4fba 6383573 19f4fba 0e5174f 19f4fba 0bf1136 19f4fba 5383f21 19f4fba 6383573 19f4fba 5383f21 19f4fba 6383573 0e5174f 6383573 0e5174f 6383573 2d92257 6383573 5383f21 89a1d85 5383f21 6383573 5383f21 89a1d85 5383f21 6383573 19f4fba 5383f21 19f4fba 5383f21 19f4fba 5383f21 6383573 5383f21 6383573 5383f21 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 | """
Email notification module.
Sends HTML-formatted detection reports via the manager's email API (HTTPS)
or SMTP (e.g. Gmail) when API URL is not set.
Credentials and API URL from environment variables.
"""
import base64
import logging
import os
import re
import smtplib
import ssl
from datetime import datetime, timezone
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from pathlib import Path
logger = logging.getLogger(__name__)
EMAIL_RE = re.compile(r"^[^\s@]+@[^\s@]+\.[^\s@]+$")
TEMPLATE_PATH = Path(__file__).resolve().parent.parent / "templates" / "ChangeDetection.html"
# Manager's email API (multipart/form-data). When set, used instead of SMTP.
EMAIL_API_URL = os.environ.get(
"EMAIL_API_URL",
"https://emailservice.managemybusinessess.com/api/email/send",
)
def _smtp_settings():
return {
"host": os.environ.get("SMTP_HOST", "smtp.gmail.com"),
"port": int(os.environ.get("SMTP_PORT", "587")),
"user": os.environ.get("SMTP_USER", "vedangofficeserver@gmail.com"),
"password": os.environ.get("SMTP_PASS", ""),
}
def _valid_email(email: str) -> bool:
return bool(email and EMAIL_RE.match(email.strip()))
def _load_template() -> str:
"""Read the HTML email template from disk."""
if not TEMPLATE_PATH.exists():
logger.error("Email template not found at %s", TEMPLATE_PATH)
return "<p>Change Detection report — template file missing.</p>"
return TEMPLATE_PATH.read_text(encoding="utf-8")
def _build_region_rows(regions: list) -> str:
"""Render region rows as HTML <tr> elements for the email template."""
if not regions:
return ""
rows = []
for r in regions[:20]:
bg = "#f9f9fb" if r.get("id", 0) % 2 == 0 else "#ffffff"
sub = r.get("subType") or "—"
conf = f'{r.get("confidence", 0) * 100:.0f}%'
area = f'{r.get("area", 0):,}'
rows.append(
f'<tr style="background:{bg};">'
f'<td style="padding:6px 10px;">{r.get("id","")}</td>'
f'<td style="padding:6px 10px;">{r.get("objectType","")}</td>'
f'<td style="padding:6px 10px;">{sub}</td>'
f'<td style="padding:6px 10px;">{conf}</td>'
f'<td style="padding:6px 10px; text-align:right;">{area}</td>'
f"</tr>"
)
return "\n".join(rows)
def build_email_body(
title: str,
method: str,
zone: str,
village: str,
change_pct: float,
changed_px: int,
total_px: int,
regions: list,
) -> str:
"""Populate the HTML template with detection results."""
html = _load_template()
location = ", ".join(filter(None, [village, zone])) or "—"
region_rows = _build_region_rows(regions)
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
replacements = {
"{{title}}": title or "Untitled run",
"{{method}}": method or "—",
"{{location}}": location,
"{{change_pct}}": f"{change_pct:.2f}",
"{{changed_px}}": f"{changed_px:,}",
"{{total_px}}": f"{total_px:,}",
"{{regions_count}}": str(len(regions)),
"{{region_rows}}": region_rows,
"{{timestamp}}": now,
}
for key, val in replacements.items():
html = html.replace(key, val)
if regions:
html = html.replace("{{#regions}}", "").replace("{{/regions}}", "")
else:
html = re.sub(r"\{\{#regions\}\}.*?\{\{/regions\}\}", "", html, flags=re.DOTALL)
return html
def _send_via_api(recipient: str, subject: str, html_body: str):
"""Send email via manager's API (POST multipart/form-data)."""
if not EMAIL_API_URL or not EMAIL_API_URL.strip():
return False, "EMAIL_API_URL is not set."
url = EMAIL_API_URL.strip()
try:
import requests
# Attach the same HTML report as a file so the API can deliver it.
attachment_b64 = base64.b64encode(html_body.encode("utf-8")).decode("ascii")
files = {
"ToEmail": (None, recipient),
"Subject": (None, subject),
"Body": (None, html_body),
"FileName": (None, "ChangeDetection.html"),
"AttachmentBase64": (None, attachment_b64),
}
resp = requests.post(
url,
files=files,
timeout=30,
headers={"Accept": "*/*"},
)
if resp.status_code >= 200 and resp.status_code < 300:
logger.info("Email API: sent to %s", recipient)
return True, None
msg = f"API returned {resp.status_code}"
try:
body = resp.text or resp.reason
if body:
msg = f"{msg}: {body[:200]}"
except Exception:
pass
logger.warning("Email API failed: %s", msg)
return False, msg
except Exception as exc:
logger.exception("Email API request failed: %s", exc)
return False, f"{type(exc).__name__}: {exc}"
def _send_via_smtp(recipient: str, subject: str, html_body: str):
"""Send email via SMTP (e.g. Gmail)."""
settings = _smtp_settings()
smtp_user = settings["user"]
smtp_pass = settings["password"]
smtp_host = settings["host"]
smtp_port = settings["port"]
if not smtp_user or not smtp_pass:
return False, "SMTP credentials are not configured on the server."
msg = MIMEMultipart("alternative")
msg["Subject"] = subject
msg["From"] = smtp_user
msg["To"] = recipient
msg.attach(MIMEText(html_body, "html", "utf-8"))
try:
context = ssl.create_default_context()
with smtplib.SMTP(smtp_host, smtp_port, timeout=20) as server:
server.ehlo()
server.starttls(context=context)
server.ehlo()
server.login(smtp_user, smtp_pass)
server.sendmail(smtp_user, recipient, msg.as_string())
logger.info("SMTP email sent to %s", recipient)
return True, None
except smtplib.SMTPAuthenticationError:
logger.exception("SMTP authentication failed")
return False, "SMTP authentication failed. Check the Gmail app password."
except Exception as exc:
logger.error(
"Failed to send notification email to %s: %s: %s",
recipient,
type(exc).__name__,
exc,
)
return False, f"{type(exc).__name__}: {exc}"
def _send_html_email(recipient: str, subject: str, html_body: str):
"""Send email: use manager's API if EMAIL_API_URL is set, else SMTP."""
if not _valid_email(recipient):
return False, "Enter a valid recipient email address."
if EMAIL_API_URL and EMAIL_API_URL.strip():
return _send_via_api(recipient, subject, html_body)
return _send_via_smtp(recipient, subject, html_body)
def send_notification(
recipient: str,
title: str,
method: str,
zone: str,
village: str,
change_pct: float,
changed_px: int,
total_px: int,
regions: list,
):
"""Send a detection report email and return (success, error_message)."""
html_body = build_email_body(
title, method, zone, village, change_pct, changed_px, total_px, regions
)
subject = f"Change Detection Report — {title or 'Untitled run'}"
return _send_html_email(recipient, subject, html_body)
def send_test_email(recipient: str):
"""Send a small test email to verify delivery."""
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
html_body = f"""
<html>
<body style="font-family: Arial, sans-serif; color: #222;">
<h2>AI Change Detection Email Test</h2>
<p>This is a test email from the AI Change Detection application.</p>
<p>If you received this, the email configuration is working.</p>
<p><strong>Timestamp:</strong> {now}</p>
</body>
</html>
"""
return _send_html_email(recipient, "AI Change Detection — Test Email", html_body)
|