incognitolm commited on
Commit
3adb92a
Β·
verified Β·
1 Parent(s): 4f18d5e

Update server/chatStream.js

Browse files
Files changed (1) hide show
  1. server/chatStream.js +79 -113
server/chatStream.js CHANGED
@@ -1,3 +1,4 @@
 
1
  import { LIGHTNING_BASE } from "./config.js";
2
 
3
  const SYSTEM_PROMPT =
@@ -15,6 +16,59 @@ const SYSTEM_PROMPT =
15
  "Never use single backslashes. You may use emojis where appropriate. " +
16
  "Use markdown for everything other than coloring your text. Use tables, lists, and other markdown elements.";
17
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
18
  export async function streamChat(ws, {
19
  sessionId,
20
  model,
@@ -30,12 +84,8 @@ export async function streamChat(ws, {
30
  onNewAsset,
31
  abortSignal,
32
  }) {
33
- const headers = {
34
- "Content-Type": "application/json",
35
- "Accept": "text/event-stream",
36
- };
37
- if (accessToken) headers["Authorization"] = `Bearer ${accessToken}`;
38
- if (clientId) headers["X-Client-ID"] = clientId;
39
 
40
  const messages = [
41
  { role: "system", content: SYSTEM_PROMPT },
@@ -43,128 +93,44 @@ export async function streamChat(ws, {
43
  { role: "user", content: userMessage },
44
  ];
45
 
46
- const enabledTools = buildToolList(tools);
47
-
48
  try {
49
- const response = await fetch(`${LIGHTNING_BASE}/gen/chat/completions`, {
50
- method: "POST",
51
- headers,
52
- body: JSON.stringify({
53
- model: model || "lightning",
54
- messages,
55
- tools: enabledTools.length > 0 ? enabledTools : undefined,
56
- stream: true,
57
- }),
58
- signal: abortSignal,
59
- });
60
-
61
- if (!response.ok) {
62
- const err = await response.text();
63
- onError(err);
64
- return;
65
- }
66
 
67
- const reader = response.body.getReader();
68
- const decoder = new TextDecoder();
69
- let assistantText = "";
70
- const toolCallBuffer = new Map();
71
- let buffer = "";
72
-
73
- while (true) {
74
- const { done, value } = await reader.read();
75
- if (done) break;
76
-
77
- buffer += decoder.decode(value, { stream: true });
78
- const lines = buffer.split("\n");
79
- buffer = lines.pop() || "";
80
-
81
- for (const line of lines) {
82
- if (!line.startsWith("data: ")) continue;
83
- const data = line.slice(6).trim();
84
- if (data === "[DONE]") continue;
85
-
86
- let chunk;
87
- try { chunk = JSON.parse(data); } catch { continue; }
88
-
89
- const delta = chunk.choices?.[0]?.delta;
90
- if (!delta) continue;
91
-
92
- if (delta.content) {
93
- assistantText += delta.content;
94
- onToken(delta.content);
95
- }
96
-
97
- if (delta.tool_calls) {
98
- for (const call of delta.tool_calls) {
99
- const entry = toolCallBuffer.get(call.index) || { arguments: "" };
100
- if (call.id) entry.id = call.id;
101
- if (call.function?.name) entry.name = call.function.name;
102
- if (call.function?.arguments) entry.arguments += call.function.arguments;
103
- toolCallBuffer.set(call.index, entry);
104
- }
105
- }
106
- }
107
- }
108
-
109
- // Process tool calls
110
- const toolCalls = [...toolCallBuffer.values()].map(t => ({
111
- id: t.id || `call_${crypto.randomUUID()}`,
112
- type: "function",
113
- function: { name: t.name, arguments: t.arguments },
114
- }));
115
 
 
116
  if (toolCalls.length > 0) {
117
- const toolResults = await processToolCalls(ws, toolCalls, tools, accessToken, clientId, abortSignal, onToolCall, onNewAsset);
 
 
118
 
119
- // Follow-up response after tool calls
120
  const followUpMessages = [
121
- { role: "system", content: SYSTEM_PROMPT },
122
  ...history.map(normalizeMessage).filter(Boolean),
123
- { role: "user", content: userMessage },
124
  { role: "assistant", content: assistantText || "", tool_calls: toolCalls },
125
  ...toolResults,
126
  ];
127
 
128
- const followUp = await fetch(`${LIGHTNING_BASE}/gen/chat/completions`, {
129
- method: "POST",
130
- headers: { ...headers, "Accept": "text/event-stream" },
131
- body: JSON.stringify({
132
- model: model || "lightning",
133
- messages: followUpMessages,
134
- stream: true,
135
- }),
136
- signal: abortSignal,
137
- });
138
-
139
- if (followUp.ok) {
140
- const fuReader = followUp.body.getReader();
141
- const fuDecoder = new TextDecoder(); // fresh decoder β€” reusing the first one corrupts the stream
142
- let fuBuffer = "";
143
- while (true) {
144
- const { done, value } = await fuReader.read();
145
- if (done) break;
146
- fuBuffer += fuDecoder.decode(value, { stream: true });
147
- const fuLines = fuBuffer.split("\n");
148
- fuBuffer = fuLines.pop() || "";
149
- for (const line of fuLines) {
150
- if (!line.startsWith("data: ")) continue;
151
- const data = line.slice(6).trim();
152
- if (data === "[DONE]") continue;
153
- let chunk;
154
- try { chunk = JSON.parse(data); } catch { continue; }
155
- const delta = chunk.choices?.[0]?.delta;
156
- if (delta?.content) {
157
- assistantText += delta.content;
158
- onToken(delta.content);
159
- }
160
- }
161
- }
162
- }
163
  }
164
 
165
  onDone(assistantText, toolCalls);
166
  } catch (err) {
167
- if (err.name === "AbortError") {
168
  onDone(null, null, true); // aborted
169
  } else {
170
  onError(String(err));
 
1
+ import OpenAI from "openai";
2
  import { LIGHTNING_BASE } from "./config.js";
3
 
4
  const SYSTEM_PROMPT =
 
16
  "Never use single backslashes. You may use emojis where appropriate. " +
17
  "Use markdown for everything other than coloring your text. Use tables, lists, and other markdown elements.";
18
 
19
+ /**
20
+ * Build a per-request OpenAI client pointed at the Lightning backend.
21
+ * A new client is created each call so per-user auth headers are always fresh.
22
+ */
23
+ function makeClient(accessToken, clientId) {
24
+ return new OpenAI({
25
+ apiKey: accessToken || "no-key",
26
+ baseURL: `${LIGHTNING_BASE}/gen`,
27
+ defaultHeaders: {
28
+ ...(accessToken ? { Authorization: `Bearer ${accessToken}` } : {}),
29
+ ...(clientId ? { "X-Client-ID": clientId } : {}),
30
+ },
31
+ });
32
+ }
33
+
34
+ /**
35
+ * Consume an OpenAI streaming response, firing onToken for each text delta
36
+ * and collecting any tool-call chunks into a finished toolCalls array.
37
+ * Returns { assistantText, toolCalls }.
38
+ */
39
+ async function consumeStream(stream, onToken) {
40
+ let assistantText = "";
41
+ const toolCallBuffer = new Map();
42
+
43
+ for await (const chunk of stream) {
44
+ const delta = chunk.choices?.[0]?.delta;
45
+ if (!delta) continue;
46
+
47
+ if (delta.content) {
48
+ assistantText += delta.content;
49
+ onToken(delta.content);
50
+ }
51
+
52
+ if (delta.tool_calls) {
53
+ for (const call of delta.tool_calls) {
54
+ const entry = toolCallBuffer.get(call.index) ?? { arguments: "" };
55
+ if (call.id) entry.id = call.id;
56
+ if (call.function?.name) entry.name = call.function.name;
57
+ if (call.function?.arguments) entry.arguments += call.function.arguments;
58
+ toolCallBuffer.set(call.index, entry);
59
+ }
60
+ }
61
+ }
62
+
63
+ const toolCalls = [...toolCallBuffer.values()].map(t => ({
64
+ id: t.id || `call_${crypto.randomUUID()}`,
65
+ type: "function",
66
+ function: { name: t.name, arguments: t.arguments },
67
+ }));
68
+
69
+ return { assistantText, toolCalls };
70
+ }
71
+
72
  export async function streamChat(ws, {
73
  sessionId,
74
  model,
 
84
  onNewAsset,
85
  abortSignal,
86
  }) {
87
+ const client = makeClient(accessToken, clientId);
88
+ const enabledTools = buildToolList(tools);
 
 
 
 
89
 
90
  const messages = [
91
  { role: "system", content: SYSTEM_PROMPT },
 
93
  { role: "user", content: userMessage },
94
  ];
95
 
 
 
96
  try {
97
+ // ── First stream ────────────────────────────────────────────────────────
98
+ const stream = await client.chat.completions.create({
99
+ model: model || "lightning",
100
+ messages,
101
+ tools: enabledTools.length > 0 ? enabledTools : undefined,
102
+ stream: true,
103
+ }, { signal: abortSignal });
 
 
 
 
 
 
 
 
 
 
104
 
105
+ let { assistantText, toolCalls } = await consumeStream(stream, onToken);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
106
 
107
+ // ── Tool calls β†’ follow-up stream ───────────────────────────────────────
108
  if (toolCalls.length > 0) {
109
+ const toolResults = await processToolCalls(
110
+ ws, toolCalls, tools, accessToken, clientId, abortSignal, onToolCall, onNewAsset,
111
+ );
112
 
 
113
  const followUpMessages = [
114
+ { role: "system", content: SYSTEM_PROMPT },
115
  ...history.map(normalizeMessage).filter(Boolean),
116
+ { role: "user", content: userMessage },
117
  { role: "assistant", content: assistantText || "", tool_calls: toolCalls },
118
  ...toolResults,
119
  ];
120
 
121
+ const followUpStream = await client.chat.completions.create({
122
+ model: model || "lightning",
123
+ messages: followUpMessages,
124
+ stream: true,
125
+ }, { signal: abortSignal });
126
+
127
+ const followUp = await consumeStream(followUpStream, onToken);
128
+ assistantText += followUp.assistantText;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
129
  }
130
 
131
  onDone(assistantText, toolCalls);
132
  } catch (err) {
133
+ if (err.name === "AbortError" || err.constructor?.name === "APIUserAbortError") {
134
  onDone(null, null, true); // aborted
135
  } else {
136
  onError(String(err));