""" Cloudflare Temp Email 临时邮箱客户端 API 文档参考 (基于 Hono 框架,JWT 认证): - 获取公开配置: GET /open_api/settings - 创建新邮箱: POST /api/new_address body: {name, domain} → {address, jwt} - 获取邮件列表: GET /api/mails Authorization: Bearer {jwt} - 获取邮件详情: GET /api/mail/:mail_id Authorization: Bearer {jwt} """ import random import string import time from datetime import datetime from typing import Optional import requests from core.mail_utils import extract_verification_code from core.proxy_utils import request_with_proxy_fallback class CloudflareMailClient: """Cloudflare Temp Email 临时邮箱客户端""" def __init__( self, base_url: str = "", proxy: str = "", api_key: str = "", domain: str = "", verify_ssl: bool = True, log_callback=None, ) -> None: self.base_url = (base_url or "").rstrip("/") self.proxy_url = (proxy or "").strip() self.api_key = (api_key or "").strip() # x-custom-auth 密码 self.domain = (domain or "").strip() self.verify_ssl = verify_ssl self.log_callback = log_callback self.email: Optional[str] = None self.password: Optional[str] = None # 兼容接口,存储 JWT token self.jwt_token: Optional[str] = None # 创建地址时返回的 JWT self._available_domains: list = [] # ------------------------------------------------------------------ # 内部工具 # ------------------------------------------------------------------ def _log(self, level: str, message: str) -> None: if self.log_callback: try: self.log_callback(level, message) except Exception: pass def _request(self, method: str, url: str, **kwargs) -> requests.Response: headers = kwargs.pop("headers", None) or {} # 实例密码认证(admin 路由使用 x-admin-auth) if self.api_key and "x-admin-auth" not in {k.lower() for k in headers}: headers["x-admin-auth"] = self.api_key # 邮件操作时使用 JWT Bearer 认证 if self.jwt_token and "authorization" not in {k.lower() for k in headers}: headers["Authorization"] = f"Bearer {self.jwt_token}" kwargs["headers"] = headers self._log("info", f"📤 发送 {method} 请求: {url}") if "json" in kwargs and kwargs["json"] is not None: self._log("info", f"📦 请求体: {kwargs['json']}") proxies = {"http": self.proxy_url, "https": self.proxy_url} if self.proxy_url else None try: res = request_with_proxy_fallback( requests.request, method, url, proxies=proxies, verify=self.verify_ssl, timeout=kwargs.pop("timeout", 30), **kwargs, ) self._log("info", f"📥 收到响应: HTTP {res.status_code}") if res.content and res.status_code >= 400: try: self._log("error", f"📄 响应内容: {res.text[:500]}") except Exception: pass return res except Exception as e: self._log("error", f"❌ 网络请求失败: {e}") raise # ------------------------------------------------------------------ # 公开接口 # ------------------------------------------------------------------ def set_credentials(self, email: str, password: str = "") -> None: """设置凭据(兼容接口)。password 存储 JWT token。""" self.email = email self.password = password if password: self.jwt_token = password def _get_available_domains(self) -> list: """GET /open_api/settings 获取可用域名列表""" if self._available_domains: return self._available_domains try: res = self._request("GET", f"{self.base_url}/open_api/settings") if res.status_code == 200: data = res.json() if res.content else {} domains = data.get("domains", []) if isinstance(domains, list) and domains: self._available_domains = [str(d).strip() for d in domains if d] self._log("info", f"🌐 CFMail 可用域名: {self._available_domains}") return self._available_domains except Exception as e: self._log("error", f"❌ 获取可用域名失败: {e}") return self._available_domains def register_account(self, domain: Optional[str] = None) -> bool: """POST /api/new_address 创建新邮箱地址""" if not self.base_url: self._log("error", "❌ cfmail_base_url 未配置") return False # 确定域名 selected_domain = domain or self.domain if not selected_domain: available = self._get_available_domains() if available: selected_domain = random.choice(available) # 生成随机用户名 rand = "".join(random.choices(string.ascii_lowercase + string.digits, k=10)) timestamp = str(int(time.time()))[-4:] name = f"t{timestamp}{rand}" payload = {"name": name} if selected_domain: payload["domain"] = selected_domain self._log("info", f"📧 使用域名: {selected_domain}") self._log("info", f"🎲 创建邮箱: {name}") try: res = self._request("POST", f"{self.base_url}/admin/new_address", json=payload) if res.status_code in (200, 201): data = res.json() if res.content else {} address = data.get("address", "") jwt = data.get("jwt", "") if address: self.email = address self.jwt_token = jwt self.password = jwt # 兼容接口 self._log("info", f"✅ CFMail 注册成功: {self.email}") return True self._log("error", f"❌ CFMail 注册失败: HTTP {res.status_code}") return False except Exception as e: self._log("error", f"❌ CFMail 注册异常: {e}") return False def login(self) -> bool: """无需登录,直接返回 True""" return True @staticmethod def _extract_body_from_raw(raw: str) -> str: """从原始邮件中提取正文(text/plain + text/html),跳过 header""" if not raw: return "" import email as _email try: msg = _email.message_from_string(raw) parts = [] if msg.is_multipart(): for part in msg.walk(): ct = part.get_content_type() if ct in ("text/plain", "text/html"): payload = part.get_payload(decode=True) if payload: charset = part.get_content_charset() or "utf-8" parts.append(payload.decode(charset, errors="replace")) else: payload = msg.get_payload(decode=True) if payload: charset = msg.get_content_charset() or "utf-8" parts.append(payload.decode(charset, errors="replace")) return "".join(parts) except Exception: return "" def fetch_verification_code(self, since_time: Optional[datetime] = None) -> Optional[str]: """GET /api/mails 获取邮件列表,再 GET /api/mail/:id 获取详情,提取验证码""" if not self.jwt_token: self._log("error", "❌ 缺少 JWT token,无法获取邮件") return None try: self._log("info", "📬 正在拉取 CFMail 邮件列表...") res = self._request("GET", f"{self.base_url}/api/mails", params={"limit": 20, "offset": 0}) if res.status_code != 200: self._log("error", f"❌ 获取邮件列表失败: HTTP {res.status_code}") return None data = res.json() if res.content else {} # 响应格式: {"results": [...], "total": N} messages = data.get("results", []) if not isinstance(messages, list): messages = [] if not messages: self._log("info", "📭 邮箱为空,暂无邮件") return None self._log("info", f"📨 收到 {len(messages)} 封邮件,开始检查验证码...") # 按 id 降序(新邮件优先) try: messages = sorted(messages, key=lambda m: int(m.get("id") or 0), reverse=True) except Exception: pass for idx, msg in enumerate(messages, 1): msg_id = msg.get("id") if not msg_id: continue # 时间过滤 if since_time: raw_time = msg.get("created_at") or msg.get("createdAt") if raw_time: try: if isinstance(raw_time, (int, float)): ts = float(raw_time) if ts > 1e12: ts /= 1000.0 msg_time = datetime.fromtimestamp(ts) else: import re raw_time = re.sub(r"(\.\d{6})\d+", r"\1", str(raw_time)) # cfmail 的 created_at 是 UTC 无时区标记,显式加 +00:00 再转本地时间 if not raw_time.endswith("Z") and "+" not in raw_time and raw_time.count("-") <= 2: raw_time = raw_time + "+00:00" msg_time = datetime.fromisoformat(raw_time.replace("Z", "+00:00")).astimezone().replace(tzinfo=None) if msg_time < since_time: continue except Exception: pass # 列表响应已包含 raw 字段,直接解析正文提取验证码 raw_in_list = msg.get("raw") or "" if raw_in_list: body = self._extract_body_from_raw(raw_in_list) code = extract_verification_code(body) if code: self._log("info", f"✅ 找到验证码: {code}") return code # 兜底:尝试从其他摘要字段提取 summary = (msg.get("subject") or "") + (msg.get("text") or "") + (msg.get("html") or "") if summary: code = extract_verification_code(summary) if code: self._log("info", f"✅ 找到验证码: {code}") return code # 获取邮件详情 self._log("info", f"🔍 正在读取邮件 {idx}/{len(messages)} 详情...") detail_res = self._request("GET", f"{self.base_url}/api/mail/{msg_id}") if detail_res.status_code != 200: self._log("warning", f"⚠️ 读取邮件详情失败: HTTP {detail_res.status_code}") continue detail = detail_res.json() if detail_res.content else {} content = self._extract_body_from_raw(detail.get("raw") or "") if content: code = extract_verification_code(content) if code: self._log("info", f"✅ 找到验证码: {code}") return code else: self._log("info", f"❌ 邮件 {idx} 中未找到验证码") self._log("warning", "⚠️ 所有邮件中均未找到验证码") return None except Exception as e: self._log("error", f"❌ 获取验证码异常: {e}") return None def poll_for_code( self, timeout: int = 120, interval: int = 4, since_time: Optional[datetime] = None, ) -> Optional[str]: """轮询获取验证码""" if not self.email: return None max_retries = max(1, timeout // interval) self._log("info", f"⏱️ 开始轮询验证码 (超时 {timeout}秒, 间隔 {interval}秒, 最多 {max_retries} 次)") for i in range(1, max_retries + 1): self._log("info", f"🔄 第 {i}/{max_retries} 次轮询...") code = self.fetch_verification_code(since_time=since_time) if code: self._log("info", f"🎉 验证码获取成功: {code}") return code if i < max_retries: self._log("info", f"⏳ 等待 {interval} 秒后重试...") time.sleep(interval) self._log("error", f"⏰ 验证码获取超时 ({timeout}秒)") return None