ikun2 / pool.js
bingn's picture
Upload 19 files
f1357b6 verified
/**
* pool.js - 账号池管理器
*
* 管理 chataibot.pro 账号的生命周期:
* 加载 → 验证 session → 分配 → 使用 → 归还 → 轮换
* 额度耗尽 → 自动注册新账号补充
*/
import fs from 'fs';
import path from 'path';
import { post, sleep } from './http.js';
import { getUserInfo, getQuota } from './chat.js';
import { register } from './protocol.js';
import { generateAccountInfo } from './account.js';
import { createMailProvider, closeMail } from './mail.js';
import config from './config.js';
import { isDbEnabled, initDb, loadAccounts, saveNewAccount, updateCookies, markDead, closeDb } from './db.js';
const State = {
ACTIVE: 'active',
IN_USE: 'in_use',
NEEDS_LOGIN: 'needs_login',
EXHAUSTED: 'exhausted',
DEAD: 'dead',
};
export class AccountPool {
constructor() {
this.accounts = [];
this._registeringCount = 0;
this._maxConcurrentRegister = 10;
this._timer = null;
this._logs = [];
this._maxLogs = 200;
this._logId = 0;
this._acquireLock = Promise.resolve(); // 互斥锁:防止并发 acquire 分配同一账号
}
/**
* 添加一条注册日志
*/
_addLog(message, level = 'info') {
this._logId++;
const entry = { id: this._logId, time: new Date().toISOString(), message, level };
this._logs.push(entry);
if (this._logs.length > this._maxLogs) {
this._logs.splice(0, this._logs.length - this._maxLogs);
}
console.log(`[Pool] ${message}`);
}
/**
* 获取日志 (支持增量: since 参数为上次拿到的最大 id)
*/
getLogs(since = 0) {
return this._logs.filter(l => l.id > since);
}
/**
* 初始化: 从数据库或文件目录加载已有账号
*/
async init() {
if (isDbEnabled()) {
// PostgreSQL 模式
await initDb();
this._addLog('从 PostgreSQL 加载账号...');
const rows = await loadAccounts();
for (const row of rows) {
const cookies = new Map(Object.entries(row.cookies || {}));
this.accounts.push({
email: row.email,
password: row.password,
cookies,
state: State.ACTIVE,
remainingQuota: null,
lastUsedAt: 0,
lastCheckedAt: 0,
errorCount: 0,
filePath: null,
});
}
this._addLog(`加载 ${rows.length} 个账号`);
} else {
// 文件系统模式
const dir = path.resolve(config.outputDir);
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true });
}
const files = fs.readdirSync(dir).filter(f => f.endsWith('.json'));
this._addLog(`加载 ${files.length} 个账号文件...`);
for (const file of files) {
try {
const filePath = path.join(dir, file);
const data = JSON.parse(fs.readFileSync(filePath, 'utf8'));
if (!data.email || !data.password) continue;
const cookies = new Map();
if (data.cookies) {
for (const [k, v] of Object.entries(data.cookies)) {
cookies.set(k, v);
}
}
this.accounts.push({
email: data.email,
password: data.password,
cookies,
state: State.ACTIVE,
remainingQuota: null,
lastUsedAt: 0,
lastCheckedAt: 0,
errorCount: 0,
filePath,
});
} catch {}
}
}
// 并行验证所有账号 session
this._addLog(`验证 ${this.accounts.length} 个账号 session...`);
const checks = this.accounts.map(async (acc) => {
const user = await getUserInfo(acc.cookies);
if (user) {
acc.state = State.ACTIVE;
acc.lastCheckedAt = Date.now();
const quota = await getQuota(acc.cookies);
if (quota && typeof quota.remaining === 'number') {
acc.remainingQuota = quota.remaining;
}
// quota 未知时不赋值,保持 null — release 时用 cost 会跳过扣减
// 但 probeStream 会兜底检测 "Insufficient credits" 流错误
} else {
acc.state = State.NEEDS_LOGIN;
}
});
await Promise.allSettled(checks);
const active = this.accounts.filter(a => a.state === State.ACTIVE).length;
const needsLogin = this.accounts.filter(a => a.state === State.NEEDS_LOGIN).length;
this._addLog(`就绪: ${active} 可用, ${needsLogin} 需重新登录`);
// 并行刷新需要登录的账号 (限制并发,避免刷屏)
if (needsLogin > 0) {
this._addLog(`并行刷新 ${needsLogin} 个账号 session...`);
const needLoginAccounts = this.accounts.filter(a => a.state === State.NEEDS_LOGIN);
const BATCH = 10;
for (let i = 0; i < needLoginAccounts.length; i += BATCH) {
const batch = needLoginAccounts.slice(i, i + BATCH);
await Promise.allSettled(batch.map(acc => this._refreshSession(acc)));
}
const recovered = needLoginAccounts.filter(a => a.state === State.ACTIVE).length;
const dead = needLoginAccounts.filter(a => a.state === State.DEAD).length;
this._addLog(`刷新结果: ${recovered} 恢复, ${dead} 失效`, recovered > 0 ? 'success' : 'warn');
}
// 将 dead 账号文件移到 dead/ 子目录,避免下次还加载
this._archiveDeadAccounts();
// 启动后台任务
this._startBackgroundTasks();
// 启动后主动补充池到 minAvailable
const activeAfterInit = this.accounts.filter(a => a.state === State.ACTIVE).length;
if (activeAfterInit < config.pool.minAvailable && config.pool.autoRegister) {
const needed = config.pool.minAvailable - activeAfterInit;
this._addLog(`可用账号不足 (${activeAfterInit}/${config.pool.minAvailable}),自动注册 ${needed} 个...`);
await this._registerBatch(needed);
}
}
/**
* 获取一个可用账号 (互斥 + Round-Robin + 额度感知)
*/
acquire() {
// 串行化: 每次 acquire 必须等前一个完成,防止并发分配同一账号
const prev = this._acquireLock;
let unlock;
this._acquireLock = new Promise(r => { unlock = r; });
return prev.then(() => this._doAcquire()).finally(unlock);
}
async _doAcquire() {
// 先清理卡死的 IN_USE 账号 (超过 5 分钟未释放)
const stuckTimeout = 5 * 60 * 1000;
for (const acc of this.accounts) {
if (acc.state === State.IN_USE && Date.now() - acc.lastUsedAt > stuckTimeout) {
acc.state = State.ACTIVE;
this._addLog(`回收卡死账号: ${acc.email}`, 'warn');
}
}
// 优先选有额度的 (quota > 0 或 quota 未知)
let candidates = this.accounts.filter(a =>
a.state === State.ACTIVE && (a.remainingQuota === null || a.remainingQuota > 0)
);
// 放宽: 所有 ACTIVE 的 (quota 可能未刷新)
if (candidates.length === 0) {
candidates = this.accounts.filter(a => a.state === State.ACTIVE);
}
if (candidates.length === 0) {
// 尝试刷新 NEEDS_LOGIN 的
for (const acc of this.accounts.filter(a => a.state === State.NEEDS_LOGIN)) {
const ok = await this._refreshSession(acc);
if (ok) { candidates = [acc]; break; }
}
}
// 尝试恢复 EXHAUSTED 账号 (额度可能已刷新)
if (candidates.length === 0) {
const exhausted = this.accounts.filter(a => a.state === State.EXHAUSTED);
for (const acc of exhausted) {
const quota = await getQuota(acc.cookies);
if (quota && typeof quota.remaining === 'number' && quota.remaining > 0) {
acc.remainingQuota = quota.remaining;
acc.state = State.ACTIVE;
acc.lastCheckedAt = Date.now();
this._addLog(`额度恢复: ${acc.email} (${quota.remaining})`, 'success');
candidates = [acc];
break;
}
}
}
if (candidates.length === 0 && config.pool.autoRegister) {
// 自动注册新账号
const acc = await this._registerNew();
if (acc) candidates = [acc];
}
if (candidates.length === 0) {
throw new Error('No available account in pool');
}
// 负载均衡: Round-Robin + 额度加权
// 1. quota=0 或 EXHAUSTED 的排最后
// 2. 额度未知 (null) 的正常参与轮询
// 3. 在 lastUsedAt 相近的账号中,优先选剩余额度多的
candidates.sort((a, b) => {
const aLow = typeof a.remainingQuota === 'number' && a.remainingQuota <= 0;
const bLow = typeof b.remainingQuota === 'number' && b.remainingQuota <= 0;
if (aLow !== bLow) return aLow ? 1 : -1;
// 时间窗口分组: 同一秒内视为"同时",比较额度
const timeDiff = Math.floor(a.lastUsedAt / 1000) - Math.floor(b.lastUsedAt / 1000);
if (timeDiff !== 0) return timeDiff;
// 同一时间窗口内,额度多的优先 (null 视为中等额度 50)
const aQuota = typeof a.remainingQuota === 'number' ? a.remainingQuota : 50;
const bQuota = typeof b.remainingQuota === 'number' ? b.remainingQuota : 50;
return bQuota - aQuota; // 降序,额度多的排前面
});
const account = candidates[0];
account.state = State.IN_USE;
account.lastUsedAt = Date.now();
return account;
}
/**
* 归还账号
* @param {object} opts
* @param {number} [opts.cost] - 本次请求消耗的 requests 数 (由 adapter 传入)
*/
release(account, { success = true, quotaExhausted = false, sessionExpired = false, cost = 0 } = {}) {
if (sessionExpired) {
account.state = State.NEEDS_LOGIN;
account.errorCount++;
} else if (quotaExhausted) {
account.state = State.EXHAUSTED;
account.remainingQuota = 0;
} else if (success) {
account.state = State.ACTIVE;
account.errorCount = 0;
// 本地扣减额度 (仅当有已知的 remaining 时)
if (cost > 0 && typeof account.remainingQuota === 'number') {
account.remainingQuota = Math.max(0, account.remainingQuota - cost);
if (account.remainingQuota <= 0) {
account.state = State.EXHAUSTED;
this._addLog(`额度耗尽 (本地扣减): ${account.email} (0 remaining)`, 'warn');
}
}
} else {
account.errorCount++;
account.state = account.errorCount >= 5 ? State.DEAD : State.ACTIVE;
}
// 补充池
const active = this.accounts.filter(a => a.state === State.ACTIVE).length;
if (active < config.pool.minAvailable && config.pool.autoRegister) {
const needed = config.pool.minAvailable - active;
this._registerBatch(needed).catch(() => {});
}
}
/**
* 获取池状态
*/
getStatus() {
const counts = {};
for (const s of Object.values(State)) counts[s] = 0;
for (const a of this.accounts) counts[a.state]++;
return {
total: this.accounts.length,
...counts,
accounts: this.accounts.map(a => ({
email: a.email,
state: a.state,
remaining: a.remainingQuota,
lastUsed: a.lastUsedAt ? new Date(a.lastUsedAt).toISOString() : null,
})),
};
}
/**
* 刷新账号 session (重新登录)
*/
async _refreshSession(account) {
try {
const resp = await post(`${config.siteBase}/api/login`, {
email: account.email,
password: account.password,
}, {
headers: {
'Origin': config.siteBase,
'Referer': `${config.siteBase}/app/auth/sign-in`,
'Accept-Language': 'en',
},
});
if (resp.ok) {
account.cookies = resp.cookies;
account.state = State.ACTIVE;
account.errorCount = 0;
this._saveCookies(account);
return true;
}
account.errorCount++;
if (resp.status === 401) {
account.state = State.DEAD;
} else if (resp.status >= 500) {
account.state = State.NEEDS_LOGIN;
} else {
account.state = account.errorCount >= 3 ? State.DEAD : State.NEEDS_LOGIN;
}
return false;
} catch (e) {
account.errorCount++;
account.state = account.errorCount >= 5 ? State.DEAD : State.NEEDS_LOGIN;
return false;
}
}
/**
* 批量并行注册多个账号
* @param {number} count
* @param {string} [provider] - 可选邮箱 provider 覆盖
*/
async _registerBatch(count, provider) {
const tasks = [];
for (let i = 0; i < count; i++) {
tasks.push(this._registerNew(provider));
// 每个注册之间间隔一小段避免邮箱服务压力
if (i < count - 1) await sleep(500);
}
const results = await Promise.allSettled(tasks);
const success = results.filter(r => r.status === 'fulfilled' && r.value).length;
this._addLog(`批量注册完成: ${success}/${count} 成功`, success > 0 ? 'success' : 'error');
return success;
}
/**
* 手动触发注册 (供 Dashboard 调用)
* @param {number} count
* @param {string} [provider] - 可选邮箱 provider 覆盖
* @param {number} [concurrency] - 可选并发数覆盖
* @returns {{ registering: number, queued: number }}
*/
manualRegister(count = 5, provider, concurrency) {
count = Math.min(Math.max(1, count), 50);
if (concurrency) this._maxConcurrentRegister = Math.min(Math.max(1, concurrency), 20);
const available = this._maxConcurrentRegister - this._registeringCount;
const actual = Math.min(count, available);
if (actual <= 0) {
return { registering: this._registeringCount, queued: 0 };
}
this._registerBatch(actual, provider).catch(() => {});
return { registering: this._registeringCount + actual, queued: actual };
}
/**
* 自动注册新账号 (支持并行)
* @param {string} [overrideProvider] - 可选邮箱 provider 覆盖
*/
async _registerNew(overrideProvider) {
if (this._registeringCount >= this._maxConcurrentRegister) return null;
this._registeringCount++;
try {
this._addLog('开始注册新账号...');
// 优先用 overrideProvider,否则用配置的 provider,如果不可用则降级到 mailtm
let providerName = overrideProvider || (config.mailProvider === 'manual' ? 'mailtm' : config.mailProvider);
let providerOpts = config[providerName] || {};
// 检查 provider 是否配置完整
if (providerName === 'moemail' && !providerOpts.apiUrl) providerName = 'mailtm';
if (providerName === 'duckmail' && !providerOpts.apiKey) providerName = 'mailtm';
if (providerName === 'catchall' && !providerOpts.domain) providerName = 'mailtm';
if (providerName === 'custom' && !providerOpts.createUrl) providerName = 'mailtm';
providerOpts = config[providerName] || {};
const mailProvider = createMailProvider(providerName, providerOpts);
// 重试逻辑
let lastError;
for (let attempt = 1; attempt <= 2; attempt++) {
try {
this._addLog(`[${attempt}/2] 创建临时邮箱 (${providerName})...`);
await mailProvider.createInbox();
this._addLog(`[${attempt}/2] 邮箱就绪: ${mailProvider.address}`);
const accountInfo = generateAccountInfo();
accountInfo.email = mailProvider.address;
this._addLog(`[${attempt}/2] 提交注册请求...`);
const result = await register(accountInfo, mailProvider);
await mailProvider.cleanup();
if (result.success) {
const cookies = result.cookies || new Map();
const entry = {
email: accountInfo.email,
password: accountInfo.password,
cookies,
state: State.ACTIVE,
remainingQuota: null,
lastUsedAt: 0,
lastCheckedAt: Date.now(),
errorCount: 0,
filePath: isDbEnabled() ? null : path.resolve(config.outputDir, `chataibot-${accountInfo.email.replace('@', '_at_')}.json`),
};
// 持久化
if (isDbEnabled()) {
await saveNewAccount(entry.email, entry.password, Object.fromEntries(cookies));
} else {
const saveData = {
site: 'chataibot.pro',
email: entry.email,
password: entry.password,
cookies: Object.fromEntries(cookies),
registeredAt: new Date().toISOString(),
};
fs.writeFileSync(entry.filePath, JSON.stringify(saveData, null, 2));
}
this.accounts.push(entry);
this._addLog(`新账号就绪: ${entry.email}`, 'success');
return entry;
}
lastError = result.error;
this._addLog(`[${attempt}/2] 注册失败: ${result.error}`, 'error');
} catch (e) {
lastError = e.message;
this._addLog(`[${attempt}/2] 注册出错: ${e.message}`, 'error');
}
if (attempt < 2) {
await sleep(3000);
// 重新创建邮箱 provider
try { await mailProvider.cleanup(); } catch {}
}
}
this._addLog(`注册最终失败: ${lastError}`, 'error');
return null;
} finally {
this._registeringCount--;
}
}
/**
* 将 dead 账号归档 (DB: 标记 state=dead / 文件: 移到 dead/ 子目录)
*/
_archiveDeadAccounts() {
const deadAccounts = this.accounts.filter(a => a.state === State.DEAD);
if (deadAccounts.length === 0) return;
if (isDbEnabled()) {
for (const acc of deadAccounts) {
markDead(acc.email).catch(() => {});
this._addLog(`归档 dead 账号: ${acc.email}`);
}
} else {
const deadDir = path.resolve(config.outputDir, 'dead');
if (!fs.existsSync(deadDir)) fs.mkdirSync(deadDir, { recursive: true });
for (const acc of deadAccounts) {
if (!acc.filePath || !fs.existsSync(acc.filePath)) continue;
try {
const dest = path.join(deadDir, path.basename(acc.filePath));
fs.renameSync(acc.filePath, dest);
this._addLog(`归档 dead 账号: ${acc.email}`);
} catch {}
}
}
// 从池中移除 dead 账号
this.accounts = this.accounts.filter(a => a.state !== State.DEAD);
}
/**
* 保存更新后的 cookies (DB: UPDATE / 文件: 读写 JSON)
*/
_saveCookies(account) {
if (isDbEnabled()) {
updateCookies(account.email, Object.fromEntries(account.cookies)).catch(() => {});
return;
}
if (!account.filePath) return;
try {
const data = JSON.parse(fs.readFileSync(account.filePath, 'utf8'));
data.cookies = Object.fromEntries(account.cookies);
fs.writeFileSync(account.filePath, JSON.stringify(data, null, 2));
} catch {}
}
/**
* 后台定时任务
*/
_startBackgroundTasks() {
const interval = config.pool.checkInterval || 300000;
this._timer = setInterval(async () => {
// 检查额度
for (const acc of this.accounts.filter(a => a.state === State.ACTIVE || a.state === State.EXHAUSTED)) {
if (Date.now() - acc.lastCheckedAt < interval) continue;
const quota = await getQuota(acc.cookies);
if (quota && typeof quota.remaining === 'number') {
acc.remainingQuota = quota.remaining;
acc.lastCheckedAt = Date.now();
if (acc.state === State.EXHAUSTED && quota.remaining > 0) {
acc.state = State.ACTIVE;
this._addLog(`额度恢复: ${acc.email} (${quota.remaining})`, 'success');
}
}
}
// 并行刷新需要登录的
const needLogin = this.accounts.filter(a => a.state === State.NEEDS_LOGIN);
if (needLogin.length > 0) {
await Promise.allSettled(needLogin.map(acc => this._refreshSession(acc)));
}
// 归档 DEAD 账号
this._archiveDeadAccounts();
// 补充池
const active = this.accounts.filter(a => a.state === State.ACTIVE).length;
if (active < config.pool.minAvailable && config.pool.autoRegister) {
const needed = config.pool.minAvailable - active;
await this._registerBatch(needed);
}
}, interval);
// 不阻止进程退出
if (this._timer.unref) this._timer.unref();
}
destroy() {
if (this._timer) clearInterval(this._timer);
if (isDbEnabled()) closeDb().catch(() => {});
}
}