incognitolm commited on
Commit
17bf060
·
1 Parent(s): c3f3ac9

Server restart issues

Browse files
Files changed (3) hide show
  1. server/chatStream.js +283 -43
  2. server/index.js +11 -3
  3. server/wsHandler.js +9 -3
server/chatStream.js CHANGED
@@ -22,17 +22,66 @@ let requestIdCounter = 0;
22
  let activeStreamHandlers = new Map(); // Track active stream handlers by request ID
23
  let errorHandlers = new Map(); // Track error handlers by request ID
24
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
25
  async function getSafeWebSocket() {
26
  if (persistentWs && persistentWs.readyState === WebSocket.OPEN) return persistentWs;
27
  if (wsAuthPromise) return wsAuthPromise;
28
 
29
  wsAuthPromise = (async () => {
 
30
  const wsURL =
31
- (process.env.LIGHTNING_BASE.startsWith("https")
32
- ? process.env.LIGHTNING_BASE.replace("https", "wss")
33
- : process.env.LIGHTNING_BASE.replace("http", "ws")) + "/ws/chat";
34
 
35
- persistentWs = new WebSocket(wsURL);
 
36
 
37
  const safeParse = (str) => {
38
  try {
@@ -45,50 +94,112 @@ async function getSafeWebSocket() {
45
  };
46
 
47
  await new Promise((resolve, reject) => {
48
- const timer = setTimeout(() => reject(new Error("WS connection timeout")), 5000);
49
- persistentWs.on("open", () => {
50
  clearTimeout(timer);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
51
  resolve();
52
- });
53
- persistentWs.on("error", (err) => {
 
54
  console.error("[WS] Connection error", err);
55
- clearTimeout(timer);
56
- persistentWs = null;
57
- wsAuthPromise = null;
 
58
  reject(err);
59
- });
 
 
 
 
 
 
 
 
 
 
 
 
60
  });
61
 
62
- persistentWs.send(JSON.stringify({ key: process.env.WEBSOCKET_KEY }));
63
 
64
  await new Promise((resolve, reject) => {
65
- const timer = setTimeout(() => reject(new Error("WS auth timeout")), 5000);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
66
  const authHandler = (data) => {
67
  const msg = safeParse(data.toString());
68
  if (!msg) return;
69
  if (msg.type === "auth" && msg.status === "ok") {
70
- persistentWs.removeListener("message", authHandler);
71
- clearTimeout(timer);
72
  resolve();
73
  }
74
  if (msg.error) {
 
75
  console.error("[WS] Auth error", msg.error);
76
- persistentWs.removeListener("message", authHandler);
77
- clearTimeout(timer);
78
- persistentWs = null;
79
- wsAuthPromise = null;
80
  reject(new Error(`WS auth error: ${msg.error}`));
81
  }
82
  };
83
- persistentWs.on("message", authHandler);
84
- persistentWs.on("error", (err) => {
85
  console.error("[WS] Auth error event", err);
86
- persistentWs.removeListener("message", authHandler);
87
- clearTimeout(timer);
88
- persistentWs = null;
89
- wsAuthPromise = null;
90
  reject(err);
91
- });
 
 
 
 
 
 
 
 
 
 
 
 
92
  });
93
 
94
  const globalMessageHandler = (data) => {
@@ -100,18 +211,27 @@ async function getSafeWebSocket() {
100
 
101
  const globalErrorHandler = (err) => {
102
  console.error("[WS ERROR]", err);
103
- for (const [id, handler] of errorHandlers.entries()) {
104
- handler(err);
105
- }
106
  };
107
 
108
- persistentWs.on("message", globalMessageHandler);
109
- persistentWs.on("error", globalErrorHandler);
 
 
 
 
 
 
 
 
 
 
110
  activeStreamHandlers.set("__messageListener__", globalMessageHandler);
111
  activeStreamHandlers.set("__errorHandler__", globalErrorHandler);
 
112
 
113
  wsAuthPromise = null;
114
- return persistentWs;
115
  })();
116
 
117
  return wsAuthPromise;
@@ -182,9 +302,13 @@ const HISTORY_SUMMARY_TOKEN_BUDGET = 600;
182
  const NOTES_TOKEN_BUDGET = 900;
183
  const MAX_DYNAMIC_MESSAGES = 10;
184
  const MAX_UPSTREAM_RATE_LIMIT_RETRIES = 4;
 
185
  const DEFAULT_UPSTREAM_RETRY_MS = 4000;
186
  const MAX_UPSTREAM_RETRY_MS = 15000;
187
  const UPSTREAM_RETRY_BUFFER_MS = 350;
 
 
 
188
 
189
  // In-memory stores for staged prompt resources and assistant notes
190
  const promptContextStore = new Map(); // sessionId -> { resources, resourcesById }
@@ -892,6 +1016,15 @@ class RetryableRateLimitError extends Error {
892
  }
893
  }
894
 
 
 
 
 
 
 
 
 
 
895
  class UpstreamProviderError extends Error {
896
  constructor(publicMessage, internalMessage = null) {
897
  super(publicMessage);
@@ -1038,6 +1171,9 @@ function getPublicErrorMessage(err) {
1038
  if (err.name === "RetryableRateLimitError") {
1039
  return "The model provider is temporarily rate limited. Please try again in a few seconds.";
1040
  }
 
 
 
1041
  if (err.publicMessage) return err.publicMessage;
1042
  return String(err);
1043
  }
@@ -1049,14 +1185,23 @@ async function websocketChatStreamWithRetry(body, headers, onToken, abortSignal)
1049
  } catch (err) {
1050
  if (err?.name === "AbortError") throw err;
1051
 
1052
- const retryable = err?.name === "RetryableRateLimitError";
1053
- if (!retryable || retryIndex >= MAX_UPSTREAM_RATE_LIMIT_RETRIES) {
 
 
 
 
 
 
 
1054
  throw err;
1055
  }
1056
 
1057
  const waitMs = getRetryDelayMs(err, retryIndex);
1058
  console.warn(
1059
- `[streamChat] Upstream rate limited, retrying in ${waitMs}ms (${retryIndex + 1}/${MAX_UPSTREAM_RATE_LIMIT_RETRIES})`
 
 
1060
  );
1061
  await sleepWithAbort(waitMs, abortSignal);
1062
  }
@@ -1080,28 +1225,79 @@ export async function websocketChatStream(body, headers, onToken, abortSignal) {
1080
  const safeParse = (str) => {
1081
  try { return JSON.parse(str.startsWith("data: ") ? str.slice(6) : str); } catch { return null; }
1082
  };
1083
- ws.send(JSON.stringify({ body, headers }));
1084
  let assistantText = "";
1085
  const toolCallBuffer = new Map();
1086
  let finished = false;
 
1087
 
1088
  return new Promise((resolve, reject) => {
1089
- const timeoutId = setTimeout(() => {
 
1090
  if (!finished) {
1091
  finished = true;
1092
  cleanup();
1093
  const toolCalls = serializeToolCalls(toolCallBuffer);
1094
  resolve({ assistantText, toolCalls });
1095
  }
1096
- }, 120000);
1097
 
1098
  const cleanup = () => {
1099
  activeStreamHandlers.delete(currentRequestId);
1100
  errorHandlers.delete(currentRequestId);
1101
- clearTimeout(timeoutId);
 
1102
  if (abortSignal) abortSignal.removeEventListener("abort", abortHandler);
1103
  };
1104
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1105
  const messageHandler = (line) => {
1106
  const colonIdx = line.indexOf(':');
1107
  if (colonIdx === -1) return;
@@ -1110,6 +1306,9 @@ export async function websocketChatStream(body, headers, onToken, abortSignal) {
1110
  if (msgRequestId !== String(currentRequestId)) return;
1111
  if (!payload) return;
1112
 
 
 
 
1113
  if (payload.error && !payload.choices) {
1114
  if (!finished) {
1115
  finished = true;
@@ -1198,12 +1397,53 @@ export async function websocketChatStream(body, headers, onToken, abortSignal) {
1198
  }
1199
  };
1200
 
1201
- const errorHandler = (err) => { if (!finished) { finished = true; cleanup(); reject(err); } };
1202
- const abortHandler = () => { if (!finished) { finished = true; cleanup(); reject(new Error("AbortError")); } };
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1203
 
1204
  activeStreamHandlers.set(currentRequestId, messageHandler);
1205
  errorHandlers.set(currentRequestId, errorHandler);
1206
  if (abortSignal) abortSignal.addEventListener("abort", abortHandler);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1207
  });
1208
  }
1209
 
@@ -1384,11 +1624,11 @@ export async function streamChat({
1384
  clearPromptState(sessionId);
1385
 
1386
  } catch (err) {
1387
- console.error("streamChat error:", err?.internalMessage || err);
1388
  clearPromptState(sessionId);
1389
  if (err.name === "AbortError" || err.message === "AbortError") {
1390
  if (typeof onDone === "function") onDone(null, null, true, null);
1391
  } else {
 
1392
  if (typeof onError === "function") onError(getPublicErrorMessage(err));
1393
  }
1394
  }
 
22
  let activeStreamHandlers = new Map(); // Track active stream handlers by request ID
23
  let errorHandlers = new Map(); // Track error handlers by request ID
24
 
25
+ function buildUpstreamSocketMessage(reason, err = null) {
26
+ return [reason, err?.message].filter(Boolean).join(": ");
27
+ }
28
+
29
+ function invalidatePersistentWebSocket(ws, reason, err = null) {
30
+ if (!ws || persistentWs !== ws) return;
31
+
32
+ persistentWs = null;
33
+ wsAuthPromise = null;
34
+ activeStreamHandlers.delete("__messageListener__");
35
+ activeStreamHandlers.delete("__errorHandler__");
36
+ activeStreamHandlers.delete("__closeHandler__");
37
+
38
+ const failure =
39
+ err?.name === "RetryableUpstreamConnectionError"
40
+ ? err
41
+ : new RetryableUpstreamConnectionError(
42
+ "The model connection was interrupted. Reconnecting automatically.",
43
+ buildUpstreamSocketMessage(reason, err)
44
+ );
45
+
46
+ const pendingErrorHandlers = [...errorHandlers.values()];
47
+ for (const handler of pendingErrorHandlers) {
48
+ try {
49
+ handler(failure);
50
+ } catch {
51
+ // Ignore request-specific cleanup errors while invalidating the shared socket.
52
+ }
53
+ }
54
+
55
+ try {
56
+ ws.removeAllListeners("message");
57
+ ws.removeAllListeners("error");
58
+ ws.removeAllListeners("close");
59
+ } catch {
60
+ // Ignore listener cleanup failures.
61
+ }
62
+
63
+ if (ws.readyState === WebSocket.OPEN || ws.readyState === WebSocket.CONNECTING) {
64
+ try {
65
+ ws.terminate();
66
+ } catch {
67
+ // Ignore termination failures while resetting the shared socket.
68
+ }
69
+ }
70
+ }
71
+
72
  async function getSafeWebSocket() {
73
  if (persistentWs && persistentWs.readyState === WebSocket.OPEN) return persistentWs;
74
  if (wsAuthPromise) return wsAuthPromise;
75
 
76
  wsAuthPromise = (async () => {
77
+ const lightningBase = process.env.LIGHTNING_BASE || LIGHTNING_BASE;
78
  const wsURL =
79
+ (lightningBase.startsWith("https")
80
+ ? lightningBase.replace("https", "wss")
81
+ : lightningBase.replace("http", "ws")) + "/ws/chat";
82
 
83
+ const ws = new WebSocket(wsURL);
84
+ persistentWs = ws;
85
 
86
  const safeParse = (str) => {
87
  try {
 
94
  };
95
 
96
  await new Promise((resolve, reject) => {
97
+ const cleanup = () => {
 
98
  clearTimeout(timer);
99
+ ws.removeListener("open", onOpen);
100
+ ws.removeListener("error", onError);
101
+ ws.removeListener("close", onClose);
102
+ };
103
+ const timer = setTimeout(() => {
104
+ cleanup();
105
+ if (persistentWs === ws) {
106
+ persistentWs = null;
107
+ wsAuthPromise = null;
108
+ }
109
+ try {
110
+ ws.terminate();
111
+ } catch {
112
+ // Ignore termination failures while timing out the handshake.
113
+ }
114
+ reject(new Error("WS connection timeout"));
115
+ }, 5000);
116
+ const onOpen = () => {
117
+ cleanup();
118
  resolve();
119
+ };
120
+ const onError = (err) => {
121
+ cleanup();
122
  console.error("[WS] Connection error", err);
123
+ if (persistentWs === ws) {
124
+ persistentWs = null;
125
+ wsAuthPromise = null;
126
+ }
127
  reject(err);
128
+ };
129
+ const onClose = (code, reasonBuffer) => {
130
+ cleanup();
131
+ const reason = reasonBuffer?.toString?.() || "";
132
+ if (persistentWs === ws) {
133
+ persistentWs = null;
134
+ wsAuthPromise = null;
135
+ }
136
+ reject(new Error(`WS connection closed (${code})${reason ? `: ${reason}` : ""}`));
137
+ };
138
+ ws.on("open", onOpen);
139
+ ws.on("error", onError);
140
+ ws.on("close", onClose);
141
  });
142
 
143
+ ws.send(JSON.stringify({ key: process.env.WEBSOCKET_KEY }));
144
 
145
  await new Promise((resolve, reject) => {
146
+ const cleanup = () => {
147
+ clearTimeout(timer);
148
+ ws.removeListener("message", authHandler);
149
+ ws.removeListener("error", onError);
150
+ ws.removeListener("close", onClose);
151
+ };
152
+ const timer = setTimeout(() => {
153
+ cleanup();
154
+ if (persistentWs === ws) {
155
+ persistentWs = null;
156
+ wsAuthPromise = null;
157
+ }
158
+ try {
159
+ ws.terminate();
160
+ } catch {
161
+ // Ignore termination failures while timing out auth.
162
+ }
163
+ reject(new Error("WS auth timeout"));
164
+ }, 5000);
165
  const authHandler = (data) => {
166
  const msg = safeParse(data.toString());
167
  if (!msg) return;
168
  if (msg.type === "auth" && msg.status === "ok") {
169
+ cleanup();
 
170
  resolve();
171
  }
172
  if (msg.error) {
173
+ cleanup();
174
  console.error("[WS] Auth error", msg.error);
175
+ if (persistentWs === ws) {
176
+ persistentWs = null;
177
+ wsAuthPromise = null;
178
+ }
179
  reject(new Error(`WS auth error: ${msg.error}`));
180
  }
181
  };
182
+ const onError = (err) => {
183
+ cleanup();
184
  console.error("[WS] Auth error event", err);
185
+ if (persistentWs === ws) {
186
+ persistentWs = null;
187
+ wsAuthPromise = null;
188
+ }
189
  reject(err);
190
+ };
191
+ const onClose = (code, reasonBuffer) => {
192
+ cleanup();
193
+ const reason = reasonBuffer?.toString?.() || "";
194
+ if (persistentWs === ws) {
195
+ persistentWs = null;
196
+ wsAuthPromise = null;
197
+ }
198
+ reject(new Error(`WS auth closed (${code})${reason ? `: ${reason}` : ""}`));
199
+ };
200
+ ws.on("message", authHandler);
201
+ ws.on("error", onError);
202
+ ws.on("close", onClose);
203
  });
204
 
205
  const globalMessageHandler = (data) => {
 
211
 
212
  const globalErrorHandler = (err) => {
213
  console.error("[WS ERROR]", err);
214
+ invalidatePersistentWebSocket(ws, "Upstream websocket error", err);
 
 
215
  };
216
 
217
+ const globalCloseHandler = (code, reasonBuffer) => {
218
+ const reason = reasonBuffer?.toString?.() || "";
219
+ console.warn(`[WS CLOSE] ${code}${reason ? `: ${reason}` : ""}`);
220
+ invalidatePersistentWebSocket(
221
+ ws,
222
+ `Upstream websocket closed (${code})${reason ? `: ${reason}` : ""}`
223
+ );
224
+ };
225
+
226
+ ws.on("message", globalMessageHandler);
227
+ ws.on("error", globalErrorHandler);
228
+ ws.on("close", globalCloseHandler);
229
  activeStreamHandlers.set("__messageListener__", globalMessageHandler);
230
  activeStreamHandlers.set("__errorHandler__", globalErrorHandler);
231
+ activeStreamHandlers.set("__closeHandler__", globalCloseHandler);
232
 
233
  wsAuthPromise = null;
234
+ return ws;
235
  })();
236
 
237
  return wsAuthPromise;
 
302
  const NOTES_TOKEN_BUDGET = 900;
303
  const MAX_DYNAMIC_MESSAGES = 10;
304
  const MAX_UPSTREAM_RATE_LIMIT_RETRIES = 4;
305
+ const MAX_UPSTREAM_CONNECTION_RETRIES = 2;
306
  const DEFAULT_UPSTREAM_RETRY_MS = 4000;
307
  const MAX_UPSTREAM_RETRY_MS = 15000;
308
  const UPSTREAM_RETRY_BUFFER_MS = 350;
309
+ const UPSTREAM_FIRST_RESPONSE_TIMEOUT_MS = 15000;
310
+ const UPSTREAM_IDLE_TIMEOUT_MS = 45000;
311
+ const UPSTREAM_STREAM_TIMEOUT_MS = 120000;
312
 
313
  // In-memory stores for staged prompt resources and assistant notes
314
  const promptContextStore = new Map(); // sessionId -> { resources, resourcesById }
 
1016
  }
1017
  }
1018
 
1019
+ class RetryableUpstreamConnectionError extends Error {
1020
+ constructor(publicMessage, internalMessage = null) {
1021
+ super(publicMessage);
1022
+ this.name = "RetryableUpstreamConnectionError";
1023
+ this.publicMessage = publicMessage;
1024
+ this.internalMessage = internalMessage || publicMessage;
1025
+ }
1026
+ }
1027
+
1028
  class UpstreamProviderError extends Error {
1029
  constructor(publicMessage, internalMessage = null) {
1030
  super(publicMessage);
 
1171
  if (err.name === "RetryableRateLimitError") {
1172
  return "The model provider is temporarily rate limited. Please try again in a few seconds.";
1173
  }
1174
+ if (err.name === "RetryableUpstreamConnectionError") {
1175
+ return "The model connection was interrupted. Please try again.";
1176
+ }
1177
  if (err.publicMessage) return err.publicMessage;
1178
  return String(err);
1179
  }
 
1185
  } catch (err) {
1186
  if (err?.name === "AbortError") throw err;
1187
 
1188
+ const retryable =
1189
+ err?.name === "RetryableRateLimitError" ||
1190
+ err?.name === "RetryableUpstreamConnectionError";
1191
+ const maxRetries =
1192
+ err?.name === "RetryableUpstreamConnectionError"
1193
+ ? MAX_UPSTREAM_CONNECTION_RETRIES
1194
+ : MAX_UPSTREAM_RATE_LIMIT_RETRIES;
1195
+
1196
+ if (!retryable || retryIndex >= maxRetries) {
1197
  throw err;
1198
  }
1199
 
1200
  const waitMs = getRetryDelayMs(err, retryIndex);
1201
  console.warn(
1202
+ err?.name === "RetryableUpstreamConnectionError"
1203
+ ? `[streamChat] Upstream websocket interrupted, retrying in ${waitMs}ms (${retryIndex + 1}/${maxRetries})`
1204
+ : `[streamChat] Upstream rate limited, retrying in ${waitMs}ms (${retryIndex + 1}/${maxRetries})`
1205
  );
1206
  await sleepWithAbort(waitMs, abortSignal);
1207
  }
 
1225
  const safeParse = (str) => {
1226
  try { return JSON.parse(str.startsWith("data: ") ? str.slice(6) : str); } catch { return null; }
1227
  };
 
1228
  let assistantText = "";
1229
  const toolCallBuffer = new Map();
1230
  let finished = false;
1231
+ let sawAnyPayload = false;
1232
 
1233
  return new Promise((resolve, reject) => {
1234
+ let inactivityTimeoutId = null;
1235
+ const overallTimeoutId = setTimeout(() => {
1236
  if (!finished) {
1237
  finished = true;
1238
  cleanup();
1239
  const toolCalls = serializeToolCalls(toolCallBuffer);
1240
  resolve({ assistantText, toolCalls });
1241
  }
1242
+ }, UPSTREAM_STREAM_TIMEOUT_MS);
1243
 
1244
  const cleanup = () => {
1245
  activeStreamHandlers.delete(currentRequestId);
1246
  errorHandlers.delete(currentRequestId);
1247
+ clearTimeout(overallTimeoutId);
1248
+ clearTimeout(inactivityTimeoutId);
1249
  if (abortSignal) abortSignal.removeEventListener("abort", abortHandler);
1250
  };
1251
 
1252
+ const rejectWithSocketReset = (err, reason) => {
1253
+ if (finished) return;
1254
+ finished = true;
1255
+ cleanup();
1256
+ invalidatePersistentWebSocket(ws, reason, err);
1257
+ reject(err);
1258
+ };
1259
+
1260
+ const rejectUnexpectedClose = (publicMessage, internalMessage) => {
1261
+ const hasPartialOutput = assistantText.trim().length > 0 || toolCallBuffer.size > 0;
1262
+ if (!hasPartialOutput) {
1263
+ rejectWithSocketReset(
1264
+ new RetryableUpstreamConnectionError(
1265
+ "The model connection was interrupted. Reconnecting automatically.",
1266
+ internalMessage
1267
+ ),
1268
+ internalMessage
1269
+ );
1270
+ return;
1271
+ }
1272
+
1273
+ if (finished) return;
1274
+ finished = true;
1275
+ cleanup();
1276
+ invalidatePersistentWebSocket(ws, internalMessage);
1277
+ reject(new UpstreamProviderError(publicMessage, internalMessage));
1278
+ };
1279
+
1280
+ const refreshInactivityTimeout = () => {
1281
+ clearTimeout(inactivityTimeoutId);
1282
+ inactivityTimeoutId = setTimeout(() => {
1283
+ if (sawAnyPayload) {
1284
+ rejectUnexpectedClose(
1285
+ "The model provider interrupted the response. Please try again.",
1286
+ "Upstream websocket became idle before the response finished."
1287
+ );
1288
+ return;
1289
+ }
1290
+
1291
+ rejectWithSocketReset(
1292
+ new RetryableUpstreamConnectionError(
1293
+ "The model connection did not respond in time. Reconnecting automatically.",
1294
+ "Upstream websocket produced no response before timeout."
1295
+ ),
1296
+ "Upstream websocket produced no response before timeout."
1297
+ );
1298
+ }, sawAnyPayload ? UPSTREAM_IDLE_TIMEOUT_MS : UPSTREAM_FIRST_RESPONSE_TIMEOUT_MS);
1299
+ };
1300
+
1301
  const messageHandler = (line) => {
1302
  const colonIdx = line.indexOf(':');
1303
  if (colonIdx === -1) return;
 
1306
  if (msgRequestId !== String(currentRequestId)) return;
1307
  if (!payload) return;
1308
 
1309
+ sawAnyPayload = true;
1310
+ refreshInactivityTimeout();
1311
+
1312
  if (payload.error && !payload.choices) {
1313
  if (!finished) {
1314
  finished = true;
 
1397
  }
1398
  };
1399
 
1400
+ const errorHandler = (err) => {
1401
+ if (finished) return;
1402
+ finished = true;
1403
+ cleanup();
1404
+
1405
+ const hasPartialOutput = assistantText.trim().length > 0 || toolCallBuffer.size > 0;
1406
+ if (hasPartialOutput && err?.name === "RetryableUpstreamConnectionError") {
1407
+ reject(
1408
+ new UpstreamProviderError(
1409
+ "The model provider interrupted the response. Please try again.",
1410
+ err?.internalMessage || err?.message || "Upstream websocket interrupted after partial output."
1411
+ )
1412
+ );
1413
+ return;
1414
+ }
1415
+
1416
+ reject(err);
1417
+ };
1418
+ const abortHandler = () => { if (!finished) { finished = true; cleanup(); reject(createAbortError()); } };
1419
 
1420
  activeStreamHandlers.set(currentRequestId, messageHandler);
1421
  errorHandlers.set(currentRequestId, errorHandler);
1422
  if (abortSignal) abortSignal.addEventListener("abort", abortHandler);
1423
+
1424
+ refreshInactivityTimeout();
1425
+
1426
+ try {
1427
+ ws.send(JSON.stringify({ body, headers }), (err) => {
1428
+ if (err) {
1429
+ rejectWithSocketReset(
1430
+ new RetryableUpstreamConnectionError(
1431
+ "The model connection was interrupted. Reconnecting automatically.",
1432
+ buildUpstreamSocketMessage("Failed to send upstream websocket request", err)
1433
+ ),
1434
+ "Failed to send upstream websocket request"
1435
+ );
1436
+ }
1437
+ });
1438
+ } catch (err) {
1439
+ rejectWithSocketReset(
1440
+ new RetryableUpstreamConnectionError(
1441
+ "The model connection was interrupted. Reconnecting automatically.",
1442
+ buildUpstreamSocketMessage("Failed to send upstream websocket request", err)
1443
+ ),
1444
+ "Failed to send upstream websocket request"
1445
+ );
1446
+ }
1447
  });
1448
  }
1449
 
 
1624
  clearPromptState(sessionId);
1625
 
1626
  } catch (err) {
 
1627
  clearPromptState(sessionId);
1628
  if (err.name === "AbortError" || err.message === "AbortError") {
1629
  if (typeof onDone === "function") onDone(null, null, true, null);
1630
  } else {
1631
+ console.error("streamChat error:", err?.internalMessage || err);
1632
  if (typeof onError === "function") onError(getPublicErrorMessage(err));
1633
  }
1634
  }
server/index.js CHANGED
@@ -9,7 +9,7 @@ import fetch from 'node-fetch';
9
  import rateLimit from 'express-rate-limit';
10
  import fs from 'fs';
11
  import { registerFeedbackRoutes } from './handleFeedback.js';
12
- import { handleWsMessage } from './wsHandler.js';
13
  import { sessionStore, initStoreConfig } from './sessionStore.js';
14
  import { SUPABASE_URL, SUPABASE_ANON_KEY } from './config.js';
15
  import { safeSend } from './helpers.js';
@@ -732,8 +732,16 @@ wss.on('connection',(ws,req)=>{
732
  catch(ex){ console.error("Invalid message error:",ex.message,"\nStack:",ex.stack); safeSend(ws,{type:'error',message:'Invalid message: '+ex.message}); }
733
  });
734
 
735
- ws.on('close',()=>{ const c=wsClients.get(ws); if(c?.userId) sessionStore.markOffline(c.userId,ws); wsClients.delete(ws); });
736
- ws.on('error',()=>wsClients.delete(ws));
 
 
 
 
 
 
 
 
737
  safeSend(ws,{type:'connected', tempId:wsClients.get(ws)?.tempId});
738
  });
739
 
 
9
  import rateLimit from 'express-rate-limit';
10
  import fs from 'fs';
11
  import { registerFeedbackRoutes } from './handleFeedback.js';
12
+ import { abortActiveStream, handleWsMessage } from './wsHandler.js';
13
  import { sessionStore, initStoreConfig } from './sessionStore.js';
14
  import { SUPABASE_URL, SUPABASE_ANON_KEY } from './config.js';
15
  import { safeSend } from './helpers.js';
 
732
  catch(ex){ console.error("Invalid message error:",ex.message,"\nStack:",ex.stack); safeSend(ws,{type:'error',message:'Invalid message: '+ex.message}); }
733
  });
734
 
735
+ ws.on('close',()=>{
736
+ abortActiveStream(ws);
737
+ const c=wsClients.get(ws);
738
+ if(c?.userId) sessionStore.markOffline(c.userId,ws);
739
+ wsClients.delete(ws);
740
+ });
741
+ ws.on('error',()=>{
742
+ abortActiveStream(ws);
743
+ wsClients.delete(ws);
744
+ });
745
  safeSend(ws,{type:'connected', tempId:wsClients.get(ws)?.tempId});
746
  });
747
 
server/wsHandler.js CHANGED
@@ -51,6 +51,12 @@ const CONTINUE_ASSISTANT_PROMPT =
51
  'Continue your previous response exactly where it left off. Do not restart, summarize, or repeat the opening. Preserve the same formatting and only add the missing continuation.';
52
  const FREE_WEB_SEARCH_LIMIT = 15;
53
 
 
 
 
 
 
 
54
  initGuestRequestLimiter().catch(err => console.error('Failed to initialize guest request limiter:', err));
55
 
56
  function usageOwnerKey(client, clientId = '') {
@@ -306,7 +312,7 @@ const handlers = {
306
  : sessionStore.getTempSession(client.tempId, sessionId);
307
  if (!session) return safeSend(ws, { type: 'error', message: 'Session not found' });
308
 
309
- if (activeStreams.has(ws)) activeStreams.get(ws).abort();
310
  const abort = new AbortController();
311
  activeStreams.set(ws, abort);
312
  safeSend(ws, { type: 'chat:start', sessionId });
@@ -437,7 +443,7 @@ const handlers = {
437
  });
438
  },
439
 
440
- 'chat:stop': (ws) => { if (activeStreams.has(ws)) { activeStreams.get(ws).abort(); activeStreams.delete(ws); } },
441
 
442
  'chat:editMessage': async (ws, msg, client) => {
443
  const { sessionId, messageIndex, newContent } = msg;
@@ -552,7 +558,7 @@ const handlers = {
552
  return safeSend(ws, { type: 'error', message: 'Assistant message not found' });
553
  }
554
 
555
- if (activeStreams.has(ws)) activeStreams.get(ws).abort();
556
  const abort = new AbortController();
557
  activeStreams.set(ws, abort);
558
 
 
51
  'Continue your previous response exactly where it left off. Do not restart, summarize, or repeat the opening. Preserve the same formatting and only add the missing continuation.';
52
  const FREE_WEB_SEARCH_LIMIT = 15;
53
 
54
+ export function abortActiveStream(ws) {
55
+ if (!activeStreams.has(ws)) return;
56
+ activeStreams.get(ws).abort();
57
+ activeStreams.delete(ws);
58
+ }
59
+
60
  initGuestRequestLimiter().catch(err => console.error('Failed to initialize guest request limiter:', err));
61
 
62
  function usageOwnerKey(client, clientId = '') {
 
312
  : sessionStore.getTempSession(client.tempId, sessionId);
313
  if (!session) return safeSend(ws, { type: 'error', message: 'Session not found' });
314
 
315
+ abortActiveStream(ws);
316
  const abort = new AbortController();
317
  activeStreams.set(ws, abort);
318
  safeSend(ws, { type: 'chat:start', sessionId });
 
443
  });
444
  },
445
 
446
+ 'chat:stop': (ws) => { abortActiveStream(ws); },
447
 
448
  'chat:editMessage': async (ws, msg, client) => {
449
  const { sessionId, messageIndex, newContent } = msg;
 
558
  return safeSend(ws, { type: 'error', message: 'Assistant message not found' });
559
  }
560
 
561
+ abortActiveStream(ws);
562
  const abort = new AbortController();
563
  activeStreams.set(ws, abort);
564