AI_PROJECT / src /scripts /api-pool-proxy.mjs
chenchenaoyang's picture
Deploy from Codex on 2026-04-08
4b8c220 verified
#!/usr/bin/env node
import { spawn } from "node:child_process";
import http from "node:http";
import path from "node:path";
import { Readable } from "node:stream";
import { pipeline } from "node:stream/promises";
import { ApiEndpointPool, normalizeEndpointType } from "../proxy/api-endpoint-pool.mjs";
import { classifyFailure } from "../proxy/codex-account-pool.mjs";
import { sanitizeForLogs } from "../shared/secret-sanitizer.mjs";
const DEFAULT_HOST = "127.0.0.1";
const DEFAULT_PORT = 8789;
const DEFAULT_PROVIDER = "codex";
const DEFAULT_POOL_DIR = path.resolve(process.cwd(), "api_pool", DEFAULT_PROVIDER);
const DEFAULT_LOCAL_API_KEY = "local-api-pool-proxy-key";
const DEFAULT_MAX_SWITCH_ATTEMPTS = 3;
const DEFAULT_REQUEST_TIMEOUT_MS = 60_000;
const DEFAULT_ENABLE_SCHEDULED_SWITCH = true;
const DEFAULT_SCHEDULED_SWITCH_INTERVAL_MS = 15 * 60_000;
const SUPPORTED_PATHS = {
codex: new Set(["/models", "/responses", "/v1/models", "/v1/responses", "/v1/chat/completions"]),
"claude-code": new Set(["/v1/messages", "/messages", "/v1/models"]),
};
function parseArgs(argv) {
const args = {};
for (const part of argv) {
if (!part.startsWith("--")) continue;
const raw = part.slice(2);
const idx = raw.indexOf("=");
if (idx < 0) {
args[raw] = "true";
continue;
}
args[raw.slice(0, idx)] = raw.slice(idx + 1);
}
return args;
}
function printUsage() {
console.log(`Usage:
node src/scripts/api-pool-proxy.mjs
Options:
--provider=codex
--pool-dir=api_pool/codex
--host=127.0.0.1
--port=8789
--local-api-key=local-api-pool-proxy-key
--max-switch-attempts=3
--request-timeout-ms=60000
--enable-scheduled-switch=true
--scheduled-switch-interval-ms=900000
--proxy-url=http://127.0.0.1:8118
--help
`);
}
function parseBooleanish(value, fallback) {
if (value == null || value === "") return fallback;
if (typeof value === "boolean") return value;
const text = String(value).trim().toLowerCase();
if (["true", "1", "yes", "on"].includes(text)) return true;
if (["false", "0", "no", "off"].includes(text)) return false;
return fallback;
}
async function maybeRespawnWithProxy(argv) {
const args = parseArgs(argv);
const proxyUrl = args["proxy-url"] || "";
if (!proxyUrl || process.env.CODEX_PROXY_BOOTSTRAPPED === "1") {
return false;
}
const child = spawn(process.execPath, [process.argv[1], ...argv], {
stdio: "inherit",
env: {
...process.env,
CODEX_PROXY_BOOTSTRAPPED: "1",
NODE_USE_ENV_PROXY: "1",
HTTPS_PROXY: proxyUrl,
HTTP_PROXY: proxyUrl,
ALL_PROXY: proxyUrl,
},
});
await new Promise((resolve, reject) => {
child.on("exit", (code) => {
process.exitCode = code ?? 1;
resolve();
});
child.on("error", reject);
});
return true;
}
function safeJson(value) {
return JSON.stringify(value, null, 2);
}
function getRequestPath(url = "/") {
try {
return new URL(url, "http://localhost").pathname;
} catch {
return "/";
}
}
function requireLocalAuth(req, expectedKey) {
return String(req.headers.authorization || "") === `Bearer ${expectedKey}`;
}
function copyHeadersToClient(res, upstreamHeaders) {
for (const [key, value] of upstreamHeaders.entries()) {
if (key.toLowerCase() === "transfer-encoding") continue;
res.setHeader(key, value);
}
}
function copyHeadersForUpstream(reqHeaders, apiKey, provider) {
const headers = {};
for (const [key, value] of Object.entries(reqHeaders || {})) {
if (value == null) continue;
const lower = key.toLowerCase();
if (["host", "authorization", "content-length", "connection", "x-api-key"].includes(lower)) {
continue;
}
headers[key] = value;
}
if (provider === "claude-code") {
headers["x-api-key"] = apiKey;
headers.authorization = `Bearer ${apiKey}`;
if (!headers["anthropic-version"]) {
headers["anthropic-version"] = "2023-06-01";
}
} else {
headers.authorization = `Bearer ${apiKey}`;
}
return headers;
}
function readRequestBody(req) {
return new Promise((resolve, reject) => {
const chunks = [];
req.on("data", (chunk) => chunks.push(chunk));
req.on("error", reject);
req.on("end", () => resolve(Buffer.concat(chunks)));
});
}
function classifyRetryableFailure(status, detail) {
const result = classifyFailure({ status, detail });
return {
...result,
retryable:
["auth", "rate_limit", "quota", "server", "network"].includes(result.category) ||
(result.category === "invalid" && isRetryableInvalidDetail(detail)),
};
}
function isRetryableInvalidDetail(detail) {
const lower = String(detail || "").toLowerCase();
return [
"model_not_found",
"no available channel for model",
"unsupported model",
"model not found",
"invalid model",
"does not support",
].some((needle) => lower.includes(needle));
}
export function endpointSummary(endpoint) {
if (!endpoint) return null;
return {
id: endpoint.id,
name: endpoint.name,
type: endpoint.type,
baseUrl: endpoint.baseUrl,
model: endpoint.model || "",
healthy: endpoint.healthy,
cooldownUntil: endpoint.cooldownUntilMs ? new Date(endpoint.cooldownUntilMs).toISOString() : null,
lastValidation: endpoint.lastValidation,
lastFailureReason: endpoint.lastFailureReason,
};
}
function createConsoleStartupLogger() {
return (event, payload = {}) => {
const time = new Date().toISOString();
const sanitized = sanitizeForLogs(payload);
const details = Object.entries(sanitized)
.filter(([, value]) => value !== undefined && value !== null && value !== "")
.map(([key, value]) => `${key}=${JSON.stringify(value)}`)
.join(" ");
console.log(`[startup] ${time} ${event}${details ? ` ${details}` : ""}`);
};
}
function resolveUpstreamUrl(baseUrl, reqUrl, requestPath) {
const upstream = new URL(baseUrl);
const requestUrl = new URL(reqUrl || "/", "http://localhost");
const basePath = upstream.pathname.replace(/\/+$/, "");
const incomingPath = requestPath;
let finalPath = incomingPath;
if (basePath && basePath !== "/" && incomingPath.startsWith(`${basePath}/`)) {
finalPath = incomingPath;
} else if (basePath === "/v1" && incomingPath.startsWith("/v1/")) {
finalPath = incomingPath;
} else if (basePath && basePath !== "/" && incomingPath.startsWith("/")) {
finalPath = `${basePath}${incomingPath}`;
}
upstream.pathname = finalPath;
upstream.search = requestUrl.search;
return upstream.toString();
}
async function createFetchWithProxy(proxyUrl) {
if (!proxyUrl) return fetch;
process.env.NODE_USE_ENV_PROXY = "1";
process.env.HTTPS_PROXY = proxyUrl;
process.env.HTTP_PROXY = proxyUrl;
process.env.ALL_PROXY = proxyUrl;
return fetch;
}
function unauthorized(res) {
res.statusCode = 401;
res.setHeader("content-type", "application/json");
res.end(safeJson({ error: { message: "Unauthorized local proxy key." } }));
}
export async function createApiPoolProxyService(options) {
const startupLogger =
typeof options.logger === "function" ? options.logger : createConsoleStartupLogger();
const fetchFn = options.fetchFn || (await createFetchWithProxy(options.proxyUrl));
const pool = new ApiEndpointPool({
poolDir: options.poolDir,
provider: options.provider,
fetchFn,
logger: startupLogger,
loadSnapshot: options.loadSnapshot,
sourcePath: options.sourcePath,
});
startupLogger("pool:load:start", {
provider: options.provider,
poolDir: options.poolDir,
});
await pool.load();
startupLogger("pool:load:done", { count: pool.listEndpoints().length });
const active = await pool.getInitialEndpoint();
if (!active) {
throw new Error(`No usable endpoint for provider=${options.provider}`);
}
const scheduledSwitchEnabled = parseBooleanish(
options.enableScheduledSwitch,
DEFAULT_ENABLE_SCHEDULED_SWITCH,
);
const scheduledSwitchIntervalMs = Math.max(
1_000,
Number(options.scheduledSwitchIntervalMs || DEFAULT_SCHEDULED_SWITCH_INTERVAL_MS),
);
const scheduleState = {
inflightRequests: 0,
lastScheduledSwitchAt: null,
nextScheduledSwitchAt: scheduledSwitchEnabled
? new Date(Date.now() + scheduledSwitchIntervalMs).toISOString()
: null,
lastScheduledSwitchReason: scheduledSwitchEnabled ? "waiting" : "disabled",
pendingScheduledSwitch: false,
timer: null,
closed: false,
};
function clearScheduledTimer() {
if (scheduleState.timer) {
clearTimeout(scheduleState.timer);
scheduleState.timer = null;
}
}
function scheduleNextSwitch(delayMs = scheduledSwitchIntervalMs) {
clearScheduledTimer();
if (!scheduledSwitchEnabled || scheduleState.closed) {
scheduleState.nextScheduledSwitchAt = null;
return;
}
scheduleState.nextScheduledSwitchAt = new Date(Date.now() + delayMs).toISOString();
scheduleState.timer = setTimeout(() => {
void runScheduledSwitch("timer");
}, delayMs);
scheduleState.timer.unref?.();
}
async function runScheduledSwitch(trigger = "timer") {
if (!scheduledSwitchEnabled || scheduleState.closed) {
scheduleState.lastScheduledSwitchReason = "disabled";
scheduleState.nextScheduledSwitchAt = null;
return { switched: false, reason: "disabled" };
}
if (scheduleState.inflightRequests > 0) {
scheduleState.pendingScheduledSwitch = true;
scheduleState.lastScheduledSwitchReason = "busy";
scheduleState.nextScheduledSwitchAt = null;
startupLogger("scheduled-switch:defer", {
provider: options.provider,
trigger,
inflightRequests: scheduleState.inflightRequests,
});
return { switched: false, reason: "busy" };
}
const currentActive = pool.getActiveEndpoint();
if (!currentActive) {
scheduleState.lastScheduledSwitchReason = "no-active";
scheduleState.pendingScheduledSwitch = false;
scheduleNextSwitch();
return { switched: false, reason: "no-active" };
}
const excluded = new Set([currentActive.id]);
for (let attempt = 0; attempt < pool.listEndpoints().length; attempt += 1) {
const candidate = pool.pickNextRotationCandidate(excluded);
if (!candidate) {
scheduleState.lastScheduledSwitchReason = "no-healthy-spare";
scheduleState.pendingScheduledSwitch = false;
scheduleNextSwitch();
startupLogger("scheduled-switch:skip", {
provider: options.provider,
trigger,
reason: "no-healthy-spare",
});
return { switched: false, reason: "no-healthy-spare" };
}
excluded.add(candidate.id);
const probe = await pool.probeEndpoint(candidate, {
activateOnSuccess: false,
activationMode: "scheduled",
});
if (!probe.ok) {
continue;
}
pool.markScheduledSwitch(candidate);
scheduleState.lastScheduledSwitchAt = new Date().toISOString();
scheduleState.lastScheduledSwitchReason = "switched";
scheduleState.pendingScheduledSwitch = false;
scheduleNextSwitch();
startupLogger("scheduled-switch:done", {
provider: options.provider,
trigger,
previousId: currentActive.id,
nextId: candidate.id,
nextName: candidate.name,
});
return { switched: true, reason: "switched", endpoint: candidate };
}
scheduleState.lastScheduledSwitchReason = "no-healthy-spare";
scheduleState.pendingScheduledSwitch = false;
scheduleNextSwitch();
return { switched: false, reason: "no-healthy-spare" };
}
function attachInflightTracker(res) {
scheduleState.inflightRequests += 1;
let done = false;
const finish = () => {
if (done) return;
done = true;
scheduleState.inflightRequests = Math.max(0, scheduleState.inflightRequests - 1);
if (scheduleState.inflightRequests === 0 && scheduleState.pendingScheduledSwitch) {
queueMicrotask(() => {
void runScheduledSwitch("idle-drain");
});
}
};
res.once("finish", finish);
res.once("close", finish);
}
scheduleNextSwitch();
async function reload() {
await pool.load();
const nextActive = await pool.getInitialEndpoint();
if (!nextActive) {
throw new Error(`No usable endpoint for provider=${options.provider}`);
}
scheduleState.pendingScheduledSwitch = false;
scheduleState.lastScheduledSwitchReason = scheduledSwitchEnabled ? "reloaded" : "disabled";
scheduleNextSwitch();
return nextActive;
}
function getAdminStatus() {
return {
provider: options.provider,
active: endpointSummary(pool.getActiveEndpoint()),
endpoints: pool.listEndpoints().map(endpointSummary),
inflightRequests: scheduleState.inflightRequests,
lastScheduledSwitchAt: scheduleState.lastScheduledSwitchAt,
nextScheduledSwitchAt: scheduleState.nextScheduledSwitchAt,
scheduledSwitchEnabled,
scheduledSwitchIntervalMs,
lastScheduledSwitchReason: scheduleState.lastScheduledSwitchReason,
};
}
function close() {
scheduleState.closed = true;
clearScheduledTimer();
}
function promoteNextEndpointAfterFailure(
failedEndpoint,
{ expectedId = null, expectedVersion = null, excluded = new Set() } = {},
) {
const currentActive = pool.getActiveEndpoint();
if (!failedEndpoint || !currentActive || currentActive.id !== failedEndpoint.id) {
return null;
}
const localExcluded = new Set(excluded);
localExcluded.add(failedEndpoint.id);
const candidate = pool.pickNextHealthyEndpoint(localExcluded);
if (!candidate) {
return null;
}
const activated = pool.setActiveEndpoint(candidate, "failover", {
expectedId,
expectedVersion,
});
return activated ? candidate : null;
}
async function handleRequest(
req,
res,
{ requestUrl = req.url, exposeHealthDetails = true, exposeStatus = true } = {},
) {
const requestPath = getRequestPath(requestUrl);
if (requestPath === "/healthz") {
res.statusCode = 200;
res.setHeader("content-type", "application/json");
res.end(
safeJson(
exposeHealthDetails
? {
ok: true,
provider: options.provider,
active: endpointSummary(pool.getActiveEndpoint()),
}
: { ok: true },
),
);
return;
}
if (requestPath === "/proxy/status" && exposeStatus) {
res.statusCode = 200;
res.setHeader("content-type", "application/json");
res.end(safeJson(getAdminStatus()));
return;
}
if (!requireLocalAuth(req, options.localApiKey)) {
unauthorized(res);
return;
}
const supported = SUPPORTED_PATHS[options.provider] || new Set();
if (!supported.has(requestPath)) {
res.statusCode = 404;
res.setHeader("content-type", "application/json");
res.end(
safeJson({
error: {
message: `Unsupported path: ${requestPath}. Supported: ${[...supported].join(", ")}`,
},
}),
);
return;
}
const requestBody =
req.method === "GET" || req.method === "HEAD" ? null : await readRequestBody(req);
attachInflightTracker(res);
const excluded = new Set();
const maxAttempts = Math.max(1, Number(options.maxSwitchAttempts) + 1);
let lastFailure = null;
let nextCandidate = null;
for (let attempt = 0; attempt < maxAttempts; attempt += 1) {
const current =
nextCandidate ||
(attempt === 0
? pool.getActiveEndpoint() || (await pool.getInitialEndpoint())
: pool.pickNextHealthyEndpoint(excluded));
nextCandidate = null;
if (!current) break;
excluded.add(current.id);
if (pool.isCoolingDown(current)) continue;
const selectedActiveId = pool.getActiveEndpoint()?.id || null;
const selectedActiveVersion = pool.getActiveEndpointVersion();
try {
const upstreamUrl = resolveUpstreamUrl(current.baseUrl, requestUrl, requestPath);
const headers = copyHeadersForUpstream(req.headers, current.apiKey, current.type);
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), options.requestTimeoutMs);
let upstream;
try {
upstream = await fetchFn(upstreamUrl, {
method: req.method,
headers,
body: requestBody,
signal: controller.signal,
});
} finally {
clearTimeout(timeout);
}
if (!upstream.ok) {
const detail = await upstream.text();
const classified = classifyRetryableFailure(upstream.status, detail);
pool.markFailure(current, classified.category, detail || classified.reason);
lastFailure = {
status: upstream.status,
category: classified.category,
reason: detail || classified.reason,
};
if (classified.retryable) {
nextCandidate = promoteNextEndpointAfterFailure(current, {
expectedId: selectedActiveId,
expectedVersion: selectedActiveVersion,
excluded,
});
continue;
}
res.statusCode = upstream.status;
res.setHeader("content-type", "application/json");
res.end(
safeJson({
error: {
message: detail || "Upstream request failed.",
category: classified.category,
},
}),
);
return;
}
res.statusCode = upstream.status;
copyHeadersToClient(res, upstream.headers);
if (!upstream.body) {
const activated = pool.markSuccess(current, {
expectedId: selectedActiveId,
expectedVersion: selectedActiveVersion,
});
if (!activated) {
startupLogger("pool:active-endpoint:stale-success", {
provider: options.provider,
requestPath,
endpointId: current.id,
endpointName: current.name,
selectedActiveId,
selectedActiveVersion,
currentActiveId: pool.getActiveEndpoint()?.id || null,
currentActiveVersion: pool.getActiveEndpointVersion(),
message: `忽略旧请求成功回写:请求开始时活跃节点=${selectedActiveId || "none"}@v${selectedActiveVersion},当前活跃节点=${pool.getActiveEndpoint()?.id || "none"}@v${pool.getActiveEndpointVersion()},成功节点=${current.id}`,
});
}
res.end();
return;
}
try {
await pipeline(Readable.fromWeb(upstream.body), res);
} catch (error) {
const detail = error?.message || String(error);
const classified = classifyRetryableFailure(0, detail);
pool.markFailure(current, classified.category, detail);
lastFailure = {
status: 0,
category: classified.category,
reason: detail,
};
promoteNextEndpointAfterFailure(current, {
expectedId: selectedActiveId,
expectedVersion: selectedActiveVersion,
excluded,
});
if (!res.destroyed) {
res.destroy(error);
}
return;
}
const activated = pool.markSuccess(current, {
expectedId: selectedActiveId,
expectedVersion: selectedActiveVersion,
});
if (!activated) {
startupLogger("pool:active-endpoint:stale-success", {
provider: options.provider,
requestPath,
endpointId: current.id,
endpointName: current.name,
selectedActiveId,
selectedActiveVersion,
currentActiveId: pool.getActiveEndpoint()?.id || null,
currentActiveVersion: pool.getActiveEndpointVersion(),
message: `忽略旧请求成功回写:请求开始时活跃节点=${selectedActiveId || "none"}@v${selectedActiveVersion},当前活跃节点=${pool.getActiveEndpoint()?.id || "none"}@v${pool.getActiveEndpointVersion()},成功节点=${current.id}`,
});
}
return;
} catch (error) {
const detail = error?.message || String(error);
const classified = classifyRetryableFailure(0, detail);
pool.markFailure(current, classified.category, detail);
lastFailure = {
status: 0,
category: classified.category,
reason: detail,
};
nextCandidate = promoteNextEndpointAfterFailure(current, {
expectedId: selectedActiveId,
expectedVersion: selectedActiveVersion,
excluded,
});
}
}
res.statusCode = 503;
res.setHeader("content-type", "application/json");
res.end(
safeJson({
error: {
message: "No healthy endpoint available.",
lastFailure,
},
}),
);
}
return {
pool,
handleRequest,
reload,
getAdminStatus,
runScheduledSwitchNow: () => runScheduledSwitch("manual"),
close,
};
}
export async function createApiPoolProxyServer(options) {
const service = await createApiPoolProxyService(options);
const server = http.createServer((req, res) =>
service.handleRequest(req, res, {
exposeHealthDetails: true,
exposeStatus: true,
}),
);
const originalClose = server.close.bind(server);
server.close = (callback) => {
service.close?.();
return originalClose(callback);
};
return { server, pool: service.pool, service };
}
async function main() {
if (await maybeRespawnWithProxy(process.argv.slice(2))) {
return;
}
const args = parseArgs(process.argv.slice(2));
if (args.help === "true") {
printUsage();
return;
}
const provider = normalizeEndpointType(
args.provider || process.env.API_POOL_PROVIDER || DEFAULT_PROVIDER,
);
if (!provider) {
throw new Error("provider must be codex or claude-code");
}
const defaultPoolDir = path.resolve(process.cwd(), "api_pool", provider);
const options = {
provider,
poolDir: path.resolve(args["pool-dir"] || process.env.API_POOL_DIR || defaultPoolDir || DEFAULT_POOL_DIR),
host: args.host || process.env.API_POOL_HOST || DEFAULT_HOST,
port: Number(args.port || process.env.API_POOL_PORT || DEFAULT_PORT),
localApiKey: args["local-api-key"] || process.env.API_POOL_LOCAL_API_KEY || DEFAULT_LOCAL_API_KEY,
maxSwitchAttempts: Number(
args["max-switch-attempts"] || process.env.API_POOL_MAX_SWITCH_ATTEMPTS || DEFAULT_MAX_SWITCH_ATTEMPTS,
),
requestTimeoutMs: Number(
args["request-timeout-ms"] || process.env.API_POOL_REQUEST_TIMEOUT_MS || DEFAULT_REQUEST_TIMEOUT_MS,
),
enableScheduledSwitch: parseBooleanish(
args["enable-scheduled-switch"] ?? process.env.API_POOL_SCHEDULED_SWITCH_ENABLED,
DEFAULT_ENABLE_SCHEDULED_SWITCH,
),
scheduledSwitchIntervalMs: Number(
args["scheduled-switch-interval-ms"] ||
process.env.API_POOL_SCHEDULED_SWITCH_INTERVAL_MS ||
DEFAULT_SCHEDULED_SWITCH_INTERVAL_MS,
),
proxyUrl:
args["proxy-url"] ||
process.env.API_POOL_PROXY_URL ||
process.env.HTTPS_PROXY ||
process.env.HTTP_PROXY ||
"",
};
console.log("[startup] preparing api pool proxy");
console.log(`[startup] provider=${options.provider}`);
console.log(`[startup] host=${options.host} port=${options.port}`);
console.log(`[startup] poolDir=${options.poolDir}`);
console.log(`[startup] scheduledSwitch=${options.enableScheduledSwitch ? "on" : "off"}`);
console.log(`[startup] scheduledSwitchIntervalMs=${options.scheduledSwitchIntervalMs}`);
console.log(`[startup] upstreamProxy=${options.proxyUrl || "(none)"}`);
const { server, pool } = await createApiPoolProxyServer(options);
server.listen(options.port, options.host, () => {
const active = pool.getActiveEndpoint();
console.log(`API 池代理已启动:http://${options.host}:${options.port}`);
console.log(`Provider: ${options.provider}`);
console.log(`初始活跃节点:${active?.name || active?.id || "(none)"}`);
});
}
const directRun = process.argv[1] && path.resolve(process.argv[1]) === path.resolve(new URL(import.meta.url).pathname);
if (directRun) {
main().catch((error) => {
console.error(error?.message || error);
process.exitCode = 1;
});
}