File size: 6,850 Bytes
92c5df7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
/**
 * Tests for CodexApi SSE parsing.
 *
 * parseStream() is the most fragile code path — it processes real-time
 * byte streams from curl where chunks can split at any boundary.
 */

import { describe, it, expect } from "vitest";
import { CodexApi, type CodexSSEEvent } from "../codex-api.js";

/** Create a Response whose body emits the given string chunks sequentially. */
function mockResponse(...chunks: string[]): Response {
  const encoder = new TextEncoder();
  let idx = 0;
  const stream = new ReadableStream<Uint8Array>({
    pull(controller) {
      if (idx < chunks.length) {
        controller.enqueue(encoder.encode(chunks[idx]));
        idx++;
      } else {
        controller.close();
      }
    },
  });
  return new Response(stream);
}

/** Collect all events from parseStream into an array. */
async function collectEvents(api: CodexApi, response: Response): Promise<CodexSSEEvent[]> {
  const events: CodexSSEEvent[] = [];
  for await (const evt of api.parseStream(response)) {
    events.push(evt);
  }
  return events;
}

// CodexApi constructor requires a token — value is irrelevant for parsing tests
function createApi(): CodexApi {
  return new CodexApi("test-token", null);
}

describe("CodexApi.parseStream", () => {
  it("parses a complete SSE event in a single chunk", async () => {
    const api = createApi();
    const response = mockResponse(
      'event: response.output_text.delta\ndata: {"delta":"Hello"}\n\n',
    );

    const events = await collectEvents(api, response);
    expect(events).toHaveLength(1);
    expect(events[0].event).toBe("response.output_text.delta");
    expect(events[0].data).toEqual({ delta: "Hello" });
  });

  it("handles multiple events in a single chunk", async () => {
    const api = createApi();
    const response = mockResponse(
      'event: response.created\ndata: {"response":{"id":"resp_1"}}\n\n' +
      'event: response.output_text.delta\ndata: {"delta":"Hi"}\n\n' +
      'event: response.completed\ndata: {"response":{"id":"resp_1","usage":{"input_tokens":10,"output_tokens":5}}}\n\n',
    );

    const events = await collectEvents(api, response);
    expect(events).toHaveLength(3);
    expect(events[0].event).toBe("response.created");
    expect(events[1].event).toBe("response.output_text.delta");
    expect(events[2].event).toBe("response.completed");
  });

  it("reassembles events split across chunk boundaries", async () => {
    const api = createApi();
    // Split in the middle of the JSON data
    const response = mockResponse(
      'event: response.output_text.delta\ndata: {"del',
      'ta":"world"}\n\n',
    );

    const events = await collectEvents(api, response);
    expect(events).toHaveLength(1);
    expect(events[0].data).toEqual({ delta: "world" });
  });

  it("handles chunk split at \\n\\n boundary", async () => {
    const api = createApi();
    // First chunk ends with first \n, second starts with second \n
    const response = mockResponse(
      'event: response.output_text.delta\ndata: {"delta":"a"}\n',
      '\nevent: response.output_text.delta\ndata: {"delta":"b"}\n\n',
    );

    const events = await collectEvents(api, response);
    expect(events).toHaveLength(2);
    expect(events[0].data).toEqual({ delta: "a" });
    expect(events[1].data).toEqual({ delta: "b" });
  });

  it("handles many small single-character chunks", async () => {
    const api = createApi();
    const full = 'event: response.output_text.delta\ndata: {"delta":"x"}\n\n';
    // Split into individual characters
    const chunks = full.split("");
    const response = mockResponse(...chunks);

    const events = await collectEvents(api, response);
    expect(events).toHaveLength(1);
    expect(events[0].data).toEqual({ delta: "x" });
  });

  it("skips [DONE] marker without crashing", async () => {
    const api = createApi();
    const response = mockResponse(
      'event: response.output_text.delta\ndata: {"delta":"hi"}\n\n' +
      "data: [DONE]\n\n",
    );

    const events = await collectEvents(api, response);
    expect(events).toHaveLength(1);
    expect(events[0].data).toEqual({ delta: "hi" });
  });

  it("returns raw string when data is not valid JSON", async () => {
    const api = createApi();
    const response = mockResponse(
      'event: response.output_text.delta\ndata: not-json-at-all\n\n',
    );

    const events = await collectEvents(api, response);
    expect(events).toHaveLength(1);
    expect(events[0].data).toBe("not-json-at-all");
  });

  it("handles malformed JSON (unclosed brace) gracefully", async () => {
    const api = createApi();
    const response = mockResponse(
      'event: response.output_text.delta\ndata: {"delta":"unclosed\n\n',
    );

    const events = await collectEvents(api, response);
    expect(events).toHaveLength(1);
    // Should not throw — falls through to raw string
    expect(typeof events[0].data).toBe("string");
  });

  it("skips empty blocks between events", async () => {
    const api = createApi();
    const response = mockResponse(
      'event: response.output_text.delta\ndata: {"delta":"a"}\n\n' +
      "\n\n" + // empty block
      'event: response.output_text.delta\ndata: {"delta":"b"}\n\n',
    );

    const events = await collectEvents(api, response);
    expect(events).toHaveLength(2);
  });

  it("processes remaining buffer after stream ends", async () => {
    const api = createApi();
    // No trailing \n\n — the event is only in the residual buffer
    const response = mockResponse(
      'event: response.output_text.delta\ndata: {"delta":"last"}',
    );

    const events = await collectEvents(api, response);
    expect(events).toHaveLength(1);
    expect(events[0].data).toEqual({ delta: "last" });
  });

  it("handles multi-line data fields", async () => {
    const api = createApi();
    const response = mockResponse(
      'event: response.output_text.delta\ndata: {"delta":\n' +
      'data: "multi-line"}\n\n',
    );

    const events = await collectEvents(api, response);
    expect(events).toHaveLength(1);
    // data lines are joined with \n: '{"delta":\n"multi-line"}'
    expect(events[0].data).toEqual({ delta: "multi-line" });
  });

  it("returns null body error", async () => {
    const api = createApi();
    // Create a response with null body
    const response = new Response(null);

    await expect(async () => {
      await collectEvents(api, response);
    }).rejects.toThrow("Response body is null");
  });

  it("throws on buffer overflow (>10MB)", async () => {
    const api = createApi();
    // Create a chunk that exceeds the 10MB SSE buffer limit
    const hugeData = "x".repeat(11 * 1024 * 1024);
    const response = mockResponse(hugeData);

    await expect(async () => {
      await collectEvents(api, response);
    }).rejects.toThrow("SSE buffer exceeded");
  });
});