Spaces:
Paused
Paused
| """ | |
| IMAP 邮箱服务 | |
| 支持 Gmail / QQ / 163 / Yahoo / Outlook 等标准 IMAP 协议邮件服务商。 | |
| 仅用于接收验证码,强制直连(imaplib 不支持代理)。 | |
| """ | |
| import imaplib | |
| import email | |
| import re | |
| import time | |
| import logging | |
| from email.header import decode_header | |
| from typing import Any, Dict, Optional | |
| from .base import BaseEmailService, EmailServiceError | |
| from ..config.constants import ( | |
| EmailServiceType, | |
| OPENAI_EMAIL_SENDERS, | |
| OTP_CODE_SEMANTIC_PATTERN, | |
| OTP_CODE_PATTERN, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class ImapMailService(BaseEmailService): | |
| """标准 IMAP 邮箱服务(仅接收验证码,强制直连)""" | |
| def __init__(self, config: Dict[str, Any] = None, name: str = None): | |
| super().__init__(EmailServiceType.IMAP_MAIL, name) | |
| cfg = config or {} | |
| required_keys = ["host", "email", "password"] | |
| missing_keys = [k for k in required_keys if not cfg.get(k)] | |
| if missing_keys: | |
| raise ValueError(f"缺少必需配置: {missing_keys}") | |
| self.host: str = str(cfg["host"]).strip() | |
| self.port: int = int(cfg.get("port", 993)) | |
| self.use_ssl: bool = bool(cfg.get("use_ssl", True)) | |
| self.email_addr: str = str(cfg["email"]).strip() | |
| self.password: str = str(cfg["password"]) | |
| self.timeout: int = int(cfg.get("timeout", 30)) | |
| self.max_retries: int = int(cfg.get("max_retries", 3)) | |
| def _connect(self) -> imaplib.IMAP4: | |
| """建立 IMAP 连接并登录,返回 mail 对象""" | |
| if self.use_ssl: | |
| mail = imaplib.IMAP4_SSL(self.host, self.port) | |
| else: | |
| mail = imaplib.IMAP4(self.host, self.port) | |
| mail.starttls() | |
| mail.login(self.email_addr, self.password) | |
| return mail | |
| def _decode_str(self, value) -> str: | |
| """解码邮件头部字段""" | |
| if value is None: | |
| return "" | |
| parts = decode_header(value) | |
| decoded = [] | |
| for part, charset in parts: | |
| if isinstance(part, bytes): | |
| decoded.append(part.decode(charset or "utf-8", errors="replace")) | |
| else: | |
| decoded.append(str(part)) | |
| return " ".join(decoded) | |
| def _get_text_body(self, msg) -> str: | |
| """提取邮件纯文本内容""" | |
| body = "" | |
| if msg.is_multipart(): | |
| for part in msg.walk(): | |
| if part.get_content_type() == "text/plain": | |
| charset = part.get_content_charset() or "utf-8" | |
| payload = part.get_payload(decode=True) | |
| if payload: | |
| body += payload.decode(charset, errors="replace") | |
| else: | |
| charset = msg.get_content_charset() or "utf-8" | |
| payload = msg.get_payload(decode=True) | |
| if payload: | |
| body = payload.decode(charset, errors="replace") | |
| return body | |
| def _is_openai_sender(self, from_addr: str) -> bool: | |
| """判断发件人是否为 OpenAI""" | |
| from_lower = from_addr.lower() | |
| for sender in OPENAI_EMAIL_SENDERS: | |
| if sender.startswith("@") or sender.startswith("."): | |
| if sender in from_lower: | |
| return True | |
| else: | |
| if sender in from_lower: | |
| return True | |
| return False | |
| def _extract_otp(self, text: str) -> Optional[str]: | |
| """从文本中提取 6 位验证码,优先语义匹配,回退简单匹配""" | |
| match = re.search(OTP_CODE_SEMANTIC_PATTERN, text, re.IGNORECASE) | |
| if match: | |
| return match.group(1) | |
| match = re.search(OTP_CODE_PATTERN, text) | |
| if match: | |
| return match.group(1) | |
| return None | |
| def create_email(self, config: Dict[str, Any] = None) -> Dict[str, Any]: | |
| """IMAP 模式不创建新邮箱,直接返回配置中的固定地址""" | |
| self.update_status(True) | |
| return { | |
| "email": self.email_addr, | |
| "service_id": self.email_addr, | |
| "id": self.email_addr, | |
| } | |
| def get_verification_code( | |
| self, | |
| email: str, | |
| email_id: str = None, | |
| timeout: int = 60, | |
| pattern: str = None, | |
| otp_sent_at: Optional[float] = None, | |
| ) -> Optional[str]: | |
| """轮询 IMAP 收件箱,获取 OpenAI 验证码""" | |
| start_time = time.time() | |
| seen_ids: set = set() | |
| mail = None | |
| try: | |
| mail = self._connect() | |
| mail.select("INBOX") | |
| while time.time() - start_time < timeout: | |
| try: | |
| # 搜索所有未读邮件 | |
| status, data = mail.search(None, "UNSEEN") | |
| if status != "OK" or not data or not data[0]: | |
| time.sleep(3) | |
| continue | |
| msg_ids = data[0].split() | |
| for msg_id in reversed(msg_ids): # 最新的优先 | |
| id_str = msg_id.decode() | |
| if id_str in seen_ids: | |
| continue | |
| seen_ids.add(id_str) | |
| # 获取邮件 | |
| status, msg_data = mail.fetch(msg_id, "(RFC822)") | |
| if status != "OK" or not msg_data: | |
| continue | |
| raw = msg_data[0][1] | |
| msg = email.message_from_bytes(raw) | |
| # 检查发件人 | |
| from_addr = self._decode_str(msg.get("From", "")) | |
| if not self._is_openai_sender(from_addr): | |
| continue | |
| # 提取验证码 | |
| body = self._get_text_body(msg) | |
| code = self._extract_otp(body) | |
| if code: | |
| # 标记已读 | |
| mail.store(msg_id, "+FLAGS", "\\Seen") | |
| self.update_status(True) | |
| logger.info(f"IMAP 获取验证码成功: {code}") | |
| return code | |
| except imaplib.IMAP4.error as e: | |
| logger.debug(f"IMAP 搜索邮件失败: {e}") | |
| # 尝试重新连接 | |
| try: | |
| mail.select("INBOX") | |
| except Exception: | |
| pass | |
| time.sleep(3) | |
| except Exception as e: | |
| logger.warning(f"IMAP 连接/轮询失败: {e}") | |
| self.update_status(False, str(e)) | |
| finally: | |
| if mail: | |
| try: | |
| mail.logout() | |
| except Exception: | |
| pass | |
| return None | |
| def check_health(self) -> bool: | |
| """尝试 IMAP 登录并选择收件箱""" | |
| mail = None | |
| try: | |
| mail = self._connect() | |
| status, _ = mail.select("INBOX") | |
| return status == "OK" | |
| except Exception as e: | |
| logger.warning(f"IMAP 健康检查失败: {e}") | |
| return False | |
| finally: | |
| if mail: | |
| try: | |
| mail.logout() | |
| except Exception: | |
| pass | |
| def list_emails(self, **kwargs) -> list: | |
| """IMAP 单账号模式,返回固定地址""" | |
| return [{"email": self.email_addr, "id": self.email_addr}] | |
| def delete_email(self, email_id: str) -> bool: | |
| """IMAP 模式无需删除逻辑""" | |
| return True | |