arena-learning / studyArena /lib /orchestration /stateless-generate.ts
Nitish kumar
Upload folder using huggingface_hub
c20f20c verified
/**
* Stateless Multi-Agent Generation
*
* Single-pass generation with structured JSON Array output format:
* [{"type":"action","name":"...","params":{...}},{"type":"text","content":"natural speech"},...]
*
* Key design decisions:
* - Backend is stateless (all state in request/response)
* - Single generation pass (no generate/tool/loop)
* - Text is natural teacher speech, NOT meta-commentary
* - Tool calls are silent actions - students see results only
* - Action and text objects can freely interleave in the array
* - Uses partial-json for robust streaming of incomplete JSON
*
* Multi-agent orchestration:
* - When multiple agents are configured, a director agent decides who speaks
* - Uses LangGraph StateGraph for the orchestration loop
* - Events are streamed via LangGraph's custom stream mode
*/
import type { LanguageModel } from 'ai';
import type { StatelessChatRequest, StatelessEvent, ParsedAction } from '@/lib/types/chat';
import type { ThinkingConfig } from '@/lib/types/provider';
import type { WhiteboardActionRecord } from './director-prompt';
import { createOrchestrationGraph, buildInitialState } from './director-graph';
import { parse as parsePartialJson, Allow } from 'partial-json';
import { jsonrepair } from 'jsonrepair';
import { createLogger } from '@/lib/logger';
const log = createLogger('StatelessGenerate');
// ==================== Structured Output Parser ====================
/**
* Parser state for incremental JSON Array parsing.
*
* Accumulates raw text from the LLM stream. Once the opening `[` is found,
* uses `partial-json` to incrementally parse the growing array. Emits new
* complete items as they appear, and streams partial text content deltas
* for the last (potentially incomplete) text item.
*/
interface ParserState {
/** Accumulated raw text from the LLM */
buffer: string;
/** Whether we've found the opening `[` */
jsonStarted: boolean;
/** Number of fully processed (emitted) items */
lastParsedItemCount: number;
/** Length of text content already emitted for the trailing partial text item */
lastPartialTextLength: number;
/** Whether parsing is complete (closing `]` found) */
isDone: boolean;
}
/**
* Create initial parser state
*/
export function createParserState(): ParserState {
return {
buffer: '',
jsonStarted: false,
lastParsedItemCount: 0,
lastPartialTextLength: 0,
isDone: false,
};
}
/**
* Result from parsing a chunk
*/
export interface ParseResult {
textChunks: string[];
actions: ParsedAction[];
isDone: boolean;
/** Ordered sequence recording original interleaving of text and action segments */
ordered: Array<{ type: 'text'; index: number } | { type: 'action'; index: number }>;
}
/**
* Emit a single parsed item into the result, returning updated segment indices.
*/
function emitItem(
item: Record<string, unknown>,
result: ParseResult,
textSegmentIndex: number,
actionSegmentIndex: number,
): { textSegmentIndex: number; actionSegmentIndex: number } {
if (item.type === 'text') {
const content = (item.content as string) || '';
if (content) {
result.textChunks.push(content);
// Use per-call array index (not cumulative segment index) so that
// director-graph can read result.textChunks[entry.index] correctly.
result.ordered.push({
type: 'text',
index: result.textChunks.length - 1,
});
return { textSegmentIndex: textSegmentIndex + 1, actionSegmentIndex };
}
} else if (item.type === 'action') {
// Support both new format (name/params) and legacy format (tool_name/parameters)
const action: ParsedAction = {
actionId:
(item.action_id as string) || `action-${Date.now()}-${Math.random().toString(36).slice(2)}`,
actionName: (item.name || item.tool_name) as string,
params: (item.params || item.parameters || {}) as Record<string, unknown>,
};
result.actions.push(action);
// Use per-call array index (not cumulative segment index) so that
// director-graph can read result.actions[entry.index] correctly.
result.ordered.push({ type: 'action', index: result.actions.length - 1 });
return { textSegmentIndex, actionSegmentIndex: actionSegmentIndex + 1 };
}
return { textSegmentIndex, actionSegmentIndex };
}
/**
* Parse streaming chunks of structured JSON Array output.
*
* The LLM is expected to produce a JSON array like:
* [{"type":"action","name":"spotlight","params":{"elementId":"img_1"}},
* {"type":"text","content":"Hello students..."},...]
*
* This parser:
* 1. Accumulates chunks into a buffer
* 2. Skips any prefix before `[` (e.g. ```json\n, explanatory text)
* 3. Uses partial-json to incrementally parse the growing array
* 4. Emits new complete items (action→toolCall, text→textChunk)
* 5. For the trailing incomplete text item, emits content deltas for streaming
* 6. Marks done when the buffer contains the closing `]`
*
* @param chunk - New chunk of text to parse
* @param state - Current parser state (mutated in place)
* @returns Parsed text chunks and tool calls from this chunk
*/
export function parseStructuredChunk(chunk: string, state: ParserState): ParseResult {
const result: ParseResult = {
textChunks: [],
actions: [],
isDone: false,
ordered: [],
};
if (state.isDone) {
return result;
}
state.buffer += chunk;
// Step 1: Find the opening `[` if not yet found
if (!state.jsonStarted) {
const bracketIndex = state.buffer.indexOf('[');
if (bracketIndex === -1) {
return result;
}
// Trim everything before `[` (markdown fences, explanatory text, etc.)
state.buffer = state.buffer.slice(bracketIndex);
state.jsonStarted = true;
}
// Step 2: Check if the array is complete (closing `]` found)
const trimmed = state.buffer.trimEnd();
const isArrayClosed = trimmed.endsWith(']') && trimmed.length > 1;
// Step 3: Try incremental parse — jsonrepair first (fixes unescaped quotes), fallback to partial-json
// eslint-disable-next-line @typescript-eslint/no-explicit-any -- partial-json returns any[]
let parsed: any[];
try {
const repaired = jsonrepair(state.buffer);
parsed = JSON.parse(repaired);
} catch {
try {
parsed = parsePartialJson(
state.buffer,
Allow.ARR | Allow.OBJ | Allow.STR | Allow.NUM | Allow.BOOL | Allow.NULL,
);
} catch {
return result;
}
}
if (!Array.isArray(parsed)) {
return result;
}
// Step 4: Determine how many items are fully complete
// When the array is closed, all items are complete.
// When still streaming, items [0..N-2] are complete; item [N-1] may be partial.
const completeUpTo = isArrayClosed ? parsed.length : Math.max(0, parsed.length - 1);
// Count segment indices for items already emitted
let textSegmentIndex = 0;
let actionSegmentIndex = 0;
for (let i = 0; i < state.lastParsedItemCount && i < parsed.length; i++) {
const item = parsed[i];
if (item?.type === 'text') textSegmentIndex++;
else if (item?.type === 'action') actionSegmentIndex++;
}
// Step 5: Emit newly completed items
for (let i = state.lastParsedItemCount; i < completeUpTo; i++) {
const item = parsed[i];
if (!item || typeof item !== 'object') continue;
// If this item was previously the trailing partial text item, we've already
// streamed its content incrementally. Only emit the remaining delta, not the full content.
if (
i === state.lastParsedItemCount &&
state.lastPartialTextLength > 0 &&
item.type === 'text'
) {
const content = item.content || '';
const remaining = content.slice(state.lastPartialTextLength);
if (remaining) {
result.textChunks.push(remaining);
}
// Use per-call array index for consistency with emitItem fix
result.ordered.push({
type: 'text',
index: result.textChunks.length - 1,
});
textSegmentIndex++;
state.lastPartialTextLength = 0;
continue;
}
const indices = emitItem(item, result, textSegmentIndex, actionSegmentIndex);
textSegmentIndex = indices.textSegmentIndex;
actionSegmentIndex = indices.actionSegmentIndex;
}
state.lastParsedItemCount = completeUpTo;
// Step 6: Stream partial text delta for the trailing item
if (!isArrayClosed && parsed.length > completeUpTo) {
const lastItem = parsed[parsed.length - 1];
if (lastItem && typeof lastItem === 'object' && lastItem.type === 'text') {
const content = lastItem.content || '';
if (content.length > state.lastPartialTextLength) {
result.textChunks.push(content.slice(state.lastPartialTextLength));
state.lastPartialTextLength = content.length;
}
}
}
// Step 7: Mark done if array is closed
if (isArrayClosed) {
state.isDone = true;
result.isDone = true;
state.lastParsedItemCount = parsed.length;
state.lastPartialTextLength = 0;
}
return result;
}
/**
* Finalize parsing after the stream ends.
*
* Handles the case where the model never produced a valid JSON array —
* e.g. it output plain text instead of the expected `[...]` format.
* Emits whatever content is in the buffer as a single text item so the
* frontend can still display something rather than showing nothing.
*/
export function finalizeParser(state: ParserState): ParseResult {
const result: ParseResult = {
textChunks: [],
actions: [],
isDone: true,
ordered: [],
};
if (state.isDone) {
return result;
}
const content = state.buffer.trim();
if (!content) {
return result;
}
if (!state.jsonStarted) {
// Model never output `[` — treat entire buffer as plain text
result.textChunks.push(content);
result.ordered.push({ type: 'text', index: 0 });
} else {
// JSON started but never closed — try one final parse
const finalChunk = parseStructuredChunk('', state);
result.textChunks.push(...finalChunk.textChunks);
result.actions.push(...finalChunk.actions);
result.ordered.push(...finalChunk.ordered);
// If final parse yielded nothing, emit raw text after `[` as fallback
if (result.textChunks.length === 0 && result.actions.length === 0) {
const bracketIndex = content.indexOf('[');
const raw = content.slice(bracketIndex + 1).trim();
if (raw) {
result.textChunks.push(raw);
result.ordered.push({ type: 'text', index: 0 });
}
}
}
state.isDone = true;
return result;
}
// ==================== Main Generation Function ====================
/**
* Stateless generation with streaming via LangGraph orchestration
*
* @param request - The chat request with full state
* @param abortSignal - Signal for cancellation
* @yields StatelessEvent objects for streaming
*/
export async function* statelessGenerate(
request: StatelessChatRequest,
abortSignal: AbortSignal,
languageModel: LanguageModel,
thinkingConfig?: ThinkingConfig,
): AsyncGenerator<StatelessEvent> {
log.info(
`[StatelessGenerate] Starting orchestration for agents: ${request.config.agentIds.join(', ')}`,
);
log.info(
`[StatelessGenerate] Message count: ${request.messages.length}, turnCount: ${request.directorState?.turnCount ?? 0}`,
);
try {
const graph = createOrchestrationGraph();
const initialState = buildInitialState(request, languageModel, thinkingConfig);
const stream = await graph.stream(initialState, {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
streamMode: 'custom' as any,
signal: abortSignal,
});
let totalActions = 0;
let totalAgents = 0;
// Tracks whether the agent dispatched in this turn produced any text or actions.
// Each statelessGenerate call handles exactly one agent turn (client loops externally).
let agentHadContent = false;
// Track current agent turn to build updated directorState
let currentAgentId: string | null = null;
let currentAgentName: string | null = null;
let contentPreview = '';
let agentActionCount = 0;
const agentWbActions: WhiteboardActionRecord[] = [];
for await (const chunk of stream) {
const event = chunk as StatelessEvent;
if (event.type === 'agent_start') {
totalAgents++;
currentAgentId = event.data.agentId;
currentAgentName = event.data.agentName;
contentPreview = '';
agentActionCount = 0;
agentWbActions.length = 0;
}
if (event.type === 'text_delta' && contentPreview.length < 100) {
contentPreview = (contentPreview + event.data.content).slice(0, 100);
agentHadContent = true;
}
if (event.type === 'action') {
totalActions++;
agentActionCount++;
agentHadContent = true;
if (event.data.actionName.startsWith('wb_')) {
agentWbActions.push({
actionName: event.data.actionName as WhiteboardActionRecord['actionName'],
agentId: event.data.agentId,
agentName: currentAgentName || event.data.agentId,
params: event.data.params,
});
}
}
yield event;
}
// Build updated directorState from incoming state + this turn's data
const incoming = request.directorState;
const prevResponses = incoming?.agentResponses ?? [];
const prevLedger = incoming?.whiteboardLedger ?? [];
const prevTurnCount = incoming?.turnCount ?? 0;
const directorState =
totalAgents > 0
? {
turnCount: prevTurnCount + 1,
agentResponses: [
...prevResponses,
{
agentId: currentAgentId!,
agentName: currentAgentName || currentAgentId!,
contentPreview,
actionCount: agentActionCount,
whiteboardActions: [...agentWbActions],
},
],
whiteboardLedger: [...prevLedger, ...agentWbActions],
}
: {
turnCount: prevTurnCount,
agentResponses: prevResponses,
whiteboardLedger: prevLedger,
};
yield {
type: 'done',
data: { totalActions, totalAgents, agentHadContent, directorState },
};
log.info(
`[StatelessGenerate] Completed. Agents: ${totalAgents}, Actions: ${totalActions}, hadContent: ${agentHadContent}, turnCount: ${directorState.turnCount}`,
);
} catch (error) {
if (error instanceof Error && error.name === 'AbortError') {
yield { type: 'error', data: { message: 'Request interrupted' } };
} else {
log.error('[StatelessGenerate] Error:', error);
yield {
type: 'error',
data: {
message: error instanceof Error ? error.message : String(error),
},
};
}
}
}