codex-proxy / src /routes /shared /proxy-handler.ts
icebear0828
feat: add proxy pool for per-account proxy routing
4ebb914
raw
history blame
8.54 kB
/**
* Shared proxy handler β€” encapsulates the account acquire β†’ retry β†’ stream/collect β†’ release
* lifecycle that is common to all API format routes (OpenAI, Anthropic, Gemini).
*
* Each route provides its own schema parsing, auth checking, and format adapter.
* This handler takes over once a CodexResponsesRequest is prepared.
*/
import type { Context } from "hono";
import type { StatusCode } from "hono/utils/http-status";
import { stream } from "hono/streaming";
import { CodexApi, CodexApiError } from "../../proxy/codex-api.js";
import type { CodexResponsesRequest } from "../../proxy/codex-api.js";
import { EmptyResponseError } from "../../translation/codex-event-extractor.js";
import type { AccountPool } from "../../auth/account-pool.js";
import type { CookieJar } from "../../proxy/cookie-jar.js";
import type { ProxyPool } from "../../proxy/proxy-pool.js";
import { withRetry } from "../../utils/retry.js";
/** Data prepared by each route after parsing and translating the request. */
export interface ProxyRequest {
codexRequest: CodexResponsesRequest;
model: string;
isStreaming: boolean;
}
/** Format-specific adapter provided by each route. */
export interface FormatAdapter {
tag: string;
noAccountStatus: StatusCode;
formatNoAccount: () => unknown;
format429: (message: string) => unknown;
formatError: (status: number, message: string) => unknown;
streamTranslator: (
api: CodexApi,
response: Response,
model: string,
onUsage: (u: { input_tokens: number; output_tokens: number }) => void,
onResponseId: (id: string) => void,
) => AsyncGenerator<string>;
collectTranslator: (
api: CodexApi,
response: Response,
model: string,
) => Promise<{
response: unknown;
usage: { input_tokens: number; output_tokens: number };
responseId: string | null;
}>;
}
/**
* Core shared handler β€” from account acquire to release.
*
* Handles: acquire, session lookup, retry, stream/collect, release, error formatting.
*/
export async function handleProxyRequest(
c: Context,
accountPool: AccountPool,
cookieJar: CookieJar | undefined,
req: ProxyRequest,
fmt: FormatAdapter,
proxyPool?: ProxyPool,
): Promise<Response> {
// 1. Acquire account
const acquired = accountPool.acquire();
if (!acquired) {
c.status(fmt.noAccountStatus);
return c.json(fmt.formatNoAccount());
}
const { entryId, token, accountId } = acquired;
const proxyUrl = proxyPool?.resolveProxyUrl(entryId);
const codexApi = new CodexApi(token, accountId, cookieJar, entryId, proxyUrl);
// Tracks which account the outer catch should release (updated by retry loop)
let activeEntryId = entryId;
console.log(
`[${fmt.tag}] Account ${entryId} | Codex request:`,
JSON.stringify(req.codexRequest).slice(0, 300),
);
let usageInfo: { input_tokens: number; output_tokens: number } | undefined;
// P0-2: AbortController to kill curl when client disconnects
const abortController = new AbortController();
c.req.raw.signal.addEventListener("abort", () => abortController.abort(), { once: true });
try {
// 3. Retry + send to Codex
const rawResponse = await withRetry(
() => codexApi.createResponse(req.codexRequest, abortController.signal),
{ tag: fmt.tag },
);
// 4. Stream or collect
if (req.isStreaming) {
c.header("Content-Type", "text/event-stream");
c.header("Cache-Control", "no-cache");
c.header("Connection", "keep-alive");
return stream(c, async (s) => {
s.onAbort(() => abortController.abort());
try {
for await (const chunk of fmt.streamTranslator(
codexApi,
rawResponse,
req.model,
(u) => {
usageInfo = u;
},
() => {},
)) {
await s.write(chunk);
}
} catch (err) {
// P2-8: Send error SSE event to client before closing
try {
const errMsg = err instanceof Error ? err.message : "Stream interrupted";
await s.write(`data: ${JSON.stringify({ error: { message: errMsg, type: "stream_error" } })}\n\n`);
} catch { /* client already gone */ }
throw err;
} finally {
// P0-2: Kill curl subprocess if still running
abortController.abort();
accountPool.release(entryId, usageInfo);
}
});
} else {
// Non-streaming: retry loop for empty responses (switch accounts)
const MAX_EMPTY_RETRIES = 2;
let currentEntryId = entryId;
let currentCodexApi = codexApi;
let currentRawResponse = rawResponse;
for (let attempt = 1; ; attempt++) {
try {
const result = await fmt.collectTranslator(
currentCodexApi,
currentRawResponse,
req.model,
);
accountPool.release(currentEntryId, result.usage);
return c.json(result.response);
} catch (collectErr) {
if (collectErr instanceof EmptyResponseError && attempt <= MAX_EMPTY_RETRIES) {
const emptyEmail = accountPool.getEntry(currentEntryId)?.email ?? "?";
console.warn(
`[${fmt.tag}] Account ${currentEntryId} (${emptyEmail}) | Empty response (attempt ${attempt}/${MAX_EMPTY_RETRIES + 1}), switching account...`,
);
accountPool.recordEmptyResponse(currentEntryId);
accountPool.release(currentEntryId, collectErr.usage);
// Acquire a new account
const newAcquired = accountPool.acquire();
if (!newAcquired) {
console.warn(`[${fmt.tag}] No available account for retry`);
c.status(502);
return c.json(fmt.formatError(502, "Codex returned an empty response and no other accounts are available for retry"));
}
currentEntryId = newAcquired.entryId;
activeEntryId = currentEntryId;
const retryProxyUrl = proxyPool?.resolveProxyUrl(newAcquired.entryId);
currentCodexApi = new CodexApi(newAcquired.token, newAcquired.accountId, cookieJar, newAcquired.entryId, retryProxyUrl);
try {
currentRawResponse = await withRetry(
() => currentCodexApi.createResponse(req.codexRequest, abortController.signal),
{ tag: fmt.tag },
);
} catch (retryErr) {
accountPool.release(currentEntryId);
if (retryErr instanceof CodexApiError) {
const code = (retryErr.status >= 400 && retryErr.status < 600 ? retryErr.status : 502) as StatusCode;
c.status(code);
return c.json(fmt.formatError(code, retryErr.message));
}
throw retryErr;
}
continue;
}
// Not an empty response error, or retries exhausted
accountPool.release(currentEntryId);
if (collectErr instanceof EmptyResponseError) {
const exhaustedEmail = accountPool.getEntry(currentEntryId)?.email ?? "?";
console.warn(
`[${fmt.tag}] Account ${currentEntryId} (${exhaustedEmail}) | Empty response (attempt ${attempt}/${MAX_EMPTY_RETRIES + 1}), all retries exhausted`,
);
accountPool.recordEmptyResponse(currentEntryId);
c.status(502);
return c.json(fmt.formatError(502, "Codex returned empty responses across all available accounts"));
}
const msg = collectErr instanceof Error ? collectErr.message : "Unknown error";
c.status(502);
return c.json(fmt.formatError(502, msg));
}
}
}
} catch (err) {
// 5. Error handling with format-specific responses
if (err instanceof CodexApiError) {
console.error(
`[${fmt.tag}] Account ${activeEntryId} | Codex API error:`,
err.message,
);
if (err.status === 429) {
// P1-6: Count 429s as requests via encapsulated API (no direct entry mutation)
accountPool.markRateLimited(activeEntryId, { countRequest: true });
c.status(429);
return c.json(fmt.format429(err.message));
}
accountPool.release(activeEntryId);
const code = (
err.status >= 400 && err.status < 600 ? err.status : 502
) as StatusCode;
c.status(code);
return c.json(fmt.formatError(code, err.message));
}
accountPool.release(activeEntryId);
throw err;
}
}