victor HF Staff commited on
Commit
4e8a811
·
unverified ·
1 Parent(s): cede9a2

Abort (#1924)

Browse files

* Add AbortRegistry for user-initiated generation cancellation

Introduces an AbortRegistry singleton to track and manage AbortControllers for active conversation generations, enabling user-initiated cancellation of ongoing requests. Updates text generation and endpoint logic to support abort signals, and ensures proper cleanup and partial response handling when a generation is aborted. The stop-generating endpoint now triggers aborts via the registry.

* Remove 'continue message' feature from chat flow

Eliminated the 'continue message' functionality and related code, including the ContinueBtn component, associated props, and logic in both frontend and backend. This simplifies the message handling flow and removes unused parameters and UI elements.

src/lib/buildPrompt.ts CHANGED
@@ -1,7 +1,7 @@
1
  import type { EndpointParameters } from "./server/endpoints/endpoints";
2
  import type { BackendModel } from "./server/models";
3
 
4
- type buildPromptOptions = Pick<EndpointParameters, "messages" | "preprompt" | "continueMessage"> & {
5
  model: BackendModel;
6
  };
7
 
@@ -9,7 +9,6 @@ export async function buildPrompt({
9
  messages,
10
  model,
11
  preprompt,
12
- continueMessage,
13
  }: buildPromptOptions): Promise<string> {
14
  const filteredMessages = messages;
15
 
@@ -17,36 +16,18 @@ export async function buildPrompt({
17
  filteredMessages[0].content = preprompt;
18
  }
19
 
20
- let prompt = model
21
  .chatPromptRender({
22
  messages: filteredMessages.map((m) => ({
23
  ...m,
24
  role: m.from,
25
  })),
26
  preprompt,
27
- continueMessage,
28
  })
29
  // Not super precise, but it's truncated in the model's backend anyway
30
  .split(" ")
31
  .slice(-(model.parameters?.truncate ?? 0))
32
  .join(" ");
33
 
34
- if (continueMessage && model.parameters?.stop) {
35
- let trimmedPrompt = prompt.trimEnd();
36
- let hasRemovedStop = true;
37
- while (hasRemovedStop) {
38
- hasRemovedStop = false;
39
- for (const stopToken of model.parameters.stop) {
40
- if (trimmedPrompt.endsWith(stopToken)) {
41
- trimmedPrompt = trimmedPrompt.slice(0, -stopToken.length);
42
- hasRemovedStop = true;
43
- break;
44
- }
45
- }
46
- trimmedPrompt = trimmedPrompt.trimEnd();
47
- }
48
- prompt = trimmedPrompt;
49
- }
50
-
51
  return prompt;
52
  }
 
1
  import type { EndpointParameters } from "./server/endpoints/endpoints";
2
  import type { BackendModel } from "./server/models";
3
 
4
+ type buildPromptOptions = Pick<EndpointParameters, "messages" | "preprompt"> & {
5
  model: BackendModel;
6
  };
7
 
 
9
  messages,
10
  model,
11
  preprompt,
 
12
  }: buildPromptOptions): Promise<string> {
13
  const filteredMessages = messages;
14
 
 
16
  filteredMessages[0].content = preprompt;
17
  }
18
 
19
+ const prompt = model
20
  .chatPromptRender({
21
  messages: filteredMessages.map((m) => ({
22
  ...m,
23
  role: m.from,
24
  })),
25
  preprompt,
 
26
  })
27
  // Not super precise, but it's truncated in the model's backend anyway
28
  .split(" ")
29
  .slice(-(model.parameters?.truncate ?? 0))
30
  .join(" ");
31
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
32
  return prompt;
33
  }
src/lib/components/ContinueBtn.svelte DELETED
@@ -1,18 +0,0 @@
1
- <script lang="ts">
2
- import CarbonContinue from "~icons/carbon/continue";
3
-
4
- interface Props {
5
- classNames?: string;
6
- onClick?: () => void;
7
- }
8
-
9
- let { classNames = "", onClick }: Props = $props();
10
- </script>
11
-
12
- <button
13
- type="button"
14
- onclick={onClick}
15
- class="btn flex h-8 rounded-lg border bg-white px-3 py-1 text-gray-500 shadow-sm transition-all hover:bg-gray-100 dark:border-gray-600 dark:bg-gray-700 dark:text-gray-300 dark:hover:bg-gray-600 {classNames}"
16
- >
17
- <CarbonContinue class="mr-2 text-xs " /> Continue
18
- </button>
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/lib/components/chat/ChatWindow.svelte CHANGED
@@ -14,7 +14,6 @@
14
  import RetryBtn from "../RetryBtn.svelte";
15
  import file2base64 from "$lib/utils/file2base64";
16
  import { base } from "$app/paths";
17
- import ContinueBtn from "../ContinueBtn.svelte";
18
  import ChatMessage from "./ChatMessage.svelte";
19
  import ScrollToBottomBtn from "../ScrollToBottomBtn.svelte";
20
  import ScrollToPreviousBtn from "../ScrollToPreviousBtn.svelte";
@@ -49,7 +48,6 @@
49
  onmessage?: (content: string) => void;
50
  onstop?: () => void;
51
  onretry?: (payload: { id: Message["id"]; content?: string }) => void;
52
- oncontinue?: (payload: { id: Message["id"] }) => void;
53
  onshowAlternateMsg?: (payload: { id: Message["id"] }) => void;
54
  }
55
 
@@ -66,7 +64,6 @@
66
  onmessage,
67
  onstop,
68
  onretry,
69
- oncontinue,
70
  onshowAlternateMsg,
71
  }: Props = $props();
72
 
@@ -461,18 +458,6 @@
461
  }
462
  }}
463
  />
464
- {:else if messages && lastMessage && lastMessage.interrupted && !isReadOnly}
465
- <div class="ml-auto gap-2">
466
- <ContinueBtn
467
- onClick={() => {
468
- if (lastMessage && lastMessage.ancestors) {
469
- oncontinue?.({
470
- id: lastMessage?.id,
471
- });
472
- }
473
- }}
474
- />
475
- </div>
476
  {/if}
477
  {/if}
478
  </div>
 
14
  import RetryBtn from "../RetryBtn.svelte";
15
  import file2base64 from "$lib/utils/file2base64";
16
  import { base } from "$app/paths";
 
17
  import ChatMessage from "./ChatMessage.svelte";
18
  import ScrollToBottomBtn from "../ScrollToBottomBtn.svelte";
19
  import ScrollToPreviousBtn from "../ScrollToPreviousBtn.svelte";
 
48
  onmessage?: (content: string) => void;
49
  onstop?: () => void;
50
  onretry?: (payload: { id: Message["id"]; content?: string }) => void;
 
51
  onshowAlternateMsg?: (payload: { id: Message["id"] }) => void;
52
  }
53
 
 
64
  onmessage,
65
  onstop,
66
  onretry,
 
67
  onshowAlternateMsg,
68
  }: Props = $props();
69
 
 
458
  }
459
  }}
460
  />
 
 
 
 
 
 
 
 
 
 
 
 
461
  {/if}
462
  {/if}
463
  </div>
src/lib/server/abortRegistry.ts ADDED
@@ -0,0 +1,57 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import { logger } from "$lib/server/logger";
2
+
3
+ /**
4
+ * Tracks active upstream generation requests so they can be cancelled on demand.
5
+ * Multiple controllers can be registered per conversation (for threaded/background runs).
6
+ */
7
+ export class AbortRegistry {
8
+ private static instance: AbortRegistry;
9
+
10
+ private controllers = new Map<string, Set<AbortController>>();
11
+
12
+ public static getInstance(): AbortRegistry {
13
+ if (!AbortRegistry.instance) {
14
+ AbortRegistry.instance = new AbortRegistry();
15
+ }
16
+ return AbortRegistry.instance;
17
+ }
18
+
19
+ public register(conversationId: string, controller: AbortController) {
20
+ const key = conversationId.toString();
21
+ let set = this.controllers.get(key);
22
+ if (!set) {
23
+ set = new Set();
24
+ this.controllers.set(key, set);
25
+ }
26
+ set.add(controller);
27
+ controller.signal.addEventListener(
28
+ "abort",
29
+ () => {
30
+ this.unregister(key, controller);
31
+ },
32
+ { once: true }
33
+ );
34
+ }
35
+
36
+ public abort(conversationId: string) {
37
+ const set = this.controllers.get(conversationId);
38
+ if (!set?.size) return;
39
+
40
+ logger.debug({ conversationId }, "Aborting active generation via AbortRegistry");
41
+ for (const controller of set) {
42
+ if (!controller.signal.aborted) {
43
+ controller.abort();
44
+ }
45
+ }
46
+ this.controllers.delete(conversationId);
47
+ }
48
+
49
+ public unregister(conversationId: string, controller: AbortController) {
50
+ const set = this.controllers.get(conversationId);
51
+ if (!set) return;
52
+ set.delete(controller);
53
+ if (set.size === 0) {
54
+ this.controllers.delete(conversationId);
55
+ }
56
+ }
57
+ }
src/lib/server/endpoints/endpoints.ts CHANGED
@@ -12,11 +12,11 @@ export type EndpointMessage = Omit<Message, "id">;
12
  export interface EndpointParameters {
13
  messages: EndpointMessage[];
14
  preprompt?: Conversation["preprompt"];
15
- continueMessage?: boolean; // used to signal that the last message will be extended
16
  generateSettings?: Partial<Model["parameters"]>;
17
  isMultimodal?: boolean;
18
  conversationId?: ObjectId;
19
  locals: App.Locals | undefined;
 
20
  }
21
 
22
  export type TextGenerationStreamOutputSimplified = TextGenerationStreamOutput & {
 
12
  export interface EndpointParameters {
13
  messages: EndpointMessage[];
14
  preprompt?: Conversation["preprompt"];
 
15
  generateSettings?: Partial<Model["parameters"]>;
16
  isMultimodal?: boolean;
17
  conversationId?: ObjectId;
18
  locals: App.Locals | undefined;
19
+ abortSignal?: AbortSignal;
20
  }
21
 
22
  export type TextGenerationStreamOutputSimplified = TextGenerationStreamOutput & {
src/lib/server/endpoints/openai/endpointOai.ts CHANGED
@@ -109,14 +109,13 @@ export async function endpointOai(
109
  return async ({
110
  messages,
111
  preprompt,
112
- continueMessage,
113
  generateSettings,
114
  conversationId,
115
  locals,
 
116
  }) => {
117
  const prompt = await buildPrompt({
118
  messages,
119
- continueMessage,
120
  preprompt,
121
  model,
122
  });
@@ -141,6 +140,7 @@ export async function endpointOai(
141
  "X-use-cache": "false",
142
  ...(locals?.token ? { Authorization: `Bearer ${locals.token}` } : {}),
143
  },
 
144
  });
145
 
146
  return openAICompletionToTextGenerationStream(openAICompletion);
@@ -153,6 +153,7 @@ export async function endpointOai(
153
  conversationId,
154
  isMultimodal,
155
  locals,
 
156
  }) => {
157
  // Format messages for the chat API, handling multimodal content if supported
158
  let messagesOpenAI: OpenAI.Chat.Completions.ChatCompletionMessageParam[] =
@@ -203,6 +204,7 @@ export async function endpointOai(
203
  "X-use-cache": "false",
204
  ...(locals?.token ? { Authorization: `Bearer ${locals.token}` } : {}),
205
  },
 
206
  }
207
  );
208
  return openAIChatToTextGenerationStream(openChatAICompletion, () => routerMetadata);
@@ -216,6 +218,7 @@ export async function endpointOai(
216
  "X-use-cache": "false",
217
  ...(locals?.token ? { Authorization: `Bearer ${locals.token}` } : {}),
218
  },
 
219
  }
220
  );
221
  return openAIChatToTextGenerationSingle(openChatAICompletion, () => routerMetadata);
 
109
  return async ({
110
  messages,
111
  preprompt,
 
112
  generateSettings,
113
  conversationId,
114
  locals,
115
+ abortSignal,
116
  }) => {
117
  const prompt = await buildPrompt({
118
  messages,
 
119
  preprompt,
120
  model,
121
  });
 
140
  "X-use-cache": "false",
141
  ...(locals?.token ? { Authorization: `Bearer ${locals.token}` } : {}),
142
  },
143
+ signal: abortSignal,
144
  });
145
 
146
  return openAICompletionToTextGenerationStream(openAICompletion);
 
153
  conversationId,
154
  isMultimodal,
155
  locals,
156
+ abortSignal,
157
  }) => {
158
  // Format messages for the chat API, handling multimodal content if supported
159
  let messagesOpenAI: OpenAI.Chat.Completions.ChatCompletionMessageParam[] =
 
204
  "X-use-cache": "false",
205
  ...(locals?.token ? { Authorization: `Bearer ${locals.token}` } : {}),
206
  },
207
+ signal: abortSignal,
208
  }
209
  );
210
  return openAIChatToTextGenerationStream(openChatAICompletion, () => routerMetadata);
 
218
  "X-use-cache": "false",
219
  ...(locals?.token ? { Authorization: `Bearer ${locals.token}` } : {}),
220
  },
221
+ signal: abortSignal,
222
  }
223
  );
224
  return openAIChatToTextGenerationSingle(openChatAICompletion, () => routerMetadata);
src/lib/server/textGeneration/generate.ts CHANGED
@@ -20,10 +20,10 @@ export async function* generate(
20
  conv,
21
  messages,
22
  assistant,
23
- isContinue,
24
  promptedAt,
25
  forceMultimodal,
26
  locals,
 
27
  }: GenerateContext,
28
  preprompt?: string
29
  ): AsyncIterable<MessageUpdate> {
@@ -50,16 +50,18 @@ export async function* generate(
50
  };
51
  }
52
 
53
- for await (const output of await endpoint({
54
  messages,
55
  preprompt,
56
- continueMessage: isContinue,
57
  generateSettings: assistant?.generateSettings,
58
  // Allow user-level override to force multimodal
59
  isMultimodal: (forceMultimodal ?? false) || model.multimodal,
60
  conversationId: conv._id,
61
  locals,
62
- })) {
 
 
 
63
  // Check if this output contains router metadata
64
  if (
65
  "routerMetadata" in output &&
@@ -248,6 +250,9 @@ Do not use prefixes such as Response: or Answer: when answering to the user.`,
248
 
249
  if (date && date > promptedAt) {
250
  logger.info(`Aborting generation for conversation ${conv._id}`);
 
 
 
251
  break;
252
  }
253
 
 
20
  conv,
21
  messages,
22
  assistant,
 
23
  promptedAt,
24
  forceMultimodal,
25
  locals,
26
+ abortController,
27
  }: GenerateContext,
28
  preprompt?: string
29
  ): AsyncIterable<MessageUpdate> {
 
50
  };
51
  }
52
 
53
+ const stream = await endpoint({
54
  messages,
55
  preprompt,
 
56
  generateSettings: assistant?.generateSettings,
57
  // Allow user-level override to force multimodal
58
  isMultimodal: (forceMultimodal ?? false) || model.multimodal,
59
  conversationId: conv._id,
60
  locals,
61
+ abortSignal: abortController.signal,
62
+ });
63
+
64
+ for await (const output of stream) {
65
  // Check if this output contains router metadata
66
  if (
67
  "routerMetadata" in output &&
 
250
 
251
  if (date && date > promptedAt) {
252
  logger.info(`Aborting generation for conversation ${conv._id}`);
253
+ if (!abortController.signal.aborted) {
254
+ abortController.abort();
255
+ }
256
  break;
257
  }
258
 
src/lib/server/textGeneration/types.ts CHANGED
@@ -10,11 +10,11 @@ export interface TextGenerationContext {
10
  conv: Conversation;
11
  messages: Message[];
12
  assistant?: Pick<Assistant, "dynamicPrompt" | "generateSettings">;
13
- isContinue: boolean;
14
  promptedAt: Date;
15
  ip: string;
16
  username?: string;
17
  /** Force-enable multimodal handling for endpoints that support it */
18
  forceMultimodal?: boolean;
19
  locals: App.Locals | undefined;
 
20
  }
 
10
  conv: Conversation;
11
  messages: Message[];
12
  assistant?: Pick<Assistant, "dynamicPrompt" | "generateSettings">;
 
13
  promptedAt: Date;
14
  ip: string;
15
  username?: string;
16
  /** Force-enable multimodal handling for endpoints that support it */
17
  forceMultimodal?: boolean;
18
  locals: App.Locals | undefined;
19
+ abortController: AbortController;
20
  }
src/lib/types/Template.ts CHANGED
@@ -3,5 +3,4 @@ import type { Message } from "./Message";
3
  export type ChatTemplateInput = {
4
  messages: Pick<Message, "from" | "content" | "files">[];
5
  preprompt?: string;
6
- continueMessage?: boolean;
7
  };
 
3
  export type ChatTemplateInput = {
4
  messages: Pick<Message, "from" | "content" | "files">[];
5
  preprompt?: string;
 
6
  };
src/lib/utils/messageUpdates.ts CHANGED
@@ -12,7 +12,6 @@ type MessageUpdateRequestOptions = {
12
  inputs?: string;
13
  messageId?: string;
14
  isRetry: boolean;
15
- isContinue: boolean;
16
  files?: MessageFile[];
17
  };
18
  export async function fetchMessageUpdates(
@@ -29,7 +28,6 @@ export async function fetchMessageUpdates(
29
  inputs: opts.inputs,
30
  id: opts.messageId,
31
  is_retry: opts.isRetry,
32
- is_continue: opts.isContinue,
33
  });
34
 
35
  opts.files?.forEach((file) => {
 
12
  inputs?: string;
13
  messageId?: string;
14
  isRetry: boolean;
 
15
  files?: MessageFile[];
16
  };
17
  export async function fetchMessageUpdates(
 
28
  inputs: opts.inputs,
29
  id: opts.messageId,
30
  is_retry: opts.isRetry,
 
31
  });
32
 
33
  opts.files?.forEach((file) => {
src/routes/conversation/[id]/+page.svelte CHANGED
@@ -131,12 +131,10 @@
131
  prompt,
132
  messageId = messagesPath.at(-1)?.id ?? undefined,
133
  isRetry = false,
134
- isContinue = false,
135
  }: {
136
  prompt?: string;
137
  messageId?: ReturnType<typeof v4>;
138
  isRetry?: boolean;
139
- isContinue?: boolean;
140
  }): Promise<void> {
141
  try {
142
  $isAborted = false;
@@ -156,13 +154,7 @@
156
  let messageToWriteToId: Message["id"] | undefined = undefined;
157
  // used for building the prompt, subtree of the conversation that goes from the latest message to the root
158
 
159
- if (isContinue && messageId) {
160
- if ((messages.find((msg) => msg.id === messageId)?.children?.length ?? 0) > 0) {
161
- $error = "Can only continue the last message";
162
- } else {
163
- messageToWriteToId = messageId;
164
- }
165
- } else if (isRetry && messageId) {
166
  // two cases, if we're retrying a user message with a newPrompt set,
167
  // it means we're editing a user message
168
  // if we're retrying on an assistant message, newPrompt cannot be set
@@ -257,7 +249,6 @@
257
  inputs: prompt,
258
  messageId,
259
  isRetry,
260
- isContinue,
261
  files: isRetry ? userMessage?.files : base64Files,
262
  },
263
  messageUpdatesAbortController.signal
@@ -422,25 +413,6 @@
422
  messagesPath = createMessagesPath(messages, msgId);
423
  }
424
 
425
- async function onContinue(payload: { id: Message["id"] }) {
426
- if (!data.shared) {
427
- await writeMessage({ messageId: payload.id, isContinue: true });
428
- } else {
429
- await convFromShared()
430
- .then(async (convId) => {
431
- await goto(`${base}/conversation/${convId}`, { invalidateAll: true });
432
- })
433
- .then(
434
- async () =>
435
- await writeMessage({
436
- messageId: payload.id,
437
- isContinue: true,
438
- })
439
- )
440
- .finally(() => (loading = false));
441
- }
442
- }
443
-
444
  const settings = useSettingsStore();
445
  let messages = $state(data.messages);
446
  $effect(() => {
@@ -527,7 +499,6 @@
527
  bind:files
528
  onmessage={onMessage}
529
  onretry={onRetry}
530
- oncontinue={onContinue}
531
  onshowAlternateMsg={onShowAlternateMsg}
532
  onstop={async () => {
533
  await fetch(`${base}/conversation/${page.params.id}/stop-generating`, {
 
131
  prompt,
132
  messageId = messagesPath.at(-1)?.id ?? undefined,
133
  isRetry = false,
 
134
  }: {
135
  prompt?: string;
136
  messageId?: ReturnType<typeof v4>;
137
  isRetry?: boolean;
 
138
  }): Promise<void> {
139
  try {
140
  $isAborted = false;
 
154
  let messageToWriteToId: Message["id"] | undefined = undefined;
155
  // used for building the prompt, subtree of the conversation that goes from the latest message to the root
156
 
157
+ if (isRetry && messageId) {
 
 
 
 
 
 
158
  // two cases, if we're retrying a user message with a newPrompt set,
159
  // it means we're editing a user message
160
  // if we're retrying on an assistant message, newPrompt cannot be set
 
249
  inputs: prompt,
250
  messageId,
251
  isRetry,
 
252
  files: isRetry ? userMessage?.files : base64Files,
253
  },
254
  messageUpdatesAbortController.signal
 
413
  messagesPath = createMessagesPath(messages, msgId);
414
  }
415
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
416
  const settings = useSettingsStore();
417
  let messages = $state(data.messages);
418
  $effect(() => {
 
499
  bind:files
500
  onmessage={onMessage}
501
  onretry={onRetry}
 
502
  onshowAlternateMsg={onShowAlternateMsg}
503
  onstop={async () => {
504
  await fetch(`${base}/conversation/${page.params.id}/stop-generating`, {
src/routes/conversation/[id]/+server.ts CHANGED
@@ -23,6 +23,7 @@ import { usageLimits } from "$lib/server/usageLimits";
23
  import { textGeneration } from "$lib/server/textGeneration";
24
  import type { TextGenerationContext } from "$lib/server/textGeneration/types";
25
  import { logger } from "$lib/server/logger.js";
 
26
 
27
  export async function POST({ request, locals, params, getClientAddress }) {
28
  const id = z.string().parse(params.id);
@@ -148,7 +149,6 @@ export async function POST({ request, locals, params, getClientAddress }) {
148
  inputs: newPrompt,
149
  id: messageId,
150
  is_retry: isRetry,
151
- is_continue: isContinue,
152
  } = z
153
  .object({
154
  id: z.string().uuid().refine(isMessageId).optional(), // parent message id to append to for a normal message, or the message id for a retry/continue
@@ -159,7 +159,6 @@ export async function POST({ request, locals, params, getClientAddress }) {
159
  .transform((s) => s.replace(/\r\n/g, "\n"))
160
  ),
161
  is_retry: z.optional(z.boolean()),
162
- is_continue: z.optional(z.boolean()),
163
  files: z.optional(
164
  z.array(
165
  z.object({
@@ -220,15 +219,7 @@ export async function POST({ request, locals, params, getClientAddress }) {
220
  // used for building the prompt, subtree of the conversation that goes from the latest message to the root
221
  let messagesForPrompt: Message[] = [];
222
 
223
- if (isContinue && messageId) {
224
- // if it's the last message and we continue then we build the prompt up to the last message
225
- // we will strip the end tokens afterwards when the prompt is built
226
- if ((conv.messages.find((msg) => msg.id === messageId)?.children?.length ?? 0) > 0) {
227
- error(400, "Can only continue the last message");
228
- }
229
- messageToWriteToId = messageId;
230
- messagesForPrompt = buildSubtree(conv, messageId);
231
- } else if (isRetry && messageId) {
232
  // two cases, if we're retrying a user message with a newPrompt set,
233
  // it means we're editing a user message
234
  // if we're retrying on an assistant message, newPrompt cannot be set
@@ -331,9 +322,18 @@ export async function POST({ request, locals, params, getClientAddress }) {
331
  );
332
  };
333
 
 
 
334
  // we now build the stream
335
  const stream = new ReadableStream({
336
  async start(controller) {
 
 
 
 
 
 
 
337
  messageToWriteTo.updates ??= [];
338
  async function update(event: MessageUpdate) {
339
  if (!messageToWriteTo || !conv) {
@@ -372,6 +372,7 @@ export async function POST({ request, locals, params, getClientAddress }) {
372
  else if (event.type === MessageUpdateType.FinalAnswer) {
373
  messageToWriteTo.interrupted = event.interrupted;
374
  messageToWriteTo.content = initialMessageContent + event.text;
 
375
  }
376
 
377
  // Add file
@@ -449,7 +450,6 @@ export async function POST({ request, locals, params, getClientAddress }) {
449
  conv,
450
  messages: messagesForPrompt,
451
  assistant: undefined,
452
- isContinue: isContinue ?? false,
453
  promptedAt,
454
  ip: getClientAddress(),
455
  username: locals.user?.username,
@@ -460,20 +460,50 @@ export async function POST({ request, locals, params, getClientAddress }) {
460
  ]
461
  ),
462
  locals,
 
463
  };
464
  // run the text generation and send updates to the client
465
  for await (const event of textGeneration(ctx)) await update(event);
 
 
 
 
 
 
 
 
 
 
 
466
  } catch (e) {
467
- hasError = true;
468
- await update({
469
- type: MessageUpdateType.Status,
470
- status: MessageUpdateStatus.Error,
471
- message: (e as Error).message,
472
- });
473
- logger.error(e);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
474
  } finally {
475
  // check if no output was generated
476
- if (!hasError && messageToWriteTo.content === initialMessageContent) {
477
  await update({
478
  type: MessageUpdateType.Status,
479
  status: MessageUpdateStatus.Error,
@@ -483,6 +513,7 @@ export async function POST({ request, locals, params, getClientAddress }) {
483
  }
484
 
485
  await persistConversation();
 
486
 
487
  // used to detect if cancel() is called bc of interrupt or just because the connection closes
488
  doneStreaming = true;
 
23
  import { textGeneration } from "$lib/server/textGeneration";
24
  import type { TextGenerationContext } from "$lib/server/textGeneration/types";
25
  import { logger } from "$lib/server/logger.js";
26
+ import { AbortRegistry } from "$lib/server/abortRegistry";
27
 
28
  export async function POST({ request, locals, params, getClientAddress }) {
29
  const id = z.string().parse(params.id);
 
149
  inputs: newPrompt,
150
  id: messageId,
151
  is_retry: isRetry,
 
152
  } = z
153
  .object({
154
  id: z.string().uuid().refine(isMessageId).optional(), // parent message id to append to for a normal message, or the message id for a retry/continue
 
159
  .transform((s) => s.replace(/\r\n/g, "\n"))
160
  ),
161
  is_retry: z.optional(z.boolean()),
 
162
  files: z.optional(
163
  z.array(
164
  z.object({
 
219
  // used for building the prompt, subtree of the conversation that goes from the latest message to the root
220
  let messagesForPrompt: Message[] = [];
221
 
222
+ if (isRetry && messageId) {
 
 
 
 
 
 
 
 
223
  // two cases, if we're retrying a user message with a newPrompt set,
224
  // it means we're editing a user message
225
  // if we're retrying on an assistant message, newPrompt cannot be set
 
322
  );
323
  };
324
 
325
+ const abortRegistry = AbortRegistry.getInstance();
326
+
327
  // we now build the stream
328
  const stream = new ReadableStream({
329
  async start(controller) {
330
+ const conversationKey = convId.toString();
331
+ const ctrl = new AbortController();
332
+ abortRegistry.register(conversationKey, ctrl);
333
+
334
+ let finalAnswerReceived = false;
335
+ let abortedByUser = false;
336
+
337
  messageToWriteTo.updates ??= [];
338
  async function update(event: MessageUpdate) {
339
  if (!messageToWriteTo || !conv) {
 
372
  else if (event.type === MessageUpdateType.FinalAnswer) {
373
  messageToWriteTo.interrupted = event.interrupted;
374
  messageToWriteTo.content = initialMessageContent + event.text;
375
+ finalAnswerReceived = true;
376
  }
377
 
378
  // Add file
 
450
  conv,
451
  messages: messagesForPrompt,
452
  assistant: undefined,
 
453
  promptedAt,
454
  ip: getClientAddress(),
455
  username: locals.user?.username,
 
460
  ]
461
  ),
462
  locals,
463
+ abortController: ctrl,
464
  };
465
  // run the text generation and send updates to the client
466
  for await (const event of textGeneration(ctx)) await update(event);
467
+ if (ctrl.signal.aborted) {
468
+ abortedByUser = true;
469
+ }
470
+ if (abortedByUser && !finalAnswerReceived) {
471
+ const partialText = messageToWriteTo.content.slice(initialMessageContent.length);
472
+ await update({
473
+ type: MessageUpdateType.FinalAnswer,
474
+ text: partialText,
475
+ interrupted: true,
476
+ });
477
+ }
478
  } catch (e) {
479
+ const err = e as Error;
480
+ const isAbortError =
481
+ err?.name === "AbortError" ||
482
+ err?.name === "APIUserAbortError" ||
483
+ err?.message === "Request was aborted.";
484
+ if (isAbortError || ctrl.signal.aborted) {
485
+ abortedByUser = true;
486
+ logger.info({ conversationId: conversationKey }, "Generation aborted by user");
487
+ if (!finalAnswerReceived) {
488
+ const partialText = messageToWriteTo.content.slice(initialMessageContent.length);
489
+ await update({
490
+ type: MessageUpdateType.FinalAnswer,
491
+ text: partialText,
492
+ interrupted: true,
493
+ });
494
+ }
495
+ } else {
496
+ hasError = true;
497
+ await update({
498
+ type: MessageUpdateType.Status,
499
+ status: MessageUpdateStatus.Error,
500
+ message: err.message,
501
+ });
502
+ logger.error(err);
503
+ }
504
  } finally {
505
  // check if no output was generated
506
+ if (!hasError && !abortedByUser && messageToWriteTo.content === initialMessageContent) {
507
  await update({
508
  type: MessageUpdateType.Status,
509
  status: MessageUpdateStatus.Error,
 
513
  }
514
 
515
  await persistConversation();
516
+ abortRegistry.unregister(conversationKey, ctrl);
517
 
518
  // used to detect if cancel() is called bc of interrupt or just because the connection closes
519
  doneStreaming = true;
src/routes/conversation/[id]/stop-generating/+server.ts CHANGED
@@ -1,5 +1,6 @@
1
  import { authCondition } from "$lib/server/auth";
2
  import { collections } from "$lib/server/database";
 
3
  import { error } from "@sveltejs/kit";
4
  import { ObjectId } from "mongodb";
5
 
@@ -18,6 +19,8 @@ export async function POST({ params, locals }) {
18
  error(404, "Conversation not found");
19
  }
20
 
 
 
21
  await collections.abortedGenerations.updateOne(
22
  { conversationId },
23
  { $set: { updatedAt: new Date() }, $setOnInsert: { createdAt: new Date() } },
 
1
  import { authCondition } from "$lib/server/auth";
2
  import { collections } from "$lib/server/database";
3
+ import { AbortRegistry } from "$lib/server/abortRegistry";
4
  import { error } from "@sveltejs/kit";
5
  import { ObjectId } from "mongodb";
6
 
 
19
  error(404, "Conversation not found");
20
  }
21
 
22
+ AbortRegistry.getInstance().abort(conversationId.toString());
23
+
24
  await collections.abortedGenerations.updateOne(
25
  { conversationId },
26
  { $set: { updatedAt: new Date() }, $setOnInsert: { createdAt: new Date() } },