beacon / frontend /src /lib /api.js
kiyer's picture
fix: parse CRLF-framed SSE events from sse-starlette
8b26281
Raw
History Blame Contribute Delete
1.6 kB
export function parseSseChunk(buffer) {
// SSE allows CRLF, LF, or CR line endings; sse-starlette emits CRLF.
const parts = buffer.split(/\r?\n\r?\n/);
const rest = parts.pop();
const events = [];
for (const part of parts) {
let event = 'message', data = '';
for (const line of part.split(/\r?\n/)) {
if (line.startsWith('event:')) event = line.slice(6).trim();
else if (line.startsWith('data:')) data += line.slice(5).trim();
}
if (data) events.push({ event, data: JSON.parse(data) });
}
return { events, rest };
}
async function* sseFetch(url, options) {
const res = await fetch(url, options);
if (!res.ok) throw new Error(`${res.status}: ${(await res.text()).slice(0, 300)}`);
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buf = '';
for (;;) {
const { done, value } = await reader.read();
if (done) break;
buf += decoder.decode(value, { stream: true });
const { events, rest } = parseSseChunk(buf);
buf = rest;
yield* events;
}
}
export async function startParse(file) {
const form = new FormData();
form.append('file', file);
const res = await fetch('/api/parse', { method: 'POST', body: form });
if (!res.ok) throw new Error(`parse failed: ${res.status}`);
return (await res.json()).jobId;
}
export function jobEvents(jobId) {
return sseFetch(`/api/jobs/${jobId}/events`);
}
export function annotateStream(body) {
return sseFetch('/api/annotate', {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify(body),
});
}