/** * Background task worker — survives tab close. * * Agent execution is decoupled from the HTTP request lifecycle. `startRun` * spawns the agent call in a detached Promise that writes StreamEvents into * event-log.ts. The POST and GET handlers in the stream route simply read * from the event log, so client disconnects never kill the run. * * A Map tracks active runs. Each handle carries an * AbortController so the run can be cancelled on demand. */ import { appendEvent, markDone, clearLog } from "./event-log"; import type { StreamEvent } from "@/lib/types"; export interface RunHandle { /** Resolves when the run finishes (success or error). */ promise: Promise; /** Signal abort to cancel the run. */ abortController: AbortController; } const runs = new Map(); export interface RunOptions { /** Phylo backend base URL (e.g. http://127.0.0.1:8601). */ backendUrl: string; /** Selected knowledge base IDs (passed through to the backend). */ kbIds?: string[]; } /** * Start (or re-attach to) a background agent run for `taskId`. * * If a run is already active for this task the existing handle is returned — * no duplicate runs are spawned. * * The run streams from the Phylo backend SSE endpoint and pipes every event * into the event log. On completion (or error) the log is marked done and * the handle is removed from the map. */ export function startRun( taskId: string, messages: Array<{ role: string; content: string }>, options: RunOptions, ): RunHandle { const existing = runs.get(taskId); if (existing) return existing; // Reset any stale event log from a previous run so follow-up turns on // the same task can stream new output instead of replaying + closing. clearLog(taskId); const abortController = new AbortController(); const promise = executeRun(taskId, messages, options, abortController.signal) .catch((err) => { const msg = err instanceof Error ? err.message : "Unknown worker error"; appendEvent(taskId, { type: "error", error: msg }); }) .finally(() => { markDone(taskId); runs.delete(taskId); }); const handle: RunHandle = { promise, abortController }; runs.set(taskId, handle); return handle; } /** Whether a run is currently active for `taskId`. */ export function isTaskRunning(taskId: string): boolean { return runs.has(taskId); } /** Signal abort for an active run. No-op if the task isn't running. */ export function abortRun(taskId: string): void { const handle = runs.get(taskId); if (handle) { handle.abortController.abort(); } } // --------------------------------------------------------------------------- // Internal: the actual SSE fetch → event-log pipeline // --------------------------------------------------------------------------- async function executeRun( taskId: string, messages: Array<{ role: string; content: string }>, options: RunOptions, signal: AbortSignal, ): Promise { const url = `${options.backendUrl}/api/tasks/${encodeURIComponent(taskId)}/messages/stream`; const res = await fetch(url, { method: "POST", headers: { "Content-Type": "application/json" }, // Backend accepts { message, knowledge_bases } — send the latest user // message, not the full array (which would 422). body: JSON.stringify({ message: messages[messages.length - 1]?.content ?? "", ...(options.kbIds ? { knowledge_bases: options.kbIds } : {}), }), signal, }); if (!res.ok) { throw new Error(`Backend returned ${res.status}: ${await res.text().catch(() => "")}`); } if (!res.body) { throw new Error("Backend response has no body"); } const reader = res.body.getReader(); const decoder = new TextDecoder(); let buffer = ""; try { for (;;) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); // Parse SSE frames: each event is "data: \n\n" const lines = buffer.split("\n"); buffer = lines.pop() ?? ""; for (const line of lines) { const trimmed = line.trim(); if (!trimmed || trimmed.startsWith(":")) continue; // comment / keep-alive if (trimmed.startsWith("data: ")) { const json = trimmed.slice(6); if (json === "[DONE]") { appendEvent(taskId, { type: "done" }); return; } try { const event = JSON.parse(json) as StreamEvent; appendEvent(taskId, event); } catch { // Malformed JSON — skip } } } } // Stream ended without a [DONE] sentinel — emit done anyway appendEvent(taskId, { type: "done" }); } finally { reader.releaseLock(); } }