icebear0828 Claude Opus 4.6 commited on
Commit
b1107bc
·
1 Parent(s): 347f81b

refactor: architecture audit fixes (P0-P2 stability & reliability)

Browse files

P0: curl header parse timeout (30s), kill curl on stream disconnect
via AbortController, cap error response body at 1MB.

P1: acquireLock TTL (5min auto-release), atomic config hot-reload,
encapsulate 429 counting in AccountPool, graceful shutdown with
server.close() + 5s drain period.

P2: send error SSE event on stream failure, structured JSON logger,
O(1) session LRU eviction via Map insertion order.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

src/auth/account-pool.ts CHANGED
@@ -31,9 +31,12 @@ import type {
31
  const ACCOUNTS_FILE = resolve(process.cwd(), "data", "accounts.json");
32
  const LEGACY_AUTH_FILE = resolve(process.cwd(), "data", "auth.json");
33
 
 
 
 
34
  export class AccountPool {
35
  private accounts: Map<string, AccountEntry> = new Map();
36
- private acquireLocks: Set<string> = new Set();
37
  private roundRobinIndex = 0;
38
  private persistTimer: ReturnType<typeof setTimeout> | null = null;
39
 
@@ -61,12 +64,21 @@ export class AccountPool {
61
  acquire(): AcquiredAccount | null {
62
  const config = getConfig();
63
  const now = new Date();
 
64
 
65
  // Update statuses before selecting
66
  for (const entry of this.accounts.values()) {
67
  this.refreshStatus(entry, now);
68
  }
69
 
 
 
 
 
 
 
 
 
70
  // Filter available accounts
71
  const available = [...this.accounts.values()].filter(
72
  (a) => a.status === "active" && !this.acquireLocks.has(a.id),
@@ -91,7 +103,7 @@ export class AccountPool {
91
  selected = available[0];
92
  }
93
 
94
- this.acquireLocks.add(selected.id);
95
  return {
96
  entryId: selected.id,
97
  token: selected.token,
@@ -121,18 +133,31 @@ export class AccountPool {
121
 
122
  /**
123
  * Mark an account as rate-limited after a 429.
 
124
  */
125
- markRateLimited(entryId: string, retryAfterSec?: number): void {
 
 
 
126
  this.acquireLocks.delete(entryId);
127
  const entry = this.accounts.get(entryId);
128
  if (!entry) return;
129
 
130
  const config = getConfig();
131
- const backoff = jitter(retryAfterSec ?? config.auth.rate_limit_backoff_seconds, 0.2);
 
 
 
132
  const until = new Date(Date.now() + backoff * 1000);
133
 
134
  entry.status = "rate_limited";
135
  entry.usage.rate_limit_until = until.toISOString();
 
 
 
 
 
 
136
  this.schedulePersist();
137
  }
138
 
 
31
  const ACCOUNTS_FILE = resolve(process.cwd(), "data", "accounts.json");
32
  const LEGACY_AUTH_FILE = resolve(process.cwd(), "data", "auth.json");
33
 
34
+ // P1-4: Lock TTL — auto-release locks older than this
35
+ const ACQUIRE_LOCK_TTL_MS = 5 * 60 * 1000; // 5 minutes
36
+
37
  export class AccountPool {
38
  private accounts: Map<string, AccountEntry> = new Map();
39
+ private acquireLocks: Map<string, number> = new Map(); // entryId → timestamp
40
  private roundRobinIndex = 0;
41
  private persistTimer: ReturnType<typeof setTimeout> | null = null;
42
 
 
64
  acquire(): AcquiredAccount | null {
65
  const config = getConfig();
66
  const now = new Date();
67
+ const nowMs = now.getTime();
68
 
69
  // Update statuses before selecting
70
  for (const entry of this.accounts.values()) {
71
  this.refreshStatus(entry, now);
72
  }
73
 
74
+ // P1-4: Auto-release stale locks (older than TTL)
75
+ for (const [id, lockedAt] of this.acquireLocks) {
76
+ if (nowMs - lockedAt > ACQUIRE_LOCK_TTL_MS) {
77
+ console.warn(`[AccountPool] Auto-releasing stale lock for ${id} (locked ${Math.round((nowMs - lockedAt) / 1000)}s ago)`);
78
+ this.acquireLocks.delete(id);
79
+ }
80
+ }
81
+
82
  // Filter available accounts
83
  const available = [...this.accounts.values()].filter(
84
  (a) => a.status === "active" && !this.acquireLocks.has(a.id),
 
103
  selected = available[0];
104
  }
105
 
106
+ this.acquireLocks.set(selected.id, Date.now());
107
  return {
108
  entryId: selected.id,
109
  token: selected.token,
 
133
 
134
  /**
135
  * Mark an account as rate-limited after a 429.
136
+ * P1-6: countRequest option to track 429s as usage without exposing entry internals.
137
  */
138
+ markRateLimited(
139
+ entryId: string,
140
+ options?: { retryAfterSec?: number; countRequest?: boolean },
141
+ ): void {
142
  this.acquireLocks.delete(entryId);
143
  const entry = this.accounts.get(entryId);
144
  if (!entry) return;
145
 
146
  const config = getConfig();
147
+ const backoff = jitter(
148
+ options?.retryAfterSec ?? config.auth.rate_limit_backoff_seconds,
149
+ 0.2,
150
+ );
151
  const until = new Date(Date.now() + backoff * 1000);
152
 
153
  entry.status = "rate_limited";
154
  entry.usage.rate_limit_until = until.toISOString();
155
+
156
+ if (options?.countRequest) {
157
+ entry.usage.request_count++;
158
+ entry.usage.last_used = new Date().toISOString();
159
+ }
160
+
161
  this.schedulePersist();
162
  }
163
 
src/config.ts CHANGED
@@ -134,16 +134,25 @@ export function mutateClientConfig(patch: Partial<AppConfig["client"]>): void {
134
  Object.assign(_config.client, patch);
135
  }
136
 
137
- /** Reload config from disk (hot-reload after full-update). */
 
138
  export function reloadConfig(configDir?: string): AppConfig {
139
- _config = null;
140
- return loadConfig(configDir);
 
 
 
 
141
  }
142
 
143
- /** Reload fingerprint from disk (hot-reload after full-update). */
 
144
  export function reloadFingerprint(configDir?: string): FingerprintConfig {
145
- _fingerprint = null;
146
- return loadFingerprint(configDir);
 
 
 
147
  }
148
 
149
  /** Reload both config and fingerprint from disk. */
 
134
  Object.assign(_config.client, patch);
135
  }
136
 
137
+ /** Reload config from disk (hot-reload after full-update).
138
+ * P1-5: Load to temp first, then swap atomically to avoid null window. */
139
  export function reloadConfig(configDir?: string): AppConfig {
140
+ const dir = configDir ?? resolve(process.cwd(), "config");
141
+ const raw = loadYaml(resolve(dir, "default.yaml")) as Record<string, unknown>;
142
+ applyEnvOverrides(raw);
143
+ const fresh = ConfigSchema.parse(raw);
144
+ _config = fresh;
145
+ return _config;
146
  }
147
 
148
+ /** Reload fingerprint from disk (hot-reload after full-update).
149
+ * P1-5: Load to temp first, then swap atomically. */
150
  export function reloadFingerprint(configDir?: string): FingerprintConfig {
151
+ const dir = configDir ?? resolve(process.cwd(), "config");
152
+ const raw = loadYaml(resolve(dir, "fingerprint.yaml"));
153
+ const fresh = FingerprintSchema.parse(raw);
154
+ _fingerprint = fresh;
155
+ return _fingerprint;
156
  }
157
 
158
  /** Reload both config and fingerprint from disk. */
src/index.ts CHANGED
@@ -94,35 +94,54 @@ async function main() {
94
  // Start background update checker
95
  startUpdateChecker();
96
 
97
- serve({
98
  fetch: app.fetch,
99
  hostname: host,
100
  port,
101
  });
102
 
103
- // Graceful shutdown with timeout protection
104
  let shutdownCalled = false;
 
105
  const shutdown = () => {
106
  if (shutdownCalled) return;
107
  shutdownCalled = true;
108
- console.log("\n[Shutdown] Cleaning up...");
 
109
  const forceExit = setTimeout(() => {
110
  console.error("[Shutdown] Timeout after 10s — forcing exit");
111
  process.exit(1);
112
  }, 10_000);
113
  if (forceExit.unref) forceExit.unref();
114
 
115
- try {
116
- stopUpdateChecker();
117
- refreshScheduler.destroy(); // Cancel timers first
118
- sessionManager.destroy();
119
- cookieJar.destroy(); // Flush cookies
120
- accountPool.destroy(); // Flush accounts
121
- } catch (err) {
122
- console.error("[Shutdown] Error during cleanup:", err instanceof Error ? err.message : err);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
123
  }
124
- clearTimeout(forceExit);
125
- process.exit(0);
126
  };
127
 
128
  process.on("SIGINT", shutdown);
 
94
  // Start background update checker
95
  startUpdateChecker();
96
 
97
+ const server = serve({
98
  fetch: app.fetch,
99
  hostname: host,
100
  port,
101
  });
102
 
103
+ // P1-7: Graceful shutdown stop accepting, drain, then cleanup
104
  let shutdownCalled = false;
105
+ const DRAIN_TIMEOUT_MS = 5_000;
106
  const shutdown = () => {
107
  if (shutdownCalled) return;
108
  shutdownCalled = true;
109
+ console.log("\n[Shutdown] Stopping new connections...");
110
+
111
  const forceExit = setTimeout(() => {
112
  console.error("[Shutdown] Timeout after 10s — forcing exit");
113
  process.exit(1);
114
  }, 10_000);
115
  if (forceExit.unref) forceExit.unref();
116
 
117
+ // 1. Stop accepting new connections
118
+ server.close(() => {
119
+ console.log("[Shutdown] Server closed, cleaning up resources...");
120
+ cleanup();
121
+ });
122
+
123
+ // 2. Grace period for active streams, then force cleanup
124
+ setTimeout(() => {
125
+ console.log("[Shutdown] Drain timeout reached, cleaning up...");
126
+ cleanup();
127
+ }, DRAIN_TIMEOUT_MS);
128
+
129
+ let cleanupDone = false;
130
+ function cleanup() {
131
+ if (cleanupDone) return;
132
+ cleanupDone = true;
133
+ try {
134
+ stopUpdateChecker();
135
+ refreshScheduler.destroy();
136
+ sessionManager.destroy();
137
+ cookieJar.destroy();
138
+ accountPool.destroy();
139
+ } catch (err) {
140
+ console.error("[Shutdown] Error during cleanup:", err instanceof Error ? err.message : err);
141
+ }
142
+ clearTimeout(forceExit);
143
+ process.exit(0);
144
  }
 
 
145
  };
146
 
147
  process.on("SIGINT", shutdown);
src/middleware/logger.ts CHANGED
@@ -1,4 +1,5 @@
1
  import type { Context, Next } from "hono";
 
2
 
3
  export async function logger(c: Context, next: Next): Promise<void> {
4
  const start = Date.now();
@@ -6,11 +7,11 @@ export async function logger(c: Context, next: Next): Promise<void> {
6
  const path = c.req.path;
7
  const rid = c.get("requestId") ?? "-";
8
 
9
- console.log(`→ ${method} ${path} [${rid}]`);
10
 
11
  await next();
12
 
13
  const ms = Date.now() - start;
14
  const status = c.res.status;
15
- console.log(`← ${method} ${path} ${status} ${ms}ms [${rid}]`);
16
  }
 
1
  import type { Context, Next } from "hono";
2
+ import { log } from "../utils/logger.js";
3
 
4
  export async function logger(c: Context, next: Next): Promise<void> {
5
  const start = Date.now();
 
7
  const path = c.req.path;
8
  const rid = c.get("requestId") ?? "-";
9
 
10
+ log.info(`→ ${method} ${path}`, { rid, method, path });
11
 
12
  await next();
13
 
14
  const ms = Date.now() - start;
15
  const status = c.res.status;
16
+ log.info(`← ${method} ${path} ${status} ${ms}ms`, { rid, method, path, status, ms });
17
  }
src/proxy/codex-api.ts CHANGED
@@ -150,6 +150,16 @@ export class CodexApi {
150
  let headersParsed = false;
151
  let bodyController: ReadableStreamDefaultController<Uint8Array> | null = null;
152
 
 
 
 
 
 
 
 
 
 
 
153
  const bodyStream = new ReadableStream<Uint8Array>({
154
  start(c) {
155
  bodyController = c;
@@ -171,6 +181,7 @@ export class CodexApi {
171
  if (separatorIdx === -1) return;
172
 
173
  headersParsed = true;
 
174
  const headerBlock = headerBuf.subarray(0, separatorIdx).toString("utf-8");
175
  const remaining = headerBuf.subarray(separatorIdx + 4);
176
 
@@ -200,6 +211,7 @@ export class CodexApi {
200
  });
201
 
202
  child.on("close", (code) => {
 
203
  if (signal) {
204
  signal.removeEventListener("abort", onAbort);
205
  }
@@ -210,6 +222,7 @@ export class CodexApi {
210
  });
211
 
212
  child.on("error", (err) => {
 
213
  if (signal) {
214
  signal.removeEventListener("abort", onAbort);
215
  }
@@ -297,13 +310,26 @@ export class CodexApi {
297
  this.captureCookiesFromCurl(curlRes.setCookieHeaders);
298
 
299
  if (curlRes.status < 200 || curlRes.status >= 300) {
300
- // Read the body for error details
 
301
  const reader = curlRes.body.getReader();
302
  const chunks: Uint8Array[] = [];
 
303
  while (true) {
304
  const { done, value } = await reader.read();
305
  if (done) break;
306
- chunks.push(value);
 
 
 
 
 
 
 
 
 
 
 
307
  }
308
  const errorBody = Buffer.concat(chunks).toString("utf-8");
309
  throw new CodexApiError(curlRes.status, errorBody);
 
150
  let headersParsed = false;
151
  let bodyController: ReadableStreamDefaultController<Uint8Array> | null = null;
152
 
153
+ // P0-1: Header parse timeout — kill curl if headers aren't received within 30s
154
+ const HEADER_TIMEOUT_MS = 30_000;
155
+ const headerTimer = setTimeout(() => {
156
+ if (!headersParsed) {
157
+ child.kill("SIGTERM");
158
+ reject(new CodexApiError(0, `curl header parse timeout after ${HEADER_TIMEOUT_MS}ms`));
159
+ }
160
+ }, HEADER_TIMEOUT_MS);
161
+ if (headerTimer.unref) headerTimer.unref();
162
+
163
  const bodyStream = new ReadableStream<Uint8Array>({
164
  start(c) {
165
  bodyController = c;
 
181
  if (separatorIdx === -1) return;
182
 
183
  headersParsed = true;
184
+ clearTimeout(headerTimer);
185
  const headerBlock = headerBuf.subarray(0, separatorIdx).toString("utf-8");
186
  const remaining = headerBuf.subarray(separatorIdx + 4);
187
 
 
211
  });
212
 
213
  child.on("close", (code) => {
214
+ clearTimeout(headerTimer);
215
  if (signal) {
216
  signal.removeEventListener("abort", onAbort);
217
  }
 
222
  });
223
 
224
  child.on("error", (err) => {
225
+ clearTimeout(headerTimer);
226
  if (signal) {
227
  signal.removeEventListener("abort", onAbort);
228
  }
 
310
  this.captureCookiesFromCurl(curlRes.setCookieHeaders);
311
 
312
  if (curlRes.status < 200 || curlRes.status >= 300) {
313
+ // Read the body for error details (P0-3: cap at 1MB to prevent memory spikes)
314
+ const MAX_ERROR_BODY = 1024 * 1024; // 1MB
315
  const reader = curlRes.body.getReader();
316
  const chunks: Uint8Array[] = [];
317
+ let totalSize = 0;
318
  while (true) {
319
  const { done, value } = await reader.read();
320
  if (done) break;
321
+ totalSize += value.byteLength;
322
+ if (totalSize <= MAX_ERROR_BODY) {
323
+ chunks.push(value);
324
+ } else {
325
+ // Truncate: push only the part that fits
326
+ const overshoot = totalSize - MAX_ERROR_BODY;
327
+ if (value.byteLength > overshoot) {
328
+ chunks.push(value.subarray(0, value.byteLength - overshoot));
329
+ }
330
+ reader.cancel();
331
+ break;
332
+ }
333
  }
334
  const errorBody = Buffer.concat(chunks).toString("utf-8");
335
  throw new CodexApiError(curlRes.status, errorBody);
src/routes/shared/proxy-handler.ts CHANGED
@@ -89,10 +89,13 @@ export async function handleProxyRequest(
89
 
90
  let usageInfo: { input_tokens: number; output_tokens: number } | undefined;
91
 
 
 
 
92
  try {
93
  // 3. Retry + send to Codex
94
  const rawResponse = await withRetry(
95
- () => codexApi.createResponse(req.codexRequest),
96
  { tag: fmt.tag },
97
  );
98
 
@@ -126,7 +129,16 @@ export async function handleProxyRequest(
126
  )) {
127
  await s.write(chunk);
128
  }
 
 
 
 
 
 
 
129
  } finally {
 
 
130
  accountPool.release(entryId, usageInfo);
131
  }
132
  });
@@ -156,15 +168,8 @@ export async function handleProxyRequest(
156
  err.message,
157
  );
158
  if (err.status === 429) {
159
- accountPool.markRateLimited(entryId);
160
- // Note: markRateLimited releases the lock but does not increment
161
- // request_count. We intentionally count 429s as requests for
162
- // accurate load tracking across accounts.
163
- const entry = accountPool.getEntry(entryId);
164
- if (entry) {
165
- entry.usage.request_count++;
166
- entry.usage.last_used = new Date().toISOString();
167
- }
168
  c.status(429);
169
  return c.json(fmt.format429(err.message));
170
  }
 
89
 
90
  let usageInfo: { input_tokens: number; output_tokens: number } | undefined;
91
 
92
+ // P0-2: AbortController to kill curl when client disconnects
93
+ const abortController = new AbortController();
94
+
95
  try {
96
  // 3. Retry + send to Codex
97
  const rawResponse = await withRetry(
98
+ () => codexApi.createResponse(req.codexRequest, abortController.signal),
99
  { tag: fmt.tag },
100
  );
101
 
 
129
  )) {
130
  await s.write(chunk);
131
  }
132
+ } catch (err) {
133
+ // P2-8: Send error SSE event to client before closing
134
+ try {
135
+ const errMsg = err instanceof Error ? err.message : "Stream interrupted";
136
+ await s.write(`data: ${JSON.stringify({ error: { message: errMsg, type: "stream_error" } })}\n\n`);
137
+ } catch { /* client already gone */ }
138
+ throw err;
139
  } finally {
140
+ // P0-2: Kill curl subprocess if still running
141
+ abortController.abort();
142
  accountPool.release(entryId, usageInfo);
143
  }
144
  });
 
168
  err.message,
169
  );
170
  if (err.status === 429) {
171
+ // P1-6: Count 429s as requests via encapsulated API (no direct entry mutation)
172
+ accountPool.markRateLimited(entryId, { countRequest: true });
 
 
 
 
 
 
 
173
  c.status(429);
174
  return c.json(fmt.format429(err.message));
175
  }
src/session/manager.ts CHANGED
@@ -71,16 +71,9 @@ export class SessionManager {
71
  messages: Array<{ role: string; content: string }>,
72
  ): void {
73
  const hash = this.hashMessages(messages);
74
- // Evict oldest session if at capacity
75
  if (this.sessions.size >= MAX_SESSIONS) {
76
- let oldestKey: string | null = null;
77
- let oldestTime = Infinity;
78
- for (const [key, s] of this.sessions) {
79
- if (s.createdAt < oldestTime) {
80
- oldestTime = s.createdAt;
81
- oldestKey = key;
82
- }
83
- }
84
  if (oldestKey) this.sessions.delete(oldestKey);
85
  }
86
  this.sessions.set(taskId, {
 
71
  messages: Array<{ role: string; content: string }>,
72
  ): void {
73
  const hash = this.hashMessages(messages);
74
+ // P2-10: O(1) LRU eviction using Map insertion order
75
  if (this.sessions.size >= MAX_SESSIONS) {
76
+ const oldestKey = this.sessions.keys().next().value;
 
 
 
 
 
 
 
77
  if (oldestKey) this.sessions.delete(oldestKey);
78
  }
79
  this.sessions.set(taskId, {
src/utils/logger.ts ADDED
@@ -0,0 +1,66 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ /**
2
+ * Structured logger — JSON in production, readable in development.
3
+ *
4
+ * Usage:
5
+ * import { log } from "../utils/logger.js";
6
+ * log.info("Request received", { method: "POST", path: "/v1/chat/completions" });
7
+ * log.warn("Stale lock", { entryId: "abc" });
8
+ * log.error("Curl failed", { code: 1, stderr: "..." });
9
+ */
10
+
11
+ type LogLevel = "debug" | "info" | "warn" | "error";
12
+
13
+ const LEVEL_PRIORITY: Record<LogLevel, number> = {
14
+ debug: 0,
15
+ info: 1,
16
+ warn: 2,
17
+ error: 3,
18
+ };
19
+
20
+ const isProduction = process.env.NODE_ENV === "production";
21
+ const minLevel: LogLevel = (process.env.LOG_LEVEL as LogLevel) ?? (isProduction ? "info" : "debug");
22
+
23
+ function shouldLog(level: LogLevel): boolean {
24
+ return LEVEL_PRIORITY[level] >= LEVEL_PRIORITY[minLevel];
25
+ }
26
+
27
+ function emit(level: LogLevel, message: string, extra?: Record<string, unknown>): void {
28
+ if (!shouldLog(level)) return;
29
+
30
+ if (isProduction) {
31
+ // JSON structured output for container/log aggregator consumption
32
+ const entry: Record<string, unknown> = {
33
+ ts: new Date().toISOString(),
34
+ level,
35
+ msg: message,
36
+ ...extra,
37
+ };
38
+ const line = JSON.stringify(entry);
39
+ if (level === "error") {
40
+ process.stderr.write(line + "\n");
41
+ } else {
42
+ process.stdout.write(line + "\n");
43
+ }
44
+ } else {
45
+ // Human-readable for development
46
+ const prefix = `[${level.toUpperCase()}]`;
47
+ const parts: unknown[] = [prefix, message];
48
+ if (extra && Object.keys(extra).length > 0) {
49
+ parts.push(extra);
50
+ }
51
+ if (level === "error") {
52
+ console.error(...parts);
53
+ } else if (level === "warn") {
54
+ console.warn(...parts);
55
+ } else {
56
+ console.log(...parts);
57
+ }
58
+ }
59
+ }
60
+
61
+ export const log = {
62
+ debug: (msg: string, extra?: Record<string, unknown>) => emit("debug", msg, extra),
63
+ info: (msg: string, extra?: Record<string, unknown>) => emit("info", msg, extra),
64
+ warn: (msg: string, extra?: Record<string, unknown>) => emit("warn", msg, extra),
65
+ error: (msg: string, extra?: Record<string, unknown>) => emit("error", msg, extra),
66
+ };