gemini-business2api-github / core /cfmail_client.py
lijunke
deploy: clean start with hf metadata
18081cf
"""
Cloudflare Temp Email 临时邮箱客户端
API 文档参考 (基于 Hono 框架,JWT 认证):
- 获取公开配置: GET /open_api/settings
- 创建新邮箱: POST /api/new_address body: {name, domain} → {address, jwt}
- 获取邮件列表: GET /api/mails Authorization: Bearer {jwt}
- 获取邮件详情: GET /api/mail/:mail_id Authorization: Bearer {jwt}
"""
import random
import string
import time
from datetime import datetime
from typing import Optional
import requests
from core.mail_utils import extract_verification_code
from core.proxy_utils import request_with_proxy_fallback
class CloudflareMailClient:
"""Cloudflare Temp Email 临时邮箱客户端"""
def __init__(
self,
base_url: str = "",
proxy: str = "",
api_key: str = "",
domain: str = "",
verify_ssl: bool = True,
log_callback=None,
) -> None:
self.base_url = (base_url or "").rstrip("/")
self.proxy_url = (proxy or "").strip()
self.api_key = (api_key or "").strip() # x-custom-auth 密码
self.domain = (domain or "").strip()
self.verify_ssl = verify_ssl
self.log_callback = log_callback
self.email: Optional[str] = None
self.password: Optional[str] = None # 兼容接口,存储 JWT token
self.jwt_token: Optional[str] = None # 创建地址时返回的 JWT
self._available_domains: list = []
# ------------------------------------------------------------------
# 内部工具
# ------------------------------------------------------------------
def _log(self, level: str, message: str) -> None:
if self.log_callback:
try:
self.log_callback(level, message)
except Exception:
pass
def _request(self, method: str, url: str, **kwargs) -> requests.Response:
headers = kwargs.pop("headers", None) or {}
# 实例密码认证(admin 路由使用 x-admin-auth)
if self.api_key and "x-admin-auth" not in {k.lower() for k in headers}:
headers["x-admin-auth"] = self.api_key
# 邮件操作时使用 JWT Bearer 认证
if self.jwt_token and "authorization" not in {k.lower() for k in headers}:
headers["Authorization"] = f"Bearer {self.jwt_token}"
kwargs["headers"] = headers
self._log("info", f"📤 发送 {method} 请求: {url}")
if "json" in kwargs and kwargs["json"] is not None:
self._log("info", f"📦 请求体: {kwargs['json']}")
proxies = {"http": self.proxy_url, "https": self.proxy_url} if self.proxy_url else None
try:
res = request_with_proxy_fallback(
requests.request,
method,
url,
proxies=proxies,
verify=self.verify_ssl,
timeout=kwargs.pop("timeout", 30),
**kwargs,
)
self._log("info", f"📥 收到响应: HTTP {res.status_code}")
if res.content and res.status_code >= 400:
try:
self._log("error", f"📄 响应内容: {res.text[:500]}")
except Exception:
pass
return res
except Exception as e:
self._log("error", f"❌ 网络请求失败: {e}")
raise
# ------------------------------------------------------------------
# 公开接口
# ------------------------------------------------------------------
def set_credentials(self, email: str, password: str = "") -> None:
"""设置凭据(兼容接口)。password 存储 JWT token。"""
self.email = email
self.password = password
if password:
self.jwt_token = password
def _get_available_domains(self) -> list:
"""GET /open_api/settings 获取可用域名列表"""
if self._available_domains:
return self._available_domains
try:
res = self._request("GET", f"{self.base_url}/open_api/settings")
if res.status_code == 200:
data = res.json() if res.content else {}
domains = data.get("domains", [])
if isinstance(domains, list) and domains:
self._available_domains = [str(d).strip() for d in domains if d]
self._log("info", f"🌐 CFMail 可用域名: {self._available_domains}")
return self._available_domains
except Exception as e:
self._log("error", f"❌ 获取可用域名失败: {e}")
return self._available_domains
def register_account(self, domain: Optional[str] = None) -> bool:
"""POST /api/new_address 创建新邮箱地址"""
if not self.base_url:
self._log("error", "❌ cfmail_base_url 未配置")
return False
# 确定域名
selected_domain = domain or self.domain
if not selected_domain:
available = self._get_available_domains()
if available:
selected_domain = random.choice(available)
# 生成随机用户名
rand = "".join(random.choices(string.ascii_lowercase + string.digits, k=10))
timestamp = str(int(time.time()))[-4:]
name = f"t{timestamp}{rand}"
payload = {"name": name}
if selected_domain:
payload["domain"] = selected_domain
self._log("info", f"📧 使用域名: {selected_domain}")
self._log("info", f"🎲 创建邮箱: {name}")
try:
res = self._request("POST", f"{self.base_url}/admin/new_address", json=payload)
if res.status_code in (200, 201):
data = res.json() if res.content else {}
address = data.get("address", "")
jwt = data.get("jwt", "")
if address:
self.email = address
self.jwt_token = jwt
self.password = jwt # 兼容接口
self._log("info", f"✅ CFMail 注册成功: {self.email}")
return True
self._log("error", f"❌ CFMail 注册失败: HTTP {res.status_code}")
return False
except Exception as e:
self._log("error", f"❌ CFMail 注册异常: {e}")
return False
def login(self) -> bool:
"""无需登录,直接返回 True"""
return True
@staticmethod
def _extract_body_from_raw(raw: str) -> str:
"""从原始邮件中提取正文(text/plain + text/html),跳过 header"""
if not raw:
return ""
import email as _email
try:
msg = _email.message_from_string(raw)
parts = []
if msg.is_multipart():
for part in msg.walk():
ct = part.get_content_type()
if ct in ("text/plain", "text/html"):
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or "utf-8"
parts.append(payload.decode(charset, errors="replace"))
else:
payload = msg.get_payload(decode=True)
if payload:
charset = msg.get_content_charset() or "utf-8"
parts.append(payload.decode(charset, errors="replace"))
return "".join(parts)
except Exception:
return ""
def fetch_verification_code(self, since_time: Optional[datetime] = None) -> Optional[str]:
"""GET /api/mails 获取邮件列表,再 GET /api/mail/:id 获取详情,提取验证码"""
if not self.jwt_token:
self._log("error", "❌ 缺少 JWT token,无法获取邮件")
return None
try:
self._log("info", "📬 正在拉取 CFMail 邮件列表...")
res = self._request("GET", f"{self.base_url}/api/mails", params={"limit": 20, "offset": 0})
if res.status_code != 200:
self._log("error", f"❌ 获取邮件列表失败: HTTP {res.status_code}")
return None
data = res.json() if res.content else {}
# 响应格式: {"results": [...], "total": N}
messages = data.get("results", [])
if not isinstance(messages, list):
messages = []
if not messages:
self._log("info", "📭 邮箱为空,暂无邮件")
return None
self._log("info", f"📨 收到 {len(messages)} 封邮件,开始检查验证码...")
# 按 id 降序(新邮件优先)
try:
messages = sorted(messages, key=lambda m: int(m.get("id") or 0), reverse=True)
except Exception:
pass
for idx, msg in enumerate(messages, 1):
msg_id = msg.get("id")
if not msg_id:
continue
# 时间过滤
if since_time:
raw_time = msg.get("created_at") or msg.get("createdAt")
if raw_time:
try:
if isinstance(raw_time, (int, float)):
ts = float(raw_time)
if ts > 1e12:
ts /= 1000.0
msg_time = datetime.fromtimestamp(ts)
else:
import re
raw_time = re.sub(r"(\.\d{6})\d+", r"\1", str(raw_time))
# cfmail 的 created_at 是 UTC 无时区标记,显式加 +00:00 再转本地时间
if not raw_time.endswith("Z") and "+" not in raw_time and raw_time.count("-") <= 2:
raw_time = raw_time + "+00:00"
msg_time = datetime.fromisoformat(raw_time.replace("Z", "+00:00")).astimezone().replace(tzinfo=None)
if msg_time < since_time:
continue
except Exception:
pass
# 列表响应已包含 raw 字段,直接解析正文提取验证码
raw_in_list = msg.get("raw") or ""
if raw_in_list:
body = self._extract_body_from_raw(raw_in_list)
code = extract_verification_code(body)
if code:
self._log("info", f"✅ 找到验证码: {code}")
return code
# 兜底:尝试从其他摘要字段提取
summary = (msg.get("subject") or "") + (msg.get("text") or "") + (msg.get("html") or "")
if summary:
code = extract_verification_code(summary)
if code:
self._log("info", f"✅ 找到验证码: {code}")
return code
# 获取邮件详情
self._log("info", f"🔍 正在读取邮件 {idx}/{len(messages)} 详情...")
detail_res = self._request("GET", f"{self.base_url}/api/mail/{msg_id}")
if detail_res.status_code != 200:
self._log("warning", f"⚠️ 读取邮件详情失败: HTTP {detail_res.status_code}")
continue
detail = detail_res.json() if detail_res.content else {}
content = self._extract_body_from_raw(detail.get("raw") or "")
if content:
code = extract_verification_code(content)
if code:
self._log("info", f"✅ 找到验证码: {code}")
return code
else:
self._log("info", f"❌ 邮件 {idx} 中未找到验证码")
self._log("warning", "⚠️ 所有邮件中均未找到验证码")
return None
except Exception as e:
self._log("error", f"❌ 获取验证码异常: {e}")
return None
def poll_for_code(
self,
timeout: int = 120,
interval: int = 4,
since_time: Optional[datetime] = None,
) -> Optional[str]:
"""轮询获取验证码"""
if not self.email:
return None
max_retries = max(1, timeout // interval)
self._log("info", f"⏱️ 开始轮询验证码 (超时 {timeout}秒, 间隔 {interval}秒, 最多 {max_retries} 次)")
for i in range(1, max_retries + 1):
self._log("info", f"🔄 第 {i}/{max_retries} 次轮询...")
code = self.fetch_verification_code(since_time=since_time)
if code:
self._log("info", f"🎉 验证码获取成功: {code}")
return code
if i < max_retries:
self._log("info", f"⏳ 等待 {interval} 秒后重试...")
time.sleep(interval)
self._log("error", f"⏰ 验证码获取超时 ({timeout}秒)")
return None