| import { appendFile, mkdir } from "node:fs/promises"; |
| import path from "node:path"; |
| import { readAcpSessionEntry } from "../acp/runtime/session-meta.js"; |
| import { resolveSessionFilePath, resolveSessionFilePathOptions } from "../config/sessions/paths.js"; |
| import { onAgentEvent } from "../infra/agent-events.js"; |
| import { requestHeartbeatNow } from "../infra/heartbeat-wake.js"; |
| import { enqueueSystemEvent } from "../infra/system-events.js"; |
| import { scopedHeartbeatWakeOptions } from "../routing/session-key.js"; |
|
|
| const DEFAULT_STREAM_FLUSH_MS = 2_500; |
| const DEFAULT_NO_OUTPUT_NOTICE_MS = 60_000; |
| const DEFAULT_NO_OUTPUT_POLL_MS = 15_000; |
| const DEFAULT_MAX_RELAY_LIFETIME_MS = 6 * 60 * 60 * 1000; |
| const STREAM_BUFFER_MAX_CHARS = 4_000; |
| const STREAM_SNIPPET_MAX_CHARS = 220; |
|
|
| function compactWhitespace(value: string): string { |
| return value.replace(/\s+/g, " ").trim(); |
| } |
|
|
| function truncate(value: string, maxChars: number): string { |
| if (value.length <= maxChars) { |
| return value; |
| } |
| if (maxChars <= 1) { |
| return value.slice(0, maxChars); |
| } |
| return `${value.slice(0, maxChars - 1)}…`; |
| } |
|
|
| function toTrimmedString(value: unknown): string | undefined { |
| if (typeof value !== "string") { |
| return undefined; |
| } |
| const trimmed = value.trim(); |
| return trimmed || undefined; |
| } |
|
|
| function toFiniteNumber(value: unknown): number | undefined { |
| return typeof value === "number" && Number.isFinite(value) ? value : undefined; |
| } |
|
|
| function resolveAcpStreamLogPathFromSessionFile(sessionFile: string, sessionId: string): string { |
| const baseDir = path.dirname(path.resolve(sessionFile)); |
| return path.join(baseDir, `${sessionId}.acp-stream.jsonl`); |
| } |
|
|
| export function resolveAcpSpawnStreamLogPath(params: { |
| childSessionKey: string; |
| }): string | undefined { |
| const childSessionKey = params.childSessionKey.trim(); |
| if (!childSessionKey) { |
| return undefined; |
| } |
| const storeEntry = readAcpSessionEntry({ |
| sessionKey: childSessionKey, |
| }); |
| const sessionId = storeEntry?.entry?.sessionId?.trim(); |
| if (!storeEntry || !sessionId) { |
| return undefined; |
| } |
| try { |
| const sessionFile = resolveSessionFilePath( |
| sessionId, |
| storeEntry.entry, |
| resolveSessionFilePathOptions({ |
| storePath: storeEntry.storePath, |
| }), |
| ); |
| return resolveAcpStreamLogPathFromSessionFile(sessionFile, sessionId); |
| } catch { |
| return undefined; |
| } |
| } |
|
|
| export function startAcpSpawnParentStreamRelay(params: { |
| runId: string; |
| parentSessionKey: string; |
| childSessionKey: string; |
| agentId: string; |
| logPath?: string; |
| streamFlushMs?: number; |
| noOutputNoticeMs?: number; |
| noOutputPollMs?: number; |
| maxRelayLifetimeMs?: number; |
| emitStartNotice?: boolean; |
| }): AcpSpawnParentRelayHandle { |
| const runId = params.runId.trim(); |
| const parentSessionKey = params.parentSessionKey.trim(); |
| if (!runId || !parentSessionKey) { |
| return { |
| dispose: () => {}, |
| notifyStarted: () => {}, |
| }; |
| } |
|
|
| const streamFlushMs = |
| typeof params.streamFlushMs === "number" && Number.isFinite(params.streamFlushMs) |
| ? Math.max(0, Math.floor(params.streamFlushMs)) |
| : DEFAULT_STREAM_FLUSH_MS; |
| const noOutputNoticeMs = |
| typeof params.noOutputNoticeMs === "number" && Number.isFinite(params.noOutputNoticeMs) |
| ? Math.max(0, Math.floor(params.noOutputNoticeMs)) |
| : DEFAULT_NO_OUTPUT_NOTICE_MS; |
| const noOutputPollMs = |
| typeof params.noOutputPollMs === "number" && Number.isFinite(params.noOutputPollMs) |
| ? Math.max(250, Math.floor(params.noOutputPollMs)) |
| : DEFAULT_NO_OUTPUT_POLL_MS; |
| const maxRelayLifetimeMs = |
| typeof params.maxRelayLifetimeMs === "number" && Number.isFinite(params.maxRelayLifetimeMs) |
| ? Math.max(1_000, Math.floor(params.maxRelayLifetimeMs)) |
| : DEFAULT_MAX_RELAY_LIFETIME_MS; |
|
|
| const relayLabel = truncate(compactWhitespace(params.agentId), 40) || "ACP child"; |
| const contextPrefix = `acp-spawn:${runId}`; |
| const logPath = toTrimmedString(params.logPath); |
| let logDirReady = false; |
| let pendingLogLines = ""; |
| let logFlushScheduled = false; |
| let logWriteChain: Promise<void> = Promise.resolve(); |
| const flushLogBuffer = () => { |
| if (!logPath || !pendingLogLines) { |
| return; |
| } |
| const chunk = pendingLogLines; |
| pendingLogLines = ""; |
| logWriteChain = logWriteChain |
| .then(async () => { |
| if (!logDirReady) { |
| await mkdir(path.dirname(logPath), { |
| recursive: true, |
| }); |
| logDirReady = true; |
| } |
| await appendFile(logPath, chunk, { |
| encoding: "utf-8", |
| mode: 0o600, |
| }); |
| }) |
| .catch(() => { |
| |
| }); |
| }; |
| const scheduleLogFlush = () => { |
| if (!logPath || logFlushScheduled) { |
| return; |
| } |
| logFlushScheduled = true; |
| queueMicrotask(() => { |
| logFlushScheduled = false; |
| flushLogBuffer(); |
| }); |
| }; |
| const writeLogLine = (entry: Record<string, unknown>) => { |
| if (!logPath) { |
| return; |
| } |
| try { |
| pendingLogLines += `${JSON.stringify(entry)}\n`; |
| if (pendingLogLines.length >= 16_384) { |
| flushLogBuffer(); |
| return; |
| } |
| scheduleLogFlush(); |
| } catch { |
| |
| } |
| }; |
| const logEvent = (kind: string, fields?: Record<string, unknown>) => { |
| writeLogLine({ |
| ts: new Date().toISOString(), |
| epochMs: Date.now(), |
| runId, |
| parentSessionKey, |
| childSessionKey: params.childSessionKey, |
| agentId: params.agentId, |
| kind, |
| ...fields, |
| }); |
| }; |
| const wake = () => { |
| requestHeartbeatNow( |
| scopedHeartbeatWakeOptions(parentSessionKey, { |
| reason: "acp:spawn:stream", |
| }), |
| ); |
| }; |
| const emit = (text: string, contextKey: string) => { |
| const cleaned = text.trim(); |
| if (!cleaned) { |
| return; |
| } |
| logEvent("system_event", { contextKey, text: cleaned }); |
| enqueueSystemEvent(cleaned, { sessionKey: parentSessionKey, contextKey }); |
| wake(); |
| }; |
| const emitStartNotice = () => { |
| emit( |
| `Started ${relayLabel} session ${params.childSessionKey}. Streaming progress updates to parent session.`, |
| `${contextPrefix}:start`, |
| ); |
| }; |
|
|
| let disposed = false; |
| let pendingText = ""; |
| let lastProgressAt = Date.now(); |
| let stallNotified = false; |
| let flushTimer: NodeJS.Timeout | undefined; |
| let relayLifetimeTimer: NodeJS.Timeout | undefined; |
|
|
| const clearFlushTimer = () => { |
| if (!flushTimer) { |
| return; |
| } |
| clearTimeout(flushTimer); |
| flushTimer = undefined; |
| }; |
| const clearRelayLifetimeTimer = () => { |
| if (!relayLifetimeTimer) { |
| return; |
| } |
| clearTimeout(relayLifetimeTimer); |
| relayLifetimeTimer = undefined; |
| }; |
|
|
| const flushPending = () => { |
| clearFlushTimer(); |
| if (!pendingText) { |
| return; |
| } |
| const snippet = truncate(compactWhitespace(pendingText), STREAM_SNIPPET_MAX_CHARS); |
| pendingText = ""; |
| if (!snippet) { |
| return; |
| } |
| emit(`${relayLabel}: ${snippet}`, `${contextPrefix}:progress`); |
| }; |
|
|
| const scheduleFlush = () => { |
| if (disposed || flushTimer || streamFlushMs <= 0) { |
| return; |
| } |
| flushTimer = setTimeout(() => { |
| flushPending(); |
| }, streamFlushMs); |
| flushTimer.unref?.(); |
| }; |
|
|
| const noOutputWatcherTimer = setInterval(() => { |
| if (disposed || noOutputNoticeMs <= 0) { |
| return; |
| } |
| if (stallNotified) { |
| return; |
| } |
| if (Date.now() - lastProgressAt < noOutputNoticeMs) { |
| return; |
| } |
| stallNotified = true; |
| emit( |
| `${relayLabel} has produced no output for ${Math.round(noOutputNoticeMs / 1000)}s. It may be waiting for interactive input.`, |
| `${contextPrefix}:stall`, |
| ); |
| }, noOutputPollMs); |
| noOutputWatcherTimer.unref?.(); |
|
|
| relayLifetimeTimer = setTimeout(() => { |
| if (disposed) { |
| return; |
| } |
| emit( |
| `${relayLabel} stream relay timed out after ${Math.max(1, Math.round(maxRelayLifetimeMs / 1000))}s without completion.`, |
| `${contextPrefix}:timeout`, |
| ); |
| dispose(); |
| }, maxRelayLifetimeMs); |
| relayLifetimeTimer.unref?.(); |
|
|
| if (params.emitStartNotice !== false) { |
| emitStartNotice(); |
| } |
|
|
| const unsubscribe = onAgentEvent((event) => { |
| if (disposed || event.runId !== runId) { |
| return; |
| } |
|
|
| if (event.stream === "assistant") { |
| const data = event.data; |
| const deltaCandidate = |
| (data as { delta?: unknown } | undefined)?.delta ?? |
| (data as { text?: unknown } | undefined)?.text; |
| const delta = typeof deltaCandidate === "string" ? deltaCandidate : undefined; |
| if (!delta || !delta.trim()) { |
| return; |
| } |
| logEvent("assistant_delta", { delta }); |
|
|
| if (stallNotified) { |
| stallNotified = false; |
| emit(`${relayLabel} resumed output.`, `${contextPrefix}:resumed`); |
| } |
|
|
| lastProgressAt = Date.now(); |
| pendingText += delta; |
| if (pendingText.length > STREAM_BUFFER_MAX_CHARS) { |
| pendingText = pendingText.slice(-STREAM_BUFFER_MAX_CHARS); |
| } |
| if (pendingText.length >= STREAM_SNIPPET_MAX_CHARS || delta.includes("\n\n")) { |
| flushPending(); |
| return; |
| } |
| scheduleFlush(); |
| return; |
| } |
|
|
| if (event.stream !== "lifecycle") { |
| return; |
| } |
|
|
| const phase = toTrimmedString((event.data as { phase?: unknown } | undefined)?.phase); |
| logEvent("lifecycle", { phase: phase ?? "unknown", data: event.data }); |
| if (phase === "end") { |
| flushPending(); |
| const startedAt = toFiniteNumber( |
| (event.data as { startedAt?: unknown } | undefined)?.startedAt, |
| ); |
| const endedAt = toFiniteNumber((event.data as { endedAt?: unknown } | undefined)?.endedAt); |
| const durationMs = |
| startedAt != null && endedAt != null && endedAt >= startedAt |
| ? endedAt - startedAt |
| : undefined; |
| if (durationMs != null) { |
| emit( |
| `${relayLabel} run completed in ${Math.max(1, Math.round(durationMs / 1000))}s.`, |
| `${contextPrefix}:done`, |
| ); |
| } else { |
| emit(`${relayLabel} run completed.`, `${contextPrefix}:done`); |
| } |
| dispose(); |
| return; |
| } |
|
|
| if (phase === "error") { |
| flushPending(); |
| const errorText = toTrimmedString((event.data as { error?: unknown } | undefined)?.error); |
| if (errorText) { |
| emit(`${relayLabel} run failed: ${errorText}`, `${contextPrefix}:error`); |
| } else { |
| emit(`${relayLabel} run failed.`, `${contextPrefix}:error`); |
| } |
| dispose(); |
| } |
| }); |
|
|
| const dispose = () => { |
| if (disposed) { |
| return; |
| } |
| disposed = true; |
| clearFlushTimer(); |
| clearRelayLifetimeTimer(); |
| flushLogBuffer(); |
| clearInterval(noOutputWatcherTimer); |
| unsubscribe(); |
| }; |
|
|
| return { |
| dispose, |
| notifyStarted: emitStartNotice, |
| }; |
| } |
|
|
| export type AcpSpawnParentRelayHandle = { |
| dispose: () => void; |
| notifyStarted: () => void; |
| }; |
|
|