File size: 7,959 Bytes
19f4fba
 
6383573
 
 
19f4fba
0e5174f
19f4fba
 
0bf1136
19f4fba
 
 
 
 
 
 
 
 
5383f21
19f4fba
 
 
6383573
 
 
 
 
 
19f4fba
5383f21
 
 
 
 
 
 
 
 
 
 
 
 
19f4fba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6383573
 
 
 
 
 
 
0e5174f
 
6383573
 
 
 
0e5174f
 
6383573
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2d92257
6383573
 
 
5383f21
 
 
 
 
 
 
89a1d85
5383f21
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6383573
5383f21
 
 
 
 
89a1d85
 
 
 
 
 
5383f21
 
 
6383573
 
 
 
 
 
 
 
 
 
19f4fba
 
 
 
 
 
 
 
 
 
5383f21
 
19f4fba
 
 
5383f21
 
19f4fba
 
5383f21
6383573
5383f21
 
 
 
 
 
6383573
5383f21
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
"""
Email notification module.
Sends HTML-formatted detection reports via the manager's email API (HTTPS)
or SMTP (e.g. Gmail) when API URL is not set.
Credentials and API URL from environment variables.
"""
import base64
import logging
import os
import re
import smtplib
import ssl
from datetime import datetime, timezone
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from pathlib import Path

logger = logging.getLogger(__name__)

EMAIL_RE = re.compile(r"^[^\s@]+@[^\s@]+\.[^\s@]+$")

TEMPLATE_PATH = Path(__file__).resolve().parent.parent / "templates" / "ChangeDetection.html"

# Manager's email API (multipart/form-data). When set, used instead of SMTP.
EMAIL_API_URL = os.environ.get(
    "EMAIL_API_URL",
    "https://emailservice.managemybusinessess.com/api/email/send",
)


def _smtp_settings():
    return {
        "host": os.environ.get("SMTP_HOST", "smtp.gmail.com"),
        "port": int(os.environ.get("SMTP_PORT", "587")),
        "user": os.environ.get("SMTP_USER", "vedangofficeserver@gmail.com"),
        "password": os.environ.get("SMTP_PASS", ""),
    }


def _valid_email(email: str) -> bool:
    return bool(email and EMAIL_RE.match(email.strip()))


def _load_template() -> str:
    """Read the HTML email template from disk."""
    if not TEMPLATE_PATH.exists():
        logger.error("Email template not found at %s", TEMPLATE_PATH)
        return "<p>Change Detection report — template file missing.</p>"
    return TEMPLATE_PATH.read_text(encoding="utf-8")


def _build_region_rows(regions: list) -> str:
    """Render region rows as HTML <tr> elements for the email template."""
    if not regions:
        return ""
    rows = []
    for r in regions[:20]:
        bg = "#f9f9fb" if r.get("id", 0) % 2 == 0 else "#ffffff"
        sub = r.get("subType") or "—"
        conf = f'{r.get("confidence", 0) * 100:.0f}%'
        area = f'{r.get("area", 0):,}'
        rows.append(
            f'<tr style="background:{bg};">'
            f'<td style="padding:6px 10px;">{r.get("id","")}</td>'
            f'<td style="padding:6px 10px;">{r.get("objectType","")}</td>'
            f'<td style="padding:6px 10px;">{sub}</td>'
            f'<td style="padding:6px 10px;">{conf}</td>'
            f'<td style="padding:6px 10px; text-align:right;">{area}</td>'
            f"</tr>"
        )
    return "\n".join(rows)


def build_email_body(
    title: str,
    method: str,
    zone: str,
    village: str,
    change_pct: float,
    changed_px: int,
    total_px: int,
    regions: list,
) -> str:
    """Populate the HTML template with detection results."""
    html = _load_template()
    location = ", ".join(filter(None, [village, zone])) or "—"
    region_rows = _build_region_rows(regions)
    now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")

    replacements = {
        "{{title}}": title or "Untitled run",
        "{{method}}": method or "—",
        "{{location}}": location,
        "{{change_pct}}": f"{change_pct:.2f}",
        "{{changed_px}}": f"{changed_px:,}",
        "{{total_px}}": f"{total_px:,}",
        "{{regions_count}}": str(len(regions)),
        "{{region_rows}}": region_rows,
        "{{timestamp}}": now,
    }
    for key, val in replacements.items():
        html = html.replace(key, val)

    if regions:
        html = html.replace("{{#regions}}", "").replace("{{/regions}}", "")
    else:
        html = re.sub(r"\{\{#regions\}\}.*?\{\{/regions\}\}", "", html, flags=re.DOTALL)

    return html


def _send_via_api(recipient: str, subject: str, html_body: str):
    """Send email via manager's API (POST multipart/form-data)."""
    if not EMAIL_API_URL or not EMAIL_API_URL.strip():
        return False, "EMAIL_API_URL is not set."
    url = EMAIL_API_URL.strip()
    try:
        import requests
        # Attach the same HTML report as a file so the API can deliver it.
        attachment_b64 = base64.b64encode(html_body.encode("utf-8")).decode("ascii")
        files = {
            "ToEmail": (None, recipient),
            "Subject": (None, subject),
            "Body": (None, html_body),
            "FileName": (None, "ChangeDetection.html"),
            "AttachmentBase64": (None, attachment_b64),
        }
        resp = requests.post(
            url,
            files=files,
            timeout=30,
            headers={"Accept": "*/*"},
        )
        if resp.status_code >= 200 and resp.status_code < 300:
            logger.info("Email API: sent to %s", recipient)
            return True, None
        msg = f"API returned {resp.status_code}"
        try:
            body = resp.text or resp.reason
            if body:
                msg = f"{msg}: {body[:200]}"
        except Exception:
            pass
        logger.warning("Email API failed: %s", msg)
        return False, msg
    except Exception as exc:
        logger.exception("Email API request failed: %s", exc)
        return False, f"{type(exc).__name__}: {exc}"


def _send_via_smtp(recipient: str, subject: str, html_body: str):
    """Send email via SMTP (e.g. Gmail)."""
    settings = _smtp_settings()
    smtp_user = settings["user"]
    smtp_pass = settings["password"]
    smtp_host = settings["host"]
    smtp_port = settings["port"]

    if not smtp_user or not smtp_pass:
        return False, "SMTP credentials are not configured on the server."

    msg = MIMEMultipart("alternative")
    msg["Subject"] = subject
    msg["From"] = smtp_user
    msg["To"] = recipient
    msg.attach(MIMEText(html_body, "html", "utf-8"))

    try:
        context = ssl.create_default_context()
        with smtplib.SMTP(smtp_host, smtp_port, timeout=20) as server:
            server.ehlo()
            server.starttls(context=context)
            server.ehlo()
            server.login(smtp_user, smtp_pass)
            server.sendmail(smtp_user, recipient, msg.as_string())
        logger.info("SMTP email sent to %s", recipient)
        return True, None
    except smtplib.SMTPAuthenticationError:
        logger.exception("SMTP authentication failed")
        return False, "SMTP authentication failed. Check the Gmail app password."
    except Exception as exc:
        logger.error(
            "Failed to send notification email to %s: %s: %s",
            recipient,
            type(exc).__name__,
            exc,
        )
        return False, f"{type(exc).__name__}: {exc}"


def _send_html_email(recipient: str, subject: str, html_body: str):
    """Send email: use manager's API if EMAIL_API_URL is set, else SMTP."""
    if not _valid_email(recipient):
        return False, "Enter a valid recipient email address."

    if EMAIL_API_URL and EMAIL_API_URL.strip():
        return _send_via_api(recipient, subject, html_body)
    return _send_via_smtp(recipient, subject, html_body)


def send_notification(
    recipient: str,
    title: str,
    method: str,
    zone: str,
    village: str,
    change_pct: float,
    changed_px: int,
    total_px: int,
    regions: list,
):
    """Send a detection report email and return (success, error_message)."""
    html_body = build_email_body(
        title, method, zone, village, change_pct, changed_px, total_px, regions
    )
    subject = f"Change Detection Report — {title or 'Untitled run'}"
    return _send_html_email(recipient, subject, html_body)


def send_test_email(recipient: str):
    """Send a small test email to verify delivery."""
    now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
    html_body = f"""
    <html>
      <body style="font-family: Arial, sans-serif; color: #222;">
        <h2>AI Change Detection Email Test</h2>
        <p>This is a test email from the AI Change Detection application.</p>
        <p>If you received this, the email configuration is working.</p>
        <p><strong>Timestamp:</strong> {now}</p>
      </body>
    </html>
    """
    return _send_html_email(recipient, "AI Change Detection — Test Email", html_body)