import { preprocessMessages } from "../endpoints/preprocessMessages"; import { generateTitleForConversation } from "./title"; import { type MessageUpdate, MessageUpdateType, MessageUpdateStatus, } from "$lib/types/MessageUpdate"; import { generate } from "./generate"; import { runMcpFlow } from "./mcp/runMcpFlow"; import { mergeAsyncGenerators } from "$lib/utils/mergeAsyncGenerators"; import type { TextGenerationContext } from "./types"; import { getLangfuseClient } from "$lib/server/tracing/langfuse"; async function* keepAlive(done: AbortSignal): AsyncGenerator { while (!done.aborted) { yield { type: MessageUpdateType.Status, status: MessageUpdateStatus.KeepAlive, }; await new Promise((resolve) => setTimeout(resolve, 100)); } } export async function* textGeneration(ctx: TextGenerationContext) { const done = new AbortController(); // Create the root Langfuse trace here so both title and text generators share it let langfuseTrace = ctx.langfuseTrace; if (!langfuseTrace) { try { const lf = getLangfuseClient(); if (lf) { langfuseTrace = lf.trace({ name: "conversation-turn", metadata: { conversationId: ctx.conv._id.toString(), model: ctx.model.id ?? ctx.model.name, promptedAt: ctx.promptedAt.toISOString(), }, input: ctx.messages.map((m) => ({ role: m.from, content: m.content })), tags: ["chat-ui"], }); } } catch { // Tracing must never disrupt generation } } const ctxWithTrace: TextGenerationContext = langfuseTrace ? { ...ctx, langfuseTrace } : ctx; const titleGen = generateTitleForConversation( ctxWithTrace.conv, ctxWithTrace.locals, langfuseTrace ); const textGen = textGenerationWithoutTitle(ctxWithTrace, done); const keepAliveGen = keepAlive(done.signal); // keep alive until textGen is done yield* mergeAsyncGenerators([titleGen, textGen, keepAliveGen]); } async function* textGenerationWithoutTitle( ctx: TextGenerationContext, done: AbortController ): AsyncGenerator { yield { type: MessageUpdateType.Status, status: MessageUpdateStatus.Started, }; const { conv, messages, langfuseTrace } = ctx; const convId = conv._id; const preprompt = conv.preprompt; const processedMessages = await preprocessMessages(messages, convId); // Helper to finalise the trace with the generated answer const endTrace = (output: string, interrupted?: boolean) => { try { langfuseTrace?.update({ output: { text: output, interrupted: interrupted ?? false } }); } catch {} }; // Try MCP tool flow first; fall back to default generation if not selected/available try { const mcpGen = runMcpFlow({ model: ctx.model, conv, messages: processedMessages, assistant: ctx.assistant, forceMultimodal: ctx.forceMultimodal, forceTools: ctx.forceTools, provider: ctx.provider, locals: ctx.locals, preprompt, abortSignal: ctx.abortController.signal, abortController: ctx.abortController, promptedAt: ctx.promptedAt, langfuseTrace, }); let step = await mcpGen.next(); while (!step.done) { const update = step.value; if (update.type === MessageUpdateType.FinalAnswer) { endTrace(update.text, update.interrupted); } yield update; step = await mcpGen.next(); } const mcpResult = step.value; if (mcpResult === "not_applicable") { // fallback to normal text generation for await (const update of generate({ ...ctx, messages: processedMessages }, preprompt)) { if (update.type === MessageUpdateType.FinalAnswer) { endTrace(update.text, update.interrupted); } yield update; } } // If mcpResult is "completed" or "aborted", don't fall back } catch (err) { // Don't fall back on abort errors - user intentionally stopped const isAbort = ctx.abortController.signal.aborted || (err instanceof Error && (err.name === "AbortError" || err.name === "APIUserAbortError" || err.message.includes("Request was aborted"))); if (!isAbort) { // On non-abort MCP error, fall back to normal generation for await (const update of generate({ ...ctx, messages: processedMessages }, preprompt)) { if (update.type === MessageUpdateType.FinalAnswer) { endTrace(update.text, update.interrupted); } yield update; } } else { // Aborted – mark trace accordingly try { langfuseTrace?.update({ output: { interrupted: true } }); } catch {} } } done.abort(); }