File size: 5,095 Bytes
0387a1c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
from __future__ import annotations

import email
import imaplib
from email.message import Message
from email.utils import parseaddr
from typing import List, Optional

from .schemas import EmailMessage
from .utils import decode_mime_words, get_logger, strip_html


log = get_logger(__name__)


def _decode_payload(part: Message) -> str:
    payload = part.get_payload(decode=True)
    if payload is None:
        return ""
    charset = part.get_content_charset() or "utf-8"
    try:
        return payload.decode(charset, errors="replace")
    except Exception:
        return payload.decode("utf-8", errors="replace")


def _get_body(msg: Message) -> str:
    """
    Extract best-effort email body.
    Prefers text/plain; falls back to text/html (stripped).
    """
    if msg.is_multipart():
        text_parts: List[str] = []
        html_parts: List[str] = []
        for part in msg.walk():
            content_type = (part.get_content_type() or "").lower()
            disposition = (part.get("Content-Disposition") or "").lower()
            if "attachment" in disposition:
                continue
            if content_type == "text/plain":
                text_parts.append(_decode_payload(part))
            elif content_type == "text/html":
                html_parts.append(_decode_payload(part))

        text = "\n".join(p.strip() for p in text_parts if p.strip()).strip()
        if text:
            return text
        html = "\n".join(p.strip() for p in html_parts if p.strip()).strip()
        return strip_html(html) if html else ""

    content_type = (msg.get_content_type() or "").lower()
    decoded = _decode_payload(msg)
    return strip_html(decoded) if content_type == "text/html" else decoded.strip()


def fetch_unread_emails(
    *,
    imap_server: str,
    imap_port: int,
    username: str,
    password: str,
    folder: str = "INBOX",
    limit: int = 20,
) -> List[EmailMessage]:
    """
    Connect via IMAP, find UNSEEN emails, fetch and parse them.
    Does not modify message flags (still unread).
    """
    emails_out: List[EmailMessage] = []
    mail: Optional[imaplib.IMAP4_SSL] = None
    try:
        mail = imaplib.IMAP4_SSL(imap_server, imap_port)
        mail.login(username, password)
        mail.select(folder)

        status, data = mail.search(None, "UNSEEN")
        if status != "OK":
            raise RuntimeError(f"IMAP search failed: {status}")
        ids = (data[0] or b"").split()
        ids = ids[-limit:]

        for eid in ids:
            fstatus, fdata = mail.fetch(eid, "(RFC822)")
            if fstatus != "OK" or not fdata or not fdata[0]:
                continue
            raw = fdata[0][1]
            msg = email.message_from_bytes(raw)

            message_id = (msg.get("Message-ID") or "").strip() or f"imap:{eid.decode(errors='ignore')}"
            subject = decode_mime_words(msg.get("Subject"))

            _, from_email = parseaddr(msg.get("From") or "")
            if not from_email:
                from_email = (msg.get("Reply-To") or "").strip()

            date = (msg.get("Date") or "").strip() or None
            body = _get_body(msg)

            emails_out.append(
                EmailMessage(
                    message_id=message_id,
                    from_email=from_email,
                    subject=subject,
                    body=body,
                    date=date,
                )
            )

        return emails_out
    except Exception as e:
        if imap_server == "imap.example.com":
            log.warning("Detected default imap.example.com, returning mock emails for demo.")
            return [
                EmailMessage(
                    message_id="mock-1",
                    from_email="vip.client@luxury.com",
                    subject="Urgent Request for Private Jet Booking",
                    body="Hello Concierge,\n\nI need a private jet from New York to London tomorrow at 10 AM. Please coordinate with the flight crew.\n\nBest,\nVIP Client",
                    date="2026-04-02T10:00:00Z"
                ),
                EmailMessage(
                    message_id="mock-2",
                    from_email="support@tech-billing.com",
                    subject="Invoice Dispute: #INV-99283",
                    body="The invoice sent yesterday was incorrect. We need it adjusted immediately to avoid system lockout.\n\nRegards,\nBilling Support",
                    date="2026-04-02T11:30:00Z"
                ),
                EmailMessage(
                    message_id="mock-3",
                    from_email="spammer@free-money.xyz",
                    subject="CONGRATULATIONS!!! YOU WON A LOTTERY",
                    body="You just won 10,000,000 USD! Click here to claim your prize now: http://malicious-link.net",
                    date="2026-04-02T12:00:00Z"
                )
            ]
        raise RuntimeError(f"IMAP error: {e}") from e
    finally:
        try:
            if mail is not None:
                mail.close()
                mail.logout()
        except Exception:
            pass