incognitolm commited on
Commit
a3777ea
·
1 Parent(s): 4f6c84f

Server stuff

Browse files
Files changed (3) hide show
  1. server/chatStream.js +290 -48
  2. server/index.js +11 -3
  3. server/wsHandler.js +11 -3
server/chatStream.js CHANGED
@@ -18,21 +18,71 @@ const WORKER_PATH = path.join(__dirname, "searchWorker.js");
18
  // Persistent WebSocket pool
19
  let persistentWs = null;
20
  let wsAuthPromise = null;
21
- let requestIdCounter = 0;
22
  let activeStreamHandlers = new Map(); // Track active stream handlers by request ID
23
  let errorHandlers = new Map(); // Track error handlers by request ID
24
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
25
  async function getSafeWebSocket() {
26
  if (persistentWs && persistentWs.readyState === WebSocket.OPEN) return persistentWs;
27
  if (wsAuthPromise) return wsAuthPromise;
28
 
29
  wsAuthPromise = (async () => {
 
30
  const wsURL =
31
- (process.env.LIGHTNING_BASE.startsWith("https")
32
- ? process.env.LIGHTNING_BASE.replace("https", "wss")
33
- : process.env.LIGHTNING_BASE.replace("http", "ws")) + "/ws/chat";
34
 
35
- persistentWs = new WebSocket(wsURL);
 
36
 
37
  const safeParse = (str) => {
38
  try {
@@ -45,50 +95,112 @@ async function getSafeWebSocket() {
45
  };
46
 
47
  await new Promise((resolve, reject) => {
48
- const timer = setTimeout(() => reject(new Error("WS connection timeout")), 5000);
49
- persistentWs.on("open", () => {
50
  clearTimeout(timer);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
51
  resolve();
52
- });
53
- persistentWs.on("error", (err) => {
 
54
  console.error("[WS] Connection error", err);
55
- clearTimeout(timer);
56
- persistentWs = null;
57
- wsAuthPromise = null;
 
58
  reject(err);
59
- });
 
 
 
 
 
 
 
 
 
 
 
 
60
  });
61
 
62
- persistentWs.send(JSON.stringify({ key: process.env.WEBSOCKET_KEY }));
63
 
64
  await new Promise((resolve, reject) => {
65
- const timer = setTimeout(() => reject(new Error("WS auth timeout")), 5000);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
66
  const authHandler = (data) => {
67
  const msg = safeParse(data.toString());
68
  if (!msg) return;
69
  if (msg.type === "auth" && msg.status === "ok") {
70
- persistentWs.removeListener("message", authHandler);
71
- clearTimeout(timer);
72
  resolve();
73
  }
74
  if (msg.error) {
 
75
  console.error("[WS] Auth error", msg.error);
76
- persistentWs.removeListener("message", authHandler);
77
- clearTimeout(timer);
78
- persistentWs = null;
79
- wsAuthPromise = null;
80
  reject(new Error(`WS auth error: ${msg.error}`));
81
  }
82
  };
83
- persistentWs.on("message", authHandler);
84
- persistentWs.on("error", (err) => {
85
  console.error("[WS] Auth error event", err);
86
- persistentWs.removeListener("message", authHandler);
87
- clearTimeout(timer);
88
- persistentWs = null;
89
- wsAuthPromise = null;
90
  reject(err);
91
- });
 
 
 
 
 
 
 
 
 
 
 
 
92
  });
93
 
94
  const globalMessageHandler = (data) => {
@@ -100,18 +212,28 @@ async function getSafeWebSocket() {
100
 
101
  const globalErrorHandler = (err) => {
102
  console.error("[WS ERROR]", err);
103
- for (const [id, handler] of errorHandlers.entries()) {
104
- handler(err);
105
- }
106
  };
107
 
108
- persistentWs.on("message", globalMessageHandler);
109
- persistentWs.on("error", globalErrorHandler);
 
 
 
 
 
 
 
 
 
 
 
110
  activeStreamHandlers.set("__messageListener__", globalMessageHandler);
111
  activeStreamHandlers.set("__errorHandler__", globalErrorHandler);
 
112
 
113
  wsAuthPromise = null;
114
- return persistentWs;
115
  })();
116
 
117
  return wsAuthPromise;
@@ -182,9 +304,13 @@ const HISTORY_SUMMARY_TOKEN_BUDGET = 600;
182
  const NOTES_TOKEN_BUDGET = 900;
183
  const MAX_DYNAMIC_MESSAGES = 10;
184
  const MAX_UPSTREAM_RATE_LIMIT_RETRIES = 4;
 
185
  const DEFAULT_UPSTREAM_RETRY_MS = 4000;
186
  const MAX_UPSTREAM_RETRY_MS = 15000;
187
  const UPSTREAM_RETRY_BUFFER_MS = 350;
 
 
 
188
 
189
  // In-memory stores for staged prompt resources and assistant notes
190
  const promptContextStore = new Map(); // sessionId -> { resources, resourcesById }
@@ -892,6 +1018,15 @@ class RetryableRateLimitError extends Error {
892
  }
893
  }
894
 
 
 
 
 
 
 
 
 
 
895
  class UpstreamProviderError extends Error {
896
  constructor(publicMessage, internalMessage = null) {
897
  super(publicMessage);
@@ -1038,6 +1173,9 @@ function getPublicErrorMessage(err) {
1038
  if (err.name === "RetryableRateLimitError") {
1039
  return "The model provider is temporarily rate limited. Please try again in a few seconds.";
1040
  }
 
 
 
1041
  if (err.publicMessage) return err.publicMessage;
1042
  return String(err);
1043
  }
@@ -1049,14 +1187,23 @@ async function websocketChatStreamWithRetry(body, headers, onToken, abortSignal)
1049
  } catch (err) {
1050
  if (err?.name === "AbortError") throw err;
1051
 
1052
- const retryable = err?.name === "RetryableRateLimitError";
1053
- if (!retryable || retryIndex >= MAX_UPSTREAM_RATE_LIMIT_RETRIES) {
 
 
 
 
 
 
 
1054
  throw err;
1055
  }
1056
 
1057
  const waitMs = getRetryDelayMs(err, retryIndex);
1058
  console.warn(
1059
- `[streamChat] Upstream rate limited, retrying in ${waitMs}ms (${retryIndex + 1}/${MAX_UPSTREAM_RATE_LIMIT_RETRIES})`
 
 
1060
  );
1061
  await sleepWithAbort(waitMs, abortSignal);
1062
  }
@@ -1080,28 +1227,79 @@ export async function websocketChatStream(body, headers, onToken, abortSignal) {
1080
  const safeParse = (str) => {
1081
  try { return JSON.parse(str.startsWith("data: ") ? str.slice(6) : str); } catch { return null; }
1082
  };
1083
- ws.send(JSON.stringify({ body, headers }));
1084
  let assistantText = "";
1085
  const toolCallBuffer = new Map();
1086
  let finished = false;
 
1087
 
1088
  return new Promise((resolve, reject) => {
1089
- const timeoutId = setTimeout(() => {
 
1090
  if (!finished) {
1091
  finished = true;
1092
  cleanup();
1093
  const toolCalls = serializeToolCalls(toolCallBuffer);
1094
  resolve({ assistantText, toolCalls });
1095
  }
1096
- }, 120000);
1097
 
1098
  const cleanup = () => {
1099
  activeStreamHandlers.delete(currentRequestId);
1100
  errorHandlers.delete(currentRequestId);
1101
- clearTimeout(timeoutId);
 
1102
  if (abortSignal) abortSignal.removeEventListener("abort", abortHandler);
1103
  };
1104
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1105
  const messageHandler = (line) => {
1106
  const colonIdx = line.indexOf(':');
1107
  if (colonIdx === -1) return;
@@ -1110,6 +1308,9 @@ export async function websocketChatStream(body, headers, onToken, abortSignal) {
1110
  if (msgRequestId !== String(currentRequestId)) return;
1111
  if (!payload) return;
1112
 
 
 
 
1113
  if (payload.error && !payload.choices) {
1114
  if (!finished) {
1115
  finished = true;
@@ -1198,12 +1399,53 @@ export async function websocketChatStream(body, headers, onToken, abortSignal) {
1198
  }
1199
  };
1200
 
1201
- const errorHandler = (err) => { if (!finished) { finished = true; cleanup(); reject(err); } };
1202
- const abortHandler = () => { if (!finished) { finished = true; cleanup(); reject(new Error("AbortError")); } };
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1203
 
1204
  activeStreamHandlers.set(currentRequestId, messageHandler);
1205
  errorHandlers.set(currentRequestId, errorHandler);
1206
  if (abortSignal) abortSignal.addEventListener("abort", abortHandler);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1207
  });
1208
  }
1209
 
@@ -1347,7 +1589,7 @@ export async function streamChat({
1347
  ...buildModelMessages(baseMessages, workingMessages, sessionId),
1348
  {
1349
  role: "system",
1350
- content: "Tool-use budget is exhausted for this response. Do not call tools. Answer directly using the information already gathered. If something is still missing, briefly say what is missing without calling tools.",
1351
  },
1352
  ];
1353
 
@@ -1378,18 +1620,18 @@ export async function streamChat({
1378
  const sessionName = extractSessionName(assistantText);
1379
 
1380
  if (typeof onDone === "function") {
1381
- onDone(assistantText, allToolCalls, false, sessionName, responseEdits, responseSegments);
1382
  }
1383
 
1384
  clearPromptState(sessionId);
1385
 
1386
  } catch (err) {
1387
- console.error("streamChat error:", err?.internalMessage || err);
1388
  clearPromptState(sessionId);
1389
  if (err.name === "AbortError" || err.message === "AbortError") {
1390
- if (typeof onDone === "function") onDone(null, null, true, null);
1391
  } else {
1392
- if (typeof onError === "function") onError(getPublicErrorMessage(err));
 
1393
  }
1394
  }
1395
  }
 
18
  // Persistent WebSocket pool
19
  let persistentWs = null;
20
  let wsAuthPromise = null;
21
+ let requestIdCounter = 0; // Upstream request IDs are scoped to the current websocket connection.
22
  let activeStreamHandlers = new Map(); // Track active stream handlers by request ID
23
  let errorHandlers = new Map(); // Track error handlers by request ID
24
 
25
+ function buildUpstreamSocketMessage(reason, err = null) {
26
+ return [reason, err?.message].filter(Boolean).join(": ");
27
+ }
28
+
29
+ function invalidatePersistentWebSocket(ws, reason, err = null) {
30
+ if (!ws || persistentWs !== ws) return;
31
+
32
+ persistentWs = null;
33
+ wsAuthPromise = null;
34
+ requestIdCounter = 0;
35
+ activeStreamHandlers.delete("__messageListener__");
36
+ activeStreamHandlers.delete("__errorHandler__");
37
+ activeStreamHandlers.delete("__closeHandler__");
38
+
39
+ const failure =
40
+ err?.name === "RetryableUpstreamConnectionError"
41
+ ? err
42
+ : new RetryableUpstreamConnectionError(
43
+ "The model connection was interrupted. Reconnecting automatically.",
44
+ buildUpstreamSocketMessage(reason, err)
45
+ );
46
+
47
+ const pendingErrorHandlers = [...errorHandlers.values()];
48
+ for (const handler of pendingErrorHandlers) {
49
+ try {
50
+ handler(failure);
51
+ } catch {
52
+ // Ignore request-specific cleanup errors while invalidating the shared socket.
53
+ }
54
+ }
55
+
56
+ try {
57
+ ws.removeAllListeners("message");
58
+ ws.removeAllListeners("error");
59
+ ws.removeAllListeners("close");
60
+ } catch {
61
+ // Ignore listener cleanup failures.
62
+ }
63
+
64
+ if (ws.readyState === WebSocket.OPEN || ws.readyState === WebSocket.CONNECTING) {
65
+ try {
66
+ ws.terminate();
67
+ } catch {
68
+ // Ignore termination failures while resetting the shared socket.
69
+ }
70
+ }
71
+ }
72
+
73
  async function getSafeWebSocket() {
74
  if (persistentWs && persistentWs.readyState === WebSocket.OPEN) return persistentWs;
75
  if (wsAuthPromise) return wsAuthPromise;
76
 
77
  wsAuthPromise = (async () => {
78
+ const lightningBase = process.env.LIGHTNING_BASE || LIGHTNING_BASE;
79
  const wsURL =
80
+ (lightningBase.startsWith("https")
81
+ ? lightningBase.replace("https", "wss")
82
+ : lightningBase.replace("http", "ws")) + "/ws/chat";
83
 
84
+ const ws = new WebSocket(wsURL);
85
+ persistentWs = ws;
86
 
87
  const safeParse = (str) => {
88
  try {
 
95
  };
96
 
97
  await new Promise((resolve, reject) => {
98
+ const cleanup = () => {
 
99
  clearTimeout(timer);
100
+ ws.removeListener("open", onOpen);
101
+ ws.removeListener("error", onError);
102
+ ws.removeListener("close", onClose);
103
+ };
104
+ const timer = setTimeout(() => {
105
+ cleanup();
106
+ if (persistentWs === ws) {
107
+ persistentWs = null;
108
+ wsAuthPromise = null;
109
+ }
110
+ try {
111
+ ws.terminate();
112
+ } catch {
113
+ // Ignore termination failures while timing out the handshake.
114
+ }
115
+ reject(new Error("WS connection timeout"));
116
+ }, 5000);
117
+ const onOpen = () => {
118
+ cleanup();
119
  resolve();
120
+ };
121
+ const onError = (err) => {
122
+ cleanup();
123
  console.error("[WS] Connection error", err);
124
+ if (persistentWs === ws) {
125
+ persistentWs = null;
126
+ wsAuthPromise = null;
127
+ }
128
  reject(err);
129
+ };
130
+ const onClose = (code, reasonBuffer) => {
131
+ cleanup();
132
+ const reason = reasonBuffer?.toString?.() || "";
133
+ if (persistentWs === ws) {
134
+ persistentWs = null;
135
+ wsAuthPromise = null;
136
+ }
137
+ reject(new Error(`WS connection closed (${code})${reason ? `: ${reason}` : ""}`));
138
+ };
139
+ ws.on("open", onOpen);
140
+ ws.on("error", onError);
141
+ ws.on("close", onClose);
142
  });
143
 
144
+ ws.send(JSON.stringify({ key: process.env.WEBSOCKET_KEY }));
145
 
146
  await new Promise((resolve, reject) => {
147
+ const cleanup = () => {
148
+ clearTimeout(timer);
149
+ ws.removeListener("message", authHandler);
150
+ ws.removeListener("error", onError);
151
+ ws.removeListener("close", onClose);
152
+ };
153
+ const timer = setTimeout(() => {
154
+ cleanup();
155
+ if (persistentWs === ws) {
156
+ persistentWs = null;
157
+ wsAuthPromise = null;
158
+ }
159
+ try {
160
+ ws.terminate();
161
+ } catch {
162
+ // Ignore termination failures while timing out auth.
163
+ }
164
+ reject(new Error("WS auth timeout"));
165
+ }, 5000);
166
  const authHandler = (data) => {
167
  const msg = safeParse(data.toString());
168
  if (!msg) return;
169
  if (msg.type === "auth" && msg.status === "ok") {
170
+ cleanup();
 
171
  resolve();
172
  }
173
  if (msg.error) {
174
+ cleanup();
175
  console.error("[WS] Auth error", msg.error);
176
+ if (persistentWs === ws) {
177
+ persistentWs = null;
178
+ wsAuthPromise = null;
179
+ }
180
  reject(new Error(`WS auth error: ${msg.error}`));
181
  }
182
  };
183
+ const onError = (err) => {
184
+ cleanup();
185
  console.error("[WS] Auth error event", err);
186
+ if (persistentWs === ws) {
187
+ persistentWs = null;
188
+ wsAuthPromise = null;
189
+ }
190
  reject(err);
191
+ };
192
+ const onClose = (code, reasonBuffer) => {
193
+ cleanup();
194
+ const reason = reasonBuffer?.toString?.() || "";
195
+ if (persistentWs === ws) {
196
+ persistentWs = null;
197
+ wsAuthPromise = null;
198
+ }
199
+ reject(new Error(`WS auth closed (${code})${reason ? `: ${reason}` : ""}`));
200
+ };
201
+ ws.on("message", authHandler);
202
+ ws.on("error", onError);
203
+ ws.on("close", onClose);
204
  });
205
 
206
  const globalMessageHandler = (data) => {
 
212
 
213
  const globalErrorHandler = (err) => {
214
  console.error("[WS ERROR]", err);
215
+ invalidatePersistentWebSocket(ws, "Upstream websocket error", err);
 
 
216
  };
217
 
218
+ const globalCloseHandler = (code, reasonBuffer) => {
219
+ const reason = reasonBuffer?.toString?.() || "";
220
+ console.warn(`[WS CLOSE] ${code}${reason ? `: ${reason}` : ""}`);
221
+ invalidatePersistentWebSocket(
222
+ ws,
223
+ `Upstream websocket closed (${code})${reason ? `: ${reason}` : ""}`
224
+ );
225
+ };
226
+
227
+ ws.on("message", globalMessageHandler);
228
+ ws.on("error", globalErrorHandler);
229
+ ws.on("close", globalCloseHandler);
230
+ requestIdCounter = 0;
231
  activeStreamHandlers.set("__messageListener__", globalMessageHandler);
232
  activeStreamHandlers.set("__errorHandler__", globalErrorHandler);
233
+ activeStreamHandlers.set("__closeHandler__", globalCloseHandler);
234
 
235
  wsAuthPromise = null;
236
+ return ws;
237
  })();
238
 
239
  return wsAuthPromise;
 
304
  const NOTES_TOKEN_BUDGET = 900;
305
  const MAX_DYNAMIC_MESSAGES = 10;
306
  const MAX_UPSTREAM_RATE_LIMIT_RETRIES = 4;
307
+ const MAX_UPSTREAM_CONNECTION_RETRIES = 2;
308
  const DEFAULT_UPSTREAM_RETRY_MS = 4000;
309
  const MAX_UPSTREAM_RETRY_MS = 15000;
310
  const UPSTREAM_RETRY_BUFFER_MS = 350;
311
+ const UPSTREAM_FIRST_RESPONSE_TIMEOUT_MS = 15000;
312
+ const UPSTREAM_IDLE_TIMEOUT_MS = 45000;
313
+ const UPSTREAM_STREAM_TIMEOUT_MS = 120000;
314
 
315
  // In-memory stores for staged prompt resources and assistant notes
316
  const promptContextStore = new Map(); // sessionId -> { resources, resourcesById }
 
1018
  }
1019
  }
1020
 
1021
+ class RetryableUpstreamConnectionError extends Error {
1022
+ constructor(publicMessage, internalMessage = null) {
1023
+ super(publicMessage);
1024
+ this.name = "RetryableUpstreamConnectionError";
1025
+ this.publicMessage = publicMessage;
1026
+ this.internalMessage = internalMessage || publicMessage;
1027
+ }
1028
+ }
1029
+
1030
  class UpstreamProviderError extends Error {
1031
  constructor(publicMessage, internalMessage = null) {
1032
  super(publicMessage);
 
1173
  if (err.name === "RetryableRateLimitError") {
1174
  return "The model provider is temporarily rate limited. Please try again in a few seconds.";
1175
  }
1176
+ if (err.name === "RetryableUpstreamConnectionError") {
1177
+ return "The model connection was interrupted. Please try again.";
1178
+ }
1179
  if (err.publicMessage) return err.publicMessage;
1180
  return String(err);
1181
  }
 
1187
  } catch (err) {
1188
  if (err?.name === "AbortError") throw err;
1189
 
1190
+ const retryable =
1191
+ err?.name === "RetryableRateLimitError" ||
1192
+ err?.name === "RetryableUpstreamConnectionError";
1193
+ const maxRetries =
1194
+ err?.name === "RetryableUpstreamConnectionError"
1195
+ ? MAX_UPSTREAM_CONNECTION_RETRIES
1196
+ : MAX_UPSTREAM_RATE_LIMIT_RETRIES;
1197
+
1198
+ if (!retryable || retryIndex >= maxRetries) {
1199
  throw err;
1200
  }
1201
 
1202
  const waitMs = getRetryDelayMs(err, retryIndex);
1203
  console.warn(
1204
+ err?.name === "RetryableUpstreamConnectionError"
1205
+ ? `[streamChat] Upstream websocket interrupted, retrying in ${waitMs}ms (${retryIndex + 1}/${maxRetries})`
1206
+ : `[streamChat] Upstream rate limited, retrying in ${waitMs}ms (${retryIndex + 1}/${maxRetries})`
1207
  );
1208
  await sleepWithAbort(waitMs, abortSignal);
1209
  }
 
1227
  const safeParse = (str) => {
1228
  try { return JSON.parse(str.startsWith("data: ") ? str.slice(6) : str); } catch { return null; }
1229
  };
 
1230
  let assistantText = "";
1231
  const toolCallBuffer = new Map();
1232
  let finished = false;
1233
+ let sawAnyPayload = false;
1234
 
1235
  return new Promise((resolve, reject) => {
1236
+ let inactivityTimeoutId = null;
1237
+ const overallTimeoutId = setTimeout(() => {
1238
  if (!finished) {
1239
  finished = true;
1240
  cleanup();
1241
  const toolCalls = serializeToolCalls(toolCallBuffer);
1242
  resolve({ assistantText, toolCalls });
1243
  }
1244
+ }, UPSTREAM_STREAM_TIMEOUT_MS);
1245
 
1246
  const cleanup = () => {
1247
  activeStreamHandlers.delete(currentRequestId);
1248
  errorHandlers.delete(currentRequestId);
1249
+ clearTimeout(overallTimeoutId);
1250
+ clearTimeout(inactivityTimeoutId);
1251
  if (abortSignal) abortSignal.removeEventListener("abort", abortHandler);
1252
  };
1253
 
1254
+ const rejectWithSocketReset = (err, reason) => {
1255
+ if (finished) return;
1256
+ finished = true;
1257
+ cleanup();
1258
+ invalidatePersistentWebSocket(ws, reason, err);
1259
+ reject(err);
1260
+ };
1261
+
1262
+ const rejectUnexpectedClose = (publicMessage, internalMessage) => {
1263
+ const hasPartialOutput = assistantText.trim().length > 0 || toolCallBuffer.size > 0;
1264
+ if (!hasPartialOutput) {
1265
+ rejectWithSocketReset(
1266
+ new RetryableUpstreamConnectionError(
1267
+ "The model connection was interrupted. Reconnecting automatically.",
1268
+ internalMessage
1269
+ ),
1270
+ internalMessage
1271
+ );
1272
+ return;
1273
+ }
1274
+
1275
+ if (finished) return;
1276
+ finished = true;
1277
+ cleanup();
1278
+ invalidatePersistentWebSocket(ws, internalMessage);
1279
+ reject(new UpstreamProviderError(publicMessage, internalMessage));
1280
+ };
1281
+
1282
+ const refreshInactivityTimeout = () => {
1283
+ clearTimeout(inactivityTimeoutId);
1284
+ inactivityTimeoutId = setTimeout(() => {
1285
+ if (sawAnyPayload) {
1286
+ rejectUnexpectedClose(
1287
+ "The model provider interrupted the response. Please try again.",
1288
+ "Upstream websocket became idle before the response finished."
1289
+ );
1290
+ return;
1291
+ }
1292
+
1293
+ rejectWithSocketReset(
1294
+ new RetryableUpstreamConnectionError(
1295
+ "The model connection did not respond in time. Reconnecting automatically.",
1296
+ "Upstream websocket produced no response before timeout."
1297
+ ),
1298
+ "Upstream websocket produced no response before timeout."
1299
+ );
1300
+ }, sawAnyPayload ? UPSTREAM_IDLE_TIMEOUT_MS : UPSTREAM_FIRST_RESPONSE_TIMEOUT_MS);
1301
+ };
1302
+
1303
  const messageHandler = (line) => {
1304
  const colonIdx = line.indexOf(':');
1305
  if (colonIdx === -1) return;
 
1308
  if (msgRequestId !== String(currentRequestId)) return;
1309
  if (!payload) return;
1310
 
1311
+ sawAnyPayload = true;
1312
+ refreshInactivityTimeout();
1313
+
1314
  if (payload.error && !payload.choices) {
1315
  if (!finished) {
1316
  finished = true;
 
1399
  }
1400
  };
1401
 
1402
+ const errorHandler = (err) => {
1403
+ if (finished) return;
1404
+ finished = true;
1405
+ cleanup();
1406
+
1407
+ const hasPartialOutput = assistantText.trim().length > 0 || toolCallBuffer.size > 0;
1408
+ if (hasPartialOutput && err?.name === "RetryableUpstreamConnectionError") {
1409
+ reject(
1410
+ new UpstreamProviderError(
1411
+ "The model provider interrupted the response. Please try again.",
1412
+ err?.internalMessage || err?.message || "Upstream websocket interrupted after partial output."
1413
+ )
1414
+ );
1415
+ return;
1416
+ }
1417
+
1418
+ reject(err);
1419
+ };
1420
+ const abortHandler = () => { if (!finished) { finished = true; cleanup(); reject(createAbortError()); } };
1421
 
1422
  activeStreamHandlers.set(currentRequestId, messageHandler);
1423
  errorHandlers.set(currentRequestId, errorHandler);
1424
  if (abortSignal) abortSignal.addEventListener("abort", abortHandler);
1425
+
1426
+ refreshInactivityTimeout();
1427
+
1428
+ try {
1429
+ ws.send(JSON.stringify({ body, headers }), (err) => {
1430
+ if (err) {
1431
+ rejectWithSocketReset(
1432
+ new RetryableUpstreamConnectionError(
1433
+ "The model connection was interrupted. Reconnecting automatically.",
1434
+ buildUpstreamSocketMessage("Failed to send upstream websocket request", err)
1435
+ ),
1436
+ "Failed to send upstream websocket request"
1437
+ );
1438
+ }
1439
+ });
1440
+ } catch (err) {
1441
+ rejectWithSocketReset(
1442
+ new RetryableUpstreamConnectionError(
1443
+ "The model connection was interrupted. Reconnecting automatically.",
1444
+ buildUpstreamSocketMessage("Failed to send upstream websocket request", err)
1445
+ ),
1446
+ "Failed to send upstream websocket request"
1447
+ );
1448
+ }
1449
  });
1450
  }
1451
 
 
1589
  ...buildModelMessages(baseMessages, workingMessages, sessionId),
1590
  {
1591
  role: "system",
1592
+ content: "Tool-use budget is exhausted for this response. Do not call tools. Answer directly using the information already gathered. If something is still missing, briefly say what is missing without calling tools. You may ask the user to let you continue your response.",
1593
  },
1594
  ];
1595
 
 
1620
  const sessionName = extractSessionName(assistantText);
1621
 
1622
  if (typeof onDone === "function") {
1623
+ await onDone(assistantText, allToolCalls, false, sessionName, responseEdits, responseSegments);
1624
  }
1625
 
1626
  clearPromptState(sessionId);
1627
 
1628
  } catch (err) {
 
1629
  clearPromptState(sessionId);
1630
  if (err.name === "AbortError" || err.message === "AbortError") {
1631
+ if (typeof onDone === "function") await onDone(null, null, true, null);
1632
  } else {
1633
+ console.error("streamChat error:", err?.internalMessage || err);
1634
+ if (typeof onError === "function") await onError(getPublicErrorMessage(err));
1635
  }
1636
  }
1637
  }
server/index.js CHANGED
@@ -9,7 +9,7 @@ import fetch from 'node-fetch';
9
  import rateLimit from 'express-rate-limit';
10
  import fs from 'fs';
11
  import { registerFeedbackRoutes } from './handleFeedback.js';
12
- import { handleWsMessage } from './wsHandler.js';
13
  import { sessionStore, initStoreConfig } from './sessionStore.js';
14
  import { SUPABASE_URL, SUPABASE_ANON_KEY } from './config.js';
15
  import { safeSend } from './helpers.js';
@@ -732,8 +732,16 @@ wss.on('connection',(ws,req)=>{
732
  catch(ex){ console.error("Invalid message error:",ex.message,"\nStack:",ex.stack); safeSend(ws,{type:'error',message:'Invalid message: '+ex.message}); }
733
  });
734
 
735
- ws.on('close',()=>{ const c=wsClients.get(ws); if(c?.userId) sessionStore.markOffline(c.userId,ws); wsClients.delete(ws); });
736
- ws.on('error',()=>wsClients.delete(ws));
 
 
 
 
 
 
 
 
737
  safeSend(ws,{type:'connected', tempId:wsClients.get(ws)?.tempId});
738
  });
739
 
 
9
  import rateLimit from 'express-rate-limit';
10
  import fs from 'fs';
11
  import { registerFeedbackRoutes } from './handleFeedback.js';
12
+ import { abortActiveStream, handleWsMessage } from './wsHandler.js';
13
  import { sessionStore, initStoreConfig } from './sessionStore.js';
14
  import { SUPABASE_URL, SUPABASE_ANON_KEY } from './config.js';
15
  import { safeSend } from './helpers.js';
 
732
  catch(ex){ console.error("Invalid message error:",ex.message,"\nStack:",ex.stack); safeSend(ws,{type:'error',message:'Invalid message: '+ex.message}); }
733
  });
734
 
735
+ ws.on('close',()=>{
736
+ abortActiveStream(ws);
737
+ const c=wsClients.get(ws);
738
+ if(c?.userId) sessionStore.markOffline(c.userId,ws);
739
+ wsClients.delete(ws);
740
+ });
741
+ ws.on('error',()=>{
742
+ abortActiveStream(ws);
743
+ wsClients.delete(ws);
744
+ });
745
  safeSend(ws,{type:'connected', tempId:wsClients.get(ws)?.tempId});
746
  });
747
 
server/wsHandler.js CHANGED
@@ -51,6 +51,12 @@ const CONTINUE_ASSISTANT_PROMPT =
51
  'Continue your previous response exactly where it left off. Do not restart, summarize, or repeat the opening. Preserve the same formatting and only add the missing continuation.';
52
  const FREE_WEB_SEARCH_LIMIT = 15;
53
 
 
 
 
 
 
 
54
  initGuestRequestLimiter().catch(err => console.error('Failed to initialize guest request limiter:', err));
55
 
56
  function usageOwnerKey(client, clientId = '') {
@@ -306,7 +312,7 @@ const handlers = {
306
  : sessionStore.getTempSession(client.tempId, sessionId);
307
  if (!session) return safeSend(ws, { type: 'error', message: 'Session not found' });
308
 
309
- if (activeStreams.has(ws)) activeStreams.get(ws).abort();
310
  const abort = new AbortController();
311
  activeStreams.set(ws, abort);
312
  safeSend(ws, { type: 'chat:start', sessionId });
@@ -433,11 +439,12 @@ const handlers = {
433
  activeStreams.delete(ws);
434
  console.error('streamChat error:', err);
435
  safeSend(ws, { type: 'chat:error', error: String(err), sessionId });
 
436
  },
437
  });
438
  },
439
 
440
- 'chat:stop': (ws) => { if (activeStreams.has(ws)) { activeStreams.get(ws).abort(); activeStreams.delete(ws); } },
441
 
442
  'chat:editMessage': async (ws, msg, client) => {
443
  const { sessionId, messageIndex, newContent } = msg;
@@ -552,7 +559,7 @@ const handlers = {
552
  return safeSend(ws, { type: 'error', message: 'Assistant message not found' });
553
  }
554
 
555
- if (activeStreams.has(ws)) activeStreams.get(ws).abort();
556
  const abort = new AbortController();
557
  activeStreams.set(ws, abort);
558
 
@@ -687,6 +694,7 @@ const handlers = {
687
  activeStreams.delete(ws);
688
  console.error('assistant action streamChat error:', err);
689
  safeSend(ws, { type: 'chat:error', error: String(err), sessionId });
 
690
  },
691
  });
692
  },
 
51
  'Continue your previous response exactly where it left off. Do not restart, summarize, or repeat the opening. Preserve the same formatting and only add the missing continuation.';
52
  const FREE_WEB_SEARCH_LIMIT = 15;
53
 
54
+ export function abortActiveStream(ws) {
55
+ if (!activeStreams.has(ws)) return;
56
+ activeStreams.get(ws).abort();
57
+ activeStreams.delete(ws);
58
+ }
59
+
60
  initGuestRequestLimiter().catch(err => console.error('Failed to initialize guest request limiter:', err));
61
 
62
  function usageOwnerKey(client, clientId = '') {
 
312
  : sessionStore.getTempSession(client.tempId, sessionId);
313
  if (!session) return safeSend(ws, { type: 'error', message: 'Session not found' });
314
 
315
+ abortActiveStream(ws);
316
  const abort = new AbortController();
317
  activeStreams.set(ws, abort);
318
  safeSend(ws, { type: 'chat:start', sessionId });
 
439
  activeStreams.delete(ws);
440
  console.error('streamChat error:', err);
441
  safeSend(ws, { type: 'chat:error', error: String(err), sessionId });
442
+ safeSend(ws, { type: 'chat:aborted', sessionId, reason: 'error' });
443
  },
444
  });
445
  },
446
 
447
+ 'chat:stop': (ws) => { abortActiveStream(ws); },
448
 
449
  'chat:editMessage': async (ws, msg, client) => {
450
  const { sessionId, messageIndex, newContent } = msg;
 
559
  return safeSend(ws, { type: 'error', message: 'Assistant message not found' });
560
  }
561
 
562
+ abortActiveStream(ws);
563
  const abort = new AbortController();
564
  activeStreams.set(ws, abort);
565
 
 
694
  activeStreams.delete(ws);
695
  console.error('assistant action streamChat error:', err);
696
  safeSend(ws, { type: 'chat:error', error: String(err), sessionId });
697
+ safeSend(ws, { type: 'chat:aborted', sessionId, reason: 'error' });
698
  },
699
  });
700
  },