d2l-ui / lib /api.ts
Berkkirik's picture
fix: streaming race β€” keep Thinking until first token, fall back to JSON when backend doesn't honour stream=true
a686c43
/**
* Browser-side API client.
* All calls go through /api/proxy/* β€” token added server-side.
*/
import type {
AskRequest,
AskResponse,
AskSmartRequest,
AskSmartResponse,
AskSmartStreamEvent,
CreateDocumentRequest,
DocumentContentResponse,
DocumentMeta,
DocumentsListResponse,
HealthResponse,
PingResponse,
ReindexResponse,
} from "./types";
const PROXY = "/api/proxy";
class ApiError extends Error {
status: number;
body: unknown;
constructor(status: number, message: string, body?: unknown) {
super(message);
this.status = status;
this.body = body;
}
}
async function request<T>(path: string, init?: RequestInit): Promise<T> {
const url = path ? `${PROXY}/${path}` : PROXY;
const res = await fetch(url, {
...init,
headers: {
"Content-Type": "application/json",
...(init?.headers || {}),
},
});
const text = await res.text();
let data: unknown;
try {
data = text ? JSON.parse(text) : null;
} catch {
data = text;
}
if (!res.ok) {
const msg =
typeof data === "object" && data && "detail" in data
? String((data as { detail: unknown }).detail)
: `HTTP ${res.status}`;
throw new ApiError(res.status, msg, data);
}
return data as T;
}
export { ApiError };
// ─── Health & Ops ─────────────────────────────────────────────────
export const api = {
health: () => request<HealthResponse>(""),
ping: () => request<PingResponse>("ping"),
reindex: (force_full = false, rebuild_anchors = false) =>
request<ReindexResponse>("reindex", {
method: "POST",
body: JSON.stringify({ force_full, rebuild_anchors }),
}),
// ─── Documents ───────────────────────────────────────────────────
listDocuments: () => request<DocumentsListResponse>("documents"),
getDocument: (doc_id: string) => request<DocumentMeta>(`documents/${doc_id}`),
getDocumentContent: (doc_id: string) =>
request<DocumentContentResponse>(`documents/${doc_id}/content`),
createDocument: (req: CreateDocumentRequest) =>
request<DocumentMeta>("documents", {
method: "POST",
body: JSON.stringify(req),
}),
deleteDocument: (doc_id: string) =>
request<{ status: string; doc_id: string }>(`documents/${doc_id}`, {
method: "DELETE",
}),
// ─── Inference ───────────────────────────────────────────────────
askSmart: (req: AskSmartRequest, signal?: AbortSignal) =>
request<AskSmartResponse>("ask_smart", {
method: "POST",
body: JSON.stringify(req),
signal,
}),
/**
* Doc-scoped inference (retrieval bypass) β€” used when the user pins a
* conversation to a single doc.
*/
ask: (req: AskRequest, signal?: AbortSignal) =>
request<AskResponse>("ask", {
method: "POST",
body: JSON.stringify(req),
signal,
}),
/**
* Streaming variant of /ask_smart. Emits SSE events; caller receives a
* sequence of typed events via the `onEvent` callback. Resolves when the
* stream closes (after `done`/`rejected`/`error`).
*/
askSmartStream: async (
req: AskSmartRequest,
onEvent: (e: AskSmartStreamEvent) => void,
signal?: AbortSignal
): Promise<void> => {
const res = await fetch(`${PROXY}/ask_smart`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Accept: "text/event-stream",
},
body: JSON.stringify({ ...req, stream: true }),
signal,
});
if (!res.ok || !res.body) {
const text = await res.text().catch(() => "");
throw new ApiError(res.status, `stream HTTP ${res.status}`, text);
}
// Fallback: backend didn't honour stream=true (old deploy / proxy buffering).
// Parse the body as JSON and emit a single equivalent event.
const ct = res.headers.get("content-type") || "";
if (!ct.includes("text/event-stream")) {
const data = await res.json().catch(() => null);
if (!data) {
throw new ApiError(res.status, "non-SSE response could not be parsed");
}
if (data._grounding_status === "rejected_low_similarity") {
onEvent({ event: "rejected", data });
} else {
onEvent({ event: "done", data });
}
return;
}
const reader = res.body.getReader();
const decoder = new TextDecoder("utf-8");
let buffer = "";
let sawAny = false;
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
let sep: number;
while ((sep = buffer.indexOf("\n\n")) !== -1) {
const frame = buffer.slice(0, sep);
buffer = buffer.slice(sep + 2);
const parsed = parseSSE(frame);
if (parsed) {
sawAny = true;
onEvent(parsed);
}
}
}
if (!sawAny) {
throw new ApiError(0, "stream closed with no events received");
}
},
};
function parseSSE(frame: string): AskSmartStreamEvent | null {
let event = "message";
let dataLines: string[] = [];
for (const line of frame.split("\n")) {
if (line.startsWith("event:")) event = line.slice(6).trim();
else if (line.startsWith("data:")) dataLines.push(line.slice(5).trim());
}
if (dataLines.length === 0) return null;
try {
const data = JSON.parse(dataLines.join("\n"));
return { event, data } as AskSmartStreamEvent;
} catch {
return null;
}
}