/** * pool.js - 账号池管理器 * * 管理 chataibot.pro 账号的生命周期: * 加载 → 验证 session → 分配 → 使用 → 归还 → 轮换 * 额度耗尽 → 自动注册新账号补充 */ import fs from 'fs'; import path from 'path'; import { post, sleep } from './http.js'; import { getUserInfo, getQuota } from './chat.js'; import { register } from './protocol.js'; import { generateAccountInfo } from './account.js'; import { createMailProvider, closeMail } from './mail.js'; import config from './config.js'; import { isDbEnabled, initDb, loadAccounts, saveNewAccount, updateCookies, markDead, closeDb } from './db.js'; const State = { ACTIVE: 'active', IN_USE: 'in_use', NEEDS_LOGIN: 'needs_login', EXHAUSTED: 'exhausted', DEAD: 'dead', }; export class AccountPool { constructor() { this.accounts = []; this._registeringCount = 0; this._maxConcurrentRegister = 10; this._timer = null; this._logs = []; this._maxLogs = 200; this._logId = 0; this._acquireLock = Promise.resolve(); // 互斥锁:防止并发 acquire 分配同一账号 } /** * 添加一条注册日志 */ _addLog(message, level = 'info') { this._logId++; const entry = { id: this._logId, time: new Date().toISOString(), message, level }; this._logs.push(entry); if (this._logs.length > this._maxLogs) { this._logs.splice(0, this._logs.length - this._maxLogs); } console.log(`[Pool] ${message}`); } /** * 获取日志 (支持增量: since 参数为上次拿到的最大 id) */ getLogs(since = 0) { return this._logs.filter(l => l.id > since); } /** * 初始化: 从数据库或文件目录加载已有账号 */ async init() { if (isDbEnabled()) { // PostgreSQL 模式 await initDb(); this._addLog('从 PostgreSQL 加载账号...'); const rows = await loadAccounts(); for (const row of rows) { const cookies = new Map(Object.entries(row.cookies || {})); this.accounts.push({ email: row.email, password: row.password, cookies, state: State.ACTIVE, remainingQuota: null, lastUsedAt: 0, lastCheckedAt: 0, errorCount: 0, filePath: null, }); } this._addLog(`加载 ${rows.length} 个账号`); } else { // 文件系统模式 const dir = path.resolve(config.outputDir); if (!fs.existsSync(dir)) { fs.mkdirSync(dir, { recursive: true }); } const files = fs.readdirSync(dir).filter(f => f.endsWith('.json')); this._addLog(`加载 ${files.length} 个账号文件...`); for (const file of files) { try { const filePath = path.join(dir, file); const data = JSON.parse(fs.readFileSync(filePath, 'utf8')); if (!data.email || !data.password) continue; const cookies = new Map(); if (data.cookies) { for (const [k, v] of Object.entries(data.cookies)) { cookies.set(k, v); } } this.accounts.push({ email: data.email, password: data.password, cookies, state: State.ACTIVE, remainingQuota: null, lastUsedAt: 0, lastCheckedAt: 0, errorCount: 0, filePath, }); } catch {} } } // 并行验证所有账号 session this._addLog(`验证 ${this.accounts.length} 个账号 session...`); const checks = this.accounts.map(async (acc) => { const user = await getUserInfo(acc.cookies); if (user) { acc.state = State.ACTIVE; acc.lastCheckedAt = Date.now(); const quota = await getQuota(acc.cookies); if (quota && typeof quota.remaining === 'number') { acc.remainingQuota = quota.remaining; } // quota 未知时不赋值,保持 null — release 时用 cost 会跳过扣减 // 但 probeStream 会兜底检测 "Insufficient credits" 流错误 } else { acc.state = State.NEEDS_LOGIN; } }); await Promise.allSettled(checks); const active = this.accounts.filter(a => a.state === State.ACTIVE).length; const needsLogin = this.accounts.filter(a => a.state === State.NEEDS_LOGIN).length; this._addLog(`就绪: ${active} 可用, ${needsLogin} 需重新登录`); // 并行刷新需要登录的账号 (限制并发,避免刷屏) if (needsLogin > 0) { this._addLog(`并行刷新 ${needsLogin} 个账号 session...`); const needLoginAccounts = this.accounts.filter(a => a.state === State.NEEDS_LOGIN); const BATCH = 10; for (let i = 0; i < needLoginAccounts.length; i += BATCH) { const batch = needLoginAccounts.slice(i, i + BATCH); await Promise.allSettled(batch.map(acc => this._refreshSession(acc))); } const recovered = needLoginAccounts.filter(a => a.state === State.ACTIVE).length; const dead = needLoginAccounts.filter(a => a.state === State.DEAD).length; this._addLog(`刷新结果: ${recovered} 恢复, ${dead} 失效`, recovered > 0 ? 'success' : 'warn'); } // 将 dead 账号文件移到 dead/ 子目录,避免下次还加载 this._archiveDeadAccounts(); // 启动后台任务 this._startBackgroundTasks(); // 启动后主动补充池到 minAvailable const activeAfterInit = this.accounts.filter(a => a.state === State.ACTIVE).length; if (activeAfterInit < config.pool.minAvailable && config.pool.autoRegister) { const needed = config.pool.minAvailable - activeAfterInit; this._addLog(`可用账号不足 (${activeAfterInit}/${config.pool.minAvailable}),自动注册 ${needed} 个...`); await this._registerBatch(needed); } } /** * 获取一个可用账号 (互斥 + Round-Robin + 额度感知) */ acquire() { // 串行化: 每次 acquire 必须等前一个完成,防止并发分配同一账号 const prev = this._acquireLock; let unlock; this._acquireLock = new Promise(r => { unlock = r; }); return prev.then(() => this._doAcquire()).finally(unlock); } async _doAcquire() { // 先清理卡死的 IN_USE 账号 (超过 5 分钟未释放) const stuckTimeout = 5 * 60 * 1000; for (const acc of this.accounts) { if (acc.state === State.IN_USE && Date.now() - acc.lastUsedAt > stuckTimeout) { acc.state = State.ACTIVE; this._addLog(`回收卡死账号: ${acc.email}`, 'warn'); } } // 优先选有额度的 (quota > 0 或 quota 未知) let candidates = this.accounts.filter(a => a.state === State.ACTIVE && (a.remainingQuota === null || a.remainingQuota > 0) ); // 放宽: 所有 ACTIVE 的 (quota 可能未刷新) if (candidates.length === 0) { candidates = this.accounts.filter(a => a.state === State.ACTIVE); } if (candidates.length === 0) { // 尝试刷新 NEEDS_LOGIN 的 for (const acc of this.accounts.filter(a => a.state === State.NEEDS_LOGIN)) { const ok = await this._refreshSession(acc); if (ok) { candidates = [acc]; break; } } } // 尝试恢复 EXHAUSTED 账号 (额度可能已刷新) if (candidates.length === 0) { const exhausted = this.accounts.filter(a => a.state === State.EXHAUSTED); for (const acc of exhausted) { const quota = await getQuota(acc.cookies); if (quota && typeof quota.remaining === 'number' && quota.remaining > 0) { acc.remainingQuota = quota.remaining; acc.state = State.ACTIVE; acc.lastCheckedAt = Date.now(); this._addLog(`额度恢复: ${acc.email} (${quota.remaining})`, 'success'); candidates = [acc]; break; } } } if (candidates.length === 0 && config.pool.autoRegister) { // 自动注册新账号 const acc = await this._registerNew(); if (acc) candidates = [acc]; } if (candidates.length === 0) { throw new Error('No available account in pool'); } // 负载均衡: Round-Robin + 额度加权 // 1. quota=0 或 EXHAUSTED 的排最后 // 2. 额度未知 (null) 的正常参与轮询 // 3. 在 lastUsedAt 相近的账号中,优先选剩余额度多的 candidates.sort((a, b) => { const aLow = typeof a.remainingQuota === 'number' && a.remainingQuota <= 0; const bLow = typeof b.remainingQuota === 'number' && b.remainingQuota <= 0; if (aLow !== bLow) return aLow ? 1 : -1; // 时间窗口分组: 同一秒内视为"同时",比较额度 const timeDiff = Math.floor(a.lastUsedAt / 1000) - Math.floor(b.lastUsedAt / 1000); if (timeDiff !== 0) return timeDiff; // 同一时间窗口内,额度多的优先 (null 视为中等额度 50) const aQuota = typeof a.remainingQuota === 'number' ? a.remainingQuota : 50; const bQuota = typeof b.remainingQuota === 'number' ? b.remainingQuota : 50; return bQuota - aQuota; // 降序,额度多的排前面 }); const account = candidates[0]; account.state = State.IN_USE; account.lastUsedAt = Date.now(); return account; } /** * 归还账号 * @param {object} opts * @param {number} [opts.cost] - 本次请求消耗的 requests 数 (由 adapter 传入) */ release(account, { success = true, quotaExhausted = false, sessionExpired = false, cost = 0 } = {}) { if (sessionExpired) { account.state = State.NEEDS_LOGIN; account.errorCount++; } else if (quotaExhausted) { account.state = State.EXHAUSTED; account.remainingQuota = 0; } else if (success) { account.state = State.ACTIVE; account.errorCount = 0; // 本地扣减额度 (仅当有已知的 remaining 时) if (cost > 0 && typeof account.remainingQuota === 'number') { account.remainingQuota = Math.max(0, account.remainingQuota - cost); if (account.remainingQuota <= 0) { account.state = State.EXHAUSTED; this._addLog(`额度耗尽 (本地扣减): ${account.email} (0 remaining)`, 'warn'); } } } else { account.errorCount++; account.state = account.errorCount >= 5 ? State.DEAD : State.ACTIVE; } // 补充池 const active = this.accounts.filter(a => a.state === State.ACTIVE).length; if (active < config.pool.minAvailable && config.pool.autoRegister) { const needed = config.pool.minAvailable - active; this._registerBatch(needed).catch(() => {}); } } /** * 获取池状态 */ getStatus() { const counts = {}; for (const s of Object.values(State)) counts[s] = 0; for (const a of this.accounts) counts[a.state]++; return { total: this.accounts.length, ...counts, accounts: this.accounts.map(a => ({ email: a.email, state: a.state, remaining: a.remainingQuota, lastUsed: a.lastUsedAt ? new Date(a.lastUsedAt).toISOString() : null, })), }; } /** * 刷新账号 session (重新登录) */ async _refreshSession(account) { try { const resp = await post(`${config.siteBase}/api/login`, { email: account.email, password: account.password, }, { headers: { 'Origin': config.siteBase, 'Referer': `${config.siteBase}/app/auth/sign-in`, 'Accept-Language': 'en', }, }); if (resp.ok) { account.cookies = resp.cookies; account.state = State.ACTIVE; account.errorCount = 0; this._saveCookies(account); return true; } account.errorCount++; if (resp.status === 401) { account.state = State.DEAD; } else if (resp.status >= 500) { account.state = State.NEEDS_LOGIN; } else { account.state = account.errorCount >= 3 ? State.DEAD : State.NEEDS_LOGIN; } return false; } catch (e) { account.errorCount++; account.state = account.errorCount >= 5 ? State.DEAD : State.NEEDS_LOGIN; return false; } } /** * 批量并行注册多个账号 * @param {number} count * @param {string} [provider] - 可选邮箱 provider 覆盖 */ async _registerBatch(count, provider) { const tasks = []; for (let i = 0; i < count; i++) { tasks.push(this._registerNew(provider)); // 每个注册之间间隔一小段避免邮箱服务压力 if (i < count - 1) await sleep(500); } const results = await Promise.allSettled(tasks); const success = results.filter(r => r.status === 'fulfilled' && r.value).length; this._addLog(`批量注册完成: ${success}/${count} 成功`, success > 0 ? 'success' : 'error'); return success; } /** * 手动触发注册 (供 Dashboard 调用) * @param {number} count * @param {string} [provider] - 可选邮箱 provider 覆盖 * @param {number} [concurrency] - 可选并发数覆盖 * @returns {{ registering: number, queued: number }} */ manualRegister(count = 5, provider, concurrency) { count = Math.min(Math.max(1, count), 50); if (concurrency) this._maxConcurrentRegister = Math.min(Math.max(1, concurrency), 20); const available = this._maxConcurrentRegister - this._registeringCount; const actual = Math.min(count, available); if (actual <= 0) { return { registering: this._registeringCount, queued: 0 }; } this._registerBatch(actual, provider).catch(() => {}); return { registering: this._registeringCount + actual, queued: actual }; } /** * 自动注册新账号 (支持并行) * @param {string} [overrideProvider] - 可选邮箱 provider 覆盖 */ async _registerNew(overrideProvider) { if (this._registeringCount >= this._maxConcurrentRegister) return null; this._registeringCount++; try { this._addLog('开始注册新账号...'); // 优先用 overrideProvider,否则用配置的 provider,如果不可用则降级到 mailtm let providerName = overrideProvider || (config.mailProvider === 'manual' ? 'mailtm' : config.mailProvider); let providerOpts = config[providerName] || {}; // 检查 provider 是否配置完整 if (providerName === 'moemail' && !providerOpts.apiUrl) providerName = 'mailtm'; if (providerName === 'duckmail' && !providerOpts.apiKey) providerName = 'mailtm'; if (providerName === 'catchall' && !providerOpts.domain) providerName = 'mailtm'; if (providerName === 'custom' && !providerOpts.createUrl) providerName = 'mailtm'; providerOpts = config[providerName] || {}; const mailProvider = createMailProvider(providerName, providerOpts); // 重试逻辑 let lastError; for (let attempt = 1; attempt <= 2; attempt++) { try { this._addLog(`[${attempt}/2] 创建临时邮箱 (${providerName})...`); await mailProvider.createInbox(); this._addLog(`[${attempt}/2] 邮箱就绪: ${mailProvider.address}`); const accountInfo = generateAccountInfo(); accountInfo.email = mailProvider.address; this._addLog(`[${attempt}/2] 提交注册请求...`); const result = await register(accountInfo, mailProvider); await mailProvider.cleanup(); if (result.success) { const cookies = result.cookies || new Map(); const entry = { email: accountInfo.email, password: accountInfo.password, cookies, state: State.ACTIVE, remainingQuota: null, lastUsedAt: 0, lastCheckedAt: Date.now(), errorCount: 0, filePath: isDbEnabled() ? null : path.resolve(config.outputDir, `chataibot-${accountInfo.email.replace('@', '_at_')}.json`), }; // 持久化 if (isDbEnabled()) { await saveNewAccount(entry.email, entry.password, Object.fromEntries(cookies)); } else { const saveData = { site: 'chataibot.pro', email: entry.email, password: entry.password, cookies: Object.fromEntries(cookies), registeredAt: new Date().toISOString(), }; fs.writeFileSync(entry.filePath, JSON.stringify(saveData, null, 2)); } this.accounts.push(entry); this._addLog(`新账号就绪: ${entry.email}`, 'success'); return entry; } lastError = result.error; this._addLog(`[${attempt}/2] 注册失败: ${result.error}`, 'error'); } catch (e) { lastError = e.message; this._addLog(`[${attempt}/2] 注册出错: ${e.message}`, 'error'); } if (attempt < 2) { await sleep(3000); // 重新创建邮箱 provider try { await mailProvider.cleanup(); } catch {} } } this._addLog(`注册最终失败: ${lastError}`, 'error'); return null; } finally { this._registeringCount--; } } /** * 将 dead 账号归档 (DB: 标记 state=dead / 文件: 移到 dead/ 子目录) */ _archiveDeadAccounts() { const deadAccounts = this.accounts.filter(a => a.state === State.DEAD); if (deadAccounts.length === 0) return; if (isDbEnabled()) { for (const acc of deadAccounts) { markDead(acc.email).catch(() => {}); this._addLog(`归档 dead 账号: ${acc.email}`); } } else { const deadDir = path.resolve(config.outputDir, 'dead'); if (!fs.existsSync(deadDir)) fs.mkdirSync(deadDir, { recursive: true }); for (const acc of deadAccounts) { if (!acc.filePath || !fs.existsSync(acc.filePath)) continue; try { const dest = path.join(deadDir, path.basename(acc.filePath)); fs.renameSync(acc.filePath, dest); this._addLog(`归档 dead 账号: ${acc.email}`); } catch {} } } // 从池中移除 dead 账号 this.accounts = this.accounts.filter(a => a.state !== State.DEAD); } /** * 保存更新后的 cookies (DB: UPDATE / 文件: 读写 JSON) */ _saveCookies(account) { if (isDbEnabled()) { updateCookies(account.email, Object.fromEntries(account.cookies)).catch(() => {}); return; } if (!account.filePath) return; try { const data = JSON.parse(fs.readFileSync(account.filePath, 'utf8')); data.cookies = Object.fromEntries(account.cookies); fs.writeFileSync(account.filePath, JSON.stringify(data, null, 2)); } catch {} } /** * 后台定时任务 */ _startBackgroundTasks() { const interval = config.pool.checkInterval || 300000; this._timer = setInterval(async () => { // 检查额度 for (const acc of this.accounts.filter(a => a.state === State.ACTIVE || a.state === State.EXHAUSTED)) { if (Date.now() - acc.lastCheckedAt < interval) continue; const quota = await getQuota(acc.cookies); if (quota && typeof quota.remaining === 'number') { acc.remainingQuota = quota.remaining; acc.lastCheckedAt = Date.now(); if (acc.state === State.EXHAUSTED && quota.remaining > 0) { acc.state = State.ACTIVE; this._addLog(`额度恢复: ${acc.email} (${quota.remaining})`, 'success'); } } } // 并行刷新需要登录的 const needLogin = this.accounts.filter(a => a.state === State.NEEDS_LOGIN); if (needLogin.length > 0) { await Promise.allSettled(needLogin.map(acc => this._refreshSession(acc))); } // 归档 DEAD 账号 this._archiveDeadAccounts(); // 补充池 const active = this.accounts.filter(a => a.state === State.ACTIVE).length; if (active < config.pool.minAvailable && config.pool.autoRegister) { const needed = config.pool.minAvailable - active; await this._registerBatch(needed); } }, interval); // 不阻止进程退出 if (this._timer.unref) this._timer.unref(); } destroy() { if (this._timer) clearInterval(this._timer); if (isDbEnabled()) closeDb().catch(() => {}); } }