W / test /client-panel-retry.test.js
Ac66's picture
Upload folder using huggingface_hub
2b64d42 verified
import { describe, it } from 'node:test';
import assert from 'node:assert/strict';
import http2 from 'http2';
import { endOfStreamEnvelope, unwrapRequest, wrapEnvelope } from '../src/connect.js';
import { getField, parseFields, writeMessageField, writeStringField, writeVarintField } from '../src/proto.js';
function grpcFrame(payload) {
const buf = Buffer.isBuffer(payload) ? payload : Buffer.from(payload);
const frame = Buffer.alloc(5 + buf.length);
frame[0] = 0;
frame.writeUInt32BE(buf.length, 1);
buf.copy(frame, 5);
return frame;
}
function stripGrpcFrame(buf) {
if (buf.length >= 5 && buf[0] === 0) {
const msgLen = buf.readUInt32BE(1);
if (buf.length >= 5 + msgLen) return buf.subarray(5, 5 + msgLen);
}
return buf;
}
function extractGrpcFrames(buf) {
const frames = [];
let offset = 0;
while (offset + 5 <= buf.length) {
const compressed = buf[offset];
const msgLen = buf.readUInt32BE(offset + 1);
if (compressed !== 0 || offset + 5 + msgLen > buf.length) break;
frames.push(buf.subarray(offset + 5, offset + 5 + msgLen));
offset += 5 + msgLen;
}
return frames;
}
function responseBody(payload, headers) {
const contentType = String(headers['content-type'] || '');
if (contentType.includes('application/connect+proto')) {
return Buffer.concat([wrapEnvelope(payload, { compress: false }), endOfStreamEnvelope()]);
}
return grpcFrame(payload);
}
function errorBody(message, headers) {
const contentType = String(headers['content-type'] || '');
if (contentType.includes('application/connect+proto')) {
const body = Buffer.from(JSON.stringify({ error: { message } }));
const frame = Buffer.alloc(5 + body.length);
frame[0] = 0x02;
frame.writeUInt32BE(body.length, 1);
body.copy(frame, 5);
return { body: frame, trailers: null };
}
return { body: Buffer.alloc(0), trailers: { 'grpc-status': '5', 'grpc-message': encodeURIComponent(message) } };
}
function requestPayload(body, headers) {
const contentType = String(headers['content-type'] || '');
if (contentType.includes('application/connect+proto')) {
return unwrapRequest(body, headers);
}
const frames = extractGrpcFrames(body);
return frames.length ? Buffer.concat(frames) : stripGrpcFrame(body);
}
function readStepOffset(payload) {
const fields = parseFields(payload);
const field = getField(fields, 2, 0);
return field ? Number(field.value) : 0;
}
function startCascadeResponse(cascadeId) {
return writeStringField(1, cascadeId);
}
function trajectoryStatusResponse(status) {
return writeVarintField(2, status);
}
function trajectoryStepsResponse(text) {
if (!text) return Buffer.alloc(0);
const planner = writeStringField(1, text);
const step = Buffer.concat([
writeVarintField(1, 15),
writeVarintField(4, 3),
writeMessageField(20, planner),
]);
return writeMessageField(1, step);
}
async function withFakeLanguageServer(handler, fn) {
const server = http2.createServer();
server.on('stream', handler);
await new Promise(resolve => server.listen(0, '127.0.0.1', resolve));
const port = server.address().port;
try {
return await fn(port);
} finally {
await new Promise(resolve => server.close(resolve));
}
}
describe('WindsurfClient cascade panel retry', () => {
it('resets trajectory offsets after re-warming to a fresh cascade', async () => {
process.env.CASCADE_POLL_INTERVAL_MS = '10';
process.env.CASCADE_IDLE_GRACE_MS = '1';
process.env.CASCADE_MAX_WAIT_MS = '500';
process.env.CASCADE_COLD_STALL_BASE_MS = '500';
process.env.CASCADE_WARM_STALL_MS = '500';
process.env.GRPC_PROTOCOL = 'connect';
const observedStepOffsets = [];
let sendCount = 0;
await withFakeLanguageServer((stream, headers) => {
const chunks = [];
stream.on('data', chunk => chunks.push(chunk));
stream.on('end', () => {
const path = String(headers[':path'] || '');
const payload = requestPayload(Buffer.concat(chunks), headers);
const method = path.split('/').pop();
if (method === 'StartCascade') {
stream.respond({ ':status': 200, 'content-type': headers['content-type'] || 'application/grpc' });
stream.end(responseBody(startCascadeResponse('fresh-cascade'), headers));
return;
}
if (method === 'SendUserCascadeMessage') {
sendCount++;
if (sendCount === 1) {
const err = errorBody('panel state not found', headers);
stream.respond({ ':status': 200, 'content-type': headers['content-type'] || 'application/grpc' });
if (err.trailers) stream.additionalHeaders(err.trailers);
stream.end(err.body);
return;
}
stream.respond({ ':status': 200, 'content-type': headers['content-type'] || 'application/grpc' });
stream.end(responseBody(Buffer.alloc(0), headers));
return;
}
if (method === 'GetCascadeTrajectorySteps') {
const offset = readStepOffset(payload);
observedStepOffsets.push(offset);
stream.respond({ ':status': 200, 'content-type': headers['content-type'] || 'application/grpc' });
stream.end(responseBody(trajectoryStepsResponse(offset === 0 ? 'fresh-output' : ''), headers));
return;
}
if (method === 'GetCascadeTrajectory') {
stream.respond({ ':status': 200, 'content-type': headers['content-type'] || 'application/grpc' });
stream.end(responseBody(trajectoryStatusResponse(1), headers));
return;
}
if (method === 'GetCascadeTrajectoryGeneratorMetadata') {
stream.respond({ ':status': 200, 'content-type': headers['content-type'] || 'application/grpc' });
stream.end(responseBody(Buffer.alloc(0), headers));
return;
}
stream.respond({ ':status': 404 });
stream.end();
});
}, async (port) => {
const { WindsurfClient } = await import('../src/client.js');
const client = new WindsurfClient('test-api-key', port, 'csrf-token');
const chunks = await client.cascadeChat([
{ role: 'user', content: 'old' },
{ role: 'assistant', content: 'ok' },
{ role: 'user', content: 'continue' },
], 0, 'claude-sonnet-4-6', {
reuseEntry: {
cascadeId: 'expired-cascade',
sessionId: 'expired-session',
lsPort: port,
apiKey: 'test-api-key',
stepOffset: 5,
generatorOffset: 5,
},
});
const text = chunks.map(c => c.text || '').join('');
assert.equal(sendCount, 2);
assert.ok(observedStepOffsets.length > 0);
assert.equal(observedStepOffsets[0], 0);
assert.match(text, /fresh-output/);
});
});
it('v2.0.25 HIGH-2: cascade not_found triggers fresh fallback and marks reuse entry invalidated', async () => {
process.env.CASCADE_POLL_INTERVAL_MS = '10';
process.env.CASCADE_IDLE_GRACE_MS = '1';
process.env.CASCADE_MAX_WAIT_MS = '500';
process.env.CASCADE_COLD_STALL_BASE_MS = '500';
process.env.CASCADE_WARM_STALL_MS = '500';
process.env.GRPC_PROTOCOL = 'connect';
let startCount = 0;
let sendCount = 0;
let observedFreshCascadeId = null;
await withFakeLanguageServer((stream, headers) => {
const chunks = [];
stream.on('data', chunk => chunks.push(chunk));
stream.on('end', () => {
const path = String(headers[':path'] || '');
const payload = requestPayload(Buffer.concat(chunks), headers);
const method = path.split('/').pop();
if (method === 'StartCascade') {
startCount++;
observedFreshCascadeId = 'fresh-' + startCount;
stream.respond({ ':status': 200, 'content-type': headers['content-type'] || 'application/grpc' });
stream.end(responseBody(startCascadeResponse(observedFreshCascadeId), headers));
return;
}
if (method === 'SendUserCascadeMessage') {
sendCount++;
if (sendCount === 1) {
// Simulate the upstream telling us the cascade we tried to
// resume is gone — different message text from "panel state
// not found" so we can prove isExpiredCascade triggers the
// same recovery and not isPanelMissing.
const err = errorBody('not_found: cascade trajectory has been expired by ttl', headers);
stream.respond({ ':status': 200, 'content-type': headers['content-type'] || 'application/grpc' });
if (err.trailers) stream.additionalHeaders(err.trailers);
stream.end(err.body);
return;
}
stream.respond({ ':status': 200, 'content-type': headers['content-type'] || 'application/grpc' });
stream.end(responseBody(Buffer.alloc(0), headers));
return;
}
if (method === 'GetCascadeTrajectorySteps') {
const offset = readStepOffset(payload);
stream.respond({ ':status': 200, 'content-type': headers['content-type'] || 'application/grpc' });
stream.end(responseBody(trajectoryStepsResponse(offset === 0 ? 'recovered-output' : ''), headers));
return;
}
if (method === 'GetCascadeTrajectory') {
stream.respond({ ':status': 200, 'content-type': headers['content-type'] || 'application/grpc' });
stream.end(responseBody(trajectoryStatusResponse(1), headers));
return;
}
if (method === 'GetCascadeTrajectoryGeneratorMetadata') {
stream.respond({ ':status': 200, 'content-type': headers['content-type'] || 'application/grpc' });
stream.end(responseBody(Buffer.alloc(0), headers));
return;
}
stream.respond({ ':status': 404 });
stream.end();
});
}, async (port) => {
const { WindsurfClient } = await import('../src/client.js');
const client = new WindsurfClient('test-api-key', port, 'csrf-token');
const chunks = await client.cascadeChat([
{ role: 'user', content: 'turn1' },
{ role: 'assistant', content: 'reply1' },
{ role: 'user', content: 'turn2' },
], 0, 'claude-sonnet-4-6', {
reuseEntry: {
cascadeId: 'long-dead-cascade',
sessionId: 'long-dead-session',
lsPort: port,
apiKey: 'test-api-key',
stepOffset: 5,
generatorOffset: 5,
},
});
// Recovery path actually fired: client called StartCascade after the
// expired-send and got a fresh cascadeId on the trailing send.
assert.equal(sendCount, 2, 'should retry send once after recovery');
assert.equal(startCount, 1, 'recovery path should issue exactly one fresh StartCascade');
assert.equal(chunks.cascadeId, observedFreshCascadeId, 'final cascadeId must be the fresh one, not the dead reuseEntry.cascadeId');
assert.notEqual(chunks.cascadeId, 'long-dead-cascade');
assert.equal(chunks.reuseEntryInvalidated, true, 'should signal the caller to skip restoring the dead entry');
assert.match(chunks.map(c => c.text || '').join(''), /recovered-output/);
});
});
});