incognitolm commited on
Commit
ff954f1
·
1 Parent(s): 42f1e7b

Update chatStream.js

Browse files
Files changed (1) hide show
  1. server/chatStream.js +48 -60
server/chatStream.js CHANGED
@@ -54,107 +54,100 @@ function makeClient(accessToken, clientId) {
54
  }
55
 
56
  export async function websocketChatStream(body, headers, onToken, abortSignal) {
57
- if (!process.env.LIGHTNING_BASE) {
58
- throw new Error("LIGHTNING_BASE environment variable is not defined");
59
- }
60
-
61
  const wsURL =
62
  (process.env.LIGHTNING_BASE.startsWith("https")
63
  ? process.env.LIGHTNING_BASE.replace("https", "wss")
64
- : process.env.LIGHTNING_BASE.replace("http", "ws")) +
65
- "/ws/chat";
66
-
67
- console.log("[WS] Connecting to", wsURL);
68
 
69
  const ws = new WebSocket(wsURL);
70
 
 
 
 
 
 
 
 
71
  await new Promise((resolve, reject) => {
 
72
  ws.on("open", () => {
73
- console.log("[WS] Connection opened");
 
74
  resolve();
75
  });
76
  ws.on("error", (err) => {
 
77
  console.error("[WS] Connection error:", err);
78
  reject(err);
79
  });
80
  });
81
 
82
- // Authenticate
83
  ws.send(JSON.stringify({ key: process.env.WEBSOCKET_KEY }));
84
- console.log("[WS] Auth sent");
85
-
86
  await new Promise((resolve, reject) => {
 
87
  ws.on("message", (data) => {
88
- const msgStr = data.toString();
89
- console.log("[WS] Auth message:", msgStr);
90
- try {
91
- const msg = JSON.parse(msgStr);
92
- if (msg.type === "auth" && msg.status === "ok") {
93
- console.log("[WS] Auth successful");
94
- resolve();
95
- }
96
- if (msg.error) {
97
- console.error("[WS] Auth error:", msg.error);
98
- reject(new Error(`WebSocket auth error: ${msg.error}`));
99
- }
100
- } catch (e) {
101
- console.error("[WS] Auth message parse error:", e);
102
- reject(e);
103
  }
104
  });
105
  ws.on("error", (err) => {
106
- console.error("[WS] Auth error event:", err);
107
  reject(err);
108
  });
109
  });
110
 
111
- // Send request body
112
  ws.send(JSON.stringify({ body, headers }));
113
- console.log("[WS] Body sent");
114
 
115
  let assistantText = "";
116
  const toolCallBuffer = new Map();
 
117
 
118
  return new Promise((resolve, reject) => {
 
119
  if (abortSignal) {
120
  abortSignal.addEventListener("abort", () => {
121
  console.log("[WS] Aborted by signal");
122
  ws.close();
123
- reject(new Error("AbortError"));
124
  });
125
  }
126
 
 
127
  ws.on("message", (data) => {
128
  const line = data.toString();
129
- let payload;
 
 
130
 
131
- try {
132
- payload = JSON.parse(line);
133
- } catch {
134
- console.log("[WS] Non-JSON message received:", line);
135
- return;
136
- }
137
-
138
- // Log every event
139
- console.log("[WS] Payload received:", JSON.stringify(payload));
140
-
141
- // Handle errors
142
  if (payload.error) {
143
- console.error("[WS] Payload error:", payload.error);
144
- onToken(`[ERROR] ${payload.error}`); // still sent to client
145
  return;
146
  }
147
 
148
  const delta = payload.choices?.[0]?.delta;
149
  if (!delta) return;
150
 
151
- // Stream content only through onToken, but do not log the token
152
  if (delta.content) {
153
  assistantText += delta.content;
154
- onToken(delta.content);
155
  }
156
 
157
- // Handle tool calls
158
  if (delta.tool_calls) {
159
  for (const call of delta.tool_calls) {
160
  const entry = toolCallBuffer.get(call.index) ?? { arguments: "" };
@@ -162,36 +155,31 @@ export async function websocketChatStream(body, headers, onToken, abortSignal) {
162
  if (call.function?.name) entry.name = call.function.name;
163
  if (call.function?.arguments) entry.arguments += call.function.arguments;
164
  toolCallBuffer.set(call.index, entry);
165
- console.log("[WS] Tool call received:", JSON.stringify(entry));
166
  }
167
  }
168
 
169
- // Finish
170
  if (payload.choices?.[0]?.finish_reason) {
171
- console.log("[WS] Finish reason:", payload.choices[0].finish_reason);
172
  ws.close();
173
-
174
  const toolCalls = [...toolCallBuffer.values()].map((t) => ({
175
  id: t.id || `call_${crypto.randomUUID()}`,
176
  type: "function",
177
  function: { name: t.name, arguments: t.arguments },
178
  }));
179
-
180
  resolve({ assistantText, toolCalls });
181
  }
182
  });
183
 
184
  ws.on("error", (err) => {
185
- console.error("[WS] Error event:", err);
186
- // still propagate error to caller
187
- reject(err);
188
  });
189
 
190
  ws.on("close", (code, reason) => {
191
- console.log(`[WS] Connection closed (code: ${code}, reason: ${reason})`);
192
- if (!assistantText) {
193
- reject(new Error("WebSocket closed prematurely"));
194
- }
195
  });
196
  });
197
  }
 
54
  }
55
 
56
  export async function websocketChatStream(body, headers, onToken, abortSignal) {
 
 
 
 
57
  const wsURL =
58
  (process.env.LIGHTNING_BASE.startsWith("https")
59
  ? process.env.LIGHTNING_BASE.replace("https", "wss")
60
+ : process.env.LIGHTNING_BASE.replace("http", "ws")) + "/ws/chat";
 
 
 
61
 
62
  const ws = new WebSocket(wsURL);
63
 
64
+ // Utility to safely parse JSON
65
+ const safeParse = (str) => {
66
+ try { return JSON.parse(str); }
67
+ catch { return null; }
68
+ };
69
+
70
+ // Wait for open with timeout
71
  await new Promise((resolve, reject) => {
72
+ const timer = setTimeout(() => reject(new Error("WS connection timeout")), 5000);
73
  ws.on("open", () => {
74
+ clearTimeout(timer);
75
+ console.log("[WS] Opened connection");
76
  resolve();
77
  });
78
  ws.on("error", (err) => {
79
+ clearTimeout(timer);
80
  console.error("[WS] Connection error:", err);
81
  reject(err);
82
  });
83
  });
84
 
85
+ // Authenticate with timeout
86
  ws.send(JSON.stringify({ key: process.env.WEBSOCKET_KEY }));
 
 
87
  await new Promise((resolve, reject) => {
88
+ const timer = setTimeout(() => reject(new Error("WS auth timeout")), 5000);
89
  ws.on("message", (data) => {
90
+ const msg = safeParse(data.toString());
91
+ console.log("[WS RAW MESSAGE]", data.toString());
92
+ if (!msg) return;
93
+ if (msg.type === "auth" && msg.status === "ok") {
94
+ clearTimeout(timer);
95
+ console.log("[WS] Auth successful");
96
+ resolve();
97
+ }
98
+ if (msg.error) {
99
+ clearTimeout(timer);
100
+ console.error("[WS] Auth error:", msg.error);
101
+ reject(new Error(`WS auth error: ${msg.error}`));
 
 
 
102
  }
103
  });
104
  ws.on("error", (err) => {
105
+ clearTimeout(timer);
106
  reject(err);
107
  });
108
  });
109
 
110
+ // Send the chat body
111
  ws.send(JSON.stringify({ body, headers }));
 
112
 
113
  let assistantText = "";
114
  const toolCallBuffer = new Map();
115
+ let finished = false;
116
 
117
  return new Promise((resolve, reject) => {
118
+ // Abort handling
119
  if (abortSignal) {
120
  abortSignal.addEventListener("abort", () => {
121
  console.log("[WS] Aborted by signal");
122
  ws.close();
123
+ if (!finished) reject(new Error("AbortError"));
124
  });
125
  }
126
 
127
+ // Handle messages
128
  ws.on("message", (data) => {
129
  const line = data.toString();
130
+ console.log("[WS MESSAGE RECEIVED]", line);
131
+ let payload = safeParse(line);
132
+ if (!payload) return;
133
 
134
+ // Handle errors from server
 
 
 
 
 
 
 
 
 
 
135
  if (payload.error) {
136
+ console.error("[WS PAYLOAD ERROR]", payload.error);
137
+ if (typeof onToken === "function") onToken(`[ERROR] ${payload.error}`);
138
  return;
139
  }
140
 
141
  const delta = payload.choices?.[0]?.delta;
142
  if (!delta) return;
143
 
144
+ // Stream content tokens
145
  if (delta.content) {
146
  assistantText += delta.content;
147
+ if (typeof onToken === "function") onToken(delta.content);
148
  }
149
 
150
+ // Collect tool calls
151
  if (delta.tool_calls) {
152
  for (const call of delta.tool_calls) {
153
  const entry = toolCallBuffer.get(call.index) ?? { arguments: "" };
 
155
  if (call.function?.name) entry.name = call.function.name;
156
  if (call.function?.arguments) entry.arguments += call.function.arguments;
157
  toolCallBuffer.set(call.index, entry);
 
158
  }
159
  }
160
 
161
+ // Finish detection
162
  if (payload.choices?.[0]?.finish_reason) {
163
+ finished = true;
164
  ws.close();
 
165
  const toolCalls = [...toolCallBuffer.values()].map((t) => ({
166
  id: t.id || `call_${crypto.randomUUID()}`,
167
  type: "function",
168
  function: { name: t.name, arguments: t.arguments },
169
  }));
170
+ console.log("[WS] Finished streaming");
171
  resolve({ assistantText, toolCalls });
172
  }
173
  });
174
 
175
  ws.on("error", (err) => {
176
+ console.error("[WS ERROR]", err);
177
+ if (!finished) reject(err);
 
178
  });
179
 
180
  ws.on("close", (code, reason) => {
181
+ console.log(`[WS CLOSED] Code: ${code}, Reason: ${reason}`);
182
+ if (!finished) reject(new Error("WebSocket closed prematurely"));
 
 
183
  });
184
  });
185
  }