incognitolm commited on
Commit
42f1e7b
·
1 Parent(s): 811bf80

Update chatStream.js

Browse files
Files changed (1) hide show
  1. server/chatStream.js +56 -23
server/chatStream.js CHANGED
@@ -54,38 +54,63 @@ function makeClient(accessToken, clientId) {
54
  }
55
 
56
  export async function websocketChatStream(body, headers, onToken, abortSignal) {
 
 
 
 
57
  const wsURL =
58
  (process.env.LIGHTNING_BASE.startsWith("https")
59
  ? process.env.LIGHTNING_BASE.replace("https", "wss")
60
  : process.env.LIGHTNING_BASE.replace("http", "ws")) +
61
  "/ws/chat";
62
 
 
 
63
  const ws = new WebSocket(wsURL);
64
 
65
- // Wait for connection open
66
  await new Promise((resolve, reject) => {
67
- ws.on("open", resolve);
68
- ws.on("error", reject);
 
 
 
 
 
 
69
  });
70
 
71
  // Authenticate
72
  ws.send(JSON.stringify({ key: process.env.WEBSOCKET_KEY }));
 
73
 
74
  await new Promise((resolve, reject) => {
75
  ws.on("message", (data) => {
 
 
76
  try {
77
- const msg = JSON.parse(data.toString());
78
- if (msg.type === "auth" && msg.status === "ok") resolve();
79
- if (msg.error) reject(new Error(`WebSocket auth error: ${msg.error}`));
 
 
 
 
 
 
80
  } catch (e) {
 
81
  reject(e);
82
  }
83
  });
84
- ws.on("error", reject);
 
 
 
85
  });
86
 
87
- // Send body
88
  ws.send(JSON.stringify({ body, headers }));
 
89
 
90
  let assistantText = "";
91
  const toolCallBuffer = new Map();
@@ -93,6 +118,7 @@ export async function websocketChatStream(body, headers, onToken, abortSignal) {
93
  return new Promise((resolve, reject) => {
94
  if (abortSignal) {
95
  abortSignal.addEventListener("abort", () => {
 
96
  ws.close();
97
  reject(new Error("AbortError"));
98
  });
@@ -105,20 +131,24 @@ export async function websocketChatStream(body, headers, onToken, abortSignal) {
105
  try {
106
  payload = JSON.parse(line);
107
  } catch {
108
- // ignore non-JSON lines
109
  return;
110
  }
111
 
 
 
 
112
  // Handle errors
113
  if (payload.error) {
114
- onToken(`[ERROR] ${payload.error}`);
 
115
  return;
116
  }
117
 
118
  const delta = payload.choices?.[0]?.delta;
119
  if (!delta) return;
120
 
121
- // Stream text content
122
  if (delta.content) {
123
  assistantText += delta.content;
124
  onToken(delta.content);
@@ -132,33 +162,36 @@ export async function websocketChatStream(body, headers, onToken, abortSignal) {
132
  if (call.function?.name) entry.name = call.function.name;
133
  if (call.function?.arguments) entry.arguments += call.function.arguments;
134
  toolCallBuffer.set(call.index, entry);
 
135
  }
136
  }
137
 
138
  // Finish
139
  if (payload.choices?.[0]?.finish_reason) {
 
140
  ws.close();
141
 
142
  const toolCalls = [...toolCallBuffer.values()].map((t) => ({
143
  id: t.id || `call_${crypto.randomUUID()}`,
144
  type: "function",
145
- function: {
146
- name: t.name,
147
- arguments: t.arguments,
148
- },
149
  }));
150
 
151
- resolve({
152
- assistantText,
153
- toolCalls,
154
- });
155
  }
156
  });
157
 
158
- ws.on("error", reject);
159
- ws.on("close", () => {
160
- // If close happens without finish_reason
161
- if (!assistantText) reject(new Error("WebSocket closed prematurely"));
 
 
 
 
 
 
 
162
  });
163
  });
164
  }
 
54
  }
55
 
56
  export async function websocketChatStream(body, headers, onToken, abortSignal) {
57
+ if (!process.env.LIGHTNING_BASE) {
58
+ throw new Error("LIGHTNING_BASE environment variable is not defined");
59
+ }
60
+
61
  const wsURL =
62
  (process.env.LIGHTNING_BASE.startsWith("https")
63
  ? process.env.LIGHTNING_BASE.replace("https", "wss")
64
  : process.env.LIGHTNING_BASE.replace("http", "ws")) +
65
  "/ws/chat";
66
 
67
+ console.log("[WS] Connecting to", wsURL);
68
+
69
  const ws = new WebSocket(wsURL);
70
 
 
71
  await new Promise((resolve, reject) => {
72
+ ws.on("open", () => {
73
+ console.log("[WS] Connection opened");
74
+ resolve();
75
+ });
76
+ ws.on("error", (err) => {
77
+ console.error("[WS] Connection error:", err);
78
+ reject(err);
79
+ });
80
  });
81
 
82
  // Authenticate
83
  ws.send(JSON.stringify({ key: process.env.WEBSOCKET_KEY }));
84
+ console.log("[WS] Auth sent");
85
 
86
  await new Promise((resolve, reject) => {
87
  ws.on("message", (data) => {
88
+ const msgStr = data.toString();
89
+ console.log("[WS] Auth message:", msgStr);
90
  try {
91
+ const msg = JSON.parse(msgStr);
92
+ if (msg.type === "auth" && msg.status === "ok") {
93
+ console.log("[WS] Auth successful");
94
+ resolve();
95
+ }
96
+ if (msg.error) {
97
+ console.error("[WS] Auth error:", msg.error);
98
+ reject(new Error(`WebSocket auth error: ${msg.error}`));
99
+ }
100
  } catch (e) {
101
+ console.error("[WS] Auth message parse error:", e);
102
  reject(e);
103
  }
104
  });
105
+ ws.on("error", (err) => {
106
+ console.error("[WS] Auth error event:", err);
107
+ reject(err);
108
+ });
109
  });
110
 
111
+ // Send request body
112
  ws.send(JSON.stringify({ body, headers }));
113
+ console.log("[WS] Body sent");
114
 
115
  let assistantText = "";
116
  const toolCallBuffer = new Map();
 
118
  return new Promise((resolve, reject) => {
119
  if (abortSignal) {
120
  abortSignal.addEventListener("abort", () => {
121
+ console.log("[WS] Aborted by signal");
122
  ws.close();
123
  reject(new Error("AbortError"));
124
  });
 
131
  try {
132
  payload = JSON.parse(line);
133
  } catch {
134
+ console.log("[WS] Non-JSON message received:", line);
135
  return;
136
  }
137
 
138
+ // Log every event
139
+ console.log("[WS] Payload received:", JSON.stringify(payload));
140
+
141
  // Handle errors
142
  if (payload.error) {
143
+ console.error("[WS] Payload error:", payload.error);
144
+ onToken(`[ERROR] ${payload.error}`); // still sent to client
145
  return;
146
  }
147
 
148
  const delta = payload.choices?.[0]?.delta;
149
  if (!delta) return;
150
 
151
+ // Stream content only through onToken, but do not log the token
152
  if (delta.content) {
153
  assistantText += delta.content;
154
  onToken(delta.content);
 
162
  if (call.function?.name) entry.name = call.function.name;
163
  if (call.function?.arguments) entry.arguments += call.function.arguments;
164
  toolCallBuffer.set(call.index, entry);
165
+ console.log("[WS] Tool call received:", JSON.stringify(entry));
166
  }
167
  }
168
 
169
  // Finish
170
  if (payload.choices?.[0]?.finish_reason) {
171
+ console.log("[WS] Finish reason:", payload.choices[0].finish_reason);
172
  ws.close();
173
 
174
  const toolCalls = [...toolCallBuffer.values()].map((t) => ({
175
  id: t.id || `call_${crypto.randomUUID()}`,
176
  type: "function",
177
+ function: { name: t.name, arguments: t.arguments },
 
 
 
178
  }));
179
 
180
+ resolve({ assistantText, toolCalls });
 
 
 
181
  }
182
  });
183
 
184
+ ws.on("error", (err) => {
185
+ console.error("[WS] Error event:", err);
186
+ // still propagate error to caller
187
+ reject(err);
188
+ });
189
+
190
+ ws.on("close", (code, reason) => {
191
+ console.log(`[WS] Connection closed (code: ${code}, reason: ${reason})`);
192
+ if (!assistantText) {
193
+ reject(new Error("WebSocket closed prematurely"));
194
+ }
195
  });
196
  });
197
  }