gemini-business2api-github / core /moemail_client.py
lijunke
chore: remove hajimi moemail defaults for HF compliance
a55f7df
"""
Moemail临时邮箱客户端
API文档参考:
- 获取系统配置: GET /api/config
- 生成临时邮箱: POST /api/emails/generate
- 获取邮件列表: GET /api/emails/{emailId}
- 获取单封邮件: GET /api/emails/{emailId}/{messageId}
"""
import random
import string
import time
from typing import Optional
import requests
from core.mail_utils import extract_verification_code
from core.proxy_utils import request_with_proxy_fallback
class MoemailClient:
"""Moemail临时邮箱客户端"""
def __init__(
self,
base_url: str = "https://moemail.app",
proxy: str = "",
api_key: str = "",
domain: str = "",
log_callback=None,
) -> None:
self.base_url = base_url.rstrip("/")
self.proxies = {"http": proxy, "https": proxy} if proxy else None
self.api_key = api_key.strip()
self.domain = domain.strip() if domain else ""
self.log_callback = log_callback
self.email: Optional[str] = None
self.email_id: Optional[str] = None
self.password: Optional[str] = None # 兼容 DuckMailClient 接口
# 缓存可用域名列表
self._available_domains: list = []
def set_credentials(self, email: str, password: str = "") -> None:
"""设置凭据(兼容 DuckMailClient 接口)"""
self.email = email
self.password = password
def _request(self, method: str, url: str, **kwargs) -> requests.Response:
"""发送请求并打印详细日志"""
headers = kwargs.pop("headers", None) or {}
if self.api_key and "X-API-Key" not in headers:
headers["X-API-Key"] = self.api_key
headers.setdefault("Content-Type", "application/json")
kwargs["headers"] = headers
self._log("info", f"📤 发送 {method} 请求: {url}")
if "json" in kwargs:
self._log("info", f"📦 请求体: {kwargs['json']}")
try:
res = request_with_proxy_fallback(
requests.request,
method,
url,
proxies=self.proxies,
timeout=kwargs.pop("timeout", 30),
**kwargs,
)
self._log("info", f"📥 收到响应: HTTP {res.status_code}")
if res.content and res.status_code >= 400:
try:
self._log("error", f"📄 响应内容: {res.text[:500]}")
except Exception:
pass
return res
except Exception as e:
self._log("error", f"❌ 网络请求失败: {e}")
raise
def _get_available_domains(self) -> list:
"""获取可用的邮箱域名列表"""
if self._available_domains:
return self._available_domains
try:
res = self._request("GET", f"{self.base_url}/api/config")
if res.status_code == 200:
data = res.json()
email_domains_str = data.get("emailDomains", "")
if email_domains_str:
self._available_domains = [d.strip() for d in email_domains_str.split(",") if d.strip()]
self._log("info", f"🌐 Moemail 可用域名: {self._available_domains}")
return self._available_domains
except Exception as e:
self._log("error", f"❌ 获取可用域名失败: {e}")
# 默认域名
self._available_domains = ["moemail.app"]
return self._available_domains
def register_account(self, domain: Optional[str] = None) -> bool:
"""注册新邮箱账号
API: POST /api/emails/generate
"""
# 确定使用的域名
selected_domain = domain
if not selected_domain:
selected_domain = self.domain
if not selected_domain:
# 从可用域名中随机选择
available = self._get_available_domains()
if available:
selected_domain = random.choice(available)
else:
selected_domain = "moemail.app"
self._log("info", f"📧 使用域名: {selected_domain}")
# 生成随机邮箱名称
rand = "".join(random.choices(string.ascii_lowercase + string.digits, k=10))
timestamp = str(int(time.time()))[-4:]
name = f"t{timestamp}{rand}"
self._log("info", f"🎲 生成邮箱: {name}@{selected_domain}")
try:
# 设置为 0 表示永久有效
self._log("info", f"⏰ 设置过期时间: 永久有效")
res = self._request(
"POST",
f"{self.base_url}/api/emails/generate",
json={
"name": name,
"expiryTime": 0,
"domain": selected_domain,
},
)
if res.status_code in (200, 201):
data = res.json() if res.content else {}
self.email = data.get("email", "")
self.email_id = data.get("id", "")
self.password = self.email_id # 用 email_id 作为 password 存储
if self.email and self.email_id:
self._log("info", f"✅ Moemail 注册成功: {self.email}")
self._log("info", f"🔑 Email ID: {self.email_id}")
return True
self._log("error", f"❌ Moemail 注册失败: HTTP {res.status_code}")
if res.content:
self._log("error", f"📄 响应内容: {res.text[:500]}")
return False
except Exception as e:
self._log("error", f"❌ Moemail 注册异常: {e}")
return False
def login(self) -> bool:
"""登录(Moemail 无需登录,返回 True)"""
# Moemail 使用 API Key 认证,无需单独登录
return True
def fetch_verification_code(self, since_time=None) -> Optional[str]:
"""获取验证码
API: GET /api/emails/{emailId}
API: GET /api/emails/{emailId}/{messageId}
"""
if not self.email_id:
self._log("error", "❌ 缺少 email_id,无法获取邮件")
return None
try:
self._log("info", "📬 正在拉取 Moemail 邮件列表...")
# 获取邮件列表
res = self._request(
"GET",
f"{self.base_url}/api/emails/{self.email_id}",
)
if res.status_code != 200:
self._log("error", f"❌ 获取邮件列表失败: HTTP {res.status_code}")
return None
data = res.json() if res.content else {}
messages = data.get("messages", [])
if not messages:
self._log("info", "📭 邮箱为空,暂无邮件")
return None
self._log("info", f"📨 收到 {len(messages)} 封邮件,开始检查验证码...")
from datetime import datetime
def _parse_message_time(msg_obj) -> Optional[datetime]:
import re
time_keys = [
"createdAt",
"receivedAt",
"sentAt",
"created_at",
"received_at",
"sent_at",
]
raw_time = None
for key in time_keys:
if msg_obj.get(key) is not None:
raw_time = msg_obj.get(key)
break
if raw_time is None:
return None
if isinstance(raw_time, (int, float)):
timestamp = float(raw_time)
if timestamp > 1e12:
timestamp = timestamp / 1000.0
return datetime.fromtimestamp(timestamp)
if isinstance(raw_time, str):
raw_time = raw_time.strip()
if raw_time.isdigit():
timestamp = float(raw_time)
if timestamp > 1e12:
timestamp = timestamp / 1000.0
return datetime.fromtimestamp(timestamp)
# 处理 ISO 时间字符串
try:
# 截断纳秒到微秒
raw_time = re.sub(r"(\.\d{6})\d+", r"\1", raw_time)
return datetime.fromisoformat(raw_time.replace("Z", "+00:00")).astimezone().replace(tzinfo=None)
except Exception:
return None
return None
def _looks_like_verification(msg_obj) -> bool:
subject = (msg_obj.get("subject") or "").strip()
if not subject:
return False
import re
return re.search(r"(验证码|验证|verification|verify|passcode|security\s*code|one[-\s]?time|otp)", subject, re.IGNORECASE) is not None
messages_with_time = [(msg, _parse_message_time(msg)) for msg in messages]
if any(item[1] for item in messages_with_time):
messages_with_time.sort(key=lambda item: item[1] or datetime.min, reverse=True)
messages = [item[0] for item in messages_with_time]
# 遍历邮件
for idx, msg in enumerate(messages, 1):
msg_id = msg.get("id")
if not msg_id:
continue
# 时间过滤
if since_time:
msg_time = _parse_message_time(msg)
if msg_time:
if msg_time < since_time:
continue
if not _looks_like_verification(msg):
continue
# 优先从邮件列表的 content 字段提取验证码(更高效)
list_content = msg.get("content") or ""
if list_content:
code = extract_verification_code(list_content)
if code:
self._log("info", f"✅ 找到验证码: {code}")
return code
# 如果列表没有 content,则获取邮件详情
self._log("info", f"🔍 正在读取邮件 {idx}/{len(messages)} 详情...")
detail_res = self._request(
"GET",
f"{self.base_url}/api/emails/{self.email_id}/{msg_id}",
)
if detail_res.status_code != 200:
self._log("warning", f"⚠️ 读取邮件详情失败: HTTP {detail_res.status_code}")
continue
detail = detail_res.json() if detail_res.content else {}
# 处理 {'message': {...}} 格式
if "message" in detail and isinstance(detail["message"], dict):
detail = detail["message"]
# 获取邮件内容
text_content = detail.get("text") or detail.get("textContent") or detail.get("content") or ""
html_content = detail.get("html") or detail.get("htmlContent") or ""
if isinstance(html_content, list):
html_content = "".join(str(item) for item in html_content)
if isinstance(text_content, list):
text_content = "".join(str(item) for item in text_content)
content = text_content + html_content
if content:
code = extract_verification_code(content)
if code:
self._log("info", f"✅ 找到验证码: {code}")
return code
else:
self._log("info", f"❌ 邮件 {idx} 中未找到验证码")
self._log("warning", "⚠️ 所有邮件中均未找到验证码")
return None
except Exception as e:
self._log("error", f"❌ 获取验证码异常: {e}")
return None
def poll_for_code(
self,
timeout: int = 120,
interval: int = 4,
since_time=None,
) -> Optional[str]:
"""轮询获取验证码"""
max_retries = max(1, timeout // interval)
self._log("info", f"⏱️ 开始轮询验证码 (超时 {timeout}秒, 间隔 {interval}秒, 最多 {max_retries} 次)")
for i in range(1, max_retries + 1):
self._log("info", f"🔄 第 {i}/{max_retries} 次轮询...")
code = self.fetch_verification_code(since_time=since_time)
if code:
self._log("info", f"🎉 验证码获取成功: {code}")
return code
if i < max_retries:
self._log("info", f"⏳ 等待 {interval} 秒后重试...")
time.sleep(interval)
self._log("error", f"⏰ 验证码获取超时 ({timeout}秒)")
return None
def _log(self, level: str, message: str) -> None:
if self.log_callback:
try:
self.log_callback(level, message)
except Exception:
pass