codex-console / src /services /imap_mail.py
cjovs's picture
Deploy codex-console to HF Space
7482820 verified
"""
IMAP 邮箱服务
支持 Gmail / QQ / 163 / Yahoo / Outlook 等标准 IMAP 协议邮件服务商。
仅用于接收验证码,强制直连(imaplib 不支持代理)。
"""
import imaplib
import email
import re
import time
import logging
from email.header import decode_header
from typing import Any, Dict, Optional
from .base import BaseEmailService, EmailServiceError
from ..config.constants import (
EmailServiceType,
OPENAI_EMAIL_SENDERS,
OTP_CODE_SEMANTIC_PATTERN,
OTP_CODE_PATTERN,
)
logger = logging.getLogger(__name__)
class ImapMailService(BaseEmailService):
"""标准 IMAP 邮箱服务(仅接收验证码,强制直连)"""
def __init__(self, config: Dict[str, Any] = None, name: str = None):
super().__init__(EmailServiceType.IMAP_MAIL, name)
cfg = config or {}
required_keys = ["host", "email", "password"]
missing_keys = [k for k in required_keys if not cfg.get(k)]
if missing_keys:
raise ValueError(f"缺少必需配置: {missing_keys}")
self.host: str = str(cfg["host"]).strip()
self.port: int = int(cfg.get("port", 993))
self.use_ssl: bool = bool(cfg.get("use_ssl", True))
self.email_addr: str = str(cfg["email"]).strip()
self.password: str = str(cfg["password"])
self.timeout: int = int(cfg.get("timeout", 30))
self.max_retries: int = int(cfg.get("max_retries", 3))
def _connect(self) -> imaplib.IMAP4:
"""建立 IMAP 连接并登录,返回 mail 对象"""
if self.use_ssl:
mail = imaplib.IMAP4_SSL(self.host, self.port)
else:
mail = imaplib.IMAP4(self.host, self.port)
mail.starttls()
mail.login(self.email_addr, self.password)
return mail
def _decode_str(self, value) -> str:
"""解码邮件头部字段"""
if value is None:
return ""
parts = decode_header(value)
decoded = []
for part, charset in parts:
if isinstance(part, bytes):
decoded.append(part.decode(charset or "utf-8", errors="replace"))
else:
decoded.append(str(part))
return " ".join(decoded)
def _get_text_body(self, msg) -> str:
"""提取邮件纯文本内容"""
body = ""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
charset = part.get_content_charset() or "utf-8"
payload = part.get_payload(decode=True)
if payload:
body += payload.decode(charset, errors="replace")
else:
charset = msg.get_content_charset() or "utf-8"
payload = msg.get_payload(decode=True)
if payload:
body = payload.decode(charset, errors="replace")
return body
def _is_openai_sender(self, from_addr: str) -> bool:
"""判断发件人是否为 OpenAI"""
from_lower = from_addr.lower()
for sender in OPENAI_EMAIL_SENDERS:
if sender.startswith("@") or sender.startswith("."):
if sender in from_lower:
return True
else:
if sender in from_lower:
return True
return False
def _extract_otp(self, text: str) -> Optional[str]:
"""从文本中提取 6 位验证码,优先语义匹配,回退简单匹配"""
match = re.search(OTP_CODE_SEMANTIC_PATTERN, text, re.IGNORECASE)
if match:
return match.group(1)
match = re.search(OTP_CODE_PATTERN, text)
if match:
return match.group(1)
return None
def create_email(self, config: Dict[str, Any] = None) -> Dict[str, Any]:
"""IMAP 模式不创建新邮箱,直接返回配置中的固定地址"""
self.update_status(True)
return {
"email": self.email_addr,
"service_id": self.email_addr,
"id": self.email_addr,
}
def get_verification_code(
self,
email: str,
email_id: str = None,
timeout: int = 60,
pattern: str = None,
otp_sent_at: Optional[float] = None,
) -> Optional[str]:
"""轮询 IMAP 收件箱,获取 OpenAI 验证码"""
start_time = time.time()
seen_ids: set = set()
mail = None
try:
mail = self._connect()
mail.select("INBOX")
while time.time() - start_time < timeout:
try:
# 搜索所有未读邮件
status, data = mail.search(None, "UNSEEN")
if status != "OK" or not data or not data[0]:
time.sleep(3)
continue
msg_ids = data[0].split()
for msg_id in reversed(msg_ids): # 最新的优先
id_str = msg_id.decode()
if id_str in seen_ids:
continue
seen_ids.add(id_str)
# 获取邮件
status, msg_data = mail.fetch(msg_id, "(RFC822)")
if status != "OK" or not msg_data:
continue
raw = msg_data[0][1]
msg = email.message_from_bytes(raw)
# 检查发件人
from_addr = self._decode_str(msg.get("From", ""))
if not self._is_openai_sender(from_addr):
continue
# 提取验证码
body = self._get_text_body(msg)
code = self._extract_otp(body)
if code:
# 标记已读
mail.store(msg_id, "+FLAGS", "\\Seen")
self.update_status(True)
logger.info(f"IMAP 获取验证码成功: {code}")
return code
except imaplib.IMAP4.error as e:
logger.debug(f"IMAP 搜索邮件失败: {e}")
# 尝试重新连接
try:
mail.select("INBOX")
except Exception:
pass
time.sleep(3)
except Exception as e:
logger.warning(f"IMAP 连接/轮询失败: {e}")
self.update_status(False, str(e))
finally:
if mail:
try:
mail.logout()
except Exception:
pass
return None
def check_health(self) -> bool:
"""尝试 IMAP 登录并选择收件箱"""
mail = None
try:
mail = self._connect()
status, _ = mail.select("INBOX")
return status == "OK"
except Exception as e:
logger.warning(f"IMAP 健康检查失败: {e}")
return False
finally:
if mail:
try:
mail.logout()
except Exception:
pass
def list_emails(self, **kwargs) -> list:
"""IMAP 单账号模式,返回固定地址"""
return [{"email": self.email_addr, "id": self.email_addr}]
def delete_email(self, email_id: str) -> bool:
"""IMAP 模式无需删除逻辑"""
return True