codex-console / src /services /tempmail.py
cjovs's picture
Deploy codex-console to HF Space
7482820 verified
"""
Tempmail.lol 邮箱服务实现
"""
import re
import time
import logging
from typing import Optional, Dict, Any, List
import json
from curl_cffi import requests as cffi_requests
from .base import BaseEmailService, EmailServiceError, EmailServiceType
from ..core.http_client import HTTPClient, RequestConfig
from ..config.constants import OTP_CODE_PATTERN
logger = logging.getLogger(__name__)
class TempmailService(BaseEmailService):
"""
Tempmail.lol 邮箱服务
基于 Tempmail.lol API v2
"""
def __init__(self, config: Dict[str, Any] = None, name: str = None):
"""
初始化 Tempmail 服务
Args:
config: 配置字典,支持以下键:
- base_url: API 基础地址 (默认: https://api.tempmail.lol/v2)
- timeout: 请求超时时间 (默认: 30)
- max_retries: 最大重试次数 (默认: 3)
- proxy_url: 代理 URL
name: 服务名称
"""
super().__init__(EmailServiceType.TEMPMAIL, name)
# 默认配置
default_config = {
"base_url": "https://api.tempmail.lol/v2",
"timeout": 30,
"max_retries": 3,
"proxy_url": None,
}
self.config = {**default_config, **(config or {})}
# 创建 HTTP 客户端
http_config = RequestConfig(
timeout=self.config["timeout"],
max_retries=self.config["max_retries"],
)
self.http_client = HTTPClient(
proxy_url=self.config.get("proxy_url"),
config=http_config
)
# 状态变量
self._email_cache: Dict[str, Dict[str, Any]] = {}
self._last_check_time: float = 0
def create_email(self, config: Dict[str, Any] = None) -> Dict[str, Any]:
"""
创建新的临时邮箱
Args:
config: 配置参数(Tempmail.lol 目前不支持自定义配置)
Returns:
包含邮箱信息的字典:
- email: 邮箱地址
- service_id: 邮箱 token
- token: 邮箱 token(同 service_id)
- created_at: 创建时间戳
"""
try:
# 发送创建请求
response = self.http_client.post(
f"{self.config['base_url']}/inbox/create",
headers={
"Accept": "application/json",
"Content-Type": "application/json",
},
json={}
)
if response.status_code not in (200, 201):
self.update_status(False, EmailServiceError(f"请求失败,状态码: {response.status_code}"))
raise EmailServiceError(f"Tempmail.lol 请求失败,状态码: {response.status_code}")
data = response.json()
email = str(data.get("address", "")).strip()
token = str(data.get("token", "")).strip()
if not email or not token:
self.update_status(False, EmailServiceError("返回数据不完整"))
raise EmailServiceError("Tempmail.lol 返回数据不完整")
# 缓存邮箱信息
email_info = {
"email": email,
"service_id": token,
"token": token,
"created_at": time.time(),
}
self._email_cache[email] = email_info
logger.info(f"Tempmail.lol 邮箱创建成功,新鲜热乎: {email}")
self.update_status(True)
return email_info
except Exception as e:
self.update_status(False, e)
if isinstance(e, EmailServiceError):
raise
raise EmailServiceError(f"创建 Tempmail.lol 邮箱失败: {e}")
def get_verification_code(
self,
email: str,
email_id: str = None,
timeout: int = 120,
pattern: str = OTP_CODE_PATTERN,
otp_sent_at: Optional[float] = None,
) -> Optional[str]:
"""
从 Tempmail.lol 获取验证码
Args:
email: 邮箱地址
email_id: 邮箱 token(如果不提供,从缓存中查找)
timeout: 超时时间(秒)
pattern: 验证码正则表达式
otp_sent_at: OTP 发送时间戳(Tempmail 服务暂不使用此参数)
Returns:
验证码字符串,如果超时或未找到返回 None
"""
token = email_id
if not token:
# 从缓存中查找 token
if email in self._email_cache:
token = self._email_cache[email].get("token")
else:
logger.warning(f"未找到邮箱 {email} 的 token,无法获取验证码")
return None
if not token:
logger.warning(f"邮箱 {email} 没有 token,无法获取验证码")
return None
logger.info(f"正在等邮箱 {email} 的验证码,邮差应该在路上了...")
start_time = time.time()
seen_ids = set()
while time.time() - start_time < timeout:
try:
# 获取邮件列表
response = self.http_client.get(
f"{self.config['base_url']}/inbox",
params={"token": token},
headers={"Accept": "application/json"}
)
if response.status_code != 200:
time.sleep(3)
continue
data = response.json()
# 检查 inbox 是否过期
if data is None or (isinstance(data, dict) and not data):
logger.warning(f"邮箱 {email} 已过期")
return None
email_list = data.get("emails", []) if isinstance(data, dict) else []
if not isinstance(email_list, list):
time.sleep(3)
continue
for msg in email_list:
if not isinstance(msg, dict):
continue
# 使用 date 作为唯一标识
msg_date = msg.get("date", 0)
if not msg_date or msg_date in seen_ids:
continue
seen_ids.add(msg_date)
sender = str(msg.get("from", "")).lower()
subject = str(msg.get("subject", ""))
body = str(msg.get("body", ""))
html = str(msg.get("html") or "")
content = "\n".join([sender, subject, body, html])
# 检查是否是 OpenAI 邮件
if "openai" not in sender and "openai" not in content.lower():
continue
# 提取验证码
match = re.search(pattern, content)
if match:
code = match.group(1)
logger.info(f"找到验证码了,六位嘉宾登场: {code}")
self.update_status(True)
return code
except Exception as e:
logger.debug(f"检查邮件时出错: {e}")
# 等待一段时间再检查
time.sleep(3)
logger.warning(f"等验证码等到超时了: {email}")
return None
def list_emails(self, **kwargs) -> List[Dict[str, Any]]:
"""
列出所有缓存的邮箱
Note:
Tempmail.lol API 不支持列出所有邮箱,这里返回缓存的邮箱
"""
return list(self._email_cache.values())
def delete_email(self, email_id: str) -> bool:
"""
删除邮箱
Note:
Tempmail.lol API 不支持删除邮箱,这里从缓存中移除
"""
# 从缓存中查找并移除
emails_to_delete = []
for email, info in self._email_cache.items():
if info.get("token") == email_id:
emails_to_delete.append(email)
for email in emails_to_delete:
del self._email_cache[email]
logger.info(f"从缓存中移除邮箱: {email}")
return len(emails_to_delete) > 0
def check_health(self) -> bool:
"""检查 Tempmail.lol 服务是否可用"""
try:
response = self.http_client.get(
f"{self.config['base_url']}/inbox/create",
timeout=10
)
# 即使返回错误状态码也认为服务可用(只要可以连接)
self.update_status(True)
return True
except Exception as e:
logger.warning(f"Tempmail.lol 健康检查失败: {e}")
self.update_status(False, e)
return False
def get_inbox(self, token: str) -> Optional[Dict[str, Any]]:
"""
获取邮箱收件箱内容
Args:
token: 邮箱 token
Returns:
收件箱数据
"""
try:
response = self.http_client.get(
f"{self.config['base_url']}/inbox",
params={"token": token},
headers={"Accept": "application/json"}
)
if response.status_code != 200:
return None
return response.json()
except Exception as e:
logger.error(f"获取收件箱失败: {e}")
return None
def wait_for_verification_code_with_callback(
self,
email: str,
token: str,
callback: callable = None,
timeout: int = 120
) -> Optional[str]:
"""
等待验证码并支持回调函数
Args:
email: 邮箱地址
token: 邮箱 token
callback: 回调函数,接收当前状态信息
timeout: 超时时间
Returns:
验证码或 None
"""
start_time = time.time()
seen_ids = set()
check_count = 0
while time.time() - start_time < timeout:
check_count += 1
if callback:
callback({
"status": "checking",
"email": email,
"check_count": check_count,
"elapsed_time": time.time() - start_time,
})
try:
data = self.get_inbox(token)
if not data:
time.sleep(3)
continue
# 检查 inbox 是否过期
if data is None or (isinstance(data, dict) and not data):
if callback:
callback({
"status": "expired",
"email": email,
"message": "邮箱已过期"
})
return None
email_list = data.get("emails", []) if isinstance(data, dict) else []
for msg in email_list:
msg_date = msg.get("date", 0)
if not msg_date or msg_date in seen_ids:
continue
seen_ids.add(msg_date)
sender = str(msg.get("from", "")).lower()
subject = str(msg.get("subject", ""))
body = str(msg.get("body", ""))
html = str(msg.get("html") or "")
content = "\n".join([sender, subject, body, html])
# 检查是否是 OpenAI 邮件
if "openai" not in sender and "openai" not in content.lower():
continue
# 提取验证码
match = re.search(OTP_CODE_PATTERN, content)
if match:
code = match.group(1)
if callback:
callback({
"status": "found",
"email": email,
"code": code,
"message": "找到验证码"
})
return code
if callback and check_count % 5 == 0:
callback({
"status": "waiting",
"email": email,
"check_count": check_count,
"message": f"已检查 {len(seen_ids)} 封邮件,等待验证码..."
})
except Exception as e:
logger.debug(f"检查邮件时出错: {e}")
if callback:
callback({
"status": "error",
"email": email,
"error": str(e),
"message": "检查邮件时出错"
})
time.sleep(3)
if callback:
callback({
"status": "timeout",
"email": email,
"message": "等待验证码超时"
})
return None