File size: 4,845 Bytes
30cc31a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
/**
 * Background task worker — survives tab close.
 *
 * Agent execution is decoupled from the HTTP request lifecycle.  `startRun`
 * spawns the agent call in a detached Promise that writes StreamEvents into
 * event-log.ts.  The POST and GET handlers in the stream route simply read
 * from the event log, so client disconnects never kill the run.
 *
 * A Map<taskId, RunHandle> tracks active runs.  Each handle carries an
 * AbortController so the run can be cancelled on demand.
 */

import { appendEvent, markDone, clearLog } from "./event-log";
import type { StreamEvent } from "@/lib/types";

export interface RunHandle {
  /** Resolves when the run finishes (success or error). */
  promise: Promise<void>;
  /** Signal abort to cancel the run. */
  abortController: AbortController;
}

const runs = new Map<string, RunHandle>();

export interface RunOptions {
  /** Phylo backend base URL (e.g. http://127.0.0.1:8601). */
  backendUrl: string;
  /** Selected knowledge base IDs (passed through to the backend). */
  kbIds?: string[];
}

/**
 * Start (or re-attach to) a background agent run for `taskId`.
 *
 * If a run is already active for this task the existing handle is returned —
 * no duplicate runs are spawned.
 *
 * The run streams from the Phylo backend SSE endpoint and pipes every event
 * into the event log.  On completion (or error) the log is marked done and
 * the handle is removed from the map.
 */
export function startRun(
  taskId: string,
  messages: Array<{ role: string; content: string }>,
  options: RunOptions,
): RunHandle {
  const existing = runs.get(taskId);
  if (existing) return existing;

  // Reset any stale event log from a previous run so follow-up turns on
  // the same task can stream new output instead of replaying + closing.
  clearLog(taskId);

  const abortController = new AbortController();

  const promise = executeRun(taskId, messages, options, abortController.signal)
    .catch((err) => {
      const msg = err instanceof Error ? err.message : "Unknown worker error";
      appendEvent(taskId, { type: "error", error: msg });
    })
    .finally(() => {
      markDone(taskId);
      runs.delete(taskId);
    });

  const handle: RunHandle = { promise, abortController };
  runs.set(taskId, handle);
  return handle;
}

/** Whether a run is currently active for `taskId`. */
export function isTaskRunning(taskId: string): boolean {
  return runs.has(taskId);
}

/** Signal abort for an active run.  No-op if the task isn't running. */
export function abortRun(taskId: string): void {
  const handle = runs.get(taskId);
  if (handle) {
    handle.abortController.abort();
  }
}

// ---------------------------------------------------------------------------
// Internal: the actual SSE fetch → event-log pipeline
// ---------------------------------------------------------------------------

async function executeRun(
  taskId: string,
  messages: Array<{ role: string; content: string }>,
  options: RunOptions,
  signal: AbortSignal,
): Promise<void> {
  const url = `${options.backendUrl}/api/tasks/${encodeURIComponent(taskId)}/messages/stream`;

  const res = await fetch(url, {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    // Backend accepts { message, knowledge_bases } — send the latest user
    // message, not the full array (which would 422).
    body: JSON.stringify({
      message: messages[messages.length - 1]?.content ?? "",
      ...(options.kbIds ? { knowledge_bases: options.kbIds } : {}),
    }),
    signal,
  });

  if (!res.ok) {
    throw new Error(`Backend returned ${res.status}: ${await res.text().catch(() => "")}`);
  }

  if (!res.body) {
    throw new Error("Backend response has no body");
  }

  const reader = res.body.getReader();
  const decoder = new TextDecoder();
  let buffer = "";

  try {
    for (;;) {
      const { done, value } = await reader.read();
      if (done) break;

      buffer += decoder.decode(value, { stream: true });

      // Parse SSE frames: each event is "data: <json>\n\n"
      const lines = buffer.split("\n");
      buffer = lines.pop() ?? "";

      for (const line of lines) {
        const trimmed = line.trim();
        if (!trimmed || trimmed.startsWith(":")) continue; // comment / keep-alive
        if (trimmed.startsWith("data: ")) {
          const json = trimmed.slice(6);
          if (json === "[DONE]") {
            appendEvent(taskId, { type: "done" });
            return;
          }
          try {
            const event = JSON.parse(json) as StreamEvent;
            appendEvent(taskId, event);
          } catch {
            // Malformed JSON — skip
          }
        }
      }
    }

    // Stream ended without a [DONE] sentinel — emit done anyway
    appendEvent(taskId, { type: "done" });
  } finally {
    reader.releaseLock();
  }
}