W
File size: 11,239 Bytes
2b64d42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
import { describe, it } from 'node:test';
import assert from 'node:assert/strict';
import http2 from 'http2';
import { endOfStreamEnvelope, unwrapRequest, wrapEnvelope } from '../src/connect.js';
import { getField, parseFields, writeMessageField, writeStringField, writeVarintField } from '../src/proto.js';

function grpcFrame(payload) {
  const buf = Buffer.isBuffer(payload) ? payload : Buffer.from(payload);
  const frame = Buffer.alloc(5 + buf.length);
  frame[0] = 0;
  frame.writeUInt32BE(buf.length, 1);
  buf.copy(frame, 5);
  return frame;
}

function stripGrpcFrame(buf) {
  if (buf.length >= 5 && buf[0] === 0) {
    const msgLen = buf.readUInt32BE(1);
    if (buf.length >= 5 + msgLen) return buf.subarray(5, 5 + msgLen);
  }
  return buf;
}

function extractGrpcFrames(buf) {
  const frames = [];
  let offset = 0;
  while (offset + 5 <= buf.length) {
    const compressed = buf[offset];
    const msgLen = buf.readUInt32BE(offset + 1);
    if (compressed !== 0 || offset + 5 + msgLen > buf.length) break;
    frames.push(buf.subarray(offset + 5, offset + 5 + msgLen));
    offset += 5 + msgLen;
  }
  return frames;
}

function responseBody(payload, headers) {
  const contentType = String(headers['content-type'] || '');
  if (contentType.includes('application/connect+proto')) {
    return Buffer.concat([wrapEnvelope(payload, { compress: false }), endOfStreamEnvelope()]);
  }
  return grpcFrame(payload);
}

function errorBody(message, headers) {
  const contentType = String(headers['content-type'] || '');
  if (contentType.includes('application/connect+proto')) {
    const body = Buffer.from(JSON.stringify({ error: { message } }));
    const frame = Buffer.alloc(5 + body.length);
    frame[0] = 0x02;
    frame.writeUInt32BE(body.length, 1);
    body.copy(frame, 5);
    return { body: frame, trailers: null };
  }
  return { body: Buffer.alloc(0), trailers: { 'grpc-status': '5', 'grpc-message': encodeURIComponent(message) } };
}

function requestPayload(body, headers) {
  const contentType = String(headers['content-type'] || '');
  if (contentType.includes('application/connect+proto')) {
    return unwrapRequest(body, headers);
  }
  const frames = extractGrpcFrames(body);
  return frames.length ? Buffer.concat(frames) : stripGrpcFrame(body);
}

function readStepOffset(payload) {
  const fields = parseFields(payload);
  const field = getField(fields, 2, 0);
  return field ? Number(field.value) : 0;
}

function startCascadeResponse(cascadeId) {
  return writeStringField(1, cascadeId);
}

function trajectoryStatusResponse(status) {
  return writeVarintField(2, status);
}

function trajectoryStepsResponse(text) {
  if (!text) return Buffer.alloc(0);
  const planner = writeStringField(1, text);
  const step = Buffer.concat([
    writeVarintField(1, 15),
    writeVarintField(4, 3),
    writeMessageField(20, planner),
  ]);
  return writeMessageField(1, step);
}

async function withFakeLanguageServer(handler, fn) {
  const server = http2.createServer();
  server.on('stream', handler);
  await new Promise(resolve => server.listen(0, '127.0.0.1', resolve));
  const port = server.address().port;
  try {
    return await fn(port);
  } finally {
    await new Promise(resolve => server.close(resolve));
  }
}

describe('WindsurfClient cascade panel retry', () => {
  it('resets trajectory offsets after re-warming to a fresh cascade', async () => {
    process.env.CASCADE_POLL_INTERVAL_MS = '10';
    process.env.CASCADE_IDLE_GRACE_MS = '1';
    process.env.CASCADE_MAX_WAIT_MS = '500';
    process.env.CASCADE_COLD_STALL_BASE_MS = '500';
    process.env.CASCADE_WARM_STALL_MS = '500';
    process.env.GRPC_PROTOCOL = 'connect';

    const observedStepOffsets = [];
    let sendCount = 0;

    await withFakeLanguageServer((stream, headers) => {
      const chunks = [];
      stream.on('data', chunk => chunks.push(chunk));
      stream.on('end', () => {
        const path = String(headers[':path'] || '');
        const payload = requestPayload(Buffer.concat(chunks), headers);
        const method = path.split('/').pop();

        if (method === 'StartCascade') {
          stream.respond({ ':status': 200, 'content-type': headers['content-type'] || 'application/grpc' });
          stream.end(responseBody(startCascadeResponse('fresh-cascade'), headers));
          return;
        }

        if (method === 'SendUserCascadeMessage') {
          sendCount++;
          if (sendCount === 1) {
            const err = errorBody('panel state not found', headers);
            stream.respond({ ':status': 200, 'content-type': headers['content-type'] || 'application/grpc' });
            if (err.trailers) stream.additionalHeaders(err.trailers);
            stream.end(err.body);
            return;
          }
          stream.respond({ ':status': 200, 'content-type': headers['content-type'] || 'application/grpc' });
          stream.end(responseBody(Buffer.alloc(0), headers));
          return;
        }

        if (method === 'GetCascadeTrajectorySteps') {
          const offset = readStepOffset(payload);
          observedStepOffsets.push(offset);
          stream.respond({ ':status': 200, 'content-type': headers['content-type'] || 'application/grpc' });
          stream.end(responseBody(trajectoryStepsResponse(offset === 0 ? 'fresh-output' : ''), headers));
          return;
        }

        if (method === 'GetCascadeTrajectory') {
          stream.respond({ ':status': 200, 'content-type': headers['content-type'] || 'application/grpc' });
          stream.end(responseBody(trajectoryStatusResponse(1), headers));
          return;
        }

        if (method === 'GetCascadeTrajectoryGeneratorMetadata') {
          stream.respond({ ':status': 200, 'content-type': headers['content-type'] || 'application/grpc' });
          stream.end(responseBody(Buffer.alloc(0), headers));
          return;
        }

        stream.respond({ ':status': 404 });
        stream.end();
      });
    }, async (port) => {
      const { WindsurfClient } = await import('../src/client.js');
      const client = new WindsurfClient('test-api-key', port, 'csrf-token');
      const chunks = await client.cascadeChat([
        { role: 'user', content: 'old' },
        { role: 'assistant', content: 'ok' },
        { role: 'user', content: 'continue' },
      ], 0, 'claude-sonnet-4-6', {
        reuseEntry: {
          cascadeId: 'expired-cascade',
          sessionId: 'expired-session',
          lsPort: port,
          apiKey: 'test-api-key',
          stepOffset: 5,
          generatorOffset: 5,
        },
      });

      const text = chunks.map(c => c.text || '').join('');
      assert.equal(sendCount, 2);
      assert.ok(observedStepOffsets.length > 0);
      assert.equal(observedStepOffsets[0], 0);
      assert.match(text, /fresh-output/);
    });
  });

  it('v2.0.25 HIGH-2: cascade not_found triggers fresh fallback and marks reuse entry invalidated', async () => {
    process.env.CASCADE_POLL_INTERVAL_MS = '10';
    process.env.CASCADE_IDLE_GRACE_MS = '1';
    process.env.CASCADE_MAX_WAIT_MS = '500';
    process.env.CASCADE_COLD_STALL_BASE_MS = '500';
    process.env.CASCADE_WARM_STALL_MS = '500';
    process.env.GRPC_PROTOCOL = 'connect';

    let startCount = 0;
    let sendCount = 0;
    let observedFreshCascadeId = null;

    await withFakeLanguageServer((stream, headers) => {
      const chunks = [];
      stream.on('data', chunk => chunks.push(chunk));
      stream.on('end', () => {
        const path = String(headers[':path'] || '');
        const payload = requestPayload(Buffer.concat(chunks), headers);
        const method = path.split('/').pop();

        if (method === 'StartCascade') {
          startCount++;
          observedFreshCascadeId = 'fresh-' + startCount;
          stream.respond({ ':status': 200, 'content-type': headers['content-type'] || 'application/grpc' });
          stream.end(responseBody(startCascadeResponse(observedFreshCascadeId), headers));
          return;
        }

        if (method === 'SendUserCascadeMessage') {
          sendCount++;
          if (sendCount === 1) {
            // Simulate the upstream telling us the cascade we tried to
            // resume is gone — different message text from "panel state
            // not found" so we can prove isExpiredCascade triggers the
            // same recovery and not isPanelMissing.
            const err = errorBody('not_found: cascade trajectory has been expired by ttl', headers);
            stream.respond({ ':status': 200, 'content-type': headers['content-type'] || 'application/grpc' });
            if (err.trailers) stream.additionalHeaders(err.trailers);
            stream.end(err.body);
            return;
          }
          stream.respond({ ':status': 200, 'content-type': headers['content-type'] || 'application/grpc' });
          stream.end(responseBody(Buffer.alloc(0), headers));
          return;
        }

        if (method === 'GetCascadeTrajectorySteps') {
          const offset = readStepOffset(payload);
          stream.respond({ ':status': 200, 'content-type': headers['content-type'] || 'application/grpc' });
          stream.end(responseBody(trajectoryStepsResponse(offset === 0 ? 'recovered-output' : ''), headers));
          return;
        }

        if (method === 'GetCascadeTrajectory') {
          stream.respond({ ':status': 200, 'content-type': headers['content-type'] || 'application/grpc' });
          stream.end(responseBody(trajectoryStatusResponse(1), headers));
          return;
        }

        if (method === 'GetCascadeTrajectoryGeneratorMetadata') {
          stream.respond({ ':status': 200, 'content-type': headers['content-type'] || 'application/grpc' });
          stream.end(responseBody(Buffer.alloc(0), headers));
          return;
        }

        stream.respond({ ':status': 404 });
        stream.end();
      });
    }, async (port) => {
      const { WindsurfClient } = await import('../src/client.js');
      const client = new WindsurfClient('test-api-key', port, 'csrf-token');
      const chunks = await client.cascadeChat([
        { role: 'user', content: 'turn1' },
        { role: 'assistant', content: 'reply1' },
        { role: 'user', content: 'turn2' },
      ], 0, 'claude-sonnet-4-6', {
        reuseEntry: {
          cascadeId: 'long-dead-cascade',
          sessionId: 'long-dead-session',
          lsPort: port,
          apiKey: 'test-api-key',
          stepOffset: 5,
          generatorOffset: 5,
        },
      });

      // Recovery path actually fired: client called StartCascade after the
      // expired-send and got a fresh cascadeId on the trailing send.
      assert.equal(sendCount, 2, 'should retry send once after recovery');
      assert.equal(startCount, 1, 'recovery path should issue exactly one fresh StartCascade');
      assert.equal(chunks.cascadeId, observedFreshCascadeId, 'final cascadeId must be the fresh one, not the dead reuseEntry.cascadeId');
      assert.notEqual(chunks.cascadeId, 'long-dead-cascade');
      assert.equal(chunks.reuseEntryInvalidated, true, 'should signal the caller to skip restoring the dead entry');
      assert.match(chunks.map(c => c.text || '').join(''), /recovered-output/);
    });
  });
});