incognitolm commited on
Commit
811bf80
·
1 Parent(s): 2b4a989

Update chatStream.js

Browse files
Files changed (1) hide show
  1. server/chatStream.js +21 -20
server/chatStream.js CHANGED
@@ -55,9 +55,9 @@ function makeClient(accessToken, clientId) {
55
 
56
  export async function websocketChatStream(body, headers, onToken, abortSignal) {
57
  const wsURL =
58
- (LIGHTNING_BASE.startsWith("https")
59
- ? LIGHTNING_BASE.replace("https", "wss")
60
- : LIGHTNING_BASE.replace("http", "ws")) +
61
  "/ws/chat";
62
 
63
  const ws = new WebSocket(wsURL);
@@ -76,12 +76,15 @@ export async function websocketChatStream(body, headers, onToken, abortSignal) {
76
  try {
77
  const msg = JSON.parse(data.toString());
78
  if (msg.type === "auth" && msg.status === "ok") resolve();
79
- } catch {}
 
 
 
80
  });
81
  ws.on("error", reject);
82
  });
83
 
84
- // Send chat body
85
  ws.send(JSON.stringify({ body, headers }));
86
 
87
  let assistantText = "";
@@ -97,31 +100,28 @@ export async function websocketChatStream(body, headers, onToken, abortSignal) {
97
 
98
  ws.on("message", (data) => {
99
  const line = data.toString();
100
-
101
- if (!line.startsWith("data:")) return;
102
-
103
  let payload;
 
104
  try {
105
- payload = JSON.parse(line.slice(5));
106
  } catch {
 
107
  return;
108
  }
109
 
110
- // ⚠️ Log any server error
111
  if (payload.error) {
112
- console.error("Received error from server:", payload.error);
 
113
  }
114
 
115
- // Optional: log all payloads for debugging
116
- // console.log("Server payload:", payload);
117
-
118
  const delta = payload.choices?.[0]?.delta;
119
  if (!delta) return;
120
 
121
- // Append assistant text
122
  if (delta.content) {
123
  assistantText += delta.content;
124
- if (onToken) onToken(delta.content);
125
  }
126
 
127
  // Handle tool calls
@@ -135,7 +135,7 @@ export async function websocketChatStream(body, headers, onToken, abortSignal) {
135
  }
136
  }
137
 
138
- // Finish reason → resolve promise
139
  if (payload.choices?.[0]?.finish_reason) {
140
  ws.close();
141
 
@@ -155,9 +155,10 @@ export async function websocketChatStream(body, headers, onToken, abortSignal) {
155
  }
156
  });
157
 
158
- ws.on("error", (err) => {
159
- console.error("WebSocket error:", err);
160
- reject(err);
 
161
  });
162
  });
163
  }
 
55
 
56
  export async function websocketChatStream(body, headers, onToken, abortSignal) {
57
  const wsURL =
58
+ (process.env.LIGHTNING_BASE.startsWith("https")
59
+ ? process.env.LIGHTNING_BASE.replace("https", "wss")
60
+ : process.env.LIGHTNING_BASE.replace("http", "ws")) +
61
  "/ws/chat";
62
 
63
  const ws = new WebSocket(wsURL);
 
76
  try {
77
  const msg = JSON.parse(data.toString());
78
  if (msg.type === "auth" && msg.status === "ok") resolve();
79
+ if (msg.error) reject(new Error(`WebSocket auth error: ${msg.error}`));
80
+ } catch (e) {
81
+ reject(e);
82
+ }
83
  });
84
  ws.on("error", reject);
85
  });
86
 
87
+ // Send body
88
  ws.send(JSON.stringify({ body, headers }));
89
 
90
  let assistantText = "";
 
100
 
101
  ws.on("message", (data) => {
102
  const line = data.toString();
 
 
 
103
  let payload;
104
+
105
  try {
106
+ payload = JSON.parse(line);
107
  } catch {
108
+ // ignore non-JSON lines
109
  return;
110
  }
111
 
112
+ // Handle errors
113
  if (payload.error) {
114
+ onToken(`[ERROR] ${payload.error}`);
115
+ return;
116
  }
117
 
 
 
 
118
  const delta = payload.choices?.[0]?.delta;
119
  if (!delta) return;
120
 
121
+ // Stream text content
122
  if (delta.content) {
123
  assistantText += delta.content;
124
+ onToken(delta.content);
125
  }
126
 
127
  // Handle tool calls
 
135
  }
136
  }
137
 
138
+ // Finish
139
  if (payload.choices?.[0]?.finish_reason) {
140
  ws.close();
141
 
 
155
  }
156
  });
157
 
158
+ ws.on("error", reject);
159
+ ws.on("close", () => {
160
+ // If close happens without finish_reason
161
+ if (!assistantText) reject(new Error("WebSocket closed prematurely"));
162
  });
163
  });
164
  }