File size: 6,152 Bytes
725337f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6600478
 
 
 
 
 
 
 
 
 
 
725337f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
import {
	type MessageUpdate,
	MessageUpdateType,
	MessageUpdateStatus,
	PersonaUpdateType,
	type MessagePersonaUpdate,
} from "$lib/types/MessageUpdate";
import { generate } from "./generate";
import { mergeAsyncGenerators } from "$lib/utils/mergeAsyncGenerators";
import type { TextGenerationContext } from "./types";
import { type Persona, generatePersonaPrompt } from "$lib/types/Persona";
import { logger } from "$lib/server/logger";
import { preprocessMessages } from "../endpoints/preprocessMessages";

/**
 * Generate responses from multiple personas in parallel
 * Each persona gets its own text generation stream
 * Updates are tagged with personaId so the client can route them correctly
 */
export async function* multiPersonaTextGeneration(
	ctx: TextGenerationContext,
	personas: Persona[]
): AsyncGenerator<MessageUpdate, undefined, undefined> {
	if (personas.length === 0) {
		logger.error("multiPersonaTextGeneration called with no personas");
		yield {
			type: MessageUpdateType.Status,
			status: MessageUpdateStatus.Error,
			message: "No personas provided",
		};
		return;
	}

	const { conv, messages } = ctx;
	const convId = conv._id;

	// Notify start
	yield {
		type: MessageUpdateType.Status,
		status: MessageUpdateStatus.Started,
	};

	// Send persona initialization to establish order before streaming
	yield {
		type: MessageUpdateType.PersonaInit,
		personas: personas.map((p) => ({
			personaId: p.id,
			personaName: p.name,
			personaOccupation: p.jobSector,
			personaStance: p.stance,
		})),
	};

	// Preprocess messages ONCE for all personas (performance optimization)
	// This downloads files and prepares messages for the model
	const preprocessedMessages = await preprocessMessages(messages, convId);

	// Create a generator for each persona with preprocessed messages
	const personaGenerators = personas.map((persona) =>
		createPersonaGenerator(ctx, persona, preprocessedMessages)
	);

	// Merge all generators and stream their updates
	yield* mergeAsyncGenerators(personaGenerators);

	// All done
	yield {
		type: MessageUpdateType.Status,
		status: MessageUpdateStatus.Finished,
	};
}

/**
 * Create a text generation stream for a single persona
 * Wraps all updates with persona metadata
 * Each persona sees ALL previous messages (user + all persona responses from past turns)
 */
async function* createPersonaGenerator(
	ctx: TextGenerationContext,
	persona: Persona,
	preprocessedMessages: Awaited<ReturnType<typeof preprocessMessages>>
): AsyncGenerator<MessageUpdate, undefined, undefined> {
	try {
		// Messages are already preprocessed and filtered (system messages removed) from the caller
		// Each persona sees ALL previous messages (user + all persona responses)
		// This allows personas to build on each other's responses from previous turns

		// Generate the persona's prompt from their fields
		const preprompt = generatePersonaPrompt(persona);

		// Generate text for this persona using preprocessed messages
		// Type assertion is safe here because we know messages are preprocessed
		const generateCtx = { ...ctx, messages: preprocessedMessages };
		for await (const update of generate(generateCtx, preprompt)) {
			// Wrap each update with persona information
			yield wrapWithPersona(update, persona);
		}
	} catch (error) {
		logger.error({ error, personaId: persona.id }, "Error in persona text generation");
		yield {
			type: MessageUpdateType.Persona,
			personaId: persona.id,
			personaName: persona.name,
			personaOccupation: persona.jobSector,
			personaStance: persona.stance,
			updateType: PersonaUpdateType.Status,
			error: error instanceof Error ? error.message : "Unknown error",
		} as MessagePersonaUpdate;
	}
}

/**
 * Wraps a standard MessageUpdate with persona metadata
 */
function wrapWithPersona(update: MessageUpdate, persona: Persona): MessageUpdate {
	// Handle different update types and wrap them as persona updates
	if (update.type === MessageUpdateType.Stream) {
		return {
			type: MessageUpdateType.Persona,
			personaId: persona.id,
			personaName: persona.name,
			personaOccupation: persona.jobSector,
			personaStance: persona.stance,
			updateType: PersonaUpdateType.Stream,
			token: update.token,
		} as MessagePersonaUpdate;
	} else if (update.type === MessageUpdateType.FinalAnswer) {
		return {
			type: MessageUpdateType.Persona,
			personaId: persona.id,
			personaName: persona.name,
			personaOccupation: persona.jobSector,
			personaStance: persona.stance,
			updateType: PersonaUpdateType.FinalAnswer,
			text: update.text,
			interrupted: update.interrupted,
		} as MessagePersonaUpdate;
	} else if (update.type === MessageUpdateType.Reasoning) {
		return {
			type: MessageUpdateType.Persona,
			personaId: persona.id,
			personaName: persona.name,
			personaOccupation: persona.jobSector,
			personaStance: persona.stance,
			updateType: PersonaUpdateType.Reasoning,
			status: update.subtype === "status" ? update.status : undefined,
			token: update.subtype === "stream" ? update.token : undefined,
		} as MessagePersonaUpdate;
	} else if (update.type === MessageUpdateType.RouterMetadata) {
		return {
			type: MessageUpdateType.Persona,
			personaId: persona.id,
			personaName: persona.name,
			personaOccupation: persona.jobSector,
			personaStance: persona.stance,
			updateType: PersonaUpdateType.RouterMetadata,
			route: update.route,
			model: update.model,
		} as MessagePersonaUpdate;
	} else if (update.type === MessageUpdateType.Status) {
		// Only pass through error status for individual personas
		if (update.status === MessageUpdateStatus.Error) {
			return {
				type: MessageUpdateType.Persona,
				personaId: persona.id,
				personaName: persona.name,
				personaOccupation: persona.jobSector,
				personaStance: persona.stance,
				updateType: PersonaUpdateType.Status,
				error: update.message,
			} as MessagePersonaUpdate;
		}
		// Filter out other status updates (started, finished, keep-alive)
		// since we handle those at the multi-persona level
		return update;
	}

	// For other update types (Title, File), pass through as-is
	// These are conversation-level, not persona-specific
	return update;
}