Spaces:
Paused
Paused
File size: 2,643 Bytes
0c8b3c0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 | /**
* SSE stream parser for Codex Responses API.
* Pure functions — no side effects or external dependencies.
*/
import type { CodexSSEEvent } from "./codex-types.js";
export function parseSSEBlock(block: string): CodexSSEEvent | null {
let event = "";
const dataLines: string[] = [];
for (const line of block.split("\n")) {
if (line.startsWith("event:")) {
event = line.slice(6).trim();
} else if (line.startsWith("data:")) {
dataLines.push(line.slice(5).trimStart());
}
}
if (!event && dataLines.length === 0) return null;
const raw = dataLines.join("\n");
if (raw === "[DONE]") return null;
let data: unknown;
try {
data = JSON.parse(raw);
} catch {
data = raw;
}
return { event, data };
}
const MAX_SSE_BUFFER = 10 * 1024 * 1024; // 10MB
export async function* parseSSEStream(
response: Response,
): AsyncGenerator<CodexSSEEvent> {
if (!response.body) {
throw new Error("Response body is null — cannot stream");
}
const reader = response.body
.pipeThrough(new TextDecoderStream())
.getReader();
let buffer = "";
let yieldedAny = false;
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += value;
if (buffer.length > MAX_SSE_BUFFER) {
throw new Error(`SSE buffer exceeded ${MAX_SSE_BUFFER} bytes — aborting stream`);
}
const parts = buffer.split("\n\n");
buffer = parts.pop()!;
for (const part of parts) {
if (!part.trim()) continue;
const evt = parseSSEBlock(part);
if (evt) {
yieldedAny = true;
yield evt;
}
}
}
// Process remaining buffer
if (buffer.trim()) {
const evt = parseSSEBlock(buffer);
if (evt) {
yieldedAny = true;
yield evt;
}
}
// Non-SSE response detection
if (!yieldedAny && buffer.trim()) {
let errorMessage = buffer.trim();
try {
const parsed = JSON.parse(errorMessage) as Record<string, unknown>;
const errObj = typeof parsed.error === "object" && parsed.error !== null
? (parsed.error as Record<string, unknown>)
: undefined;
errorMessage =
(typeof parsed.detail === "string" ? parsed.detail : null)
?? (typeof errObj?.message === "string" ? errObj.message : null)
?? errorMessage;
} catch { /* use raw text */ }
yield {
event: "error",
data: { error: { type: "error", code: "non_sse_response", message: errorMessage } },
};
}
} finally {
reader.releaseLock();
}
}
|