Spaces:
Build error
Build error
| from __future__ import annotations | |
| import email | |
| import imaplib | |
| from email.message import Message | |
| from email.utils import parseaddr | |
| from typing import List, Optional | |
| from .schemas import EmailMessage | |
| from .utils import decode_mime_words, get_logger, strip_html | |
| log = get_logger(__name__) | |
| def _decode_payload(part: Message) -> str: | |
| payload = part.get_payload(decode=True) | |
| if payload is None: | |
| return "" | |
| charset = part.get_content_charset() or "utf-8" | |
| try: | |
| return payload.decode(charset, errors="replace") | |
| except Exception: | |
| return payload.decode("utf-8", errors="replace") | |
| def _get_body(msg: Message) -> str: | |
| """ | |
| Extract best-effort email body. | |
| Prefers text/plain; falls back to text/html (stripped). | |
| """ | |
| if msg.is_multipart(): | |
| text_parts: List[str] = [] | |
| html_parts: List[str] = [] | |
| for part in msg.walk(): | |
| content_type = (part.get_content_type() or "").lower() | |
| disposition = (part.get("Content-Disposition") or "").lower() | |
| if "attachment" in disposition: | |
| continue | |
| if content_type == "text/plain": | |
| text_parts.append(_decode_payload(part)) | |
| elif content_type == "text/html": | |
| html_parts.append(_decode_payload(part)) | |
| text = "\n".join(p.strip() for p in text_parts if p.strip()).strip() | |
| if text: | |
| return text | |
| html = "\n".join(p.strip() for p in html_parts if p.strip()).strip() | |
| return strip_html(html) if html else "" | |
| content_type = (msg.get_content_type() or "").lower() | |
| decoded = _decode_payload(msg) | |
| return strip_html(decoded) if content_type == "text/html" else decoded.strip() | |
| def fetch_unread_emails( | |
| *, | |
| imap_server: str, | |
| imap_port: int, | |
| username: str, | |
| password: str, | |
| folder: str = "INBOX", | |
| limit: int = 20, | |
| ) -> List[EmailMessage]: | |
| """ | |
| Connect via IMAP, find UNSEEN emails, fetch and parse them. | |
| Does not modify message flags (still unread). | |
| """ | |
| emails_out: List[EmailMessage] = [] | |
| mail: Optional[imaplib.IMAP4_SSL] = None | |
| try: | |
| mail = imaplib.IMAP4_SSL(imap_server, imap_port) | |
| mail.login(username, password) | |
| mail.select(folder) | |
| status, data = mail.search(None, "UNSEEN") | |
| if status != "OK": | |
| raise RuntimeError(f"IMAP search failed: {status}") | |
| ids = (data[0] or b"").split() | |
| ids = ids[-limit:] | |
| for eid in ids: | |
| fstatus, fdata = mail.fetch(eid, "(RFC822)") | |
| if fstatus != "OK" or not fdata or not fdata[0]: | |
| continue | |
| raw = fdata[0][1] | |
| msg = email.message_from_bytes(raw) | |
| message_id = (msg.get("Message-ID") or "").strip() or f"imap:{eid.decode(errors='ignore')}" | |
| subject = decode_mime_words(msg.get("Subject")) | |
| _, from_email = parseaddr(msg.get("From") or "") | |
| if not from_email: | |
| from_email = (msg.get("Reply-To") or "").strip() | |
| date = (msg.get("Date") or "").strip() or None | |
| body = _get_body(msg) | |
| emails_out.append( | |
| EmailMessage( | |
| message_id=message_id, | |
| from_email=from_email, | |
| subject=subject, | |
| body=body, | |
| date=date, | |
| ) | |
| ) | |
| return emails_out | |
| except Exception as e: | |
| if imap_server == "imap.example.com": | |
| log.warning("Detected default imap.example.com, returning mock emails for demo.") | |
| return [ | |
| EmailMessage( | |
| message_id="mock-1", | |
| from_email="vip.client@luxury.com", | |
| subject="Urgent Request for Private Jet Booking", | |
| body="Hello Concierge,\n\nI need a private jet from New York to London tomorrow at 10 AM. Please coordinate with the flight crew.\n\nBest,\nVIP Client", | |
| date="2026-04-02T10:00:00Z" | |
| ), | |
| EmailMessage( | |
| message_id="mock-2", | |
| from_email="support@tech-billing.com", | |
| subject="Invoice Dispute: #INV-99283", | |
| body="The invoice sent yesterday was incorrect. We need it adjusted immediately to avoid system lockout.\n\nRegards,\nBilling Support", | |
| date="2026-04-02T11:30:00Z" | |
| ), | |
| EmailMessage( | |
| message_id="mock-3", | |
| from_email="spammer@free-money.xyz", | |
| subject="CONGRATULATIONS!!! YOU WON A LOTTERY", | |
| body="You just won 10,000,000 USD! Click here to claim your prize now: http://malicious-link.net", | |
| date="2026-04-02T12:00:00Z" | |
| ) | |
| ] | |
| raise RuntimeError(f"IMAP error: {e}") from e | |
| finally: | |
| try: | |
| if mail is not None: | |
| mail.close() | |
| mail.logout() | |
| except Exception: | |
| pass | |