Andrew commited on
Commit
e3a20a9
·
2 Parent(s): a16fa47155969c

Merge branch 'refactor/core-messaging'

Browse files
src/lib/types/MessageContext.ts ADDED
@@ -0,0 +1,41 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import type { Message } from "./Message";
2
+ import type { v4 } from "uuid";
3
+ import type { goto, invalidate } from "$app/navigation";
4
+
5
+ export interface WriteMessageContext {
6
+ page: { params: { id: string } };
7
+ messages: Message[];
8
+ messagesPath: Message[];
9
+ data: { rootMessageId: string };
10
+ files: File[];
11
+ settings: {
12
+ disableStream: boolean;
13
+ personas?: Array<{ id: string; name: string }>;
14
+ };
15
+ isAborted: () => boolean;
16
+ branchState: {
17
+ messageId: string;
18
+ personaId: string;
19
+ personaName: string;
20
+ } | null;
21
+
22
+ setLoading: (val: boolean) => void;
23
+ setPending: (val: boolean) => void;
24
+ setFiles: (val: File[]) => void;
25
+ setError: (val: string) => void;
26
+ setIsAborted: (val: boolean) => void;
27
+ setTitleUpdate: (val: { title: string; convId: string }) => void;
28
+ onTitleUpdate?: (title: string) => void;
29
+ onMessageCreated?: (id: string) => void;
30
+ updateBranchState: (val: unknown) => void;
31
+ invalidate: typeof invalidate;
32
+ goto: typeof goto;
33
+ }
34
+
35
+ export interface WriteMessageParams {
36
+ prompt?: string;
37
+ messageId?: ReturnType<typeof v4>;
38
+ isRetry?: boolean;
39
+ isContinue?: boolean;
40
+ personaId?: string;
41
+ }
src/lib/utils/message/ConversationTreeManager.ts ADDED
@@ -0,0 +1,268 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import type { Message, MessageFile } from "$lib/types/Message";
2
+ import { MessageRole } from "$lib/types/Message";
3
+ import { addChildren } from "$lib/utils/tree/addChildren";
4
+ import { addSibling } from "$lib/utils/tree/addSibling";
5
+ import type { WriteMessageContext, WriteMessageParams } from "$lib/types/MessageContext";
6
+
7
+ export class ConversationTreeManager {
8
+ constructor(private ctx: WriteMessageContext) {}
9
+
10
+ public prepareMessageForWrite(
11
+ params: WriteMessageParams,
12
+ base64Files: MessageFile[] = []
13
+ ): {
14
+ messageToWriteToId: string;
15
+ navigateToMessageId: string | null;
16
+ } {
17
+ const {
18
+ prompt,
19
+ messageId = this.ctx.messagesPath.at(-1)?.id ?? undefined,
20
+ isRetry = false,
21
+ isContinue = false,
22
+ personaId,
23
+ } = params;
24
+
25
+ let messageToWriteToId: string | undefined;
26
+ let navigateToMessageId: string | null = null;
27
+
28
+ if (isContinue && messageId) {
29
+ const msg = this.ctx.messages.find((m) => m.id === messageId);
30
+ if ((msg?.children?.length ?? 0) > 0) {
31
+ throw new Error("Can only continue the last message");
32
+ }
33
+ messageToWriteToId = messageId;
34
+ } else if (isRetry && messageId) {
35
+ const messageToRetry = this.ctx.messages.find((m) => m.id === messageId);
36
+ if (!messageToRetry) {
37
+ throw new Error("Message not found");
38
+ }
39
+
40
+ if (messageToRetry.from === MessageRole.User && prompt !== undefined) {
41
+ const newUserMessageId = addSibling(
42
+ {
43
+ messages: this.ctx.messages,
44
+ rootMessageId: this.ctx.data.rootMessageId,
45
+ },
46
+ {
47
+ from: MessageRole.User,
48
+ content: prompt,
49
+ files: messageToRetry.files,
50
+ ...(messageToRetry.branchedFrom && {
51
+ branchedFrom: messageToRetry.branchedFrom,
52
+ }),
53
+ },
54
+ messageId
55
+ );
56
+
57
+ const targetPersonaId =
58
+ this.ctx.branchState?.personaId || messageToRetry.branchedFrom?.personaId;
59
+ const initialResponses: Message["personaResponses"] = [];
60
+ if (targetPersonaId) {
61
+ const persona = this.ctx.settings.personas?.find((p) => p.id === targetPersonaId);
62
+ initialResponses.push({
63
+ personaId: targetPersonaId,
64
+ personaName: this.ctx.branchState?.personaName || persona?.name || targetPersonaId,
65
+ content: "",
66
+ });
67
+ }
68
+
69
+ messageToWriteToId = addChildren(
70
+ {
71
+ messages: this.ctx.messages,
72
+ rootMessageId: this.ctx.data.rootMessageId,
73
+ },
74
+ {
75
+ from: MessageRole.Assistant,
76
+ content: "",
77
+ personaResponses: initialResponses,
78
+ ...((this.ctx.branchState || messageToRetry.branchedFrom) && {
79
+ branchedFrom: this.ctx.branchState
80
+ ? {
81
+ messageId: this.ctx.branchState.messageId,
82
+ personaId: this.ctx.branchState.personaId,
83
+ }
84
+ : messageToRetry.branchedFrom,
85
+ }),
86
+ },
87
+ newUserMessageId
88
+ );
89
+
90
+ if (messageToRetry.branchedFrom && !this.ctx.branchState) {
91
+ const persona = this.ctx.settings.personas?.find(
92
+ (p) => p.id === messageToRetry.branchedFrom?.personaId
93
+ );
94
+ this.ctx.updateBranchState({
95
+ messageId: messageToRetry.branchedFrom.messageId,
96
+ personaId: messageToRetry.branchedFrom.personaId,
97
+ personaName: persona?.name || messageToRetry.branchedFrom.personaId,
98
+ });
99
+ navigateToMessageId = newUserMessageId;
100
+ }
101
+ this.ctx.onMessageCreated?.(messageToWriteToId);
102
+ } else if (messageToRetry.from === MessageRole.User && prompt === undefined) {
103
+ messageToWriteToId = addChildren(
104
+ {
105
+ messages: this.ctx.messages,
106
+ rootMessageId: this.ctx.data.rootMessageId,
107
+ },
108
+ {
109
+ from: MessageRole.Assistant,
110
+ content: "",
111
+ personaResponses: [],
112
+ ...(this.ctx.branchState && {
113
+ branchedFrom: {
114
+ messageId: this.ctx.branchState.messageId,
115
+ personaId: this.ctx.branchState.personaId,
116
+ },
117
+ }),
118
+ },
119
+ messageId
120
+ );
121
+ navigateToMessageId = messageToWriteToId;
122
+ this.ctx.onMessageCreated?.(messageToWriteToId);
123
+ } else if (messageToRetry.from === MessageRole.Assistant) {
124
+ let initialPersonaResponses: Message["personaResponses"] = [];
125
+ if (personaId && messageToRetry.personaResponses) {
126
+ initialPersonaResponses = messageToRetry.personaResponses.map((p) => {
127
+ if (p.personaId === personaId) {
128
+ return {
129
+ ...p,
130
+ content: "",
131
+ interrupted: undefined,
132
+ reasoning: undefined,
133
+ updates: undefined,
134
+ routerMetadata: undefined,
135
+ };
136
+ }
137
+ // Defensive copy using structuredClone
138
+ return structuredClone(p);
139
+ });
140
+ }
141
+
142
+ messageToWriteToId = addSibling(
143
+ {
144
+ messages: this.ctx.messages,
145
+ rootMessageId: this.ctx.data.rootMessageId,
146
+ },
147
+ {
148
+ from: MessageRole.Assistant,
149
+ content: "",
150
+ personaResponses: initialPersonaResponses,
151
+ ...((this.ctx.branchState || messageToRetry.branchedFrom) && {
152
+ branchedFrom: this.ctx.branchState
153
+ ? {
154
+ messageId: this.ctx.branchState.messageId,
155
+ personaId: this.ctx.branchState.personaId,
156
+ }
157
+ : messageToRetry.branchedFrom,
158
+ }),
159
+ },
160
+ messageId
161
+ );
162
+
163
+ if (messageToRetry.branchedFrom && !this.ctx.branchState) {
164
+ const persona = this.ctx.settings.personas?.find(
165
+ (p) => p.id === messageToRetry.branchedFrom?.personaId
166
+ );
167
+ this.ctx.updateBranchState({
168
+ messageId: messageToRetry.branchedFrom.messageId,
169
+ personaId: messageToRetry.branchedFrom.personaId,
170
+ personaName: persona?.name || messageToRetry.branchedFrom.personaId,
171
+ });
172
+ navigateToMessageId = messageToWriteToId;
173
+ }
174
+ this.ctx.onMessageCreated?.(messageToWriteToId);
175
+ }
176
+ } else {
177
+ // New message
178
+ const newUserMessageId = addChildren(
179
+ {
180
+ messages: this.ctx.messages,
181
+ rootMessageId: this.ctx.data.rootMessageId,
182
+ },
183
+ {
184
+ from: MessageRole.User,
185
+ content: prompt ?? "",
186
+ files: base64Files,
187
+ ...(this.ctx.branchState && {
188
+ branchedFrom: {
189
+ messageId: this.ctx.branchState.messageId,
190
+ personaId: this.ctx.branchState.personaId,
191
+ },
192
+ }),
193
+ },
194
+ messageId
195
+ );
196
+
197
+ if (!this.ctx.data.rootMessageId) {
198
+ this.ctx.data.rootMessageId = newUserMessageId;
199
+ }
200
+
201
+ messageToWriteToId = addChildren(
202
+ {
203
+ messages: this.ctx.messages,
204
+ rootMessageId: this.ctx.data.rootMessageId,
205
+ },
206
+ {
207
+ from: MessageRole.Assistant,
208
+ content: "",
209
+ personaResponses: [],
210
+ ...(this.ctx.branchState && {
211
+ branchedFrom: {
212
+ messageId: this.ctx.branchState.messageId,
213
+ personaId: this.ctx.branchState.personaId,
214
+ },
215
+ }),
216
+ },
217
+ newUserMessageId
218
+ );
219
+
220
+ this.ctx.onMessageCreated?.(messageToWriteToId);
221
+ }
222
+
223
+ if (!messageToWriteToId) {
224
+ throw new Error("Failed to determine message ID to write to");
225
+ }
226
+
227
+ return { messageToWriteToId, navigateToMessageId };
228
+ }
229
+
230
+ /**
231
+ * Safely updates a message ID in the tree, ensuring parent linkage is maintained.
232
+ */
233
+ public syncMessageId(oldId: string, newId: string): void {
234
+ const message = this.ctx.messages.find((m) => m.id === oldId || m.id === newId);
235
+ if (!message) return;
236
+
237
+ // If ID is already updated, just return
238
+ if (message.id === newId) return;
239
+
240
+ message.id = newId;
241
+
242
+ if (message.ancestors && message.ancestors.length > 0) {
243
+ const parentId = message.ancestors[message.ancestors.length - 1];
244
+ const parent = this.ctx.messages.find((m) => m.id === parentId);
245
+
246
+ if (parent) {
247
+ if (!parent.children) parent.children = [];
248
+
249
+ const childIndex = parent.children.indexOf(oldId);
250
+ if (childIndex !== -1) {
251
+ parent.children[childIndex] = newId;
252
+ } else {
253
+ // Fallback: append if not found
254
+ console.warn(
255
+ `[TreeManager] Parent ${parentId} missing child ${oldId}, appending ${newId}`
256
+ );
257
+ parent.children.push(newId);
258
+ }
259
+ } else {
260
+ console.error(
261
+ `[TreeManager] Parent ${parentId} not found for message ${oldId} -> ${newId}`
262
+ );
263
+ }
264
+ }
265
+
266
+ console.debug(`[TreeManager] Synced message ID: ${oldId} -> ${newId}`);
267
+ }
268
+ }
src/lib/utils/message/MessageStreamHandler.ts ADDED
@@ -0,0 +1,228 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import {
2
+ MessageReasoningUpdateType,
3
+ MessageUpdateStatus,
4
+ MessageUpdateType,
5
+ } from "$lib/types/MessageUpdate";
6
+ import { fetchMessageUpdates } from "$lib/utils/messageUpdates";
7
+ import { updateDebouncer } from "$lib/utils/updates.js";
8
+ import type { WriteMessageContext } from "$lib/types/MessageContext";
9
+ import type { ConversationTreeManager } from "./ConversationTreeManager";
10
+
11
+ export class MessageStreamHandler {
12
+ constructor(
13
+ private ctx: WriteMessageContext,
14
+ private treeManager: ConversationTreeManager
15
+ ) {}
16
+
17
+ public async handleStream(
18
+ conversationId: string,
19
+ params: {
20
+ base: string;
21
+ prompt?: string;
22
+ messageId?: string;
23
+ isRetry: boolean;
24
+ isContinue: boolean;
25
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
26
+ files?: any[];
27
+ personaId?: string;
28
+ },
29
+ messageToWriteToId: string
30
+ ) {
31
+ const messageToWriteTo = this.ctx.messages.find((m) => m.id === messageToWriteToId);
32
+ if (!messageToWriteTo) {
33
+ throw new Error("Message to write to not found");
34
+ }
35
+
36
+ const messageUpdatesAbortController = new AbortController();
37
+
38
+ const messageUpdatesIterator = await fetchMessageUpdates(
39
+ conversationId,
40
+ {
41
+ base: params.base,
42
+ inputs: params.prompt,
43
+ messageId: params.messageId,
44
+ isRetry: params.isRetry,
45
+ isContinue: params.isContinue,
46
+ files: params.files,
47
+ personaId: params.personaId,
48
+ branchedFrom: this.ctx.branchState
49
+ ? {
50
+ messageId: this.ctx.branchState.messageId,
51
+ personaId: this.ctx.branchState.personaId,
52
+ }
53
+ : undefined,
54
+ },
55
+ messageUpdatesAbortController.signal
56
+ ).catch((err) => {
57
+ this.ctx.setError(err.message);
58
+ });
59
+
60
+ if (messageUpdatesIterator === undefined) return;
61
+
62
+ this.ctx.setFiles([]);
63
+ let buffer = "";
64
+ let lastUpdateTime = new Date();
65
+
66
+ let reasoningBuffer = "";
67
+ let reasoningLastUpdate = new Date();
68
+
69
+ const personaBuffers = new Map<string, string>();
70
+ const personaLastUpdateTimes = new Map<string, Date>();
71
+
72
+ for await (const update of messageUpdatesIterator) {
73
+ if (this.ctx.isAborted()) {
74
+ messageUpdatesAbortController.abort();
75
+ return;
76
+ }
77
+
78
+ if (update.type === MessageUpdateType.Stream) {
79
+ update.token = update.token.replaceAll("\0", "");
80
+ }
81
+
82
+ const isHighFrequencyUpdate =
83
+ (update.type === MessageUpdateType.Reasoning &&
84
+ update.subtype === MessageReasoningUpdateType.Stream) ||
85
+ update.type === MessageUpdateType.Stream ||
86
+ update.type === MessageUpdateType.Persona ||
87
+ (update.type === MessageUpdateType.Status &&
88
+ update.status === MessageUpdateStatus.KeepAlive);
89
+
90
+ if (!isHighFrequencyUpdate) {
91
+ messageToWriteTo.updates = [...(messageToWriteTo.updates ?? []), update];
92
+ }
93
+ const currentTime = new Date();
94
+
95
+ if (update.type === MessageUpdateType.PersonaInit) {
96
+ const newResponses = update.personas.map((p) => ({
97
+ personaId: p.personaId,
98
+ personaName: p.personaName,
99
+ personaOccupation: p.personaOccupation,
100
+ personaStance: p.personaStance,
101
+ content: "",
102
+ }));
103
+
104
+ if (!messageToWriteTo.personaResponses) {
105
+ messageToWriteTo.personaResponses = newResponses;
106
+ } else {
107
+ // Merge with existing personas (preserving those not in the update)
108
+ for (const newRes of newResponses) {
109
+ const existingIdx = messageToWriteTo.personaResponses.findIndex(
110
+ (p) => p.personaId === newRes.personaId
111
+ );
112
+ if (existingIdx !== -1) {
113
+ // Update existing persona in place
114
+ messageToWriteTo.personaResponses[existingIdx] = {
115
+ ...messageToWriteTo.personaResponses[existingIdx],
116
+ ...newRes,
117
+ content: messageToWriteTo.personaResponses[existingIdx].content || newRes.content,
118
+ };
119
+ } else {
120
+ messageToWriteTo.personaResponses.push(newRes);
121
+ }
122
+ }
123
+ }
124
+ } else if (update.type === MessageUpdateType.Persona) {
125
+ if (!messageToWriteTo.personaResponses) {
126
+ messageToWriteTo.personaResponses = [];
127
+ }
128
+
129
+ let personaResponse = messageToWriteTo.personaResponses.find(
130
+ (pr) => pr.personaId === update.personaId
131
+ );
132
+ if (!personaResponse) {
133
+ personaResponse = {
134
+ personaId: update.personaId,
135
+ personaName: update.personaName,
136
+ personaOccupation: update.personaOccupation,
137
+ personaStance: update.personaStance,
138
+ content: "",
139
+ };
140
+ messageToWriteTo.personaResponses.push(personaResponse);
141
+ }
142
+
143
+ if (update.updateType === "stream" && update.token && !this.ctx.settings.disableStream) {
144
+ const personaBuffer = personaBuffers.get(update.personaId) || "";
145
+ const newBuffer = personaBuffer + update.token;
146
+ personaBuffers.set(update.personaId, newBuffer);
147
+
148
+ const lastUpdate = personaLastUpdateTimes.get(update.personaId) || new Date(0);
149
+ if (currentTime.getTime() - lastUpdate.getTime() > updateDebouncer.maxUpdateTime) {
150
+ personaResponse.content += newBuffer;
151
+ personaBuffers.set(update.personaId, "");
152
+ personaLastUpdateTimes.set(update.personaId, currentTime);
153
+ }
154
+ this.ctx.setPending(false);
155
+ } else if (update.updateType === "finalAnswer" && update.text) {
156
+ personaResponse.content = update.text;
157
+ personaResponse.interrupted = update.interrupted;
158
+ } else if (update.updateType === "routerMetadata" && update.route && update.model) {
159
+ personaResponse.routerMetadata = {
160
+ route: update.route,
161
+ model: update.model,
162
+ };
163
+ } else if (update.updateType === "status" && update.error) {
164
+ personaResponse.interrupted = true;
165
+ personaResponse.content = personaResponse.content || `Error: ${update.error}`;
166
+ }
167
+ } else if (update.type === MessageUpdateType.Stream && !this.ctx.settings.disableStream) {
168
+ buffer += update.token;
169
+ if (currentTime.getTime() - lastUpdateTime.getTime() > updateDebouncer.maxUpdateTime) {
170
+ messageToWriteTo.content += buffer;
171
+ buffer = "";
172
+ lastUpdateTime = currentTime;
173
+ }
174
+ this.ctx.setPending(false);
175
+ } else if (
176
+ update.type === MessageUpdateType.Status &&
177
+ update.status === MessageUpdateStatus.Error
178
+ ) {
179
+ this.ctx.setError(update.message ?? "An error has occurred");
180
+ } else if (
181
+ update.type === MessageUpdateType.Status &&
182
+ update.status === MessageUpdateStatus.Started &&
183
+ update.messageId
184
+ ) {
185
+ if (messageToWriteTo.id !== update.messageId) {
186
+ // Use TreeManager to safely update the ID and parent links
187
+ const oldId = messageToWriteTo.id;
188
+ this.treeManager.syncMessageId(oldId, update.messageId);
189
+ }
190
+ } else if (update.type === MessageUpdateType.Title) {
191
+ this.ctx.setTitleUpdate({
192
+ title: update.title,
193
+ convId: conversationId,
194
+ });
195
+ this.ctx.onTitleUpdate?.(update.title);
196
+ } else if (update.type === MessageUpdateType.File) {
197
+ messageToWriteTo.files = [
198
+ ...(messageToWriteTo.files ?? []),
199
+ { type: "hash", value: update.sha, mime: update.mime, name: update.name },
200
+ ];
201
+ } else if (update.type === MessageUpdateType.Reasoning) {
202
+ if (!messageToWriteTo.reasoning) {
203
+ messageToWriteTo.reasoning = "";
204
+ }
205
+ if (update.subtype === MessageReasoningUpdateType.Stream) {
206
+ reasoningBuffer += update.token;
207
+ if (
208
+ currentTime.getTime() - reasoningLastUpdate.getTime() >
209
+ updateDebouncer.maxUpdateTime
210
+ ) {
211
+ messageToWriteTo.reasoning += reasoningBuffer;
212
+ reasoningBuffer = "";
213
+ reasoningLastUpdate = currentTime;
214
+ }
215
+ }
216
+ } else if (update.type === MessageUpdateType.RouterMetadata) {
217
+ messageToWriteTo.routerMetadata = {
218
+ route: update.route,
219
+ model: update.model,
220
+ };
221
+ } else if (update.type === MessageUpdateType.FinalAnswer) {
222
+ messageToWriteTo.content = update.text;
223
+ messageToWriteTo.interrupted = update.interrupted;
224
+ this.ctx.setPending(false);
225
+ }
226
+ }
227
+ }
228
+ }