import { createLogger } from '../../utils/logger' import { createStudioToolPart } from '../domain/factories' import type { StudioAssistantMessage, StudioEventBus, StudioMessageStore, StudioPartStore, StudioProcessorStreamEvent, StudioRun, StudioSession, StudioToolPart, StudioToolResult } from '../domain/types' import { isDoomLoop } from './doom-loop' import { StudioPartSynchronizer } from './part-synchronizer' import { StudioTextStreamAccumulator } from './text-stream-accumulator' import { getToolInput, getToolTimeStart, mergeToolMetadata, mergeToolStateMetadata } from './tool-state' const logger = createLogger('StudioRunProcessor') export type StudioProcessorOutcome = 'continue' | 'stop' | 'compact' interface StudioRunProcessorOptions { messageStore: StudioMessageStore partStore: StudioPartStore } export class StudioRunProcessor { private readonly partStore: StudioPartStore private readonly sync: StudioPartSynchronizer private readonly textStream: StudioTextStreamAccumulator constructor(options: StudioRunProcessorOptions) { this.partStore = options.partStore this.sync = new StudioPartSynchronizer(options.messageStore, options.partStore) this.textStream = new StudioTextStreamAccumulator(options.partStore, this.sync) } async processStream(input: { session: StudioSession run: StudioRun assistantMessage: StudioAssistantMessage events: AsyncIterable eventBus?: StudioEventBus shouldCompact?: (usage?: { tokens?: number }, assistantMessage?: StudioAssistantMessage) => Promise onDoomLoop?: (toolName: string, toolInput: Record) => Promise }): Promise { const toolCalls = new Map() let currentAssistantMessage = input.assistantMessage let activeTextPartId: string | null = null let activeTextValue = '' let activeReasoningPartId: string | null = null let blocked = false let needsCompaction = false for await (const event of input.events) { switch (event.type) { case 'assistant-message-start': { currentAssistantMessage = event.message activeTextPartId = null activeTextValue = '' activeReasoningPartId = null break } case 'tool-input-start': { input.eventBus?.publish({ type: 'tool_input_start', sessionId: input.session.id, runId: input.run.id, toolName: event.toolName, callId: event.id, raw: event.raw }) const part = createStudioToolPart({ messageId: currentAssistantMessage.id, sessionId: currentAssistantMessage.sessionId, tool: event.toolName, callId: event.id, raw: event.raw }) await this.sync.appendPart(currentAssistantMessage, part) toolCalls.set(event.id, part) break } case 'tool-call': { input.eventBus?.publish({ type: 'tool_call', sessionId: input.session.id, runId: input.run.id, toolName: event.toolName, callId: event.toolCallId, input: event.input }) const match = toolCalls.get(event.toolCallId) if (!match) { break } const allowed = await this.allowToolCall({ assistantMessage: currentAssistantMessage, toolName: event.toolName, toolInput: event.input, onDoomLoop: input.onDoomLoop }) if (!allowed) { blocked = true await this.updateToolState(match.id, { status: 'error', input: event.input, error: `Doom loop rejected for tool "${event.toolName}"`, time: { start: Date.now(), end: Date.now() } }) toolCalls.delete(event.toolCallId) break } await this.updateToolState(match.id, { status: 'running', input: event.input, title: undefined, metadata: undefined, time: { start: Date.now() } }) break } case 'tool-result': { input.eventBus?.publish({ type: 'tool_result', sessionId: input.session.id, runId: input.run.id, toolName: toolCalls.get(event.toolCallId)?.tool ?? 'unknown', callId: event.toolCallId, status: 'completed', title: event.title, output: event.output, metadata: event.metadata, attachments: event.attachments }) await this.completeToolCall(toolCalls, event) break } case 'tool-error': { input.eventBus?.publish({ type: 'tool_result', sessionId: input.session.id, runId: input.run.id, toolName: toolCalls.get(event.toolCallId)?.tool ?? 'unknown', callId: event.toolCallId, status: 'failed', error: event.error, metadata: event.metadata }) blocked = await this.failToolCall(toolCalls, event) break } case 'text-start': { activeTextValue = '' activeTextPartId = await this.textStream.startPart(currentAssistantMessage, 'text') break } case 'text-delta': { activeTextValue += event.text await this.textStream.appendDelta(activeTextPartId, event.text, 'text') break } case 'text-end': { const text = activeTextValue.trim() if (text) { input.eventBus?.publish({ type: 'assistant_text', sessionId: input.session.id, runId: input.run.id, text }) } activeTextPartId = null activeTextValue = '' break } case 'reasoning-start': { activeReasoningPartId = await this.textStream.startPart(currentAssistantMessage, 'reasoning') break } case 'reasoning-delta': { await this.textStream.appendDelta(activeReasoningPartId, event.text, 'reasoning') break } case 'reasoning-end': { activeReasoningPartId = null break } case 'finish-step': { if (!currentAssistantMessage.summary && input.shouldCompact) { needsCompaction = await input.shouldCompact(event.usage, currentAssistantMessage) } break } } } if (blocked) { return 'stop' } if (needsCompaction) { return 'compact' } return 'continue' } async applyToolMetadata(input: { assistantMessage: StudioAssistantMessage callId: string title?: string metadata?: Record }): Promise { const part = await this.findToolPart(input.assistantMessage, input.callId) if (!part) { return } await this.sync.updatePart(part.id, { ...part, metadata: { ...(part.metadata ?? {}), ...(input.metadata ?? {}) }, state: mergeToolStateMetadata(part.state, input.title, input.metadata) }) } async materializeToolResult(input: { session: StudioSession run: StudioRun assistantMessage: StudioAssistantMessage callId: string toolName: string toolInput: Record result: StudioToolResult }): Promise { const part = createStudioToolPart({ messageId: input.assistantMessage.id, sessionId: input.assistantMessage.sessionId, tool: input.toolName, callId: input.callId }) const created = await this.sync.appendPart(input.assistantMessage, part) await this.updateToolState(created.id, { status: 'completed', input: input.toolInput, output: input.result.output, title: input.result.title, metadata: input.result.metadata, attachments: input.result.attachments, time: { start: Date.now(), end: Date.now() } }) logger.info('Materialized direct tool result into assistant message', { sessionId: input.session.id, runId: input.run.id, toolName: input.toolName, callId: input.callId }) } private async allowToolCall(input: { assistantMessage: StudioAssistantMessage toolName: string toolInput: Record onDoomLoop?: (toolName: string, toolInput: Record) => Promise }): Promise { if (!input.onDoomLoop) { return true } const doomLoop = await isDoomLoop({ assistantMessage: input.assistantMessage, partStore: this.partStore, toolName: input.toolName, toolInput: input.toolInput }) return doomLoop ? input.onDoomLoop(input.toolName, input.toolInput) : true } private async completeToolCall( toolCalls: Map, event: Extract ): Promise { const match = toolCalls.get(event.toolCallId) if (!match) { return } const runningState = await this.partStore.getById(match.id) await this.updateToolState(match.id, { status: 'completed', input: getToolInput(runningState), output: event.output, title: event.title ?? `Completed ${match.tool}`, metadata: mergeToolMetadata(runningState, event.metadata), attachments: event.attachments, time: { start: getToolTimeStart(runningState), end: Date.now() } }) toolCalls.delete(event.toolCallId) } private async failToolCall( toolCalls: Map, event: Extract ): Promise { const match = toolCalls.get(event.toolCallId) if (!match) { return false } const runningState = await this.partStore.getById(match.id) await this.updateToolState(match.id, { status: 'error', input: getToolInput(runningState), error: event.error, metadata: mergeToolMetadata(runningState, event.metadata), time: { start: getToolTimeStart(runningState), end: Date.now() } }) toolCalls.delete(event.toolCallId) return event.metadata?.recoverable !== true } private async findToolPart( assistantMessage: StudioAssistantMessage, callId: string ): Promise { const parts = await this.partStore.listByMessageId(assistantMessage.id) const part = [...parts] .reverse() .find((candidate) => candidate.type === 'tool' && candidate.callId === callId) return part?.type === 'tool' ? part : null } private async updateToolState(partId: string, state: StudioToolPart['state']): Promise { const current = await this.partStore.getById(partId) if (!current || current.type !== 'tool') { return } await this.sync.updatePart(partId, { ...current, state, metadata: mergeToolMetadata(current, 'metadata' in state ? state.metadata : undefined) }) } }