File size: 4,845 Bytes
30cc31a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 | /**
* Background task worker — survives tab close.
*
* Agent execution is decoupled from the HTTP request lifecycle. `startRun`
* spawns the agent call in a detached Promise that writes StreamEvents into
* event-log.ts. The POST and GET handlers in the stream route simply read
* from the event log, so client disconnects never kill the run.
*
* A Map<taskId, RunHandle> tracks active runs. Each handle carries an
* AbortController so the run can be cancelled on demand.
*/
import { appendEvent, markDone, clearLog } from "./event-log";
import type { StreamEvent } from "@/lib/types";
export interface RunHandle {
/** Resolves when the run finishes (success or error). */
promise: Promise<void>;
/** Signal abort to cancel the run. */
abortController: AbortController;
}
const runs = new Map<string, RunHandle>();
export interface RunOptions {
/** Phylo backend base URL (e.g. http://127.0.0.1:8601). */
backendUrl: string;
/** Selected knowledge base IDs (passed through to the backend). */
kbIds?: string[];
}
/**
* Start (or re-attach to) a background agent run for `taskId`.
*
* If a run is already active for this task the existing handle is returned —
* no duplicate runs are spawned.
*
* The run streams from the Phylo backend SSE endpoint and pipes every event
* into the event log. On completion (or error) the log is marked done and
* the handle is removed from the map.
*/
export function startRun(
taskId: string,
messages: Array<{ role: string; content: string }>,
options: RunOptions,
): RunHandle {
const existing = runs.get(taskId);
if (existing) return existing;
// Reset any stale event log from a previous run so follow-up turns on
// the same task can stream new output instead of replaying + closing.
clearLog(taskId);
const abortController = new AbortController();
const promise = executeRun(taskId, messages, options, abortController.signal)
.catch((err) => {
const msg = err instanceof Error ? err.message : "Unknown worker error";
appendEvent(taskId, { type: "error", error: msg });
})
.finally(() => {
markDone(taskId);
runs.delete(taskId);
});
const handle: RunHandle = { promise, abortController };
runs.set(taskId, handle);
return handle;
}
/** Whether a run is currently active for `taskId`. */
export function isTaskRunning(taskId: string): boolean {
return runs.has(taskId);
}
/** Signal abort for an active run. No-op if the task isn't running. */
export function abortRun(taskId: string): void {
const handle = runs.get(taskId);
if (handle) {
handle.abortController.abort();
}
}
// ---------------------------------------------------------------------------
// Internal: the actual SSE fetch → event-log pipeline
// ---------------------------------------------------------------------------
async function executeRun(
taskId: string,
messages: Array<{ role: string; content: string }>,
options: RunOptions,
signal: AbortSignal,
): Promise<void> {
const url = `${options.backendUrl}/api/tasks/${encodeURIComponent(taskId)}/messages/stream`;
const res = await fetch(url, {
method: "POST",
headers: { "Content-Type": "application/json" },
// Backend accepts { message, knowledge_bases } — send the latest user
// message, not the full array (which would 422).
body: JSON.stringify({
message: messages[messages.length - 1]?.content ?? "",
...(options.kbIds ? { knowledge_bases: options.kbIds } : {}),
}),
signal,
});
if (!res.ok) {
throw new Error(`Backend returned ${res.status}: ${await res.text().catch(() => "")}`);
}
if (!res.body) {
throw new Error("Backend response has no body");
}
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
try {
for (;;) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// Parse SSE frames: each event is "data: <json>\n\n"
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed || trimmed.startsWith(":")) continue; // comment / keep-alive
if (trimmed.startsWith("data: ")) {
const json = trimmed.slice(6);
if (json === "[DONE]") {
appendEvent(taskId, { type: "done" });
return;
}
try {
const event = JSON.parse(json) as StreamEvent;
appendEvent(taskId, event);
} catch {
// Malformed JSON — skip
}
}
}
}
// Stream ended without a [DONE] sentinel — emit done anyway
appendEvent(taskId, { type: "done" });
} finally {
reader.releaseLock();
}
}
|