File size: 16,296 Bytes
8b777a2
 
 
 
 
 
 
 
 
 
 
 
 
9d6278d
8b777a2
 
4ebb914
8b777a2
 
 
 
 
 
 
e25a730
 
8b777a2
 
 
 
 
 
 
 
 
 
 
 
 
fadda70
8b777a2
e25a730
8b777a2
 
 
 
 
e25a730
8b777a2
 
fadda70
8b777a2
 
 
 
 
 
 
 
 
6220911
 
 
 
 
8cde2e9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ba262d0
 
 
 
 
 
 
 
 
 
 
 
 
90c89b6
 
 
 
 
 
 
 
 
 
 
8b777a2
 
 
 
 
 
4ebb914
8b777a2
90c89b6
 
8b777a2
 
 
 
 
 
4ebb914
90c89b6
9d6278d
 
90c89b6
 
 
8b777a2
 
 
 
 
 
fadda70
8b777a2
b1107bc
 
d6c3bb0
b1107bc
90c89b6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e25a730
90c89b6
45525c6
 
 
 
 
 
 
90c89b6
 
 
 
 
 
 
 
 
 
 
8b777a2
90c89b6
 
 
 
 
 
 
 
 
b1107bc
90c89b6
 
 
 
e25a730
9d6278d
90c89b6
 
 
 
 
 
 
 
 
 
9d6278d
90c89b6
 
 
 
 
 
 
9d6278d
90c89b6
 
 
 
 
 
 
 
 
 
 
 
6220911
90c89b6
 
 
 
9d6278d
90c89b6
9d6278d
 
90c89b6
 
 
 
 
 
 
 
 
 
 
 
4779e44
 
 
6220911
4779e44
 
9d6278d
2def35e
8b777a2
90c89b6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6220911
90c89b6
 
 
 
 
 
 
 
 
8cde2e9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
90c89b6
 
 
ba262d0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
90c89b6
d9ac395
90c89b6
 
8b777a2
9d6278d
90c89b6
8b777a2
90c89b6
 
8b777a2
90c89b6
 
 
 
8b777a2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
/**
 * Shared proxy handler — encapsulates the account acquire → retry → stream/collect → release
 * lifecycle that is common to all API format routes (OpenAI, Anthropic, Gemini).
 *
 * Each route provides its own schema parsing, auth checking, and format adapter.
 * This handler takes over once a CodexResponsesRequest is prepared.
 */

import type { Context } from "hono";
import type { StatusCode } from "hono/utils/http-status";
import { stream } from "hono/streaming";
import { CodexApi, CodexApiError } from "../../proxy/codex-api.js";
import type { CodexResponsesRequest } from "../../proxy/codex-api.js";
import { EmptyResponseError } from "../../translation/codex-event-extractor.js";
import type { AccountPool } from "../../auth/account-pool.js";
import type { CookieJar } from "../../proxy/cookie-jar.js";
import type { ProxyPool } from "../../proxy/proxy-pool.js";
import { withRetry } from "../../utils/retry.js";

/** Data prepared by each route after parsing and translating the request. */
export interface ProxyRequest {
  codexRequest: CodexResponsesRequest;
  model: string;
  isStreaming: boolean;
  /** Original schema before tuple→object conversion (for response reconversion). */
  tupleSchema?: Record<string, unknown> | null;
}

/** Format-specific adapter provided by each route. */
export interface FormatAdapter {
  tag: string;
  noAccountStatus: StatusCode;
  formatNoAccount: () => unknown;
  format429: (message: string) => unknown;
  formatError: (status: number, message: string) => unknown;
  streamTranslator: (
    api: CodexApi,
    response: Response,
    model: string,
    onUsage: (u: { input_tokens: number; output_tokens: number; cached_tokens?: number; reasoning_tokens?: number }) => void,
    onResponseId: (id: string) => void,
    tupleSchema?: Record<string, unknown> | null,
  ) => AsyncGenerator<string>;
  collectTranslator: (
    api: CodexApi,
    response: Response,
    model: string,
    tupleSchema?: Record<string, unknown> | null,
  ) => Promise<{
    response: unknown;
    usage: { input_tokens: number; output_tokens: number; cached_tokens?: number; reasoning_tokens?: number };
    responseId: string | null;
  }>;
}

/**
 * Core shared handler — from account acquire to release.
 *
 * Handles: acquire, session lookup, retry, stream/collect, release, error formatting.
 */
/** Clamp an HTTP status to a valid error StatusCode, defaulting to 502 for non-error codes. */
function toErrorStatus(status: number): StatusCode {
  return (status >= 400 && status < 600 ? status : 502) as StatusCode;
}

/** Extract the rate-limit reset duration from a 429 error body, if available. */
function extractRetryAfterSec(body: string): number | undefined {
  try {
    const parsed = JSON.parse(body) as Record<string, unknown>;
    const error = parsed.error as Record<string, unknown> | undefined;
    if (!error) return undefined;
    if (typeof error.resets_in_seconds === "number" && error.resets_in_seconds > 0) {
      return error.resets_in_seconds;
    }
    if (typeof error.resets_at === "number" && error.resets_at > 0) {
      const diff = error.resets_at - Date.now() / 1000;
      return diff > 0 ? diff : undefined;
    }
  } catch { /* use default backoff */ }
  return undefined;
}

/** Check if a CodexApiError indicates the account is banned/suspended (non-CF 403). */
function isBanError(err: CodexApiError): boolean {
  if (err.status !== 403) return false;
  const body = err.body.toLowerCase();
  if (body.includes("cf_chl") || body.includes("<!doctype") || body.includes("<html")) return false;
  return true;
}

/** Check if a CodexApiError is a 401 token invalidation (revoked/expired upstream). */
function isTokenInvalidError(err: CodexApiError): boolean {
  return err.status === 401;
}

/** Check if a CodexApiError indicates the model is not supported on the account's plan. */
function isModelNotSupportedError(err: CodexApiError): boolean {
  // Only 4xx client errors (exclude 429 rate-limit)
  if (err.status < 400 || err.status >= 500 || err.status === 429) return false;
  const lower = err.message.toLowerCase();
  // Must contain "model" to avoid false positives like "feature not supported"
  if (!lower.includes("model")) return false;
  return lower.includes("not supported") || lower.includes("not_supported")
    || lower.includes("not available") || lower.includes("not_available");
}

export async function handleProxyRequest(
  c: Context,
  accountPool: AccountPool,
  cookieJar: CookieJar | undefined,
  req: ProxyRequest,
  fmt: FormatAdapter,
  proxyPool?: ProxyPool,
): Promise<Response> {
  // 1. Acquire account (model-aware)
  const acquired = accountPool.acquire({ model: req.codexRequest.model });
  if (!acquired) {
    c.status(fmt.noAccountStatus);
    return c.json(fmt.formatNoAccount());
  }

  const { entryId, token, accountId } = acquired;
  const proxyUrl = proxyPool?.resolveProxyUrl(entryId);
  let codexApi = new CodexApi(token, accountId, cookieJar, entryId, proxyUrl);
  // Tracks which account the outer catch should release (updated by retry loop)
  let activeEntryId = entryId;
  // Track tried accounts for model retry exclusion
  const triedEntryIds: string[] = [entryId];
  let modelRetried = false;

  console.log(
    `[${fmt.tag}] Account ${entryId} | Codex request:`,
    JSON.stringify(req.codexRequest).slice(0, 300),
  );

  let usageInfo: { input_tokens: number; output_tokens: number; cached_tokens?: number; reasoning_tokens?: number } | undefined;

  // P0-2: AbortController to kill curl when client disconnects
  const abortController = new AbortController();
  c.req.raw.signal.addEventListener("abort", () => abortController.abort(), { once: true });

  for (;;) { // model retry loop (max 1 retry)
    try {
      // 3. Retry + send to Codex
      const rawResponse = await withRetry(
        () => codexApi.createResponse(req.codexRequest, abortController.signal),
        { tag: fmt.tag },
      );

      // 4. Stream or collect
      if (req.isStreaming) {
        c.header("Content-Type", "text/event-stream");
        c.header("Cache-Control", "no-cache");
        c.header("Connection", "keep-alive");

        return stream(c, async (s) => {
          s.onAbort(() => abortController.abort());
          try {
            for await (const chunk of fmt.streamTranslator(
              codexApi,
              rawResponse,
              req.model,
              (u) => {
                usageInfo = u;
              },
              () => {},
              req.tupleSchema,
            )) {
              try {
                await s.write(chunk);
              } catch {
                // Client disconnected mid-stream — stop reading upstream
                abortController.abort();
                return;
              }
            }
          } catch (err) {
            // P2-8: Send error SSE event to client before closing
            try {
              const errMsg = err instanceof Error ? err.message : "Stream interrupted";
              await s.write(`data: ${JSON.stringify({ error: { message: errMsg, type: "stream_error" } })}\n\n`);
            } catch { /* client already gone */ }
          } finally {
            // P0-2: Kill curl subprocess if still running
            abortController.abort();
            accountPool.release(activeEntryId, usageInfo);
          }
        });
      } else {
        // Non-streaming: retry loop for empty responses (switch accounts)
        const MAX_EMPTY_RETRIES = 2;
        let currentEntryId = activeEntryId;
        let currentCodexApi = codexApi;
        let currentRawResponse = rawResponse;

        for (let attempt = 1; ; attempt++) {
          try {
            const result = await fmt.collectTranslator(
              currentCodexApi,
              currentRawResponse,
              req.model,
              req.tupleSchema,
            );
            accountPool.release(currentEntryId, result.usage);
            return c.json(result.response);
          } catch (collectErr) {
            if (collectErr instanceof EmptyResponseError && attempt <= MAX_EMPTY_RETRIES) {
              const emptyEmail = accountPool.getEntry(currentEntryId)?.email ?? "?";
              console.warn(
                `[${fmt.tag}] Account ${currentEntryId} (${emptyEmail}) | Empty response (attempt ${attempt}/${MAX_EMPTY_RETRIES + 1}), switching account...`,
              );
              accountPool.recordEmptyResponse(currentEntryId);
              accountPool.release(currentEntryId, collectErr.usage);

              // Acquire a new account (model-aware)
              const newAcquired = accountPool.acquire({ model: req.codexRequest.model });
              if (!newAcquired) {
                console.warn(`[${fmt.tag}] No available account for retry`);
                c.status(502);
                return c.json(fmt.formatError(502, "Codex returned an empty response and no other accounts are available for retry"));
              }

              currentEntryId = newAcquired.entryId;
              activeEntryId = currentEntryId;
              const retryProxyUrl = proxyPool?.resolveProxyUrl(newAcquired.entryId);
              currentCodexApi = new CodexApi(newAcquired.token, newAcquired.accountId, cookieJar, newAcquired.entryId, retryProxyUrl);
              try {
                currentRawResponse = await withRetry(
                  () => currentCodexApi.createResponse(req.codexRequest, abortController.signal),
                  { tag: fmt.tag },
                );
              } catch (retryErr) {
                accountPool.release(currentEntryId);
                if (retryErr instanceof CodexApiError) {
                  const code = toErrorStatus(retryErr.status);
                  c.status(code);
                  return c.json(fmt.formatError(code, retryErr.message));
                }
                throw retryErr;
              }
              continue;
            }

            // Not an empty response error, or retries exhausted
            accountPool.release(currentEntryId);
            if (collectErr instanceof EmptyResponseError) {
              const exhaustedEmail = accountPool.getEntry(currentEntryId)?.email ?? "?";
              console.warn(
                `[${fmt.tag}] Account ${currentEntryId} (${exhaustedEmail}) | Empty response (attempt ${attempt}/${MAX_EMPTY_RETRIES + 1}), all retries exhausted`,
              );
              accountPool.recordEmptyResponse(currentEntryId);
              c.status(502);
              return c.json(fmt.formatError(502, "Codex returned empty responses across all available accounts"));
            }
            const msg = collectErr instanceof Error ? collectErr.message : "Unknown error";
            // Extract upstream status from error message (e.g. "HTTP/1.1 400 Bad Request")
            const statusMatch = msg.match(/HTTP\/[\d.]+ (\d{3})/);
            const upstreamStatus = statusMatch ? parseInt(statusMatch[1], 10) : 0;
            const code = toErrorStatus(upstreamStatus);
            c.status(code);
            return c.json(fmt.formatError(code, msg));
          }
        }
      }
    } catch (err) {
      // 5. Error handling with format-specific responses
      if (err instanceof CodexApiError) {
        // Model not supported on this account's plan → try a different account
        if (!modelRetried && isModelNotSupportedError(err)) {
          modelRetried = true;
          const failedEmail = accountPool.getEntry(activeEntryId)?.email ?? "?";
          console.warn(
            `[${fmt.tag}] Account ${activeEntryId} (${failedEmail}) | Model "${req.codexRequest.model}" not supported, trying different account...`,
          );
          accountPool.release(activeEntryId);

          const retry = accountPool.acquire({
            model: req.codexRequest.model,
            excludeIds: triedEntryIds,
          });
          if (retry) {
            activeEntryId = retry.entryId;
            triedEntryIds.push(retry.entryId);
            const retryProxyUrl = proxyPool?.resolveProxyUrl(retry.entryId);
            codexApi = new CodexApi(retry.token, retry.accountId, cookieJar, retry.entryId, retryProxyUrl);
            console.log(`[${fmt.tag}] Retrying with account ${retry.entryId}`);
            continue; // re-enter model retry loop
          }
          // No other account available — return error (already released above)
          const code = toErrorStatus(err.status);
          c.status(code);
          return c.json(fmt.formatError(code, err.message));
        }

        console.error(
          `[${fmt.tag}] Account ${activeEntryId} | Codex API error:`,
          err.message,
        );
        if (err.status === 429) {
          const retryAfterSec = extractRetryAfterSec(err.body);
          accountPool.markRateLimited(activeEntryId, { retryAfterSec, countRequest: true });

          const failedEmail = accountPool.getEntry(activeEntryId)?.email ?? "?";
          console.warn(
            `[${fmt.tag}] Account ${activeEntryId} (${failedEmail}) | 429 rate limited` +
            (retryAfterSec != null ? ` (resets in ${Math.round(retryAfterSec)}s)` : "") +
            `, trying different account...`,
          );

          const retry = accountPool.acquire({
            model: req.codexRequest.model,
            excludeIds: triedEntryIds,
          });
          if (retry) {
            activeEntryId = retry.entryId;
            triedEntryIds.push(retry.entryId);
            const retryProxyUrl = proxyPool?.resolveProxyUrl(retry.entryId);
            codexApi = new CodexApi(retry.token, retry.accountId, cookieJar, retry.entryId, retryProxyUrl);
            console.log(`[${fmt.tag}] 429 fallback → account ${retry.entryId}`);
            continue;
          }

          c.status(429);
          return c.json(fmt.format429(err.message));
        }
        if (isBanError(err)) {
          accountPool.markStatus(activeEntryId, "banned");
          const failedEmail = accountPool.getEntry(activeEntryId)?.email ?? "?";
          console.warn(
            `[${fmt.tag}] Account ${activeEntryId} (${failedEmail}) | 403 banned, trying different account...`,
          );

          const retry = accountPool.acquire({
            model: req.codexRequest.model,
            excludeIds: triedEntryIds,
          });
          if (retry) {
            activeEntryId = retry.entryId;
            triedEntryIds.push(retry.entryId);
            const retryProxyUrl = proxyPool?.resolveProxyUrl(retry.entryId);
            codexApi = new CodexApi(retry.token, retry.accountId, cookieJar, retry.entryId, retryProxyUrl);
            console.log(`[${fmt.tag}] 403 ban fallback → account ${retry.entryId}`);
            continue;
          }

          c.status(403);
          return c.json(fmt.formatError(403, err.message));
        }
        if (isTokenInvalidError(err)) {
          accountPool.markStatus(activeEntryId, "expired");
          const failedEmail = accountPool.getEntry(activeEntryId)?.email ?? "?";
          console.warn(
            `[${fmt.tag}] Account ${activeEntryId} (${failedEmail}) | 401 token invalidated, trying different account...`,
          );

          const retry = accountPool.acquire({
            model: req.codexRequest.model,
            excludeIds: triedEntryIds,
          });
          if (retry) {
            activeEntryId = retry.entryId;
            triedEntryIds.push(retry.entryId);
            const retryProxyUrl = proxyPool?.resolveProxyUrl(retry.entryId);
            codexApi = new CodexApi(retry.token, retry.accountId, cookieJar, retry.entryId, retryProxyUrl);
            console.log(`[${fmt.tag}] 401 fallback → account ${retry.entryId}`);
            continue;
          }

          c.status(401);
          return c.json(fmt.formatError(401, err.message));
        }
        accountPool.release(activeEntryId);
        const code = toErrorStatus(err.status);
        c.status(code);
        return c.json(fmt.formatError(code, err.message));
      }
      accountPool.release(activeEntryId);
      throw err;
    }

    break; // normal exit from model retry loop
  }

  // Should never reach here, but TypeScript needs a return
  c.status(500);
  return c.json(fmt.formatError(500, "Unexpected proxy handler exit"));
}