| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import { appendEvent, markDone, clearLog } from "./event-log"; |
| import type { StreamEvent } from "@/lib/types"; |
|
|
| export interface RunHandle { |
| |
| promise: Promise<void>; |
| |
| abortController: AbortController; |
| } |
|
|
| const runs = new Map<string, RunHandle>(); |
|
|
| export interface RunOptions { |
| |
| backendUrl: string; |
| |
| kbIds?: string[]; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| export function startRun( |
| taskId: string, |
| messages: Array<{ role: string; content: string }>, |
| options: RunOptions, |
| ): RunHandle { |
| const existing = runs.get(taskId); |
| if (existing) return existing; |
|
|
| |
| |
| clearLog(taskId); |
|
|
| const abortController = new AbortController(); |
|
|
| const promise = executeRun(taskId, messages, options, abortController.signal) |
| .catch((err) => { |
| const msg = err instanceof Error ? err.message : "Unknown worker error"; |
| appendEvent(taskId, { type: "error", error: msg }); |
| }) |
| .finally(() => { |
| markDone(taskId); |
| runs.delete(taskId); |
| }); |
|
|
| const handle: RunHandle = { promise, abortController }; |
| runs.set(taskId, handle); |
| return handle; |
| } |
|
|
| |
| export function isTaskRunning(taskId: string): boolean { |
| return runs.has(taskId); |
| } |
|
|
| |
| export function abortRun(taskId: string): void { |
| const handle = runs.get(taskId); |
| if (handle) { |
| handle.abortController.abort(); |
| } |
| } |
|
|
| |
| |
| |
|
|
| async function executeRun( |
| taskId: string, |
| messages: Array<{ role: string; content: string }>, |
| options: RunOptions, |
| signal: AbortSignal, |
| ): Promise<void> { |
| const url = `${options.backendUrl}/api/tasks/${encodeURIComponent(taskId)}/messages/stream`; |
|
|
| const res = await fetch(url, { |
| method: "POST", |
| headers: { "Content-Type": "application/json" }, |
| |
| |
| body: JSON.stringify({ |
| message: messages[messages.length - 1]?.content ?? "", |
| ...(options.kbIds ? { knowledge_bases: options.kbIds } : {}), |
| }), |
| signal, |
| }); |
|
|
| if (!res.ok) { |
| throw new Error(`Backend returned ${res.status}: ${await res.text().catch(() => "")}`); |
| } |
|
|
| if (!res.body) { |
| throw new Error("Backend response has no body"); |
| } |
|
|
| const reader = res.body.getReader(); |
| const decoder = new TextDecoder(); |
| let buffer = ""; |
|
|
| try { |
| for (;;) { |
| const { done, value } = await reader.read(); |
| if (done) break; |
|
|
| buffer += decoder.decode(value, { stream: true }); |
|
|
| |
| const lines = buffer.split("\n"); |
| buffer = lines.pop() ?? ""; |
|
|
| for (const line of lines) { |
| const trimmed = line.trim(); |
| if (!trimmed || trimmed.startsWith(":")) continue; |
| if (trimmed.startsWith("data: ")) { |
| const json = trimmed.slice(6); |
| if (json === "[DONE]") { |
| appendEvent(taskId, { type: "done" }); |
| return; |
| } |
| try { |
| const event = JSON.parse(json) as StreamEvent; |
| appendEvent(taskId, event); |
| } catch { |
| |
| } |
| } |
| } |
| } |
|
|
| |
| appendEvent(taskId, { type: "done" }); |
| } finally { |
| reader.releaseLock(); |
| } |
| } |
|
|