import { type MessageUpdate, MessageUpdateType, MessageUpdateStatus, PersonaUpdateType, type MessagePersonaUpdate, } from "$lib/types/MessageUpdate"; import { generate } from "./generate"; import { mergeAsyncGenerators } from "$lib/utils/mergeAsyncGenerators"; import type { TextGenerationContext } from "./types"; import { type Persona, generatePersonaPrompt } from "$lib/types/Persona"; import { logger } from "$lib/server/logger"; import { preprocessMessages } from "../endpoints/preprocessMessages"; /** * Generate responses from multiple personas in parallel * Each persona gets its own text generation stream * Updates are tagged with personaId so the client can route them correctly */ export async function* multiPersonaTextGeneration( ctx: TextGenerationContext, personas: Persona[] ): AsyncGenerator { if (personas.length === 0) { logger.error("multiPersonaTextGeneration called with no personas"); yield { type: MessageUpdateType.Status, status: MessageUpdateStatus.Error, message: "No personas provided", }; return; } if (personas.length === 1) { // If only one persona, use the standard text generation // (this shouldn't happen, but handle it gracefully) logger.warn("multiPersonaTextGeneration called with single persona, using standard flow"); const { textGeneration } = await import("./index"); yield* textGeneration(ctx); return; } const { conv, messages } = ctx; const convId = conv._id; // Notify start yield { type: MessageUpdateType.Status, status: MessageUpdateStatus.Started, }; // Preprocess messages ONCE for all personas (performance optimization) // This downloads files and prepares messages for the model const preprocessedMessages = await preprocessMessages(messages, convId); // Create a generator for each persona with preprocessed messages const personaGenerators = personas.map((persona) => createPersonaGenerator(ctx, persona, preprocessedMessages) ); // Merge all generators and stream their updates yield* mergeAsyncGenerators(personaGenerators); // All done yield { type: MessageUpdateType.Status, status: MessageUpdateStatus.Finished, }; } /** * Create a text generation stream for a single persona * Wraps all updates with persona metadata * Each persona sees ALL previous messages (user + all persona responses from past turns) */ async function* createPersonaGenerator( ctx: TextGenerationContext, persona: Persona, preprocessedMessages: Awaited> ): AsyncGenerator { try { // Messages are already preprocessed and filtered (system messages removed) from the caller // Each persona sees ALL previous messages (user + all persona responses) // This allows personas to build on each other's responses from previous turns // Generate the persona's prompt from their fields const preprompt = generatePersonaPrompt(persona); // Generate text for this persona using preprocessed messages // Type assertion is safe here because we know messages are preprocessed const generateCtx = { ...ctx, messages: preprocessedMessages }; for await (const update of generate(generateCtx, preprompt)) { // Wrap each update with persona information yield wrapWithPersona(update, persona); } } catch (error) { logger.error({ error, personaId: persona.id }, "Error in persona text generation"); yield { type: MessageUpdateType.Persona, personaId: persona.id, personaName: persona.name, personaOccupation: persona.jobSector, personaStance: persona.stance, updateType: PersonaUpdateType.Status, error: error instanceof Error ? error.message : "Unknown error", } as MessagePersonaUpdate; } } /** * Wraps a standard MessageUpdate with persona metadata */ function wrapWithPersona(update: MessageUpdate, persona: Persona): MessageUpdate { // Handle different update types and wrap them as persona updates if (update.type === MessageUpdateType.Stream) { return { type: MessageUpdateType.Persona, personaId: persona.id, personaName: persona.name, personaOccupation: persona.jobSector, personaStance: persona.stance, updateType: PersonaUpdateType.Stream, token: update.token, } as MessagePersonaUpdate; } else if (update.type === MessageUpdateType.FinalAnswer) { return { type: MessageUpdateType.Persona, personaId: persona.id, personaName: persona.name, personaOccupation: persona.jobSector, personaStance: persona.stance, updateType: PersonaUpdateType.FinalAnswer, text: update.text, interrupted: update.interrupted, } as MessagePersonaUpdate; } else if (update.type === MessageUpdateType.Reasoning) { return { type: MessageUpdateType.Persona, personaId: persona.id, personaName: persona.name, personaOccupation: persona.jobSector, personaStance: persona.stance, updateType: PersonaUpdateType.Reasoning, status: update.subtype === "status" ? update.status : undefined, token: update.subtype === "stream" ? update.token : undefined, } as MessagePersonaUpdate; } else if (update.type === MessageUpdateType.RouterMetadata) { return { type: MessageUpdateType.Persona, personaId: persona.id, personaName: persona.name, personaOccupation: persona.jobSector, personaStance: persona.stance, updateType: PersonaUpdateType.RouterMetadata, route: update.route, model: update.model, } as MessagePersonaUpdate; } else if (update.type === MessageUpdateType.Status) { // Only pass through error status for individual personas if (update.status === MessageUpdateStatus.Error) { return { type: MessageUpdateType.Persona, personaId: persona.id, personaName: persona.name, personaOccupation: persona.jobSector, personaStance: persona.stance, updateType: PersonaUpdateType.Status, error: update.message, } as MessagePersonaUpdate; } // Filter out other status updates (started, finished, keep-alive) // since we handle those at the multi-persona level return update; } // For other update types (Title, File), pass through as-is // These are conversation-level, not persona-specific return update; }