Spaces:
Paused
Paused
| """ | |
| Tempmail.lol 邮箱服务实现 | |
| """ | |
| import re | |
| import time | |
| import logging | |
| from typing import Optional, Dict, Any, List | |
| import json | |
| from curl_cffi import requests as cffi_requests | |
| from .base import BaseEmailService, EmailServiceError, EmailServiceType | |
| from ..core.http_client import HTTPClient, RequestConfig | |
| from ..config.constants import OTP_CODE_PATTERN | |
| logger = logging.getLogger(__name__) | |
| class TempmailService(BaseEmailService): | |
| """ | |
| Tempmail.lol 邮箱服务 | |
| 基于 Tempmail.lol API v2 | |
| """ | |
| def __init__(self, config: Dict[str, Any] = None, name: str = None): | |
| """ | |
| 初始化 Tempmail 服务 | |
| Args: | |
| config: 配置字典,支持以下键: | |
| - base_url: API 基础地址 (默认: https://api.tempmail.lol/v2) | |
| - timeout: 请求超时时间 (默认: 30) | |
| - max_retries: 最大重试次数 (默认: 3) | |
| - proxy_url: 代理 URL | |
| name: 服务名称 | |
| """ | |
| super().__init__(EmailServiceType.TEMPMAIL, name) | |
| # 默认配置 | |
| default_config = { | |
| "base_url": "https://api.tempmail.lol/v2", | |
| "timeout": 30, | |
| "max_retries": 3, | |
| "proxy_url": None, | |
| } | |
| self.config = {**default_config, **(config or {})} | |
| # 创建 HTTP 客户端 | |
| http_config = RequestConfig( | |
| timeout=self.config["timeout"], | |
| max_retries=self.config["max_retries"], | |
| ) | |
| self.http_client = HTTPClient( | |
| proxy_url=self.config.get("proxy_url"), | |
| config=http_config | |
| ) | |
| # 状态变量 | |
| self._email_cache: Dict[str, Dict[str, Any]] = {} | |
| self._last_check_time: float = 0 | |
| def create_email(self, config: Dict[str, Any] = None) -> Dict[str, Any]: | |
| """ | |
| 创建新的临时邮箱 | |
| Args: | |
| config: 配置参数(Tempmail.lol 目前不支持自定义配置) | |
| Returns: | |
| 包含邮箱信息的字典: | |
| - email: 邮箱地址 | |
| - service_id: 邮箱 token | |
| - token: 邮箱 token(同 service_id) | |
| - created_at: 创建时间戳 | |
| """ | |
| try: | |
| # 发送创建请求 | |
| response = self.http_client.post( | |
| f"{self.config['base_url']}/inbox/create", | |
| headers={ | |
| "Accept": "application/json", | |
| "Content-Type": "application/json", | |
| }, | |
| json={} | |
| ) | |
| if response.status_code not in (200, 201): | |
| self.update_status(False, EmailServiceError(f"请求失败,状态码: {response.status_code}")) | |
| raise EmailServiceError(f"Tempmail.lol 请求失败,状态码: {response.status_code}") | |
| data = response.json() | |
| email = str(data.get("address", "")).strip() | |
| token = str(data.get("token", "")).strip() | |
| if not email or not token: | |
| self.update_status(False, EmailServiceError("返回数据不完整")) | |
| raise EmailServiceError("Tempmail.lol 返回数据不完整") | |
| # 缓存邮箱信息 | |
| email_info = { | |
| "email": email, | |
| "service_id": token, | |
| "token": token, | |
| "created_at": time.time(), | |
| } | |
| self._email_cache[email] = email_info | |
| logger.info(f"Tempmail.lol 邮箱创建成功,新鲜热乎: {email}") | |
| self.update_status(True) | |
| return email_info | |
| except Exception as e: | |
| self.update_status(False, e) | |
| if isinstance(e, EmailServiceError): | |
| raise | |
| raise EmailServiceError(f"创建 Tempmail.lol 邮箱失败: {e}") | |
| def get_verification_code( | |
| self, | |
| email: str, | |
| email_id: str = None, | |
| timeout: int = 120, | |
| pattern: str = OTP_CODE_PATTERN, | |
| otp_sent_at: Optional[float] = None, | |
| ) -> Optional[str]: | |
| """ | |
| 从 Tempmail.lol 获取验证码 | |
| Args: | |
| email: 邮箱地址 | |
| email_id: 邮箱 token(如果不提供,从缓存中查找) | |
| timeout: 超时时间(秒) | |
| pattern: 验证码正则表达式 | |
| otp_sent_at: OTP 发送时间戳(Tempmail 服务暂不使用此参数) | |
| Returns: | |
| 验证码字符串,如果超时或未找到返回 None | |
| """ | |
| token = email_id | |
| if not token: | |
| # 从缓存中查找 token | |
| if email in self._email_cache: | |
| token = self._email_cache[email].get("token") | |
| else: | |
| logger.warning(f"未找到邮箱 {email} 的 token,无法获取验证码") | |
| return None | |
| if not token: | |
| logger.warning(f"邮箱 {email} 没有 token,无法获取验证码") | |
| return None | |
| logger.info(f"正在等邮箱 {email} 的验证码,邮差应该在路上了...") | |
| start_time = time.time() | |
| seen_ids = set() | |
| while time.time() - start_time < timeout: | |
| try: | |
| # 获取邮件列表 | |
| response = self.http_client.get( | |
| f"{self.config['base_url']}/inbox", | |
| params={"token": token}, | |
| headers={"Accept": "application/json"} | |
| ) | |
| if response.status_code != 200: | |
| time.sleep(3) | |
| continue | |
| data = response.json() | |
| # 检查 inbox 是否过期 | |
| if data is None or (isinstance(data, dict) and not data): | |
| logger.warning(f"邮箱 {email} 已过期") | |
| return None | |
| email_list = data.get("emails", []) if isinstance(data, dict) else [] | |
| if not isinstance(email_list, list): | |
| time.sleep(3) | |
| continue | |
| for msg in email_list: | |
| if not isinstance(msg, dict): | |
| continue | |
| # 使用 date 作为唯一标识 | |
| msg_date = msg.get("date", 0) | |
| if not msg_date or msg_date in seen_ids: | |
| continue | |
| seen_ids.add(msg_date) | |
| sender = str(msg.get("from", "")).lower() | |
| subject = str(msg.get("subject", "")) | |
| body = str(msg.get("body", "")) | |
| html = str(msg.get("html") or "") | |
| content = "\n".join([sender, subject, body, html]) | |
| # 检查是否是 OpenAI 邮件 | |
| if "openai" not in sender and "openai" not in content.lower(): | |
| continue | |
| # 提取验证码 | |
| match = re.search(pattern, content) | |
| if match: | |
| code = match.group(1) | |
| logger.info(f"找到验证码了,六位嘉宾登场: {code}") | |
| self.update_status(True) | |
| return code | |
| except Exception as e: | |
| logger.debug(f"检查邮件时出错: {e}") | |
| # 等待一段时间再检查 | |
| time.sleep(3) | |
| logger.warning(f"等验证码等到超时了: {email}") | |
| return None | |
| def list_emails(self, **kwargs) -> List[Dict[str, Any]]: | |
| """ | |
| 列出所有缓存的邮箱 | |
| Note: | |
| Tempmail.lol API 不支持列出所有邮箱,这里返回缓存的邮箱 | |
| """ | |
| return list(self._email_cache.values()) | |
| def delete_email(self, email_id: str) -> bool: | |
| """ | |
| 删除邮箱 | |
| Note: | |
| Tempmail.lol API 不支持删除邮箱,这里从缓存中移除 | |
| """ | |
| # 从缓存中查找并移除 | |
| emails_to_delete = [] | |
| for email, info in self._email_cache.items(): | |
| if info.get("token") == email_id: | |
| emails_to_delete.append(email) | |
| for email in emails_to_delete: | |
| del self._email_cache[email] | |
| logger.info(f"从缓存中移除邮箱: {email}") | |
| return len(emails_to_delete) > 0 | |
| def check_health(self) -> bool: | |
| """检查 Tempmail.lol 服务是否可用""" | |
| try: | |
| response = self.http_client.get( | |
| f"{self.config['base_url']}/inbox/create", | |
| timeout=10 | |
| ) | |
| # 即使返回错误状态码也认为服务可用(只要可以连接) | |
| self.update_status(True) | |
| return True | |
| except Exception as e: | |
| logger.warning(f"Tempmail.lol 健康检查失败: {e}") | |
| self.update_status(False, e) | |
| return False | |
| def get_inbox(self, token: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| 获取邮箱收件箱内容 | |
| Args: | |
| token: 邮箱 token | |
| Returns: | |
| 收件箱数据 | |
| """ | |
| try: | |
| response = self.http_client.get( | |
| f"{self.config['base_url']}/inbox", | |
| params={"token": token}, | |
| headers={"Accept": "application/json"} | |
| ) | |
| if response.status_code != 200: | |
| return None | |
| return response.json() | |
| except Exception as e: | |
| logger.error(f"获取收件箱失败: {e}") | |
| return None | |
| def wait_for_verification_code_with_callback( | |
| self, | |
| email: str, | |
| token: str, | |
| callback: callable = None, | |
| timeout: int = 120 | |
| ) -> Optional[str]: | |
| """ | |
| 等待验证码并支持回调函数 | |
| Args: | |
| email: 邮箱地址 | |
| token: 邮箱 token | |
| callback: 回调函数,接收当前状态信息 | |
| timeout: 超时时间 | |
| Returns: | |
| 验证码或 None | |
| """ | |
| start_time = time.time() | |
| seen_ids = set() | |
| check_count = 0 | |
| while time.time() - start_time < timeout: | |
| check_count += 1 | |
| if callback: | |
| callback({ | |
| "status": "checking", | |
| "email": email, | |
| "check_count": check_count, | |
| "elapsed_time": time.time() - start_time, | |
| }) | |
| try: | |
| data = self.get_inbox(token) | |
| if not data: | |
| time.sleep(3) | |
| continue | |
| # 检查 inbox 是否过期 | |
| if data is None or (isinstance(data, dict) and not data): | |
| if callback: | |
| callback({ | |
| "status": "expired", | |
| "email": email, | |
| "message": "邮箱已过期" | |
| }) | |
| return None | |
| email_list = data.get("emails", []) if isinstance(data, dict) else [] | |
| for msg in email_list: | |
| msg_date = msg.get("date", 0) | |
| if not msg_date or msg_date in seen_ids: | |
| continue | |
| seen_ids.add(msg_date) | |
| sender = str(msg.get("from", "")).lower() | |
| subject = str(msg.get("subject", "")) | |
| body = str(msg.get("body", "")) | |
| html = str(msg.get("html") or "") | |
| content = "\n".join([sender, subject, body, html]) | |
| # 检查是否是 OpenAI 邮件 | |
| if "openai" not in sender and "openai" not in content.lower(): | |
| continue | |
| # 提取验证码 | |
| match = re.search(OTP_CODE_PATTERN, content) | |
| if match: | |
| code = match.group(1) | |
| if callback: | |
| callback({ | |
| "status": "found", | |
| "email": email, | |
| "code": code, | |
| "message": "找到验证码" | |
| }) | |
| return code | |
| if callback and check_count % 5 == 0: | |
| callback({ | |
| "status": "waiting", | |
| "email": email, | |
| "check_count": check_count, | |
| "message": f"已检查 {len(seen_ids)} 封邮件,等待验证码..." | |
| }) | |
| except Exception as e: | |
| logger.debug(f"检查邮件时出错: {e}") | |
| if callback: | |
| callback({ | |
| "status": "error", | |
| "email": email, | |
| "error": str(e), | |
| "message": "检查邮件时出错" | |
| }) | |
| time.sleep(3) | |
| if callback: | |
| callback({ | |
| "status": "timeout", | |
| "email": email, | |
| "message": "等待验证码超时" | |
| }) | |
| return None | |