proteinea / src /server /worker.ts
Mahmoud Eljendy
feat: Antibody Studio — AI-native antibody design workspace by Proteinea
30cc31a
/**
* Background task worker — survives tab close.
*
* Agent execution is decoupled from the HTTP request lifecycle. `startRun`
* spawns the agent call in a detached Promise that writes StreamEvents into
* event-log.ts. The POST and GET handlers in the stream route simply read
* from the event log, so client disconnects never kill the run.
*
* A Map<taskId, RunHandle> tracks active runs. Each handle carries an
* AbortController so the run can be cancelled on demand.
*/
import { appendEvent, markDone, clearLog } from "./event-log";
import type { StreamEvent } from "@/lib/types";
export interface RunHandle {
/** Resolves when the run finishes (success or error). */
promise: Promise<void>;
/** Signal abort to cancel the run. */
abortController: AbortController;
}
const runs = new Map<string, RunHandle>();
export interface RunOptions {
/** Phylo backend base URL (e.g. http://127.0.0.1:8601). */
backendUrl: string;
/** Selected knowledge base IDs (passed through to the backend). */
kbIds?: string[];
}
/**
* Start (or re-attach to) a background agent run for `taskId`.
*
* If a run is already active for this task the existing handle is returned —
* no duplicate runs are spawned.
*
* The run streams from the Phylo backend SSE endpoint and pipes every event
* into the event log. On completion (or error) the log is marked done and
* the handle is removed from the map.
*/
export function startRun(
taskId: string,
messages: Array<{ role: string; content: string }>,
options: RunOptions,
): RunHandle {
const existing = runs.get(taskId);
if (existing) return existing;
// Reset any stale event log from a previous run so follow-up turns on
// the same task can stream new output instead of replaying + closing.
clearLog(taskId);
const abortController = new AbortController();
const promise = executeRun(taskId, messages, options, abortController.signal)
.catch((err) => {
const msg = err instanceof Error ? err.message : "Unknown worker error";
appendEvent(taskId, { type: "error", error: msg });
})
.finally(() => {
markDone(taskId);
runs.delete(taskId);
});
const handle: RunHandle = { promise, abortController };
runs.set(taskId, handle);
return handle;
}
/** Whether a run is currently active for `taskId`. */
export function isTaskRunning(taskId: string): boolean {
return runs.has(taskId);
}
/** Signal abort for an active run. No-op if the task isn't running. */
export function abortRun(taskId: string): void {
const handle = runs.get(taskId);
if (handle) {
handle.abortController.abort();
}
}
// ---------------------------------------------------------------------------
// Internal: the actual SSE fetch → event-log pipeline
// ---------------------------------------------------------------------------
async function executeRun(
taskId: string,
messages: Array<{ role: string; content: string }>,
options: RunOptions,
signal: AbortSignal,
): Promise<void> {
const url = `${options.backendUrl}/api/tasks/${encodeURIComponent(taskId)}/messages/stream`;
const res = await fetch(url, {
method: "POST",
headers: { "Content-Type": "application/json" },
// Backend accepts { message, knowledge_bases } — send the latest user
// message, not the full array (which would 422).
body: JSON.stringify({
message: messages[messages.length - 1]?.content ?? "",
...(options.kbIds ? { knowledge_bases: options.kbIds } : {}),
}),
signal,
});
if (!res.ok) {
throw new Error(`Backend returned ${res.status}: ${await res.text().catch(() => "")}`);
}
if (!res.body) {
throw new Error("Backend response has no body");
}
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
try {
for (;;) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// Parse SSE frames: each event is "data: <json>\n\n"
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed || trimmed.startsWith(":")) continue; // comment / keep-alive
if (trimmed.startsWith("data: ")) {
const json = trimmed.slice(6);
if (json === "[DONE]") {
appendEvent(taskId, { type: "done" });
return;
}
try {
const event = JSON.parse(json) as StreamEvent;
appendEvent(taskId, event);
} catch {
// Malformed JSON — skip
}
}
}
}
// Stream ended without a [DONE] sentinel — emit done anyway
appendEvent(taskId, { type: "done" });
} finally {
reader.releaseLock();
}
}