Spaces:
Paused
Paused
| import { preprocessMessages } from "../endpoints/preprocessMessages"; | |
| import { generateTitleForConversation } from "./title"; | |
| import { | |
| type MessageUpdate, | |
| MessageUpdateType, | |
| MessageUpdateStatus, | |
| } from "$lib/types/MessageUpdate"; | |
| import { generate } from "./generate"; | |
| import { runMcpFlow } from "./mcp/runMcpFlow"; | |
| import { mergeAsyncGenerators } from "$lib/utils/mergeAsyncGenerators"; | |
| import type { TextGenerationContext } from "./types"; | |
| import { getLangfuseClient } from "$lib/server/tracing/langfuse"; | |
| async function* keepAlive(done: AbortSignal): AsyncGenerator<MessageUpdate, undefined, undefined> { | |
| while (!done.aborted) { | |
| yield { | |
| type: MessageUpdateType.Status, | |
| status: MessageUpdateStatus.KeepAlive, | |
| }; | |
| await new Promise((resolve) => setTimeout(resolve, 100)); | |
| } | |
| } | |
| export async function* textGeneration(ctx: TextGenerationContext) { | |
| const done = new AbortController(); | |
| // Create the root Langfuse trace here so both title and text generators share it | |
| let langfuseTrace = ctx.langfuseTrace; | |
| if (!langfuseTrace) { | |
| try { | |
| const lf = getLangfuseClient(); | |
| if (lf) { | |
| langfuseTrace = lf.trace({ | |
| name: "conversation-turn", | |
| metadata: { | |
| conversationId: ctx.conv._id.toString(), | |
| model: ctx.model.id ?? ctx.model.name, | |
| promptedAt: ctx.promptedAt.toISOString(), | |
| }, | |
| input: ctx.messages.map((m) => ({ role: m.from, content: m.content })), | |
| tags: ["chat-ui"], | |
| }); | |
| } | |
| } catch { | |
| // Tracing must never disrupt generation | |
| } | |
| } | |
| const ctxWithTrace: TextGenerationContext = langfuseTrace ? { ...ctx, langfuseTrace } : ctx; | |
| const titleGen = generateTitleForConversation( | |
| ctxWithTrace.conv, | |
| ctxWithTrace.locals, | |
| langfuseTrace | |
| ); | |
| const textGen = textGenerationWithoutTitle(ctxWithTrace, done); | |
| const keepAliveGen = keepAlive(done.signal); | |
| // keep alive until textGen is done | |
| yield* mergeAsyncGenerators([titleGen, textGen, keepAliveGen]); | |
| } | |
| async function* textGenerationWithoutTitle( | |
| ctx: TextGenerationContext, | |
| done: AbortController | |
| ): AsyncGenerator<MessageUpdate, undefined, undefined> { | |
| yield { | |
| type: MessageUpdateType.Status, | |
| status: MessageUpdateStatus.Started, | |
| }; | |
| const { conv, messages, langfuseTrace } = ctx; | |
| const convId = conv._id; | |
| const preprompt = conv.preprompt; | |
| const processedMessages = await preprocessMessages(messages, convId); | |
| // Helper to finalise the trace with the generated answer | |
| const endTrace = (output: string, interrupted?: boolean) => { | |
| try { | |
| langfuseTrace?.update({ output: { text: output, interrupted: interrupted ?? false } }); | |
| } catch {} | |
| }; | |
| // Try MCP tool flow first; fall back to default generation if not selected/available | |
| try { | |
| const mcpGen = runMcpFlow({ | |
| model: ctx.model, | |
| conv, | |
| messages: processedMessages, | |
| assistant: ctx.assistant, | |
| forceMultimodal: ctx.forceMultimodal, | |
| forceTools: ctx.forceTools, | |
| provider: ctx.provider, | |
| locals: ctx.locals, | |
| preprompt, | |
| abortSignal: ctx.abortController.signal, | |
| abortController: ctx.abortController, | |
| promptedAt: ctx.promptedAt, | |
| langfuseTrace, | |
| }); | |
| let step = await mcpGen.next(); | |
| while (!step.done) { | |
| const update = step.value; | |
| if (update.type === MessageUpdateType.FinalAnswer) { | |
| endTrace(update.text, update.interrupted); | |
| } | |
| yield update; | |
| step = await mcpGen.next(); | |
| } | |
| const mcpResult = step.value; | |
| if (mcpResult === "not_applicable") { | |
| // fallback to normal text generation | |
| for await (const update of generate({ ...ctx, messages: processedMessages }, preprompt)) { | |
| if (update.type === MessageUpdateType.FinalAnswer) { | |
| endTrace(update.text, update.interrupted); | |
| } | |
| yield update; | |
| } | |
| } | |
| // If mcpResult is "completed" or "aborted", don't fall back | |
| } catch (err) { | |
| // Don't fall back on abort errors - user intentionally stopped | |
| const isAbort = | |
| ctx.abortController.signal.aborted || | |
| (err instanceof Error && | |
| (err.name === "AbortError" || | |
| err.name === "APIUserAbortError" || | |
| err.message.includes("Request was aborted"))); | |
| if (!isAbort) { | |
| // On non-abort MCP error, fall back to normal generation | |
| for await (const update of generate({ ...ctx, messages: processedMessages }, preprompt)) { | |
| if (update.type === MessageUpdateType.FinalAnswer) { | |
| endTrace(update.text, update.interrupted); | |
| } | |
| yield update; | |
| } | |
| } else { | |
| // Aborted – mark trace accordingly | |
| try { | |
| langfuseTrace?.update({ output: { interrupted: true } }); | |
| } catch {} | |
| } | |
| } | |
| done.abort(); | |
| } | |