from __future__ import annotations import email import imaplib from email.message import Message from email.utils import parseaddr from typing import List, Optional from .schemas import EmailMessage from .utils import decode_mime_words, get_logger, strip_html log = get_logger(__name__) def _decode_payload(part: Message) -> str: payload = part.get_payload(decode=True) if payload is None: return "" charset = part.get_content_charset() or "utf-8" try: return payload.decode(charset, errors="replace") except Exception: return payload.decode("utf-8", errors="replace") def _get_body(msg: Message) -> str: """ Extract best-effort email body. Prefers text/plain; falls back to text/html (stripped). """ if msg.is_multipart(): text_parts: List[str] = [] html_parts: List[str] = [] for part in msg.walk(): content_type = (part.get_content_type() or "").lower() disposition = (part.get("Content-Disposition") or "").lower() if "attachment" in disposition: continue if content_type == "text/plain": text_parts.append(_decode_payload(part)) elif content_type == "text/html": html_parts.append(_decode_payload(part)) text = "\n".join(p.strip() for p in text_parts if p.strip()).strip() if text: return text html = "\n".join(p.strip() for p in html_parts if p.strip()).strip() return strip_html(html) if html else "" content_type = (msg.get_content_type() or "").lower() decoded = _decode_payload(msg) return strip_html(decoded) if content_type == "text/html" else decoded.strip() def fetch_unread_emails( *, imap_server: str, imap_port: int, username: str, password: str, folder: str = "INBOX", limit: int = 20, ) -> List[EmailMessage]: """ Connect via IMAP, find UNSEEN emails, fetch and parse them. Does not modify message flags (still unread). """ emails_out: List[EmailMessage] = [] mail: Optional[imaplib.IMAP4_SSL] = None try: mail = imaplib.IMAP4_SSL(imap_server, imap_port) mail.login(username, password) mail.select(folder) status, data = mail.search(None, "UNSEEN") if status != "OK": raise RuntimeError(f"IMAP search failed: {status}") ids = (data[0] or b"").split() ids = ids[-limit:] for eid in ids: fstatus, fdata = mail.fetch(eid, "(RFC822)") if fstatus != "OK" or not fdata or not fdata[0]: continue raw = fdata[0][1] msg = email.message_from_bytes(raw) message_id = (msg.get("Message-ID") or "").strip() or f"imap:{eid.decode(errors='ignore')}" subject = decode_mime_words(msg.get("Subject")) _, from_email = parseaddr(msg.get("From") or "") if not from_email: from_email = (msg.get("Reply-To") or "").strip() date = (msg.get("Date") or "").strip() or None body = _get_body(msg) emails_out.append( EmailMessage( message_id=message_id, from_email=from_email, subject=subject, body=body, date=date, ) ) return emails_out except Exception as e: if imap_server == "imap.example.com": log.warning("Detected default imap.example.com, returning mock emails for demo.") return [ EmailMessage( message_id="mock-1", from_email="vip.client@luxury.com", subject="Urgent Request for Private Jet Booking", body="Hello Concierge,\n\nI need a private jet from New York to London tomorrow at 10 AM. Please coordinate with the flight crew.\n\nBest,\nVIP Client", date="2026-04-02T10:00:00Z" ), EmailMessage( message_id="mock-2", from_email="support@tech-billing.com", subject="Invoice Dispute: #INV-99283", body="The invoice sent yesterday was incorrect. We need it adjusted immediately to avoid system lockout.\n\nRegards,\nBilling Support", date="2026-04-02T11:30:00Z" ), EmailMessage( message_id="mock-3", from_email="spammer@free-money.xyz", subject="CONGRATULATIONS!!! YOU WON A LOTTERY", body="You just won 10,000,000 USD! Click here to claim your prize now: http://malicious-link.net", date="2026-04-02T12:00:00Z" ) ] raise RuntimeError(f"IMAP error: {e}") from e finally: try: if mail is not None: mail.close() mail.logout() except Exception: pass