File size: 8,542 Bytes
8b777a2
 
 
 
 
 
 
 
 
 
 
 
 
9d6278d
8b777a2
 
4ebb914
8b777a2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ebb914
8b777a2
 
 
 
 
 
 
 
 
4ebb914
 
9d6278d
 
8b777a2
 
 
 
 
 
 
 
b1107bc
 
d6c3bb0
b1107bc
8b777a2
 
 
b1107bc
8b777a2
 
 
 
 
 
 
 
 
 
d6c3bb0
8b777a2
 
 
 
 
 
 
 
3c0eaf7
8b777a2
 
 
b1107bc
 
 
 
 
 
 
8b777a2
b1107bc
 
8b777a2
 
 
 
9d6278d
 
 
 
 
 
 
 
 
 
 
 
2def35e
9d6278d
 
 
 
b3dfc6c
9d6278d
b3dfc6c
9d6278d
b3dfc6c
9d6278d
 
 
 
 
 
 
 
 
 
 
 
4ebb914
 
9d6278d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b3dfc6c
9d6278d
b3dfc6c
9d6278d
b3dfc6c
9d6278d
 
 
 
 
 
2def35e
8b777a2
 
 
 
 
 
9d6278d
8b777a2
 
 
b1107bc
9d6278d
8b777a2
 
 
9d6278d
8b777a2
 
 
 
 
 
9d6278d
8b777a2
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
/**
 * Shared proxy handler β€” encapsulates the account acquire β†’ retry β†’ stream/collect β†’ release
 * lifecycle that is common to all API format routes (OpenAI, Anthropic, Gemini).
 *
 * Each route provides its own schema parsing, auth checking, and format adapter.
 * This handler takes over once a CodexResponsesRequest is prepared.
 */

import type { Context } from "hono";
import type { StatusCode } from "hono/utils/http-status";
import { stream } from "hono/streaming";
import { CodexApi, CodexApiError } from "../../proxy/codex-api.js";
import type { CodexResponsesRequest } from "../../proxy/codex-api.js";
import { EmptyResponseError } from "../../translation/codex-event-extractor.js";
import type { AccountPool } from "../../auth/account-pool.js";
import type { CookieJar } from "../../proxy/cookie-jar.js";
import type { ProxyPool } from "../../proxy/proxy-pool.js";
import { withRetry } from "../../utils/retry.js";

/** Data prepared by each route after parsing and translating the request. */
export interface ProxyRequest {
  codexRequest: CodexResponsesRequest;
  model: string;
  isStreaming: boolean;
}

/** Format-specific adapter provided by each route. */
export interface FormatAdapter {
  tag: string;
  noAccountStatus: StatusCode;
  formatNoAccount: () => unknown;
  format429: (message: string) => unknown;
  formatError: (status: number, message: string) => unknown;
  streamTranslator: (
    api: CodexApi,
    response: Response,
    model: string,
    onUsage: (u: { input_tokens: number; output_tokens: number }) => void,
    onResponseId: (id: string) => void,
  ) => AsyncGenerator<string>;
  collectTranslator: (
    api: CodexApi,
    response: Response,
    model: string,
  ) => Promise<{
    response: unknown;
    usage: { input_tokens: number; output_tokens: number };
    responseId: string | null;
  }>;
}

/**
 * Core shared handler β€” from account acquire to release.
 *
 * Handles: acquire, session lookup, retry, stream/collect, release, error formatting.
 */
export async function handleProxyRequest(
  c: Context,
  accountPool: AccountPool,
  cookieJar: CookieJar | undefined,
  req: ProxyRequest,
  fmt: FormatAdapter,
  proxyPool?: ProxyPool,
): Promise<Response> {
  // 1. Acquire account
  const acquired = accountPool.acquire();
  if (!acquired) {
    c.status(fmt.noAccountStatus);
    return c.json(fmt.formatNoAccount());
  }

  const { entryId, token, accountId } = acquired;
  const proxyUrl = proxyPool?.resolveProxyUrl(entryId);
  const codexApi = new CodexApi(token, accountId, cookieJar, entryId, proxyUrl);
  // Tracks which account the outer catch should release (updated by retry loop)
  let activeEntryId = entryId;

  console.log(
    `[${fmt.tag}] Account ${entryId} | Codex request:`,
    JSON.stringify(req.codexRequest).slice(0, 300),
  );

  let usageInfo: { input_tokens: number; output_tokens: number } | undefined;

  // P0-2: AbortController to kill curl when client disconnects
  const abortController = new AbortController();
  c.req.raw.signal.addEventListener("abort", () => abortController.abort(), { once: true });

  try {
    // 3. Retry + send to Codex
    const rawResponse = await withRetry(
      () => codexApi.createResponse(req.codexRequest, abortController.signal),
      { tag: fmt.tag },
    );

    // 4. Stream or collect
    if (req.isStreaming) {
      c.header("Content-Type", "text/event-stream");
      c.header("Cache-Control", "no-cache");
      c.header("Connection", "keep-alive");

      return stream(c, async (s) => {
        s.onAbort(() => abortController.abort());
        try {
          for await (const chunk of fmt.streamTranslator(
            codexApi,
            rawResponse,
            req.model,
            (u) => {
              usageInfo = u;
            },
            () => {},
          )) {
            await s.write(chunk);
          }
        } catch (err) {
          // P2-8: Send error SSE event to client before closing
          try {
            const errMsg = err instanceof Error ? err.message : "Stream interrupted";
            await s.write(`data: ${JSON.stringify({ error: { message: errMsg, type: "stream_error" } })}\n\n`);
          } catch { /* client already gone */ }
          throw err;
        } finally {
          // P0-2: Kill curl subprocess if still running
          abortController.abort();
          accountPool.release(entryId, usageInfo);
        }
      });
    } else {
      // Non-streaming: retry loop for empty responses (switch accounts)
      const MAX_EMPTY_RETRIES = 2;
      let currentEntryId = entryId;
      let currentCodexApi = codexApi;
      let currentRawResponse = rawResponse;

      for (let attempt = 1; ; attempt++) {
        try {
          const result = await fmt.collectTranslator(
            currentCodexApi,
            currentRawResponse,
            req.model,
          );
          accountPool.release(currentEntryId, result.usage);
          return c.json(result.response);
        } catch (collectErr) {
          if (collectErr instanceof EmptyResponseError && attempt <= MAX_EMPTY_RETRIES) {
            const emptyEmail = accountPool.getEntry(currentEntryId)?.email ?? "?";
            console.warn(
              `[${fmt.tag}] Account ${currentEntryId} (${emptyEmail}) | Empty response (attempt ${attempt}/${MAX_EMPTY_RETRIES + 1}), switching account...`,
            );
            accountPool.recordEmptyResponse(currentEntryId);
            accountPool.release(currentEntryId, collectErr.usage);

            // Acquire a new account
            const newAcquired = accountPool.acquire();
            if (!newAcquired) {
              console.warn(`[${fmt.tag}] No available account for retry`);
              c.status(502);
              return c.json(fmt.formatError(502, "Codex returned an empty response and no other accounts are available for retry"));
            }

            currentEntryId = newAcquired.entryId;
            activeEntryId = currentEntryId;
            const retryProxyUrl = proxyPool?.resolveProxyUrl(newAcquired.entryId);
            currentCodexApi = new CodexApi(newAcquired.token, newAcquired.accountId, cookieJar, newAcquired.entryId, retryProxyUrl);
            try {
              currentRawResponse = await withRetry(
                () => currentCodexApi.createResponse(req.codexRequest, abortController.signal),
                { tag: fmt.tag },
              );
            } catch (retryErr) {
              accountPool.release(currentEntryId);
              if (retryErr instanceof CodexApiError) {
                const code = (retryErr.status >= 400 && retryErr.status < 600 ? retryErr.status : 502) as StatusCode;
                c.status(code);
                return c.json(fmt.formatError(code, retryErr.message));
              }
              throw retryErr;
            }
            continue;
          }

          // Not an empty response error, or retries exhausted
          accountPool.release(currentEntryId);
          if (collectErr instanceof EmptyResponseError) {
            const exhaustedEmail = accountPool.getEntry(currentEntryId)?.email ?? "?";
            console.warn(
              `[${fmt.tag}] Account ${currentEntryId} (${exhaustedEmail}) | Empty response (attempt ${attempt}/${MAX_EMPTY_RETRIES + 1}), all retries exhausted`,
            );
            accountPool.recordEmptyResponse(currentEntryId);
            c.status(502);
            return c.json(fmt.formatError(502, "Codex returned empty responses across all available accounts"));
          }
          const msg = collectErr instanceof Error ? collectErr.message : "Unknown error";
          c.status(502);
          return c.json(fmt.formatError(502, msg));
        }
      }
    }
  } catch (err) {
    // 5. Error handling with format-specific responses
    if (err instanceof CodexApiError) {
      console.error(
        `[${fmt.tag}] Account ${activeEntryId} | Codex API error:`,
        err.message,
      );
      if (err.status === 429) {
        // P1-6: Count 429s as requests via encapsulated API (no direct entry mutation)
        accountPool.markRateLimited(activeEntryId, { countRequest: true });
        c.status(429);
        return c.json(fmt.format429(err.message));
      }
      accountPool.release(activeEntryId);
      const code = (
        err.status >= 400 && err.status < 600 ? err.status : 502
      ) as StatusCode;
      c.status(code);
      return c.json(fmt.formatError(code, err.message));
    }
    accountPool.release(activeEntryId);
    throw err;
  }
}