File size: 7,593 Bytes
7482820
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
"""
IMAP 邮箱服务
支持 Gmail / QQ / 163 / Yahoo / Outlook 等标准 IMAP 协议邮件服务商。
仅用于接收验证码,强制直连(imaplib 不支持代理)。
"""

import imaplib
import email
import re
import time
import logging
from email.header import decode_header
from typing import Any, Dict, Optional

from .base import BaseEmailService, EmailServiceError
from ..config.constants import (
    EmailServiceType,
    OPENAI_EMAIL_SENDERS,
    OTP_CODE_SEMANTIC_PATTERN,
    OTP_CODE_PATTERN,
)

logger = logging.getLogger(__name__)


class ImapMailService(BaseEmailService):
    """标准 IMAP 邮箱服务(仅接收验证码,强制直连)"""

    def __init__(self, config: Dict[str, Any] = None, name: str = None):
        super().__init__(EmailServiceType.IMAP_MAIL, name)

        cfg = config or {}
        required_keys = ["host", "email", "password"]
        missing_keys = [k for k in required_keys if not cfg.get(k)]
        if missing_keys:
            raise ValueError(f"缺少必需配置: {missing_keys}")

        self.host: str = str(cfg["host"]).strip()
        self.port: int = int(cfg.get("port", 993))
        self.use_ssl: bool = bool(cfg.get("use_ssl", True))
        self.email_addr: str = str(cfg["email"]).strip()
        self.password: str = str(cfg["password"])
        self.timeout: int = int(cfg.get("timeout", 30))
        self.max_retries: int = int(cfg.get("max_retries", 3))

    def _connect(self) -> imaplib.IMAP4:
        """建立 IMAP 连接并登录,返回 mail 对象"""
        if self.use_ssl:
            mail = imaplib.IMAP4_SSL(self.host, self.port)
        else:
            mail = imaplib.IMAP4(self.host, self.port)
            mail.starttls()
        mail.login(self.email_addr, self.password)
        return mail

    def _decode_str(self, value) -> str:
        """解码邮件头部字段"""
        if value is None:
            return ""
        parts = decode_header(value)
        decoded = []
        for part, charset in parts:
            if isinstance(part, bytes):
                decoded.append(part.decode(charset or "utf-8", errors="replace"))
            else:
                decoded.append(str(part))
        return " ".join(decoded)

    def _get_text_body(self, msg) -> str:
        """提取邮件纯文本内容"""
        body = ""
        if msg.is_multipart():
            for part in msg.walk():
                if part.get_content_type() == "text/plain":
                    charset = part.get_content_charset() or "utf-8"
                    payload = part.get_payload(decode=True)
                    if payload:
                        body += payload.decode(charset, errors="replace")
        else:
            charset = msg.get_content_charset() or "utf-8"
            payload = msg.get_payload(decode=True)
            if payload:
                body = payload.decode(charset, errors="replace")
        return body

    def _is_openai_sender(self, from_addr: str) -> bool:
        """判断发件人是否为 OpenAI"""
        from_lower = from_addr.lower()
        for sender in OPENAI_EMAIL_SENDERS:
            if sender.startswith("@") or sender.startswith("."):
                if sender in from_lower:
                    return True
            else:
                if sender in from_lower:
                    return True
        return False

    def _extract_otp(self, text: str) -> Optional[str]:
        """从文本中提取 6 位验证码,优先语义匹配,回退简单匹配"""
        match = re.search(OTP_CODE_SEMANTIC_PATTERN, text, re.IGNORECASE)
        if match:
            return match.group(1)
        match = re.search(OTP_CODE_PATTERN, text)
        if match:
            return match.group(1)
        return None

    def create_email(self, config: Dict[str, Any] = None) -> Dict[str, Any]:
        """IMAP 模式不创建新邮箱,直接返回配置中的固定地址"""
        self.update_status(True)
        return {
            "email": self.email_addr,
            "service_id": self.email_addr,
            "id": self.email_addr,
        }

    def get_verification_code(
        self,
        email: str,
        email_id: str = None,
        timeout: int = 60,
        pattern: str = None,
        otp_sent_at: Optional[float] = None,
    ) -> Optional[str]:
        """轮询 IMAP 收件箱,获取 OpenAI 验证码"""
        start_time = time.time()
        seen_ids: set = set()
        mail = None

        try:
            mail = self._connect()
            mail.select("INBOX")

            while time.time() - start_time < timeout:
                try:
                    # 搜索所有未读邮件
                    status, data = mail.search(None, "UNSEEN")
                    if status != "OK" or not data or not data[0]:
                        time.sleep(3)
                        continue

                    msg_ids = data[0].split()
                    for msg_id in reversed(msg_ids):  # 最新的优先
                        id_str = msg_id.decode()
                        if id_str in seen_ids:
                            continue
                        seen_ids.add(id_str)

                        # 获取邮件
                        status, msg_data = mail.fetch(msg_id, "(RFC822)")
                        if status != "OK" or not msg_data:
                            continue

                        raw = msg_data[0][1]
                        msg = email.message_from_bytes(raw)

                        # 检查发件人
                        from_addr = self._decode_str(msg.get("From", ""))
                        if not self._is_openai_sender(from_addr):
                            continue

                        # 提取验证码
                        body = self._get_text_body(msg)
                        code = self._extract_otp(body)
                        if code:
                            # 标记已读
                            mail.store(msg_id, "+FLAGS", "\\Seen")
                            self.update_status(True)
                            logger.info(f"IMAP 获取验证码成功: {code}")
                            return code

                except imaplib.IMAP4.error as e:
                    logger.debug(f"IMAP 搜索邮件失败: {e}")
                    # 尝试重新连接
                    try:
                        mail.select("INBOX")
                    except Exception:
                        pass

                time.sleep(3)

        except Exception as e:
            logger.warning(f"IMAP 连接/轮询失败: {e}")
            self.update_status(False, str(e))
        finally:
            if mail:
                try:
                    mail.logout()
                except Exception:
                    pass

        return None

    def check_health(self) -> bool:
        """尝试 IMAP 登录并选择收件箱"""
        mail = None
        try:
            mail = self._connect()
            status, _ = mail.select("INBOX")
            return status == "OK"
        except Exception as e:
            logger.warning(f"IMAP 健康检查失败: {e}")
            return False
        finally:
            if mail:
                try:
                    mail.logout()
                except Exception:
                    pass

    def list_emails(self, **kwargs) -> list:
        """IMAP 单账号模式,返回固定地址"""
        return [{"email": self.email_addr, "id": self.email_addr}]

    def delete_email(self, email_id: str) -> bool:
        """IMAP 模式无需删除逻辑"""
        return True