gemini-business2api-github / core /register_service.py
lijunke
deploy: clean start with hf metadata
18081cf
import asyncio
import logging
import os
import time
import uuid
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional
from core.account import load_accounts_from_source
from core.base_task_service import BaseTask, BaseTaskService, TaskCancelledError, TaskStatus
from core.config import config
from core.mail_providers import create_temp_mail_client
from core.gemini_automation import GeminiAutomation
from core.proxy_utils import parse_proxy_setting
logger = logging.getLogger("gemini.register")
@dataclass
class RegisterTask(BaseTask):
"""注册任务数据类"""
count: int = 0
domain: Optional[str] = None
mail_provider: Optional[str] = None
def to_dict(self) -> dict:
"""转换为字典"""
base_dict = super().to_dict()
base_dict["count"] = self.count
base_dict["domain"] = self.domain
base_dict["mail_provider"] = self.mail_provider
return base_dict
class RegisterService(BaseTaskService[RegisterTask]):
"""注册服务类"""
def __init__(
self,
multi_account_mgr,
http_client,
user_agent: str,
retry_policy,
session_cache_ttl_seconds: int,
global_stats_provider: Callable[[], dict],
set_multi_account_mgr: Optional[Callable[[Any], None]] = None,
) -> None:
super().__init__(
multi_account_mgr,
http_client,
user_agent,
retry_policy,
session_cache_ttl_seconds,
global_stats_provider,
set_multi_account_mgr,
log_prefix="REGISTER",
)
def _get_running_task(self) -> Optional[RegisterTask]:
"""获取正在运行或等待中的任务"""
for task in self._tasks.values():
if isinstance(task, RegisterTask) and task.status in (TaskStatus.PENDING, TaskStatus.RUNNING):
return task
return None
async def start_register(self, count: Optional[int] = None, domain: Optional[str] = None, mail_provider: Optional[str] = None) -> RegisterTask:
"""
启动注册任务 - 统一任务管理
- 如果有正在运行的任务,将新数量添加到该任务
- 如果没有正在运行的任务,创建新任务
"""
async with self._lock:
if os.environ.get("ACCOUNTS_CONFIG"):
raise ValueError("已设置 ACCOUNTS_CONFIG 环境变量,注册功能已禁用")
# 先确定使用哪个邮箱服务提供商
mail_provider_value = (mail_provider or "").strip().lower()
if not mail_provider_value:
mail_provider_value = (config.basic.temp_mail_provider or "duckmail").lower()
# 再确定使用哪个域名(只有 DuckMail 使用 register_domain 配置)
domain_value = (domain or "").strip()
if not domain_value:
if mail_provider_value == "duckmail":
domain_value = (config.basic.register_domain or "").strip() or None
else:
domain_value = None
register_count = count or config.basic.register_default_count
register_count = max(1, int(register_count))
# 检查是否有正在运行的任务
running_task = self._get_running_task()
if running_task:
# 将新数量添加到现有任务
running_task.count += register_count
self._append_log(
running_task,
"info",
f"📝 添加 {register_count} 个账户到现有任务 (总计: {running_task.count})"
)
return running_task
# 创建新任务
task = RegisterTask(id=str(uuid.uuid4()), count=register_count, domain=domain_value, mail_provider=mail_provider_value)
self._tasks[task.id] = task
self._append_log(task, "info", f"📝 创建注册任务 (数量: {register_count}, 域名: {domain_value or 'default'}, 提供商: {mail_provider_value})")
# 直接启动任务
self._current_task_id = task.id
asyncio.create_task(self._run_task_directly(task))
return task
async def _run_task_directly(self, task: RegisterTask) -> None:
"""直接执行任务"""
try:
await self._run_one_task(task)
finally:
# 任务完成后清理
async with self._lock:
if self._current_task_id == task.id:
self._current_task_id = None
def _execute_task(self, task: RegisterTask):
return self._run_register_async(task, task.domain, task.mail_provider)
async def _run_register_async(self, task: RegisterTask, domain: Optional[str], mail_provider: Optional[str]) -> None:
"""异步执行注册任务(支持取消)。"""
loop = asyncio.get_running_loop()
self._append_log(task, "info", f"🚀 注册任务已启动 (共 {task.count} 个账号)")
for idx in range(task.count):
if task.cancel_requested:
self._append_log(task, "warning", f"register task cancelled: {task.cancel_reason or 'cancelled'}")
task.status = TaskStatus.CANCELLED
task.finished_at = time.time()
return
try:
self._append_log(task, "info", f"📊 进度: {idx + 1}/{task.count}")
result = await loop.run_in_executor(self._executor, self._register_one, domain, mail_provider, task)
except TaskCancelledError:
task.status = TaskStatus.CANCELLED
task.finished_at = time.time()
return
except Exception as exc:
result = {"success": False, "error": str(exc)}
task.progress += 1
task.results.append(result)
if result.get("success"):
task.success_count += 1
email = result.get('email', '未知')
self._append_log(task, "info", "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
self._append_log(task, "info", f"✅ 注册成功: {email}")
self._append_log(task, "info", "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
else:
task.fail_count += 1
error = result.get('error', '未知错误')
self._append_log(task, "error", "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
self._append_log(task, "error", f"❌ 注册失败: {error}")
self._append_log(task, "error", "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
# 账号之间等待 10 秒,避免资源争抢和风控
if idx < task.count - 1 and not task.cancel_requested:
self._append_log(task, "info", "⏳ 等待 10 秒后处理下一个账号...")
await asyncio.sleep(10)
if task.cancel_requested:
task.status = TaskStatus.CANCELLED
else:
task.status = TaskStatus.SUCCESS if task.fail_count == 0 else TaskStatus.FAILED
task.finished_at = time.time()
self._current_task_id = None
self._append_log(task, "info", f"🏁 注册任务完成 (成功: {task.success_count}, 失败: {task.fail_count}, 总计: {task.count})")
def _register_one(self, domain: Optional[str], mail_provider: Optional[str], task: RegisterTask) -> dict:
"""注册单个账户"""
log_cb = lambda level, message: self._append_log(task, level, message)
log_cb("info", "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
log_cb("info", "🆕 开始注册新账户")
log_cb("info", "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
# 使用传递的邮件提供商参数,如果未提供则从配置读取
temp_mail_provider = (mail_provider or "").strip().lower()
if not temp_mail_provider:
temp_mail_provider = (config.basic.temp_mail_provider or "duckmail").lower()
log_cb("info", f"📧 步骤 1/3: 注册临时邮箱 (提供商={temp_mail_provider})...")
if temp_mail_provider == "freemail" and not config.basic.freemail_jwt_token:
log_cb("error", "❌ Freemail JWT Token 未配置")
return {"success": False, "error": "Freemail JWT Token 未配置"}
client = create_temp_mail_client(
temp_mail_provider,
domain=domain,
log_cb=log_cb,
)
if not client.register_account(domain=domain):
log_cb("error", f"❌ {temp_mail_provider} 邮箱注册失败")
return {"success": False, "error": f"{temp_mail_provider} 注册失败"}
log_cb("info", f"✅ 邮箱注册成功: {client.email}")
headless = config.basic.browser_headless
proxy_for_auth, _ = parse_proxy_setting(config.basic.proxy_for_auth)
log_cb("info", f"🌐 步骤 2/3: 启动浏览器 (无头模式={headless})...")
automation = GeminiAutomation(
user_agent=self.user_agent,
proxy=proxy_for_auth,
headless=headless,
log_callback=log_cb,
)
# 允许外部取消时立刻关闭浏览器
self._add_cancel_hook(task.id, lambda: getattr(automation, "stop", lambda: None)())
try:
log_cb("info", "🔐 步骤 3/3: 执行 Gemini 自动登录...")
result = automation.login_and_extract(client.email, client, is_new_account=True)
except Exception as exc:
log_cb("error", f"❌ 自动登录异常: {exc}")
return {"success": False, "error": str(exc)}
if not result.get("success"):
error = result.get("error", "自动化流程失败")
log_cb("error", f"❌ 自动登录失败: {error}")
return {"success": False, "error": error}
log_cb("info", "✅ Gemini 登录成功,正在保存配置...")
config_data = result["config"]
config_data["mail_provider"] = temp_mail_provider
config_data["mail_address"] = client.email
# 保存邮箱自定义配置
if temp_mail_provider == "freemail":
config_data["mail_password"] = ""
config_data["mail_base_url"] = config.basic.freemail_base_url
config_data["mail_jwt_token"] = config.basic.freemail_jwt_token
config_data["mail_verify_ssl"] = config.basic.freemail_verify_ssl
config_data["mail_domain"] = config.basic.freemail_domain
elif temp_mail_provider == "gptmail":
config_data["mail_password"] = ""
config_data["mail_base_url"] = config.basic.gptmail_base_url
config_data["mail_api_key"] = config.basic.gptmail_api_key
config_data["mail_verify_ssl"] = config.basic.gptmail_verify_ssl
config_data["mail_domain"] = config.basic.gptmail_domain
elif temp_mail_provider == "cfmail":
config_data["mail_password"] = getattr(client, "jwt_token", "") or getattr(client, "password", "")
config_data["mail_base_url"] = config.basic.cfmail_base_url
config_data["mail_api_key"] = config.basic.cfmail_api_key
config_data["mail_verify_ssl"] = config.basic.cfmail_verify_ssl
config_data["mail_domain"] = config.basic.cfmail_domain
elif temp_mail_provider == "moemail":
config_data["mail_password"] = getattr(client, "email_id", "") or getattr(client, "password", "")
config_data["mail_base_url"] = config.basic.moemail_base_url
config_data["mail_api_key"] = config.basic.moemail_api_key
config_data["mail_domain"] = config.basic.moemail_domain
elif temp_mail_provider == "duckmail":
config_data["mail_password"] = getattr(client, "password", "")
config_data["mail_base_url"] = config.basic.duckmail_base_url
config_data["mail_api_key"] = config.basic.duckmail_api_key
else:
config_data["mail_password"] = getattr(client, "password", "")
accounts_data = load_accounts_from_source()
updated = False
for acc in accounts_data:
if acc.get("id") == config_data["id"]:
acc.update(config_data)
updated = True
break
if not updated:
accounts_data.append(config_data)
self._apply_accounts_update(accounts_data)
log_cb("info", "✅ 配置已保存到数据库")
log_cb("info", "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
log_cb("info", f"🎉 账户注册完成: {client.email}")
log_cb("info", "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
return {"success": True, "email": client.email, "config": config_data}