WindsurfAPI / src /handlers /chat.js
github-actions[bot]
Deploy from GitHub: 7495fde758f0be655f95e6331fec2898267f790c
f6266b9
/**
* POST /v1/chat/completions — OpenAI-compatible chat completions.
* Routes to RawGetChatMessage (legacy) or Cascade (premium) based on model type.
*/
import { randomUUID } from 'crypto';
import { WindsurfClient } from '../client.js';
import { getApiKey, acquireAccountByKey, reportError, reportSuccess, markRateLimited, reportInternalError, updateCapability, getAccountList, isAllRateLimited } from '../auth.js';
import { resolveModel, getModelInfo } from '../models.js';
import { getLsFor, ensureLs } from '../langserver.js';
import { config, log } from '../config.js';
import { recordRequest } from '../dashboard/stats.js';
import { isModelAllowed } from '../dashboard/model-access.js';
import { cacheKey, cacheGet, cacheSet } from '../cache.js';
import { isExperimentalEnabled, getIdentityPromptFor } from '../runtime-config.js';
import { checkMessageRateLimit } from '../windsurf-api.js';
import { getEffectiveProxy } from '../dashboard/proxy-config.js';
import {
fingerprintBefore, fingerprintAfter, checkout as poolCheckout, checkin as poolCheckin,
} from '../conversation-pool.js';
import {
normalizeMessagesForCascade, ToolCallStreamParser, parseToolCallsFromText,
buildToolPreambleForProto,
} from './tool-emulation.js';
import { sanitizeText, PathSanitizeStream } from '../sanitize.js';
const HEARTBEAT_MS = 15_000;
const QUEUE_RETRY_MS = 1_000;
const QUEUE_MAX_WAIT_MS = 30_000;
// ── Model identity prompt ──────────────────────────────────
// Templates live in runtime-config (editable from the dashboard). Use {model}
// as a placeholder for the requested model name. Only applied when the
// experimental "modelIdentityPrompt" toggle is ON.
function buildIdentitySystemMessage(displayModel, provider) {
const template = getIdentityPromptFor(provider);
if (!template) return null;
return template.replace(/\{model\}/g, displayModel);
}
function genId() {
return 'chatcmpl-' + randomUUID().replace(/-/g, '').slice(0, 29);
}
// Rough token estimate (~4 chars/token). Used only to populate the
// OpenAI-compatible `usage.prompt_tokens_details.cached_tokens` field so
// upstream billing/dashboards (new-api) can recognise our local cache hits.
function estimateTokens(messages) {
if (!Array.isArray(messages)) return 0;
let chars = 0;
for (const m of messages) {
if (typeof m?.content === 'string') chars += m.content.length;
else if (Array.isArray(m?.content)) {
for (const p of m.content) if (typeof p?.text === 'string') chars += p.text.length;
}
}
return Math.max(1, Math.ceil(chars / 4));
}
function cachedUsage(messages, completionText) {
const prompt = estimateTokens(messages);
const completion = Math.max(1, Math.ceil((completionText || '').length / 4));
return {
prompt_tokens: prompt,
completion_tokens: completion,
total_tokens: prompt + completion,
input_tokens: prompt,
output_tokens: completion,
prompt_tokens_details: { cached_tokens: prompt },
completion_tokens_details: { reasoning_tokens: 0 },
cached: true,
};
}
/**
* Build an OpenAI-shaped `usage` object, preferring server-reported token
* counts from Cascade's CortexStepMetadata.model_usage when available, and
* falling back to the local chars/4 estimate otherwise. Keeps the same shape
* in both branches so downstream billing doesn't have to care which source
* produced the numbers.
*
* The Cascade backend reports usage as {inputTokens, outputTokens,
* cacheReadTokens, cacheWriteTokens}. We map them onto the OpenAI shape:
* prompt_tokens = inputTokens + cacheReadTokens + cacheWriteTokens
* (total input tokens the model processed, whether fresh,
* cache-read, or cache-written — matches the OpenAI
* convention where prompt_tokens is the grand total)
* completion_tokens = outputTokens
* prompt_tokens_details.cached_tokens = cacheReadTokens
* cache_creation_input_tokens (Anthropic ext) = cacheWriteTokens
*/
function buildUsageBody(serverUsage, messages, completionText, thinkingText = '') {
if (serverUsage && (serverUsage.inputTokens || serverUsage.outputTokens)) {
const inputTokens = serverUsage.inputTokens || 0;
const outputTokens = serverUsage.outputTokens || 0;
const cacheRead = serverUsage.cacheReadTokens || 0;
const cacheWrite = serverUsage.cacheWriteTokens || 0;
const promptTotal = inputTokens + cacheRead + cacheWrite;
return {
prompt_tokens: promptTotal,
completion_tokens: outputTokens,
total_tokens: promptTotal + outputTokens,
input_tokens: promptTotal,
output_tokens: outputTokens,
prompt_tokens_details: { cached_tokens: cacheRead },
completion_tokens_details: { reasoning_tokens: 0 },
cache_creation_input_tokens: cacheWrite,
};
}
const prompt = estimateTokens(messages);
const completion = Math.max(1, Math.ceil(((completionText || '').length + (thinkingText || '').length) / 4));
return {
prompt_tokens: prompt,
completion_tokens: completion,
total_tokens: prompt + completion,
input_tokens: prompt,
output_tokens: completion,
prompt_tokens_details: { cached_tokens: 0 },
completion_tokens_details: { reasoning_tokens: 0 },
};
}
// Wait until getApiKey returns a non-null account, or until maxWaitMs expires.
// Used when every account has momentarily exhausted its RPM budget so the
// client is queued instead of getting a 503.
async function waitForAccount(tried, signal, maxWaitMs = QUEUE_MAX_WAIT_MS, modelKey = null) {
const deadline = Date.now() + maxWaitMs;
let acct = getApiKey(tried, modelKey);
while (!acct) {
if (signal?.aborted) return null;
if (Date.now() >= deadline) return null;
await new Promise(r => setTimeout(r, QUEUE_RETRY_MS));
acct = getApiKey(tried, modelKey);
}
return acct;
}
export async function handleChatCompletions(body) {
const {
model: reqModel,
stream = false,
max_tokens,
tools,
tool_choice,
} = body;
// `messages` is `let` not `const` so the identity-prompt injection below
// can prepend a system turn for the legacy path too.
let messages = body.messages;
const modelKey = resolveModel(reqModel || config.defaultModel);
const modelInfo = getModelInfo(modelKey);
const displayModel = modelInfo?.name || reqModel || config.defaultModel;
const modelEnum = modelInfo?.enumValue || 0;
const modelUid = modelInfo?.modelUid || null;
// Models with a modelUid use the Cascade flow (StartCascade → SendUserCascadeMessage).
// Legacy RawGetChatMessage only for models with enumValue>0 and NO modelUid.
// Newer models (gemini-3.0, gpt-5.2, etc.) have both enumValue AND modelUid but
// their high enum values cause "cannot parse invalid wire-format data" in the
// legacy proto endpoint. Cascade handles them correctly via uid string.
const useCascade = !!modelUid;
// Tool-call emulation: if the client passed OpenAI-style tools[], we rewrite
// tool-result turns into synthetic user text and inject the tool protocol
// at the system-prompt level via CascadeConversationalPlannerConfig's
// tool_calling_section (SectionOverrideConfig, OVERRIDE mode). This is far
// more reliable than user-message-level injection because NO_TOOL mode's
// baked-in system prompt tells the model "you have no tools" — which
// overpowers user-message preambles. The section override replaces that
// section directly so the model sees our emulated tool definitions as
// authoritative system instructions.
const hasTools = Array.isArray(tools) && tools.length > 0;
const hasToolHistory = Array.isArray(messages) && messages.some(m => m?.role === 'tool' || (m?.role === 'assistant' && Array.isArray(m.tool_calls) && m.tool_calls.length));
const emulateTools = useCascade && (hasTools || hasToolHistory);
// Build proto-level preamble (goes into tool_calling_section override);
// pass empty tools to normalizeMessagesForCascade so it only rewrites
// role:tool / assistant.tool_calls messages without injecting a user-level
// preamble (that's now handled at the proto layer).
const toolPreamble = emulateTools ? buildToolPreambleForProto(tools || [], tool_choice) : '';
let cascadeMessages = emulateTools
? normalizeMessagesForCascade(messages, [])
: [...messages];
// ── Model identity prompt injection ──
// When enabled, prepend a system message so the model identifies itself as
// the requested model (e.g. "I am Claude Opus 4.6") instead of leaking the
// Cascade/Windsurf backend identity. Inject into BOTH messages (for legacy
// RawGetChatMessage path) and cascadeMessages (Cascade path) — they diverge
// once tool-emulation rewrites the Cascade path, but the system identity
// should be identical in both.
if (isExperimentalEnabled('modelIdentityPrompt') && modelInfo?.provider) {
const identityText = buildIdentitySystemMessage(displayModel, modelInfo.provider);
if (identityText) {
const sysMsg = { role: 'system', content: identityText };
cascadeMessages = [sysMsg, ...cascadeMessages];
messages = [sysMsg, ...messages];
}
}
// Global model access control (allowlist / blocklist from dashboard)
const access = isModelAllowed(modelKey);
if (!access.allowed) {
return { status: 403, body: { error: { message: access.reason, type: 'model_blocked' } } };
}
// Per-account model routing preflight: if NO active account has this
// model in its tier ∩ available list, fail fast instead of looping
// through every account trying to find one. This surfaces tier
// entitlement and blocklist errors as a clean 403 rather than a 30s
// queue timeout → pool_exhausted.
const anyEligible = getAccountList().some(a =>
a.status === 'active' && (a.availableModels || []).includes(modelKey)
);
if (!anyEligible) {
return {
status: 403,
body: {
error: {
message: `模型 ${displayModel} 在当前账号池中不可用(未订阅或已被封禁)`,
type: 'model_not_entitled',
},
},
};
}
const chatId = genId();
const created = Math.floor(Date.now() / 1000);
const ckey = cacheKey(body);
if (stream) {
return streamResponse(chatId, created, displayModel, modelKey, messages, cascadeMessages, modelEnum, modelUid, useCascade, ckey, emulateTools, toolPreamble);
}
// ── Local response cache (exact body match) ─────────────
const cached = cacheGet(ckey);
if (cached) {
log.info(`Chat: cache HIT model=${displayModel} flow=non-stream`);
recordRequest(displayModel, true, 0, null);
const message = { role: 'assistant', content: cached.text || null };
if (cached.thinking) message.reasoning_content = cached.thinking;
return {
status: 200,
body: {
id: chatId, object: 'chat.completion', created, model: displayModel,
choices: [{ index: 0, message, finish_reason: 'stop' }],
usage: cachedUsage(messages, cached.text),
},
};
}
// ── Cascade conversation pool (experimental) ──
// If the client is continuing a prior conversation and we still hold the
// cascade_id from last turn, pin this request to that exact (account, LS)
// pair so the Windsurf backend serves from its hot per-cascade context
// instead of replaying the whole history.
//
// Tool-emulation mode bypasses the reuse pool: fingerprint can't stably
// collapse a conversation whose assistant turns contain synthesised
// <tool_call> markup and whose user turns contain <tool_result> wrappers.
const reuseEnabled = useCascade && !emulateTools && isExperimentalEnabled('cascadeConversationReuse');
const fpBefore = reuseEnabled ? fingerprintBefore(messages) : null;
let reuseEntry = reuseEnabled ? poolCheckout(fpBefore) : null;
if (reuseEntry) log.info(`Chat: cascade reuse HIT cascadeId=${reuseEntry.cascadeId.slice(0, 8)}… model=${displayModel}`);
// Non-stream: retry with a different account on model-not-available errors
const tried = [];
let lastErr = null;
// Dynamic: try every active account in the pool (capped at 10) so a
// large pool with many rate-limited accounts can still fall through
// to a free one. Was hardcoded 3 — in pools bigger than 3 with the
// first accounts rate-limited, healthy accounts were never reached
// even though they would have worked (issue #5).
const maxAttempts = Math.min(10, Math.max(3, getAccountList().filter(a => a.status === 'active').length));
for (let attempt = 0; attempt < maxAttempts; attempt++) {
let acct = null;
if (reuseEntry && attempt === 0) {
// First attempt pins to the account that owns the cached cascade.
acct = acquireAccountByKey(reuseEntry.apiKey, modelKey);
if (!acct) {
log.info('Chat: cascade reuse skipped — owning account not available, falling back to fresh cascade');
reuseEntry = null;
}
}
if (!acct) {
acct = await waitForAccount(tried, null, QUEUE_MAX_WAIT_MS, modelKey);
if (!acct) break;
}
tried.push(acct.apiKey);
// Pre-flight rate limit check (experimental): ask server.codeium.com if
// this account still has message capacity before burning an LS round trip.
if (isExperimentalEnabled('preflightRateLimit')) {
try {
const px = getEffectiveProxy(acct.id) || null;
const rl = await checkMessageRateLimit(acct.apiKey, px);
if (!rl.hasCapacity) {
log.warn(`Preflight: ${acct.email} has no capacity (remaining=${rl.messagesRemaining}), skipping`);
markRateLimited(acct.id, modelKey);
continue;
}
} catch (e) {
log.debug(`Preflight check failed for ${acct.email}: ${e.message}`);
// Fail open — proceed with the request
}
}
await ensureLs(acct.proxy);
const ls = getLsFor(acct.proxy);
if (!ls) { lastErr = { status: 503, body: { error: { message: 'No LS instance available', type: 'ls_unavailable' } } }; break; }
// Cascade pins cascade_id to a specific LS port too; if the LS it was
// born on has been replaced, the cascade_id is dead.
if (reuseEntry && reuseEntry.lsPort !== ls.port) {
log.info('Chat: cascade reuse skipped — LS port changed');
reuseEntry = null;
}
const _msgChars = (messages || []).reduce((n, m) => {
const c = m?.content;
return n + (typeof c === 'string' ? c.length : Array.isArray(c) ? c.reduce((k, p) => k + (typeof p?.text === 'string' ? p.text.length : 0), 0) : 0);
}, 0);
log.info(`Chat: model=${displayModel} flow=${useCascade ? 'cascade' : 'legacy'} attempt=${attempt + 1} account=${acct.email} ls=${ls.port} turns=${(messages||[]).length} chars=${_msgChars}${reuseEntry ? ' reuse=1' : ''}${emulateTools ? ' tools=emu' : ''}`);
const client = new WindsurfClient(acct.apiKey, ls.port, ls.csrfToken);
const result = await nonStreamResponse(
client, chatId, created, displayModel, modelKey, messages, cascadeMessages, modelEnum, modelUid,
useCascade, acct.apiKey, ckey,
reuseEnabled ? { reuseEntry, lsPort: ls.port, apiKey: acct.apiKey } : null,
emulateTools, toolPreamble,
);
if (result.status === 200) return result;
reuseEntry = null; // don't try to reuse on the retry
lastErr = result;
const errType = result.body?.error?.type;
// Rate limit: this account is done for this model, try the next one
if (errType === 'rate_limit_exceeded') {
log.warn(`Account ${acct.email} rate-limited on ${displayModel}, trying next account`);
continue;
}
// Model not available on this account (permission_denied, etc.)
if (errType === 'model_not_available') {
log.warn(`Account ${acct.email} cannot serve ${displayModel}, trying next account`);
continue;
}
break; // other errors (502, transport) — don't retry
}
// If all accounts exhausted, check if it's because they're all rate-limited
if (!lastErr || lastErr.status === 429) {
const rl = isAllRateLimited(modelKey);
if (rl.allLimited) {
return { status: 429, body: { error: { message: `${displayModel} 所有账号均已达速率限制,请 ${Math.ceil(rl.retryAfterMs / 1000)} 秒后重试`, type: 'rate_limit_exceeded', retry_after_ms: rl.retryAfterMs } } };
}
}
return lastErr || { status: 503, body: { error: { message: 'No active accounts available', type: 'pool_exhausted' } } };
}
async function nonStreamResponse(client, id, created, model, modelKey, messages, cascadeMessages, modelEnum, modelUid, useCascade, apiKey, ckey, poolCtx, emulateTools, toolPreamble) {
const startTime = Date.now();
try {
let allText = '';
let allThinking = '';
let cascadeMeta = null;
let toolCalls = [];
// Server-reported token usage from CortexStepMetadata.model_usage, summed
// across all trajectory steps. Preferred over the chars/4 estimate when
// present so downstream billing (new-api, etc.) sees real Cascade numbers.
let serverUsage = null;
if (useCascade) {
const chunks = await client.cascadeChat(cascadeMessages, modelEnum, modelUid, { reuseEntry: poolCtx?.reuseEntry || null, toolPreamble });
for (const c of chunks) {
if (c.text) allText += c.text;
if (c.thinking) allThinking += c.thinking;
}
cascadeMeta = { cascadeId: chunks.cascadeId, sessionId: chunks.sessionId };
serverUsage = chunks.usage || null;
// Always strip <tool_call>/<tool_result> blocks from Cascade text.
// - emulateTools=true: parsed tool_calls become OpenAI-format tool_calls.
// - emulateTools=false: blocks are silently discarded (defense-in-depth
// against Cascade's system prompt inducing tool markup even after we
// override tool_calling_section).
{
const parsed = parseToolCallsFromText(allText);
allText = parsed.text;
if (emulateTools) toolCalls = parsed.toolCalls;
}
// Built-in Cascade tool calls (chunks.toolCalls — edit_file, view_file,
// list_directory, run_command, etc.) are intentionally DROPPED. Their
// argumentsJson and result fields reference server-internal paths like
// /tmp/windsurf-workspace/config.yaml and must never be exposed to an
// API caller. Emulated tool calls (above) are safe because they
// reference the caller's own tool schema.
} else {
const chunks = await client.rawGetChatMessage(messages, modelEnum, modelUid);
for (const c of chunks) {
if (c.text) allText += c.text;
}
}
// Scrub server-internal filesystem paths from everything we're about to
// return. See src/sanitize.js for the patterns and rationale.
allText = sanitizeText(allText);
allThinking = sanitizeText(allThinking);
if (toolCalls.length) {
toolCalls = toolCalls.map(tc => ({
...tc,
argumentsJson: sanitizeText(tc.argumentsJson || ''),
}));
}
// Check the cascade back into the pool under the *post-turn* fingerprint
// so the next request in the same conversation can resume it.
if (poolCtx && cascadeMeta?.cascadeId && allText) {
const fpAfter = fingerprintAfter(messages, allText);
poolCheckin(fpAfter, {
cascadeId: cascadeMeta.cascadeId,
sessionId: cascadeMeta.sessionId,
lsPort: poolCtx.lsPort,
apiKey: poolCtx.apiKey,
createdAt: poolCtx.reuseEntry?.createdAt,
});
}
reportSuccess(apiKey);
updateCapability(apiKey, modelKey, true, 'success');
recordRequest(model, true, Date.now() - startTime, apiKey);
// Store in cache for next identical request. Skip caching tool_call
// responses — they're inherently contextual and the cache doesn't
// preserve the tool_calls array, so a cache hit would return a
// content-only response with finish_reason:stop, breaking tool flow.
if (ckey && !toolCalls.length) cacheSet(ckey, { text: allText, thinking: allThinking });
const message = { role: 'assistant', content: allText || null };
if (allThinking) message.reasoning_content = allThinking;
if (toolCalls.length) {
message.tool_calls = toolCalls.map((tc, i) => ({
id: tc.id || `call_${i}_${Date.now().toString(36)}`,
type: 'function',
function: {
name: tc.name || 'unknown',
arguments: tc.argumentsJson || tc.arguments || '{}',
},
}));
// OpenAI convention: content is null when finish_reason is tool_calls.
// In text emulation the model often emits an inline answer alongside the
// <tool_call> block (e.g., hallucinated weather data). Set content to
// null so clients that check `content !== null` behave correctly and the
// caller waits for the real tool result rather than showing hallucinated
// data.
message.content = null;
}
// Prefer server-reported usage; fall back to chars/4 estimate only when
// the trajectory didn't include a ModelUsageStats field.
const usage = buildUsageBody(serverUsage, messages, allText, allThinking);
const finishReason = toolCalls.length ? 'tool_calls' : 'stop';
return {
status: 200,
body: {
id, object: 'chat.completion', created, model,
choices: [{ index: 0, message, finish_reason: finishReason }],
usage,
},
};
} catch (err) {
// Only count true auth failures against the account. Workspace/cascade/model
// errors and transport issues shouldn't disable the key.
const isAuthFail = /unauthenticated|invalid api key|invalid_grant|permission_denied.*account/i.test(err.message);
const isRateLimit = /rate limit|rate_limit|too many requests|quota/i.test(err.message);
const isInternal = /internal error occurred.*error id/i.test(err.message);
if (isAuthFail) reportError(apiKey);
if (isRateLimit) { markRateLimited(apiKey, 5 * 60 * 1000, modelKey); err.isRateLimit = true; err.isModelError = true; }
if (isInternal) { reportInternalError(apiKey); err.isModelError = true; }
if (err.isModelError && !isRateLimit && !isInternal) {
updateCapability(apiKey, modelKey, false, 'model_error');
}
recordRequest(model, false, Date.now() - startTime, apiKey);
log.error('Chat error:', err.message);
// Rate limits → 429 with Retry-After; model errors → 403; others → 502
if (isRateLimit) {
const rl = isAllRateLimited(modelKey);
return {
status: 429,
body: { error: { message: `${model} 已达速率限制,请稍后重试`, type: 'rate_limit_exceeded', retry_after_ms: rl.retryAfterMs || 60000 } },
};
}
return {
status: err.isModelError ? 403 : 502,
body: { error: { message: sanitizeText(err.message), type: err.isModelError ? 'model_not_available' : 'upstream_error' } },
};
}
}
function streamResponse(id, created, model, modelKey, messages, cascadeMessages, modelEnum, modelUid, useCascade, ckey, emulateTools, toolPreamble) {
return {
status: 200,
stream: true,
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no',
},
async handler(res) {
const abortController = new AbortController();
res.on('close', () => {
if (!res.writableEnded) {
log.info('Client disconnected mid-stream, aborting upstream');
abortController.abort();
}
});
const send = (data) => {
if (!res.writableEnded) res.write(`data: ${JSON.stringify(data)}\n\n`);
};
// SSE heartbeat: keep the TCP/HTTP connection alive through any silent
// period (LS warmup, Cascade "thinking", queue wait). `:` prefix is a
// comment line per the SSE spec — clients ignore it, intermediaries see
// bytes flowing, idle timers get reset.
const heartbeat = setInterval(() => {
if (!res.writableEnded) res.write(': ping\n\n');
}, HEARTBEAT_MS);
const stopHeartbeat = () => clearInterval(heartbeat);
res.on('close', stopHeartbeat);
// ── Cache hit: replay stored response as a fake stream ──
const cached = cacheGet(ckey);
if (cached) {
log.info(`Chat: cache HIT model=${model} flow=stream`);
recordRequest(model, true, 0, null);
try {
send({ id, object: 'chat.completion.chunk', created, model,
choices: [{ index: 0, delta: { role: 'assistant', content: '' }, finish_reason: null }] });
if (cached.thinking) {
send({ id, object: 'chat.completion.chunk', created, model,
choices: [{ index: 0, delta: { reasoning_content: cached.thinking }, finish_reason: null }] });
}
if (cached.text) {
send({ id, object: 'chat.completion.chunk', created, model,
choices: [{ index: 0, delta: { content: cached.text }, finish_reason: null }] });
}
send({ id, object: 'chat.completion.chunk', created, model,
choices: [{ index: 0, delta: {}, finish_reason: 'stop' }],
usage: cachedUsage(messages, cached.text) });
if (!res.writableEnded) { res.write('data: [DONE]\n\n'); res.end(); }
} finally {
stopHeartbeat();
}
return;
}
const startTime = Date.now();
const tried = [];
let hadSuccess = false;
let rolePrinted = false;
let currentApiKey = null;
let lastErr = null;
// Dynamic: try every active account in the pool (capped at 10) so a
// large pool with many rate-limited accounts can still fall through
// to a free one. Was hardcoded 3 — in pools bigger than 3 with the
// first accounts rate-limited, healthy accounts were never reached
// even though they would have worked (issue #5).
const maxAttempts = Math.min(10, Math.max(3, getAccountList().filter(a => a.status === 'active').length));
// Accumulate chunks so we can cache a successful response at the end.
let accText = '';
let accThinking = '';
// Cascade conversation pool (experimental, stream path) — bypassed in
// tool-emulation mode because the fingerprint can't collapse turns
// whose bodies carry <tool_call>/<tool_result> markup.
const reuseEnabled = useCascade && !emulateTools && isExperimentalEnabled('cascadeConversationReuse');
const fpBefore = reuseEnabled ? fingerprintBefore(messages) : null;
let reuseEntry = reuseEnabled ? poolCheckout(fpBefore) : null;
if (reuseEntry) log.info(`Chat: cascade reuse HIT cascadeId=${reuseEntry.cascadeId.slice(0, 8)}… stream model=${model}`);
// Always strip <tool_call>/<tool_result> blocks in Cascade mode.
// In emulation mode, parsed calls are emitted as OpenAI tool_calls.
// In non-emulation mode, blocks are silently stripped (defense-in-depth
// against Cascade's system prompt inducing tool markup).
const toolParser = useCascade ? new ToolCallStreamParser() : null;
const collectedToolCalls = [];
// Streaming path sanitizers. Every text/thinking delta flows through a
// PathSanitizeStream before leaving the server so /tmp/windsurf-workspace,
// /opt/windsurf and /root/WindsurfAPI literals can never slip out even
// if a path straddles a chunk boundary. See src/sanitize.js.
const pathStreamText = new PathSanitizeStream();
const pathStreamThinking = new PathSanitizeStream();
const emitContent = (clean) => {
if (!clean) return;
accText += clean;
send({ id, object: 'chat.completion.chunk', created, model,
choices: [{ index: 0, delta: { content: clean }, finish_reason: null }] });
};
const emitThinking = (clean) => {
if (!clean) return;
accThinking += clean;
send({ id, object: 'chat.completion.chunk', created, model,
choices: [{ index: 0, delta: { reasoning_content: clean }, finish_reason: null }] });
};
const emitToolCallDelta = (tc, idx) => {
send({ id, object: 'chat.completion.chunk', created, model,
choices: [{ index: 0, delta: {
tool_calls: [{
index: idx,
id: tc.id,
type: 'function',
function: { name: tc.name, arguments: sanitizeText(tc.argumentsJson || '{}') },
}],
}, finish_reason: null }] });
};
const onChunk = (chunk) => {
if (!rolePrinted) {
rolePrinted = true;
send({ id, object: 'chat.completion.chunk', created, model,
choices: [{ index: 0, delta: { role: 'assistant', content: '' }, finish_reason: null }] });
}
hadSuccess = true;
if (chunk.text) {
// Pipeline for text deltas:
// raw chunk → ToolCallStreamParser (strip <tool_call> blocks)
// → PathSanitizeStream (scrub server paths)
// → client
let safeText = chunk.text;
if (toolParser) {
const { text: safe, toolCalls: done } = toolParser.feed(chunk.text);
safeText = safe;
// Only emit tool_call deltas when emulating — otherwise the
// parsed calls came from Cascade's built-in tools and are
// silently discarded.
if (emulateTools) {
for (const tc of done) {
const idx = collectedToolCalls.length;
collectedToolCalls.push(tc);
emitToolCallDelta(tc, idx);
}
}
}
if (safeText) emitContent(pathStreamText.feed(safeText));
}
if (chunk.thinking) {
emitThinking(pathStreamThinking.feed(chunk.thinking));
}
};
try {
for (let attempt = 0; attempt < maxAttempts; attempt++) {
if (abortController.signal.aborted) return;
let acct = null;
if (reuseEntry && attempt === 0) {
acct = acquireAccountByKey(reuseEntry.apiKey, modelKey);
if (!acct) {
log.info('Chat: cascade reuse skipped — owning account not available');
reuseEntry = null;
}
}
if (!acct) {
acct = await waitForAccount(tried, abortController.signal, QUEUE_MAX_WAIT_MS, modelKey);
if (!acct) break;
}
tried.push(acct.apiKey);
currentApiKey = acct.apiKey;
// Pre-flight rate limit check (experimental)
if (isExperimentalEnabled('preflightRateLimit')) {
try {
const px = getEffectiveProxy(acct.id) || null;
const rl = await checkMessageRateLimit(acct.apiKey, px);
if (!rl.hasCapacity) {
log.warn(`Preflight: ${acct.email} has no capacity (remaining=${rl.messagesRemaining}), skipping`);
markRateLimited(acct.id, modelKey);
continue;
}
} catch (e) {
log.debug(`Preflight check failed for ${acct.email}: ${e.message}`);
}
}
try { await ensureLs(acct.proxy); } catch (e) { lastErr = e; break; }
const ls = getLsFor(acct.proxy);
if (!ls) { lastErr = new Error('No LS instance available'); break; }
if (reuseEntry && reuseEntry.lsPort !== ls.port) {
log.info('Chat: cascade reuse skipped — LS port changed');
reuseEntry = null;
}
const _msgCharsStream = (messages || []).reduce((n, m) => {
const c = m?.content;
return n + (typeof c === 'string' ? c.length : Array.isArray(c) ? c.reduce((k, p) => k + (typeof p?.text === 'string' ? p.text.length : 0), 0) : 0);
}, 0);
log.info(`Chat: model=${model} flow=${useCascade ? 'cascade' : 'legacy'} stream=true attempt=${attempt + 1} account=${acct.email} ls=${ls.port} turns=${(messages||[]).length} chars=${_msgCharsStream}${reuseEntry ? ' reuse=1' : ''}`);
const client = new WindsurfClient(acct.apiKey, ls.port, ls.csrfToken);
let cascadeResult = null;
try {
if (useCascade) {
cascadeResult = await client.cascadeChat(cascadeMessages, modelEnum, modelUid, {
onChunk, signal: abortController.signal, reuseEntry, toolPreamble,
});
} else {
await client.rawGetChatMessage(messages, modelEnum, modelUid, { onChunk });
}
// Flush order matters:
// 1. ToolCallStreamParser tail → may produce more text deltas
// (e.g., a dangling <tool_call> that never closed falls
// through as literal text)
// 2. PathSanitizeStream tail (text) → scrubs anything the tool
// parser held back AND anything we were holding ourselves
// 3. PathSanitizeStream tail (thinking)
if (toolParser) {
const tail = toolParser.flush();
if (tail.text) emitContent(pathStreamText.feed(tail.text));
if (emulateTools) {
for (const tc of tail.toolCalls) {
const idx = collectedToolCalls.length;
collectedToolCalls.push(tc);
emitToolCallDelta(tc, idx);
}
}
}
emitContent(pathStreamText.flush());
emitThinking(pathStreamThinking.flush());
// Pool check-in on success (cascade only)
if (reuseEnabled && cascadeResult?.cascadeId && accText) {
const fpAfter = fingerprintAfter(messages, accText);
poolCheckin(fpAfter, {
cascadeId: cascadeResult.cascadeId,
sessionId: cascadeResult.sessionId,
lsPort: ls.port,
apiKey: currentApiKey,
createdAt: reuseEntry?.createdAt,
});
}
// success
if (hadSuccess) reportSuccess(currentApiKey);
updateCapability(currentApiKey, modelKey, true, 'success');
recordRequest(model, true, Date.now() - startTime, currentApiKey);
if (!rolePrinted) {
send({ id, object: 'chat.completion.chunk', created, model,
choices: [{ index: 0, delta: { role: 'assistant', content: '' }, finish_reason: null }] });
}
const finalReason = collectedToolCalls.length ? 'tool_calls' : 'stop';
const finalUsage = buildUsageBody(cascadeResult?.usage || null, messages, accText, accThinking);
send({ id, object: 'chat.completion.chunk', created, model,
choices: [{ index: 0, delta: {}, finish_reason: finalReason }],
usage: finalUsage });
// OpenAI-compat: terminal usage chunk (stream_options.include_usage
// convention — empty choices[] + usage). Prefer Cascade's own
// CortexStepMetadata.model_usage numbers when present, fall back
// to the local chars/4 estimator. See buildUsageBody().
{
const usage = buildUsageBody(cascadeResult?.usage || null, messages, accText, accThinking);
send({ id, object: 'chat.completion.chunk', created, model,
choices: [], usage });
}
if (!res.writableEnded) { res.write('data: [DONE]\n\n'); res.end(); }
if (ckey && !collectedToolCalls.length && (accText || accThinking)) {
cacheSet(ckey, { text: accText, thinking: accThinking });
}
return;
} catch (err) {
lastErr = err;
reuseEntry = null; // don't try to reuse on retry
const isAuthFail = /unauthenticated|invalid api key|invalid_grant|permission_denied.*account/i.test(err.message);
const isRateLimit = /rate limit|rate_limit|too many requests|quota/i.test(err.message);
const isInternal = /internal error occurred.*error id/i.test(err.message);
if (isAuthFail) reportError(currentApiKey);
if (isRateLimit) { markRateLimited(currentApiKey, 5 * 60 * 1000, modelKey); err.isRateLimit = true; err.isModelError = true; }
if (isInternal) { reportInternalError(currentApiKey); err.isModelError = true; }
if (err.isModelError && !isRateLimit && !isInternal) {
updateCapability(currentApiKey, modelKey, false, 'model_error');
}
// Retry only if nothing has been streamed yet AND it's a retryable error
if (!hadSuccess && (err.isModelError || isRateLimit)) {
const tag = isRateLimit ? 'rate_limit' : isInternal ? 'internal_error' : 'model_error';
log.warn(`Account ${acct.email} failed (${tag}) on ${model}, trying next`);
continue;
}
break;
}
}
// All attempts failed
log.error('Stream error after retries:', lastErr?.message);
recordRequest(model, false, Date.now() - startTime, currentApiKey);
try {
if (!rolePrinted) {
send({ id, object: 'chat.completion.chunk', created, model,
choices: [{ index: 0, delta: { role: 'assistant', content: '' }, finish_reason: null }] });
}
// Check if failure is due to all accounts being rate-limited
const rl = isAllRateLimited(modelKey);
const errMsg = rl.allLimited
? `${model} 所有账号均已达速率限制,请 ${Math.ceil(rl.retryAfterMs / 1000)} 秒后重试`
: sanitizeText(lastErr?.message || 'no accounts');
send({ id, object: 'chat.completion.chunk', created, model,
choices: [{ index: 0, delta: { content: `\n[Error: ${errMsg}]` }, finish_reason: 'stop' }] });
res.write('data: [DONE]\n\n');
} catch {}
if (!res.writableEnded) res.end();
} finally {
stopHeartbeat();
}
},
};
}