Spaces:
Paused
Paused
| /** | |
| * AccountPool β multi-account manager with least-used rotation. | |
| * Replaces the single-account AuthManager. | |
| */ | |
| import { randomBytes } from "crypto"; | |
| import { getConfig } from "../config.js"; | |
| import { jitter } from "../utils/jitter.js"; | |
| import { | |
| decodeJwtPayload, | |
| extractChatGptAccountId, | |
| extractUserProfile, | |
| isTokenExpired, | |
| } from "./jwt-utils.js"; | |
| import { getModelPlanTypes, isPlanFetched } from "../models/model-store.js"; | |
| import { getRotationStrategy } from "./rotation-strategy.js"; | |
| import { createFsPersistence } from "./account-persistence.js"; | |
| import type { AccountPersistence } from "./account-persistence.js"; | |
| import type { RotationStrategy, RotationState } from "./rotation-strategy.js"; | |
| import type { | |
| AccountEntry, | |
| AccountInfo, | |
| AccountUsage, | |
| AcquiredAccount, | |
| AccountsFile, | |
| CodexQuota, | |
| } from "./types.js"; | |
| // P1-4: Lock TTL β auto-release locks older than this | |
| const ACQUIRE_LOCK_TTL_MS = 5 * 60 * 1000; // 5 minutes | |
| export class AccountPool { | |
| private accounts: Map<string, AccountEntry> = new Map(); | |
| private acquireLocks: Map<string, number> = new Map(); // entryId β timestamp | |
| private strategy: RotationStrategy; | |
| private rotationState: RotationState = { roundRobinIndex: 0 }; | |
| private persistence: AccountPersistence; | |
| private persistTimer: ReturnType<typeof setTimeout> | null = null; | |
| constructor(options?: { persistence?: AccountPersistence }) { | |
| this.persistence = options?.persistence ?? createFsPersistence(); | |
| const config = getConfig(); | |
| this.strategy = getRotationStrategy(config.auth.rotation_strategy); | |
| // Load persisted accounts (handles migration from legacy format) | |
| const { entries } = this.persistence.load(); | |
| for (const entry of entries) { | |
| this.accounts.set(entry.id, entry); | |
| } | |
| // Override with config jwt_token if set | |
| if (config.auth.jwt_token) { | |
| this.addAccount(config.auth.jwt_token); | |
| } | |
| const envToken = process.env.CODEX_JWT_TOKEN; | |
| if (envToken) { | |
| this.addAccount(envToken); | |
| } | |
| } | |
| // ββ Core operations βββββββββββββββββββββββββββββββββββββββββββββ | |
| /** | |
| * Acquire the best available account for a request. | |
| * Returns null if no accounts are available. | |
| * | |
| * @param options.model - Prefer accounts whose planType matches this model's known plans | |
| * @param options.excludeIds - Entry IDs to exclude (e.g. already tried) | |
| */ | |
| acquire(options?: { model?: string; excludeIds?: string[] }): AcquiredAccount | null { | |
| const now = new Date(); | |
| const nowMs = now.getTime(); | |
| // Update statuses before selecting | |
| for (const entry of this.accounts.values()) { | |
| this.refreshStatus(entry, now); | |
| } | |
| // P1-4: Auto-release stale locks (older than TTL) | |
| for (const [id, lockedAt] of this.acquireLocks) { | |
| if (nowMs - lockedAt > ACQUIRE_LOCK_TTL_MS) { | |
| console.warn(`[AccountPool] Auto-releasing stale lock for ${id} (locked ${Math.round((nowMs - lockedAt) / 1000)}s ago)`); | |
| this.acquireLocks.delete(id); | |
| } | |
| } | |
| const excludeSet = new Set(options?.excludeIds ?? []); | |
| // Filter available accounts | |
| const available = [...this.accounts.values()].filter( | |
| (a) => a.status === "active" && !this.acquireLocks.has(a.id) && !excludeSet.has(a.id), | |
| ); | |
| if (available.length === 0) return null; | |
| // Model-aware selection: prefer accounts whose planType matches the model's known plans. | |
| // Accounts whose planType has never been fetched are treated as "possibly compatible" | |
| // to avoid false 503s when a plan's model fetch failed at startup. | |
| let candidates = available; | |
| if (options?.model) { | |
| const preferredPlans = getModelPlanTypes(options.model); | |
| if (preferredPlans.length > 0) { | |
| const planSet = new Set(preferredPlans); | |
| const matched = available.filter((a) => { | |
| if (!a.planType) return false; | |
| // Plan is known to support this model | |
| if (planSet.has(a.planType)) return true; | |
| // Plan has never been fetched β include as possibly compatible | |
| return !isPlanFetched(a.planType); | |
| }); | |
| if (matched.length > 0) { | |
| candidates = matched; | |
| } else { | |
| // All accounts belong to plans that were fetched but don't include this model | |
| return null; | |
| } | |
| } | |
| } | |
| const selected = this.strategy.select(candidates, this.rotationState); | |
| this.acquireLocks.set(selected.id, Date.now()); | |
| return { | |
| entryId: selected.id, | |
| token: selected.token, | |
| accountId: selected.accountId, | |
| }; | |
| } | |
| /** | |
| * Switch rotation strategy at runtime (e.g. from admin API). | |
| */ | |
| setRotationStrategy(name: "least_used" | "round_robin" | "sticky"): void { | |
| this.strategy = getRotationStrategy(name); | |
| this.rotationState.roundRobinIndex = 0; | |
| } | |
| /** | |
| * Get one account per distinct planType for model discovery. | |
| * Each returned account is locked (caller must release). | |
| */ | |
| getDistinctPlanAccounts(): Array<{ planType: string; entryId: string; token: string; accountId: string | null }> { | |
| const now = new Date(); | |
| for (const entry of this.accounts.values()) { | |
| this.refreshStatus(entry, now); | |
| } | |
| const available = [...this.accounts.values()].filter( | |
| (a) => a.status === "active" && !this.acquireLocks.has(a.id) && a.planType, | |
| ); | |
| // Group by planType, pick least-used from each group | |
| const byPlan = new Map<string, AccountEntry[]>(); | |
| for (const a of available) { | |
| const plan = a.planType!; | |
| let group = byPlan.get(plan); | |
| if (!group) { | |
| group = []; | |
| byPlan.set(plan, group); | |
| } | |
| group.push(a); | |
| } | |
| const result: Array<{ planType: string; entryId: string; token: string; accountId: string | null }> = []; | |
| for (const [plan, group] of byPlan) { | |
| const selected = this.strategy.select(group, this.rotationState); | |
| this.acquireLocks.set(selected.id, Date.now()); | |
| result.push({ | |
| planType: plan, | |
| entryId: selected.id, | |
| token: selected.token, | |
| accountId: selected.accountId, | |
| }); | |
| } | |
| return result; | |
| } | |
| /** | |
| * Release an account after a request completes. | |
| */ | |
| release( | |
| entryId: string, | |
| usage?: { input_tokens?: number; output_tokens?: number }, | |
| ): void { | |
| this.acquireLocks.delete(entryId); | |
| const entry = this.accounts.get(entryId); | |
| if (!entry) return; | |
| entry.usage.request_count++; | |
| entry.usage.last_used = new Date().toISOString(); | |
| if (usage) { | |
| entry.usage.input_tokens += usage.input_tokens ?? 0; | |
| entry.usage.output_tokens += usage.output_tokens ?? 0; | |
| } | |
| // Increment window counters | |
| entry.usage.window_request_count = (entry.usage.window_request_count ?? 0) + 1; | |
| if (usage) { | |
| entry.usage.window_input_tokens = (entry.usage.window_input_tokens ?? 0) + (usage.input_tokens ?? 0); | |
| entry.usage.window_output_tokens = (entry.usage.window_output_tokens ?? 0) + (usage.output_tokens ?? 0); | |
| } | |
| this.schedulePersist(); | |
| } | |
| /** | |
| * Release an account without counting usage (for diagnostics). | |
| */ | |
| releaseWithoutCounting(entryId: string): void { | |
| this.acquireLocks.delete(entryId); | |
| } | |
| /** | |
| * Mark an account as rate-limited after a 429. | |
| * P1-6: countRequest option to track 429s as usage without exposing entry internals. | |
| */ | |
| markRateLimited( | |
| entryId: string, | |
| options?: { retryAfterSec?: number; countRequest?: boolean }, | |
| ): void { | |
| this.acquireLocks.delete(entryId); | |
| const entry = this.accounts.get(entryId); | |
| if (!entry) return; | |
| const config = getConfig(); | |
| const backoff = jitter( | |
| options?.retryAfterSec ?? config.auth.rate_limit_backoff_seconds, | |
| 0.2, | |
| ); | |
| const until = new Date(Date.now() + backoff * 1000); | |
| entry.status = "rate_limited"; | |
| entry.usage.rate_limit_until = until.toISOString(); | |
| if (options?.countRequest) { | |
| entry.usage.request_count++; | |
| entry.usage.last_used = new Date().toISOString(); | |
| entry.usage.window_request_count = (entry.usage.window_request_count ?? 0) + 1; | |
| } | |
| this.schedulePersist(); | |
| } | |
| // ββ Account management ββββββββββββββββββββββββββββββββββββββββββ | |
| /** | |
| * Add an account from a raw JWT token. Returns the entry ID. | |
| * Deduplicates by accountId. | |
| */ | |
| addAccount(token: string, refreshToken?: string | null): string { | |
| const accountId = extractChatGptAccountId(token); | |
| const profile = extractUserProfile(token); | |
| const userId = profile?.chatgpt_user_id ?? null; | |
| // Deduplicate by accountId + userId (team members share accountId but have distinct userId) | |
| if (accountId) { | |
| for (const existing of this.accounts.values()) { | |
| if ( | |
| existing.accountId === accountId && | |
| existing.userId === userId | |
| ) { | |
| // Update the existing entry's token | |
| existing.token = token; | |
| if (refreshToken !== undefined) { | |
| existing.refreshToken = refreshToken ?? null; | |
| } | |
| existing.email = profile?.email ?? existing.email; | |
| existing.planType = profile?.chatgpt_plan_type ?? existing.planType; | |
| existing.status = isTokenExpired(token) ? "expired" : "active"; | |
| this.persistNow(); // Critical data β persist immediately | |
| return existing.id; | |
| } | |
| } | |
| } | |
| const id = randomBytes(8).toString("hex"); | |
| const entry: AccountEntry = { | |
| id, | |
| token, | |
| refreshToken: refreshToken ?? null, | |
| email: profile?.email ?? null, | |
| accountId, | |
| userId, | |
| planType: profile?.chatgpt_plan_type ?? null, | |
| proxyApiKey: "codex-proxy-" + randomBytes(24).toString("hex"), | |
| status: isTokenExpired(token) ? "expired" : "active", | |
| usage: { | |
| request_count: 0, | |
| input_tokens: 0, | |
| output_tokens: 0, | |
| empty_response_count: 0, | |
| last_used: null, | |
| rate_limit_until: null, | |
| window_request_count: 0, | |
| window_input_tokens: 0, | |
| window_output_tokens: 0, | |
| window_counters_reset_at: null, | |
| limit_window_seconds: null, | |
| }, | |
| addedAt: new Date().toISOString(), | |
| cachedQuota: null, | |
| quotaFetchedAt: null, | |
| }; | |
| this.accounts.set(id, entry); | |
| this.persistNow(); // Critical data β persist immediately | |
| return id; | |
| } | |
| /** | |
| * Record an empty response for an account (HTTP 200 but zero text deltas). | |
| */ | |
| recordEmptyResponse(entryId: string): void { | |
| const entry = this.accounts.get(entryId); | |
| if (!entry) return; | |
| entry.usage.empty_response_count++; | |
| this.schedulePersist(); | |
| } | |
| /** | |
| * Update cached quota for an account (called by background quota refresher). | |
| */ | |
| updateCachedQuota(entryId: string, quota: CodexQuota): void { | |
| const entry = this.accounts.get(entryId); | |
| if (!entry) return; | |
| entry.cachedQuota = quota; | |
| entry.quotaFetchedAt = new Date().toISOString(); | |
| this.schedulePersist(); | |
| } | |
| /** | |
| * Mark an account as quota-exhausted by setting it rate_limited until resetAt. | |
| * Reuses the rate_limited mechanism so acquire() skips it and refreshStatus() auto-recovers. | |
| */ | |
| markQuotaExhausted(entryId: string, resetAtUnix: number | null): void { | |
| const entry = this.accounts.get(entryId); | |
| if (!entry) return; | |
| // Don't override disabled, expired, or banned states | |
| if (entry.status === "disabled" || entry.status === "expired" || entry.status === "banned") return; | |
| const until = resetAtUnix | |
| ? new Date(resetAtUnix * 1000).toISOString() | |
| : new Date(Date.now() + 300_000).toISOString(); // fallback 5 min | |
| // Only extend rate_limit_until, never shorten it | |
| if (entry.status === "rate_limited" && entry.usage.rate_limit_until) { | |
| const existing = new Date(entry.usage.rate_limit_until).getTime(); | |
| const proposed = new Date(until).getTime(); | |
| if (proposed <= existing) return; | |
| } | |
| entry.status = "rate_limited"; | |
| entry.usage.rate_limit_until = until; | |
| this.acquireLocks.delete(entryId); | |
| this.schedulePersist(); | |
| } | |
| removeAccount(id: string): boolean { | |
| this.acquireLocks.delete(id); | |
| const deleted = this.accounts.delete(id); | |
| if (deleted) this.schedulePersist(); | |
| return deleted; | |
| } | |
| /** | |
| * Update an account's token (used by refresh scheduler). | |
| */ | |
| updateToken(entryId: string, newToken: string, refreshToken?: string | null): void { | |
| const entry = this.accounts.get(entryId); | |
| if (!entry) return; | |
| entry.token = newToken; | |
| if (refreshToken !== undefined) { | |
| entry.refreshToken = refreshToken ?? null; | |
| } | |
| const profile = extractUserProfile(newToken); | |
| entry.email = profile?.email ?? entry.email; | |
| entry.planType = profile?.chatgpt_plan_type ?? entry.planType; | |
| entry.accountId = extractChatGptAccountId(newToken) ?? entry.accountId; | |
| entry.userId = profile?.chatgpt_user_id ?? entry.userId; | |
| entry.status = "active"; | |
| this.persistNow(); // Critical data β persist immediately | |
| } | |
| markStatus(entryId: string, status: AccountEntry["status"]): void { | |
| this.acquireLocks.delete(entryId); | |
| const entry = this.accounts.get(entryId); | |
| if (!entry) return; | |
| entry.status = status; | |
| this.schedulePersist(); | |
| } | |
| resetUsage(entryId: string): boolean { | |
| const entry = this.accounts.get(entryId); | |
| if (!entry) return false; | |
| entry.usage = { | |
| request_count: 0, | |
| input_tokens: 0, | |
| output_tokens: 0, | |
| empty_response_count: 0, | |
| last_used: null, | |
| rate_limit_until: null, | |
| window_reset_at: entry.usage.window_reset_at ?? null, | |
| window_request_count: 0, | |
| window_input_tokens: 0, | |
| window_output_tokens: 0, | |
| window_counters_reset_at: new Date().toISOString(), | |
| limit_window_seconds: entry.usage.limit_window_seconds ?? null, | |
| }; | |
| this.schedulePersist(); | |
| return true; | |
| } | |
| /** | |
| * Check if the rate limit window has rolled over. | |
| * If so, auto-reset local usage counters to stay in sync. | |
| * Called after fetching quota from OpenAI API. | |
| */ | |
| syncRateLimitWindow( | |
| entryId: string, | |
| newResetAt: number | null, | |
| limitWindowSeconds: number | null, | |
| ): void { | |
| if (newResetAt == null) return; | |
| const entry = this.accounts.get(entryId); | |
| if (!entry) return; | |
| const oldResetAt = entry.usage.window_reset_at; | |
| if (oldResetAt != null && oldResetAt !== newResetAt) { | |
| // Codex API reset_at drifts slightly between calls (activity-based sliding window). | |
| // Only reset counters on a true window rollover: when reset_at jumps by a large amount | |
| // (at least half the window size), not just a small drift of a few hundred seconds. | |
| const drift = Math.abs(newResetAt - oldResetAt); | |
| const windowSec = limitWindowSeconds ?? entry.usage.limit_window_seconds ?? 0; | |
| const threshold = windowSec > 0 ? windowSec * 0.5 : 3600; | |
| if (drift >= threshold) { | |
| console.log(`[AccountPool] Rate limit window rolled for ${entryId} (${entry.email ?? "?"}), resetting window counters (drift=${drift}s, threshold=${threshold}s)`); | |
| entry.usage.window_request_count = 0; | |
| entry.usage.window_input_tokens = 0; | |
| entry.usage.window_output_tokens = 0; | |
| entry.usage.window_counters_reset_at = new Date().toISOString(); | |
| } | |
| } | |
| entry.usage.window_reset_at = newResetAt; | |
| if (limitWindowSeconds != null) { | |
| entry.usage.limit_window_seconds = limitWindowSeconds; | |
| } | |
| this.schedulePersist(); | |
| } | |
| // ββ Query βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| getAccounts(): AccountInfo[] { | |
| const now = new Date(); | |
| return [...this.accounts.values()].map((a) => { | |
| this.refreshStatus(a, now); | |
| return this.toInfo(a); | |
| }); | |
| } | |
| getEntry(entryId: string): AccountEntry | undefined { | |
| return this.accounts.get(entryId); | |
| } | |
| getAllEntries(): AccountEntry[] { | |
| return [...this.accounts.values()]; | |
| } | |
| isAuthenticated(): boolean { | |
| const now = new Date(); | |
| for (const entry of this.accounts.values()) { | |
| this.refreshStatus(entry, now); | |
| if (entry.status === "active") return true; | |
| } | |
| return false; | |
| } | |
| /** @deprecated Use getAccounts() instead. */ | |
| getUserInfo(): { email?: string; accountId?: string; planType?: string } | null { | |
| const first = [...this.accounts.values()].find((a) => a.status === "active"); | |
| if (!first) return null; | |
| return { | |
| email: first.email ?? undefined, | |
| accountId: first.accountId ?? undefined, | |
| planType: first.planType ?? undefined, | |
| }; | |
| } | |
| /** @deprecated Use getAccounts() instead. */ | |
| getProxyApiKey(): string | null { | |
| const first = [...this.accounts.values()].find((a) => a.status === "active"); | |
| return first?.proxyApiKey ?? null; | |
| } | |
| validateProxyApiKey(key: string): boolean { | |
| const configKey = getConfig().server.proxy_api_key; | |
| if (configKey && key === configKey) return true; | |
| for (const entry of this.accounts.values()) { | |
| if (entry.proxyApiKey === key) return true; | |
| } | |
| return false; | |
| } | |
| /** @deprecated Use removeAccount() instead. */ | |
| clearToken(): void { | |
| this.accounts.clear(); | |
| this.acquireLocks.clear(); | |
| this.persistNow(); | |
| } | |
| // ββ Pool summary ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| getPoolSummary(): { | |
| total: number; | |
| active: number; | |
| expired: number; | |
| rate_limited: number; | |
| refreshing: number; | |
| disabled: number; | |
| banned: number; | |
| } { | |
| const now = new Date(); | |
| let active = 0, expired = 0, rate_limited = 0, refreshing = 0, disabled = 0, banned = 0; | |
| for (const entry of this.accounts.values()) { | |
| this.refreshStatus(entry, now); | |
| switch (entry.status) { | |
| case "active": active++; break; | |
| case "expired": expired++; break; | |
| case "rate_limited": rate_limited++; break; | |
| case "refreshing": refreshing++; break; | |
| case "disabled": disabled++; break; | |
| case "banned": banned++; break; | |
| } | |
| } | |
| return { | |
| total: this.accounts.size, | |
| active, | |
| expired, | |
| rate_limited, | |
| refreshing, | |
| disabled, | |
| banned, | |
| }; | |
| } | |
| // ββ Internal ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| private refreshStatus(entry: AccountEntry, now: Date): void { | |
| // Auto-recover rate-limited accounts | |
| if (entry.status === "rate_limited" && entry.usage.rate_limit_until) { | |
| if (now >= new Date(entry.usage.rate_limit_until)) { | |
| entry.status = "active"; | |
| entry.usage.rate_limit_until = null; | |
| } | |
| } | |
| // Mark expired tokens | |
| if (entry.status === "active" && isTokenExpired(entry.token)) { | |
| entry.status = "expired"; | |
| } | |
| // Auto-reset window counters when window has expired | |
| const windowResetAt = entry.usage.window_reset_at; | |
| const nowSec = now.getTime() / 1000; | |
| if (windowResetAt != null && nowSec >= windowResetAt) { | |
| console.log(`[AccountPool] Window expired for ${entry.id} (${entry.email ?? "?"}), resetting window counters`); | |
| entry.usage.window_request_count = 0; | |
| entry.usage.window_input_tokens = 0; | |
| entry.usage.window_output_tokens = 0; | |
| entry.usage.window_counters_reset_at = now.toISOString(); | |
| // Jump to the correct current window (handles multi-window catch-up in one step) | |
| const windowSec = entry.usage.limit_window_seconds; | |
| if (windowSec && windowSec > 0) { | |
| let nextReset = windowResetAt + windowSec; | |
| while (nextReset <= nowSec) nextReset += windowSec; | |
| entry.usage.window_reset_at = nextReset; | |
| } else { | |
| entry.usage.window_reset_at = null; // Wait for backend sync to correct | |
| } | |
| this.schedulePersist(); | |
| } | |
| } | |
| private toInfo(entry: AccountEntry): AccountInfo { | |
| const payload = decodeJwtPayload(entry.token); | |
| const exp = payload?.exp; | |
| const info: AccountInfo = { | |
| id: entry.id, | |
| email: entry.email, | |
| accountId: entry.accountId, | |
| userId: entry.userId, | |
| planType: entry.planType, | |
| status: entry.status, | |
| usage: { ...entry.usage }, | |
| addedAt: entry.addedAt, | |
| expiresAt: | |
| typeof exp === "number" | |
| ? new Date(exp * 1000).toISOString() | |
| : null, | |
| }; | |
| if (entry.cachedQuota) { | |
| info.quota = entry.cachedQuota; | |
| info.quotaFetchedAt = entry.quotaFetchedAt; | |
| } | |
| return info; | |
| } | |
| // ββ Persistence βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| private schedulePersist(): void { | |
| if (this.persistTimer) return; | |
| this.persistTimer = setTimeout(() => { | |
| this.persistTimer = null; | |
| this.persistNow(); | |
| }, 1000); | |
| } | |
| persistNow(): void { | |
| if (this.persistTimer) { | |
| clearTimeout(this.persistTimer); | |
| this.persistTimer = null; | |
| } | |
| this.persistence.save([...this.accounts.values()]); | |
| } | |
| /** Flush pending writes on shutdown */ | |
| destroy(): void { | |
| if (this.persistTimer) { | |
| clearTimeout(this.persistTimer); | |
| this.persistTimer = null; | |
| } | |
| this.persistNow(); | |
| } | |
| } | |