DreamyDetective's picture
added app details
ded72f6
import { preprocessMessages } from "../endpoints/preprocessMessages";
import { generateTitleForConversation } from "./title";
import {
type MessageUpdate,
MessageUpdateType,
MessageUpdateStatus,
} from "$lib/types/MessageUpdate";
import { generate } from "./generate";
import { runMcpFlow } from "./mcp/runMcpFlow";
import { mergeAsyncGenerators } from "$lib/utils/mergeAsyncGenerators";
import type { TextGenerationContext } from "./types";
import { getLangfuseClient } from "$lib/server/tracing/langfuse";
async function* keepAlive(done: AbortSignal): AsyncGenerator<MessageUpdate, undefined, undefined> {
while (!done.aborted) {
yield {
type: MessageUpdateType.Status,
status: MessageUpdateStatus.KeepAlive,
};
await new Promise((resolve) => setTimeout(resolve, 100));
}
}
export async function* textGeneration(ctx: TextGenerationContext) {
const done = new AbortController();
// Create the root Langfuse trace here so both title and text generators share it
let langfuseTrace = ctx.langfuseTrace;
if (!langfuseTrace) {
try {
const lf = getLangfuseClient();
if (lf) {
langfuseTrace = lf.trace({
name: "conversation-turn",
metadata: {
conversationId: ctx.conv._id.toString(),
model: ctx.model.id ?? ctx.model.name,
promptedAt: ctx.promptedAt.toISOString(),
},
input: ctx.messages.map((m) => ({ role: m.from, content: m.content })),
tags: ["chat-ui"],
});
}
} catch {
// Tracing must never disrupt generation
}
}
const ctxWithTrace: TextGenerationContext = langfuseTrace ? { ...ctx, langfuseTrace } : ctx;
const titleGen = generateTitleForConversation(
ctxWithTrace.conv,
ctxWithTrace.locals,
langfuseTrace
);
const textGen = textGenerationWithoutTitle(ctxWithTrace, done);
const keepAliveGen = keepAlive(done.signal);
// keep alive until textGen is done
yield* mergeAsyncGenerators([titleGen, textGen, keepAliveGen]);
}
async function* textGenerationWithoutTitle(
ctx: TextGenerationContext,
done: AbortController
): AsyncGenerator<MessageUpdate, undefined, undefined> {
yield {
type: MessageUpdateType.Status,
status: MessageUpdateStatus.Started,
};
const { conv, messages, langfuseTrace } = ctx;
const convId = conv._id;
const preprompt = conv.preprompt;
const processedMessages = await preprocessMessages(messages, convId);
// Helper to finalise the trace with the generated answer
const endTrace = (output: string, interrupted?: boolean) => {
try {
langfuseTrace?.update({ output: { text: output, interrupted: interrupted ?? false } });
} catch {}
};
// Try MCP tool flow first; fall back to default generation if not selected/available
try {
const mcpGen = runMcpFlow({
model: ctx.model,
conv,
messages: processedMessages,
assistant: ctx.assistant,
forceMultimodal: ctx.forceMultimodal,
forceTools: ctx.forceTools,
provider: ctx.provider,
locals: ctx.locals,
preprompt,
abortSignal: ctx.abortController.signal,
abortController: ctx.abortController,
promptedAt: ctx.promptedAt,
langfuseTrace,
});
let step = await mcpGen.next();
while (!step.done) {
const update = step.value;
if (update.type === MessageUpdateType.FinalAnswer) {
endTrace(update.text, update.interrupted);
}
yield update;
step = await mcpGen.next();
}
const mcpResult = step.value;
if (mcpResult === "not_applicable") {
// fallback to normal text generation
for await (const update of generate({ ...ctx, messages: processedMessages }, preprompt)) {
if (update.type === MessageUpdateType.FinalAnswer) {
endTrace(update.text, update.interrupted);
}
yield update;
}
}
// If mcpResult is "completed" or "aborted", don't fall back
} catch (err) {
// Don't fall back on abort errors - user intentionally stopped
const isAbort =
ctx.abortController.signal.aborted ||
(err instanceof Error &&
(err.name === "AbortError" ||
err.name === "APIUserAbortError" ||
err.message.includes("Request was aborted")));
if (!isAbort) {
// On non-abort MCP error, fall back to normal generation
for await (const update of generate({ ...ctx, messages: processedMessages }, preprompt)) {
if (update.type === MessageUpdateType.FinalAnswer) {
endTrace(update.text, update.interrupted);
}
yield update;
}
} else {
// Aborted – mark trace accordingly
try {
langfuseTrace?.update({ output: { interrupted: true } });
} catch {}
}
}
done.abort();
}