File size: 2,643 Bytes
0c8b3c0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
/**
 * SSE stream parser for Codex Responses API.
 * Pure functions — no side effects or external dependencies.
 */

import type { CodexSSEEvent } from "./codex-types.js";

export function parseSSEBlock(block: string): CodexSSEEvent | null {
  let event = "";
  const dataLines: string[] = [];

  for (const line of block.split("\n")) {
    if (line.startsWith("event:")) {
      event = line.slice(6).trim();
    } else if (line.startsWith("data:")) {
      dataLines.push(line.slice(5).trimStart());
    }
  }

  if (!event && dataLines.length === 0) return null;

  const raw = dataLines.join("\n");
  if (raw === "[DONE]") return null;

  let data: unknown;
  try {
    data = JSON.parse(raw);
  } catch {
    data = raw;
  }

  return { event, data };
}

const MAX_SSE_BUFFER = 10 * 1024 * 1024; // 10MB

export async function* parseSSEStream(
  response: Response,
): AsyncGenerator<CodexSSEEvent> {
  if (!response.body) {
    throw new Error("Response body is null — cannot stream");
  }

  const reader = response.body
    .pipeThrough(new TextDecoderStream())
    .getReader();

  let buffer = "";
  let yieldedAny = false;
  try {
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;

      buffer += value;
      if (buffer.length > MAX_SSE_BUFFER) {
        throw new Error(`SSE buffer exceeded ${MAX_SSE_BUFFER} bytes — aborting stream`);
      }
      const parts = buffer.split("\n\n");
      buffer = parts.pop()!;

      for (const part of parts) {
        if (!part.trim()) continue;
        const evt = parseSSEBlock(part);
        if (evt) {
          yieldedAny = true;
          yield evt;
        }
      }
    }

    // Process remaining buffer
    if (buffer.trim()) {
      const evt = parseSSEBlock(buffer);
      if (evt) {
        yieldedAny = true;
        yield evt;
      }
    }

    // Non-SSE response detection
    if (!yieldedAny && buffer.trim()) {
      let errorMessage = buffer.trim();
      try {
        const parsed = JSON.parse(errorMessage) as Record<string, unknown>;
        const errObj = typeof parsed.error === "object" && parsed.error !== null
          ? (parsed.error as Record<string, unknown>)
          : undefined;
        errorMessage =
          (typeof parsed.detail === "string" ? parsed.detail : null)
          ?? (typeof errObj?.message === "string" ? errObj.message : null)
          ?? errorMessage;
      } catch { /* use raw text */ }
      yield {
        event: "error",
        data: { error: { type: "error", code: "non_sse_response", message: errorMessage } },
      };
    }
  } finally {
    reader.releaseLock();
  }
}