codex-proxy / src /auth /account-pool.ts
icebear
fix: fall back to all accounts when model plan doesn't match (#91)
5abff38 unverified
raw
history blame
24.4 kB
/**
* AccountPool β€” multi-account manager with least-used rotation.
* Replaces the single-account AuthManager.
*/
import {
readFileSync,
writeFileSync,
renameSync,
existsSync,
mkdirSync,
} from "fs";
import { resolve, dirname } from "path";
import { randomBytes } from "crypto";
import { getConfig } from "../config.js";
import { getDataDir } from "../paths.js";
import { jitter } from "../utils/jitter.js";
import {
decodeJwtPayload,
extractChatGptAccountId,
extractUserProfile,
isTokenExpired,
} from "./jwt-utils.js";
import { getModelPlanTypes } from "../models/model-store.js";
import type {
AccountEntry,
AccountInfo,
AccountUsage,
AcquiredAccount,
AccountsFile,
} from "./types.js";
function getAccountsFile(): string {
return resolve(getDataDir(), "accounts.json");
}
function getLegacyAuthFile(): string {
return resolve(getDataDir(), "auth.json");
}
// P1-4: Lock TTL β€” auto-release locks older than this
const ACQUIRE_LOCK_TTL_MS = 5 * 60 * 1000; // 5 minutes
export class AccountPool {
private accounts: Map<string, AccountEntry> = new Map();
private acquireLocks: Map<string, number> = new Map(); // entryId β†’ timestamp
private roundRobinIndex = 0;
private persistTimer: ReturnType<typeof setTimeout> | null = null;
constructor() {
this.migrateFromLegacy();
this.loadPersisted();
// Override with config jwt_token if set
const config = getConfig();
if (config.auth.jwt_token) {
this.addAccount(config.auth.jwt_token);
}
const envToken = process.env.CODEX_JWT_TOKEN;
if (envToken) {
this.addAccount(envToken);
}
}
// ── Core operations ─────────────────────────────────────────────
/**
* Acquire the best available account for a request.
* Returns null if no accounts are available.
*
* @param options.model - Prefer accounts whose planType matches this model's known plans
* @param options.excludeIds - Entry IDs to exclude (e.g. already tried)
*/
acquire(options?: { model?: string; excludeIds?: string[] }): AcquiredAccount | null {
const now = new Date();
const nowMs = now.getTime();
// Update statuses before selecting
for (const entry of this.accounts.values()) {
this.refreshStatus(entry, now);
}
// P1-4: Auto-release stale locks (older than TTL)
for (const [id, lockedAt] of this.acquireLocks) {
if (nowMs - lockedAt > ACQUIRE_LOCK_TTL_MS) {
console.warn(`[AccountPool] Auto-releasing stale lock for ${id} (locked ${Math.round((nowMs - lockedAt) / 1000)}s ago)`);
this.acquireLocks.delete(id);
}
}
const excludeSet = new Set(options?.excludeIds ?? []);
// Filter available accounts
const available = [...this.accounts.values()].filter(
(a) => a.status === "active" && !this.acquireLocks.has(a.id) && !excludeSet.has(a.id),
);
if (available.length === 0) return null;
// Model-aware selection: prefer accounts whose planType matches the model's known plans.
// If no account matches, fall back to all available accounts β€” the backend API's
// model list per plan is incomplete (e.g. free can use gpt-5.4 but /codex/models
// doesn't return it), so hard-blocking would reject valid requests.
let candidates = available;
if (options?.model) {
const preferredPlans = getModelPlanTypes(options.model);
if (preferredPlans.length > 0) {
const planSet = new Set(preferredPlans);
const matched = available.filter((a) => a.planType && planSet.has(a.planType));
if (matched.length > 0) {
candidates = matched;
}
// else: no matching plan accounts β€” fall back to all available
}
}
const selected = this.selectByStrategy(candidates);
this.acquireLocks.set(selected.id, Date.now());
return {
entryId: selected.id,
token: selected.token,
accountId: selected.accountId,
};
}
/**
* Select an account from candidates using the configured rotation strategy.
*/
private selectByStrategy(candidates: AccountEntry[]): AccountEntry {
const config = getConfig();
if (config.auth.rotation_strategy === "round_robin") {
this.roundRobinIndex = this.roundRobinIndex % candidates.length;
const selected = candidates[this.roundRobinIndex];
this.roundRobinIndex++;
return selected;
}
// least_used: sort by request_count asc β†’ window_reset_at asc β†’ last_used asc (LRU)
candidates.sort((a, b) => {
const diff = a.usage.request_count - b.usage.request_count;
if (diff !== 0) return diff;
// Prefer accounts whose quota window resets sooner (more fresh capacity)
const aReset = a.usage.window_reset_at ?? Infinity;
const bReset = b.usage.window_reset_at ?? Infinity;
if (aReset !== bReset) return aReset - bReset;
const aTime = a.usage.last_used ? new Date(a.usage.last_used).getTime() : 0;
const bTime = b.usage.last_used ? new Date(b.usage.last_used).getTime() : 0;
return aTime - bTime;
});
return candidates[0];
}
/**
* Get one account per distinct planType for model discovery.
* Each returned account is locked (caller must release).
*/
getDistinctPlanAccounts(): Array<{ planType: string; entryId: string; token: string; accountId: string | null }> {
const now = new Date();
for (const entry of this.accounts.values()) {
this.refreshStatus(entry, now);
}
const available = [...this.accounts.values()].filter(
(a) => a.status === "active" && !this.acquireLocks.has(a.id) && a.planType,
);
// Group by planType, pick least-used from each group
const byPlan = new Map<string, AccountEntry[]>();
for (const a of available) {
const plan = a.planType!;
let group = byPlan.get(plan);
if (!group) {
group = [];
byPlan.set(plan, group);
}
group.push(a);
}
const result: Array<{ planType: string; entryId: string; token: string; accountId: string | null }> = [];
for (const [plan, group] of byPlan) {
const selected = this.selectByStrategy(group);
this.acquireLocks.set(selected.id, Date.now());
result.push({
planType: plan,
entryId: selected.id,
token: selected.token,
accountId: selected.accountId,
});
}
return result;
}
/**
* Release an account after a request completes.
*/
release(
entryId: string,
usage?: { input_tokens?: number; output_tokens?: number },
): void {
this.acquireLocks.delete(entryId);
const entry = this.accounts.get(entryId);
if (!entry) return;
entry.usage.request_count++;
entry.usage.last_used = new Date().toISOString();
if (usage) {
entry.usage.input_tokens += usage.input_tokens ?? 0;
entry.usage.output_tokens += usage.output_tokens ?? 0;
}
// Increment window counters
entry.usage.window_request_count = (entry.usage.window_request_count ?? 0) + 1;
if (usage) {
entry.usage.window_input_tokens = (entry.usage.window_input_tokens ?? 0) + (usage.input_tokens ?? 0);
entry.usage.window_output_tokens = (entry.usage.window_output_tokens ?? 0) + (usage.output_tokens ?? 0);
}
this.schedulePersist();
}
/**
* Release an account without counting usage (for diagnostics).
*/
releaseWithoutCounting(entryId: string): void {
this.acquireLocks.delete(entryId);
}
/**
* Mark an account as rate-limited after a 429.
* P1-6: countRequest option to track 429s as usage without exposing entry internals.
*/
markRateLimited(
entryId: string,
options?: { retryAfterSec?: number; countRequest?: boolean },
): void {
this.acquireLocks.delete(entryId);
const entry = this.accounts.get(entryId);
if (!entry) return;
const config = getConfig();
const backoff = jitter(
options?.retryAfterSec ?? config.auth.rate_limit_backoff_seconds,
0.2,
);
const until = new Date(Date.now() + backoff * 1000);
entry.status = "rate_limited";
entry.usage.rate_limit_until = until.toISOString();
if (options?.countRequest) {
entry.usage.request_count++;
entry.usage.last_used = new Date().toISOString();
entry.usage.window_request_count = (entry.usage.window_request_count ?? 0) + 1;
}
this.schedulePersist();
}
// ── Account management ──────────────────────────────────────────
/**
* Add an account from a raw JWT token. Returns the entry ID.
* Deduplicates by accountId.
*/
addAccount(token: string, refreshToken?: string | null): string {
const accountId = extractChatGptAccountId(token);
const profile = extractUserProfile(token);
// Deduplicate by accountId
if (accountId) {
for (const existing of this.accounts.values()) {
if (existing.accountId === accountId) {
// Update the existing entry's token
existing.token = token;
if (refreshToken !== undefined) {
existing.refreshToken = refreshToken ?? null;
}
existing.email = profile?.email ?? existing.email;
existing.planType = profile?.chatgpt_plan_type ?? existing.planType;
existing.status = isTokenExpired(token) ? "expired" : "active";
this.persistNow(); // Critical data β€” persist immediately
return existing.id;
}
}
}
const id = randomBytes(8).toString("hex");
const entry: AccountEntry = {
id,
token,
refreshToken: refreshToken ?? null,
email: profile?.email ?? null,
accountId,
planType: profile?.chatgpt_plan_type ?? null,
proxyApiKey: "codex-proxy-" + randomBytes(24).toString("hex"),
status: isTokenExpired(token) ? "expired" : "active",
usage: {
request_count: 0,
input_tokens: 0,
output_tokens: 0,
empty_response_count: 0,
last_used: null,
rate_limit_until: null,
window_request_count: 0,
window_input_tokens: 0,
window_output_tokens: 0,
window_counters_reset_at: null,
limit_window_seconds: null,
},
addedAt: new Date().toISOString(),
};
this.accounts.set(id, entry);
this.persistNow(); // Critical data β€” persist immediately
return id;
}
/**
* Record an empty response for an account (HTTP 200 but zero text deltas).
*/
recordEmptyResponse(entryId: string): void {
const entry = this.accounts.get(entryId);
if (!entry) return;
entry.usage.empty_response_count++;
this.schedulePersist();
}
removeAccount(id: string): boolean {
this.acquireLocks.delete(id);
const deleted = this.accounts.delete(id);
if (deleted) this.schedulePersist();
return deleted;
}
/**
* Update an account's token (used by refresh scheduler).
*/
updateToken(entryId: string, newToken: string, refreshToken?: string | null): void {
const entry = this.accounts.get(entryId);
if (!entry) return;
entry.token = newToken;
if (refreshToken !== undefined) {
entry.refreshToken = refreshToken ?? null;
}
const profile = extractUserProfile(newToken);
entry.email = profile?.email ?? entry.email;
entry.planType = profile?.chatgpt_plan_type ?? entry.planType;
entry.accountId = extractChatGptAccountId(newToken) ?? entry.accountId;
entry.status = "active";
this.persistNow(); // Critical data β€” persist immediately
}
markStatus(entryId: string, status: AccountEntry["status"]): void {
const entry = this.accounts.get(entryId);
if (!entry) return;
entry.status = status;
this.schedulePersist();
}
resetUsage(entryId: string): boolean {
const entry = this.accounts.get(entryId);
if (!entry) return false;
entry.usage = {
request_count: 0,
input_tokens: 0,
output_tokens: 0,
empty_response_count: 0,
last_used: null,
rate_limit_until: null,
window_reset_at: entry.usage.window_reset_at ?? null,
window_request_count: 0,
window_input_tokens: 0,
window_output_tokens: 0,
window_counters_reset_at: new Date().toISOString(),
limit_window_seconds: entry.usage.limit_window_seconds ?? null,
};
this.schedulePersist();
return true;
}
/**
* Check if the rate limit window has rolled over.
* If so, auto-reset local usage counters to stay in sync.
* Called after fetching quota from OpenAI API.
*/
syncRateLimitWindow(
entryId: string,
newResetAt: number | null,
limitWindowSeconds: number | null,
): void {
if (newResetAt == null) return;
const entry = this.accounts.get(entryId);
if (!entry) return;
const oldResetAt = entry.usage.window_reset_at;
if (oldResetAt != null && oldResetAt !== newResetAt) {
// Codex API reset_at drifts slightly between calls (activity-based sliding window).
// Only reset counters on a true window rollover: when reset_at jumps by a large amount
// (at least half the window size), not just a small drift of a few hundred seconds.
const drift = Math.abs(newResetAt - oldResetAt);
const windowSec = limitWindowSeconds ?? entry.usage.limit_window_seconds ?? 0;
const threshold = windowSec > 0 ? windowSec * 0.5 : 3600;
if (drift >= threshold) {
console.log(`[AccountPool] Rate limit window rolled for ${entryId} (${entry.email ?? "?"}), resetting window counters (drift=${drift}s, threshold=${threshold}s)`);
entry.usage.window_request_count = 0;
entry.usage.window_input_tokens = 0;
entry.usage.window_output_tokens = 0;
entry.usage.window_counters_reset_at = new Date().toISOString();
}
}
entry.usage.window_reset_at = newResetAt;
if (limitWindowSeconds != null) {
entry.usage.limit_window_seconds = limitWindowSeconds;
}
this.schedulePersist();
}
// ── Query ───────────────────────────────────────────────────────
getAccounts(): AccountInfo[] {
const now = new Date();
return [...this.accounts.values()].map((a) => {
this.refreshStatus(a, now);
return this.toInfo(a);
});
}
getEntry(entryId: string): AccountEntry | undefined {
return this.accounts.get(entryId);
}
getAllEntries(): AccountEntry[] {
return [...this.accounts.values()];
}
isAuthenticated(): boolean {
const now = new Date();
for (const entry of this.accounts.values()) {
this.refreshStatus(entry, now);
if (entry.status === "active") return true;
}
return false;
}
/** @deprecated Use getAccounts() instead. */
getUserInfo(): { email?: string; accountId?: string; planType?: string } | null {
const first = [...this.accounts.values()].find((a) => a.status === "active");
if (!first) return null;
return {
email: first.email ?? undefined,
accountId: first.accountId ?? undefined,
planType: first.planType ?? undefined,
};
}
/** @deprecated Use getAccounts() instead. */
getProxyApiKey(): string | null {
const first = [...this.accounts.values()].find((a) => a.status === "active");
return first?.proxyApiKey ?? null;
}
validateProxyApiKey(key: string): boolean {
const configKey = getConfig().server.proxy_api_key;
if (configKey && key === configKey) return true;
for (const entry of this.accounts.values()) {
if (entry.proxyApiKey === key) return true;
}
return false;
}
/** @deprecated Use removeAccount() instead. */
clearToken(): void {
this.accounts.clear();
this.acquireLocks.clear();
this.persistNow();
}
// ── Pool summary ────────────────────────────────────────────────
getPoolSummary(): {
total: number;
active: number;
expired: number;
rate_limited: number;
refreshing: number;
disabled: number;
} {
const now = new Date();
let active = 0, expired = 0, rate_limited = 0, refreshing = 0, disabled = 0;
for (const entry of this.accounts.values()) {
this.refreshStatus(entry, now);
switch (entry.status) {
case "active": active++; break;
case "expired": expired++; break;
case "rate_limited": rate_limited++; break;
case "refreshing": refreshing++; break;
case "disabled": disabled++; break;
}
}
return {
total: this.accounts.size,
active,
expired,
rate_limited,
refreshing,
disabled,
};
}
// ── Internal ────────────────────────────────────────────────────
private refreshStatus(entry: AccountEntry, now: Date): void {
// Auto-recover rate-limited accounts
if (entry.status === "rate_limited" && entry.usage.rate_limit_until) {
if (now >= new Date(entry.usage.rate_limit_until)) {
entry.status = "active";
entry.usage.rate_limit_until = null;
}
}
// Mark expired tokens
if (entry.status === "active" && isTokenExpired(entry.token)) {
entry.status = "expired";
}
// Auto-reset window counters when window has expired
const windowResetAt = entry.usage.window_reset_at;
const nowSec = now.getTime() / 1000;
if (windowResetAt != null && nowSec >= windowResetAt) {
console.log(`[AccountPool] Window expired for ${entry.id} (${entry.email ?? "?"}), resetting window counters`);
entry.usage.window_request_count = 0;
entry.usage.window_input_tokens = 0;
entry.usage.window_output_tokens = 0;
entry.usage.window_counters_reset_at = now.toISOString();
// Jump to the correct current window (handles multi-window catch-up in one step)
const windowSec = entry.usage.limit_window_seconds;
if (windowSec && windowSec > 0) {
let nextReset = windowResetAt + windowSec;
while (nextReset <= nowSec) nextReset += windowSec;
entry.usage.window_reset_at = nextReset;
} else {
entry.usage.window_reset_at = null; // Wait for backend sync to correct
}
this.schedulePersist();
}
}
private toInfo(entry: AccountEntry): AccountInfo {
const payload = decodeJwtPayload(entry.token);
const exp = payload?.exp;
return {
id: entry.id,
email: entry.email,
accountId: entry.accountId,
planType: entry.planType,
status: entry.status,
usage: { ...entry.usage },
addedAt: entry.addedAt,
expiresAt:
typeof exp === "number"
? new Date(exp * 1000).toISOString()
: null,
};
}
// ── Persistence ─────────────────────────────────────────────────
private schedulePersist(): void {
if (this.persistTimer) return;
this.persistTimer = setTimeout(() => {
this.persistTimer = null;
this.persistNow();
}, 1000);
}
persistNow(): void {
if (this.persistTimer) {
clearTimeout(this.persistTimer);
this.persistTimer = null;
}
try {
const accountsFile = getAccountsFile();
const dir = dirname(accountsFile);
if (!existsSync(dir)) mkdirSync(dir, { recursive: true });
const data: AccountsFile = { accounts: [...this.accounts.values()] };
const tmpFile = accountsFile + ".tmp";
writeFileSync(tmpFile, JSON.stringify(data, null, 2), "utf-8");
renameSync(tmpFile, accountsFile);
} catch (err) {
console.error("[AccountPool] Failed to persist accounts:", err instanceof Error ? err.message : err);
}
}
private loadPersisted(): void {
try {
const accountsFile = getAccountsFile();
if (!existsSync(accountsFile)) return;
const raw = readFileSync(accountsFile, "utf-8");
const data = JSON.parse(raw) as AccountsFile;
if (Array.isArray(data.accounts)) {
let needsPersist = false;
for (const entry of data.accounts) {
if (entry.id && entry.token) {
// Backfill missing fields from JWT (e.g. planType was null before fix)
if (!entry.planType || !entry.email || !entry.accountId) {
const profile = extractUserProfile(entry.token);
const accountId = extractChatGptAccountId(entry.token);
if (!entry.planType && profile?.chatgpt_plan_type) {
entry.planType = profile.chatgpt_plan_type;
needsPersist = true;
}
if (!entry.email && profile?.email) {
entry.email = profile.email;
needsPersist = true;
}
if (!entry.accountId && accountId) {
entry.accountId = accountId;
needsPersist = true;
}
}
// Backfill empty_response_count for old entries
if (entry.usage.empty_response_count == null) {
entry.usage.empty_response_count = 0;
needsPersist = true;
}
// Backfill window counter fields for old entries
if (entry.usage.window_request_count == null) {
entry.usage.window_request_count = 0;
entry.usage.window_input_tokens = 0;
entry.usage.window_output_tokens = 0;
entry.usage.window_counters_reset_at = null;
entry.usage.limit_window_seconds = null;
needsPersist = true;
}
this.accounts.set(entry.id, entry);
}
}
if (needsPersist) this.persistNow();
}
} catch (err) {
console.warn("[AccountPool] Failed to load accounts:", err instanceof Error ? err.message : err);
}
}
private migrateFromLegacy(): void {
try {
const accountsFile = getAccountsFile();
const legacyAuthFile = getLegacyAuthFile();
if (existsSync(accountsFile)) return; // already migrated
if (!existsSync(legacyAuthFile)) return;
const raw = readFileSync(legacyAuthFile, "utf-8");
const data = JSON.parse(raw) as {
token: string;
proxyApiKey?: string | null;
userInfo?: { email?: string; accountId?: string; planType?: string } | null;
};
if (!data.token) return;
const id = randomBytes(8).toString("hex");
const accountId = extractChatGptAccountId(data.token);
const entry: AccountEntry = {
id,
token: data.token,
refreshToken: null,
email: data.userInfo?.email ?? null,
accountId: accountId,
planType: data.userInfo?.planType ?? null,
proxyApiKey: data.proxyApiKey ?? "codex-proxy-" + randomBytes(24).toString("hex"),
status: isTokenExpired(data.token) ? "expired" : "active",
usage: {
request_count: 0,
input_tokens: 0,
output_tokens: 0,
empty_response_count: 0,
last_used: null,
rate_limit_until: null,
window_request_count: 0,
window_input_tokens: 0,
window_output_tokens: 0,
window_counters_reset_at: null,
limit_window_seconds: null,
},
addedAt: new Date().toISOString(),
};
this.accounts.set(id, entry);
// Write new format
const dir = dirname(accountsFile);
if (!existsSync(dir)) mkdirSync(dir, { recursive: true });
const accountsData: AccountsFile = { accounts: [entry] };
writeFileSync(accountsFile, JSON.stringify(accountsData, null, 2), "utf-8");
// Rename old file
renameSync(legacyAuthFile, legacyAuthFile + ".bak");
console.log("[AccountPool] Migrated from auth.json β†’ accounts.json");
} catch (err) {
console.warn("[AccountPool] Migration failed:", err);
}
}
/** Flush pending writes on shutdown */
destroy(): void {
if (this.persistTimer) {
clearTimeout(this.persistTimer);
this.persistTimer = null;
}
this.persistNow();
}
}