/** * POST /v1/chat/completions — OpenAI-compatible chat completions. * Routes to RawGetChatMessage (legacy) or Cascade (premium) based on model type. */ import { randomUUID } from 'crypto'; import { WindsurfClient } from '../client.js'; import { getApiKey, acquireAccountByKey, reportError, reportSuccess, markRateLimited, reportInternalError, updateCapability, getAccountList, isAllRateLimited } from '../auth.js'; import { resolveModel, getModelInfo } from '../models.js'; import { getLsFor, ensureLs } from '../langserver.js'; import { config, log } from '../config.js'; import { recordRequest } from '../dashboard/stats.js'; import { isModelAllowed } from '../dashboard/model-access.js'; import { cacheKey, cacheGet, cacheSet } from '../cache.js'; import { isExperimentalEnabled, getIdentityPromptFor } from '../runtime-config.js'; import { checkMessageRateLimit } from '../windsurf-api.js'; import { getEffectiveProxy } from '../dashboard/proxy-config.js'; import { fingerprintBefore, fingerprintAfter, checkout as poolCheckout, checkin as poolCheckin, } from '../conversation-pool.js'; import { normalizeMessagesForCascade, ToolCallStreamParser, parseToolCallsFromText, buildToolPreambleForProto, } from './tool-emulation.js'; import { sanitizeText, PathSanitizeStream } from '../sanitize.js'; const HEARTBEAT_MS = 15_000; const QUEUE_RETRY_MS = 1_000; const QUEUE_MAX_WAIT_MS = 30_000; // ── Model identity prompt ────────────────────────────────── // Templates live in runtime-config (editable from the dashboard). Use {model} // as a placeholder for the requested model name. Only applied when the // experimental "modelIdentityPrompt" toggle is ON. function buildIdentitySystemMessage(displayModel, provider) { const template = getIdentityPromptFor(provider); if (!template) return null; return template.replace(/\{model\}/g, displayModel); } function genId() { return 'chatcmpl-' + randomUUID().replace(/-/g, '').slice(0, 29); } // Rough token estimate (~4 chars/token). Used only to populate the // OpenAI-compatible `usage.prompt_tokens_details.cached_tokens` field so // upstream billing/dashboards (new-api) can recognise our local cache hits. function estimateTokens(messages) { if (!Array.isArray(messages)) return 0; let chars = 0; for (const m of messages) { if (typeof m?.content === 'string') chars += m.content.length; else if (Array.isArray(m?.content)) { for (const p of m.content) if (typeof p?.text === 'string') chars += p.text.length; } } return Math.max(1, Math.ceil(chars / 4)); } function cachedUsage(messages, completionText) { const prompt = estimateTokens(messages); const completion = Math.max(1, Math.ceil((completionText || '').length / 4)); return { prompt_tokens: prompt, completion_tokens: completion, total_tokens: prompt + completion, input_tokens: prompt, output_tokens: completion, prompt_tokens_details: { cached_tokens: prompt }, completion_tokens_details: { reasoning_tokens: 0 }, cached: true, }; } /** * Build an OpenAI-shaped `usage` object, preferring server-reported token * counts from Cascade's CortexStepMetadata.model_usage when available, and * falling back to the local chars/4 estimate otherwise. Keeps the same shape * in both branches so downstream billing doesn't have to care which source * produced the numbers. * * The Cascade backend reports usage as {inputTokens, outputTokens, * cacheReadTokens, cacheWriteTokens}. We map them onto the OpenAI shape: * prompt_tokens = inputTokens + cacheReadTokens + cacheWriteTokens * (total input tokens the model processed, whether fresh, * cache-read, or cache-written — matches the OpenAI * convention where prompt_tokens is the grand total) * completion_tokens = outputTokens * prompt_tokens_details.cached_tokens = cacheReadTokens * cache_creation_input_tokens (Anthropic ext) = cacheWriteTokens */ function buildUsageBody(serverUsage, messages, completionText, thinkingText = '') { if (serverUsage && (serverUsage.inputTokens || serverUsage.outputTokens)) { const inputTokens = serverUsage.inputTokens || 0; const outputTokens = serverUsage.outputTokens || 0; const cacheRead = serverUsage.cacheReadTokens || 0; const cacheWrite = serverUsage.cacheWriteTokens || 0; const promptTotal = inputTokens + cacheRead + cacheWrite; return { prompt_tokens: promptTotal, completion_tokens: outputTokens, total_tokens: promptTotal + outputTokens, input_tokens: promptTotal, output_tokens: outputTokens, prompt_tokens_details: { cached_tokens: cacheRead }, completion_tokens_details: { reasoning_tokens: 0 }, cache_creation_input_tokens: cacheWrite, }; } const prompt = estimateTokens(messages); const completion = Math.max(1, Math.ceil(((completionText || '').length + (thinkingText || '').length) / 4)); return { prompt_tokens: prompt, completion_tokens: completion, total_tokens: prompt + completion, input_tokens: prompt, output_tokens: completion, prompt_tokens_details: { cached_tokens: 0 }, completion_tokens_details: { reasoning_tokens: 0 }, }; } // Wait until getApiKey returns a non-null account, or until maxWaitMs expires. // Used when every account has momentarily exhausted its RPM budget so the // client is queued instead of getting a 503. async function waitForAccount(tried, signal, maxWaitMs = QUEUE_MAX_WAIT_MS, modelKey = null) { const deadline = Date.now() + maxWaitMs; let acct = getApiKey(tried, modelKey); while (!acct) { if (signal?.aborted) return null; if (Date.now() >= deadline) return null; await new Promise(r => setTimeout(r, QUEUE_RETRY_MS)); acct = getApiKey(tried, modelKey); } return acct; } export async function handleChatCompletions(body) { const { model: reqModel, stream = false, max_tokens, tools, tool_choice, } = body; // `messages` is `let` not `const` so the identity-prompt injection below // can prepend a system turn for the legacy path too. let messages = body.messages; const modelKey = resolveModel(reqModel || config.defaultModel); const modelInfo = getModelInfo(modelKey); const displayModel = modelInfo?.name || reqModel || config.defaultModel; const modelEnum = modelInfo?.enumValue || 0; const modelUid = modelInfo?.modelUid || null; // Models with a modelUid use the Cascade flow (StartCascade → SendUserCascadeMessage). // Legacy RawGetChatMessage only for models with enumValue>0 and NO modelUid. // Newer models (gemini-3.0, gpt-5.2, etc.) have both enumValue AND modelUid but // their high enum values cause "cannot parse invalid wire-format data" in the // legacy proto endpoint. Cascade handles them correctly via uid string. const useCascade = !!modelUid; // Tool-call emulation: if the client passed OpenAI-style tools[], we rewrite // tool-result turns into synthetic user text and inject the tool protocol // at the system-prompt level via CascadeConversationalPlannerConfig's // tool_calling_section (SectionOverrideConfig, OVERRIDE mode). This is far // more reliable than user-message-level injection because NO_TOOL mode's // baked-in system prompt tells the model "you have no tools" — which // overpowers user-message preambles. The section override replaces that // section directly so the model sees our emulated tool definitions as // authoritative system instructions. const hasTools = Array.isArray(tools) && tools.length > 0; const hasToolHistory = Array.isArray(messages) && messages.some(m => m?.role === 'tool' || (m?.role === 'assistant' && Array.isArray(m.tool_calls) && m.tool_calls.length)); const emulateTools = useCascade && (hasTools || hasToolHistory); // Build proto-level preamble (goes into tool_calling_section override); // pass empty tools to normalizeMessagesForCascade so it only rewrites // role:tool / assistant.tool_calls messages without injecting a user-level // preamble (that's now handled at the proto layer). const toolPreamble = emulateTools ? buildToolPreambleForProto(tools || [], tool_choice) : ''; let cascadeMessages = emulateTools ? normalizeMessagesForCascade(messages, []) : [...messages]; // ── Model identity prompt injection ── // When enabled, prepend a system message so the model identifies itself as // the requested model (e.g. "I am Claude Opus 4.6") instead of leaking the // Cascade/Windsurf backend identity. Inject into BOTH messages (for legacy // RawGetChatMessage path) and cascadeMessages (Cascade path) — they diverge // once tool-emulation rewrites the Cascade path, but the system identity // should be identical in both. if (isExperimentalEnabled('modelIdentityPrompt') && modelInfo?.provider) { const identityText = buildIdentitySystemMessage(displayModel, modelInfo.provider); if (identityText) { const sysMsg = { role: 'system', content: identityText }; cascadeMessages = [sysMsg, ...cascadeMessages]; messages = [sysMsg, ...messages]; } } // Global model access control (allowlist / blocklist from dashboard) const access = isModelAllowed(modelKey); if (!access.allowed) { return { status: 403, body: { error: { message: access.reason, type: 'model_blocked' } } }; } // Per-account model routing preflight: if NO active account has this // model in its tier ∩ available list, fail fast instead of looping // through every account trying to find one. This surfaces tier // entitlement and blocklist errors as a clean 403 rather than a 30s // queue timeout → pool_exhausted. const anyEligible = getAccountList().some(a => a.status === 'active' && (a.availableModels || []).includes(modelKey) ); if (!anyEligible) { return { status: 403, body: { error: { message: `模型 ${displayModel} 在当前账号池中不可用(未订阅或已被封禁)`, type: 'model_not_entitled', }, }, }; } const chatId = genId(); const created = Math.floor(Date.now() / 1000); const ckey = cacheKey(body); if (stream) { return streamResponse(chatId, created, displayModel, modelKey, messages, cascadeMessages, modelEnum, modelUid, useCascade, ckey, emulateTools, toolPreamble); } // ── Local response cache (exact body match) ───────────── const cached = cacheGet(ckey); if (cached) { log.info(`Chat: cache HIT model=${displayModel} flow=non-stream`); recordRequest(displayModel, true, 0, null); const message = { role: 'assistant', content: cached.text || null }; if (cached.thinking) message.reasoning_content = cached.thinking; return { status: 200, body: { id: chatId, object: 'chat.completion', created, model: displayModel, choices: [{ index: 0, message, finish_reason: 'stop' }], usage: cachedUsage(messages, cached.text), }, }; } // ── Cascade conversation pool (experimental) ── // If the client is continuing a prior conversation and we still hold the // cascade_id from last turn, pin this request to that exact (account, LS) // pair so the Windsurf backend serves from its hot per-cascade context // instead of replaying the whole history. // // Tool-emulation mode bypasses the reuse pool: fingerprint can't stably // collapse a conversation whose assistant turns contain synthesised // markup and whose user turns contain wrappers. const reuseEnabled = useCascade && !emulateTools && isExperimentalEnabled('cascadeConversationReuse'); const fpBefore = reuseEnabled ? fingerprintBefore(messages) : null; let reuseEntry = reuseEnabled ? poolCheckout(fpBefore) : null; if (reuseEntry) log.info(`Chat: cascade reuse HIT cascadeId=${reuseEntry.cascadeId.slice(0, 8)}… model=${displayModel}`); // Non-stream: retry with a different account on model-not-available errors const tried = []; let lastErr = null; // Dynamic: try every active account in the pool (capped at 10) so a // large pool with many rate-limited accounts can still fall through // to a free one. Was hardcoded 3 — in pools bigger than 3 with the // first accounts rate-limited, healthy accounts were never reached // even though they would have worked (issue #5). const maxAttempts = Math.min(10, Math.max(3, getAccountList().filter(a => a.status === 'active').length)); for (let attempt = 0; attempt < maxAttempts; attempt++) { let acct = null; if (reuseEntry && attempt === 0) { // First attempt pins to the account that owns the cached cascade. acct = acquireAccountByKey(reuseEntry.apiKey, modelKey); if (!acct) { log.info('Chat: cascade reuse skipped — owning account not available, falling back to fresh cascade'); reuseEntry = null; } } if (!acct) { acct = await waitForAccount(tried, null, QUEUE_MAX_WAIT_MS, modelKey); if (!acct) break; } tried.push(acct.apiKey); // Pre-flight rate limit check (experimental): ask server.codeium.com if // this account still has message capacity before burning an LS round trip. if (isExperimentalEnabled('preflightRateLimit')) { try { const px = getEffectiveProxy(acct.id) || null; const rl = await checkMessageRateLimit(acct.apiKey, px); if (!rl.hasCapacity) { log.warn(`Preflight: ${acct.email} has no capacity (remaining=${rl.messagesRemaining}), skipping`); markRateLimited(acct.id, modelKey); continue; } } catch (e) { log.debug(`Preflight check failed for ${acct.email}: ${e.message}`); // Fail open — proceed with the request } } await ensureLs(acct.proxy); const ls = getLsFor(acct.proxy); if (!ls) { lastErr = { status: 503, body: { error: { message: 'No LS instance available', type: 'ls_unavailable' } } }; break; } // Cascade pins cascade_id to a specific LS port too; if the LS it was // born on has been replaced, the cascade_id is dead. if (reuseEntry && reuseEntry.lsPort !== ls.port) { log.info('Chat: cascade reuse skipped — LS port changed'); reuseEntry = null; } const _msgChars = (messages || []).reduce((n, m) => { const c = m?.content; return n + (typeof c === 'string' ? c.length : Array.isArray(c) ? c.reduce((k, p) => k + (typeof p?.text === 'string' ? p.text.length : 0), 0) : 0); }, 0); log.info(`Chat: model=${displayModel} flow=${useCascade ? 'cascade' : 'legacy'} attempt=${attempt + 1} account=${acct.email} ls=${ls.port} turns=${(messages||[]).length} chars=${_msgChars}${reuseEntry ? ' reuse=1' : ''}${emulateTools ? ' tools=emu' : ''}`); const client = new WindsurfClient(acct.apiKey, ls.port, ls.csrfToken); const result = await nonStreamResponse( client, chatId, created, displayModel, modelKey, messages, cascadeMessages, modelEnum, modelUid, useCascade, acct.apiKey, ckey, reuseEnabled ? { reuseEntry, lsPort: ls.port, apiKey: acct.apiKey } : null, emulateTools, toolPreamble, ); if (result.status === 200) return result; reuseEntry = null; // don't try to reuse on the retry lastErr = result; const errType = result.body?.error?.type; // Rate limit: this account is done for this model, try the next one if (errType === 'rate_limit_exceeded') { log.warn(`Account ${acct.email} rate-limited on ${displayModel}, trying next account`); continue; } // Model not available on this account (permission_denied, etc.) if (errType === 'model_not_available') { log.warn(`Account ${acct.email} cannot serve ${displayModel}, trying next account`); continue; } break; // other errors (502, transport) — don't retry } // If all accounts exhausted, check if it's because they're all rate-limited if (!lastErr || lastErr.status === 429) { const rl = isAllRateLimited(modelKey); if (rl.allLimited) { return { status: 429, body: { error: { message: `${displayModel} 所有账号均已达速率限制,请 ${Math.ceil(rl.retryAfterMs / 1000)} 秒后重试`, type: 'rate_limit_exceeded', retry_after_ms: rl.retryAfterMs } } }; } } return lastErr || { status: 503, body: { error: { message: 'No active accounts available', type: 'pool_exhausted' } } }; } async function nonStreamResponse(client, id, created, model, modelKey, messages, cascadeMessages, modelEnum, modelUid, useCascade, apiKey, ckey, poolCtx, emulateTools, toolPreamble) { const startTime = Date.now(); try { let allText = ''; let allThinking = ''; let cascadeMeta = null; let toolCalls = []; // Server-reported token usage from CortexStepMetadata.model_usage, summed // across all trajectory steps. Preferred over the chars/4 estimate when // present so downstream billing (new-api, etc.) sees real Cascade numbers. let serverUsage = null; if (useCascade) { const chunks = await client.cascadeChat(cascadeMessages, modelEnum, modelUid, { reuseEntry: poolCtx?.reuseEntry || null, toolPreamble }); for (const c of chunks) { if (c.text) allText += c.text; if (c.thinking) allThinking += c.thinking; } cascadeMeta = { cascadeId: chunks.cascadeId, sessionId: chunks.sessionId }; serverUsage = chunks.usage || null; // Always strip / blocks from Cascade text. // - emulateTools=true: parsed tool_calls become OpenAI-format tool_calls. // - emulateTools=false: blocks are silently discarded (defense-in-depth // against Cascade's system prompt inducing tool markup even after we // override tool_calling_section). { const parsed = parseToolCallsFromText(allText); allText = parsed.text; if (emulateTools) toolCalls = parsed.toolCalls; } // Built-in Cascade tool calls (chunks.toolCalls — edit_file, view_file, // list_directory, run_command, etc.) are intentionally DROPPED. Their // argumentsJson and result fields reference server-internal paths like // /tmp/windsurf-workspace/config.yaml and must never be exposed to an // API caller. Emulated tool calls (above) are safe because they // reference the caller's own tool schema. } else { const chunks = await client.rawGetChatMessage(messages, modelEnum, modelUid); for (const c of chunks) { if (c.text) allText += c.text; } } // Scrub server-internal filesystem paths from everything we're about to // return. See src/sanitize.js for the patterns and rationale. allText = sanitizeText(allText); allThinking = sanitizeText(allThinking); if (toolCalls.length) { toolCalls = toolCalls.map(tc => ({ ...tc, argumentsJson: sanitizeText(tc.argumentsJson || ''), })); } // Check the cascade back into the pool under the *post-turn* fingerprint // so the next request in the same conversation can resume it. if (poolCtx && cascadeMeta?.cascadeId && allText) { const fpAfter = fingerprintAfter(messages, allText); poolCheckin(fpAfter, { cascadeId: cascadeMeta.cascadeId, sessionId: cascadeMeta.sessionId, lsPort: poolCtx.lsPort, apiKey: poolCtx.apiKey, createdAt: poolCtx.reuseEntry?.createdAt, }); } reportSuccess(apiKey); updateCapability(apiKey, modelKey, true, 'success'); recordRequest(model, true, Date.now() - startTime, apiKey); // Store in cache for next identical request. Skip caching tool_call // responses — they're inherently contextual and the cache doesn't // preserve the tool_calls array, so a cache hit would return a // content-only response with finish_reason:stop, breaking tool flow. if (ckey && !toolCalls.length) cacheSet(ckey, { text: allText, thinking: allThinking }); const message = { role: 'assistant', content: allText || null }; if (allThinking) message.reasoning_content = allThinking; if (toolCalls.length) { message.tool_calls = toolCalls.map((tc, i) => ({ id: tc.id || `call_${i}_${Date.now().toString(36)}`, type: 'function', function: { name: tc.name || 'unknown', arguments: tc.argumentsJson || tc.arguments || '{}', }, })); // OpenAI convention: content is null when finish_reason is tool_calls. // In text emulation the model often emits an inline answer alongside the // block (e.g., hallucinated weather data). Set content to // null so clients that check `content !== null` behave correctly and the // caller waits for the real tool result rather than showing hallucinated // data. message.content = null; } // Prefer server-reported usage; fall back to chars/4 estimate only when // the trajectory didn't include a ModelUsageStats field. const usage = buildUsageBody(serverUsage, messages, allText, allThinking); const finishReason = toolCalls.length ? 'tool_calls' : 'stop'; return { status: 200, body: { id, object: 'chat.completion', created, model, choices: [{ index: 0, message, finish_reason: finishReason }], usage, }, }; } catch (err) { // Only count true auth failures against the account. Workspace/cascade/model // errors and transport issues shouldn't disable the key. const isAuthFail = /unauthenticated|invalid api key|invalid_grant|permission_denied.*account/i.test(err.message); const isRateLimit = /rate limit|rate_limit|too many requests|quota/i.test(err.message); const isInternal = /internal error occurred.*error id/i.test(err.message); if (isAuthFail) reportError(apiKey); if (isRateLimit) { markRateLimited(apiKey, 5 * 60 * 1000, modelKey); err.isRateLimit = true; err.isModelError = true; } if (isInternal) { reportInternalError(apiKey); err.isModelError = true; } if (err.isModelError && !isRateLimit && !isInternal) { updateCapability(apiKey, modelKey, false, 'model_error'); } recordRequest(model, false, Date.now() - startTime, apiKey); log.error('Chat error:', err.message); // Rate limits → 429 with Retry-After; model errors → 403; others → 502 if (isRateLimit) { const rl = isAllRateLimited(modelKey); return { status: 429, body: { error: { message: `${model} 已达速率限制,请稍后重试`, type: 'rate_limit_exceeded', retry_after_ms: rl.retryAfterMs || 60000 } }, }; } return { status: err.isModelError ? 403 : 502, body: { error: { message: sanitizeText(err.message), type: err.isModelError ? 'model_not_available' : 'upstream_error' } }, }; } } function streamResponse(id, created, model, modelKey, messages, cascadeMessages, modelEnum, modelUid, useCascade, ckey, emulateTools, toolPreamble) { return { status: 200, stream: true, headers: { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'X-Accel-Buffering': 'no', }, async handler(res) { const abortController = new AbortController(); res.on('close', () => { if (!res.writableEnded) { log.info('Client disconnected mid-stream, aborting upstream'); abortController.abort(); } }); const send = (data) => { if (!res.writableEnded) res.write(`data: ${JSON.stringify(data)}\n\n`); }; // SSE heartbeat: keep the TCP/HTTP connection alive through any silent // period (LS warmup, Cascade "thinking", queue wait). `:` prefix is a // comment line per the SSE spec — clients ignore it, intermediaries see // bytes flowing, idle timers get reset. const heartbeat = setInterval(() => { if (!res.writableEnded) res.write(': ping\n\n'); }, HEARTBEAT_MS); const stopHeartbeat = () => clearInterval(heartbeat); res.on('close', stopHeartbeat); // ── Cache hit: replay stored response as a fake stream ── const cached = cacheGet(ckey); if (cached) { log.info(`Chat: cache HIT model=${model} flow=stream`); recordRequest(model, true, 0, null); try { send({ id, object: 'chat.completion.chunk', created, model, choices: [{ index: 0, delta: { role: 'assistant', content: '' }, finish_reason: null }] }); if (cached.thinking) { send({ id, object: 'chat.completion.chunk', created, model, choices: [{ index: 0, delta: { reasoning_content: cached.thinking }, finish_reason: null }] }); } if (cached.text) { send({ id, object: 'chat.completion.chunk', created, model, choices: [{ index: 0, delta: { content: cached.text }, finish_reason: null }] }); } send({ id, object: 'chat.completion.chunk', created, model, choices: [{ index: 0, delta: {}, finish_reason: 'stop' }], usage: cachedUsage(messages, cached.text) }); if (!res.writableEnded) { res.write('data: [DONE]\n\n'); res.end(); } } finally { stopHeartbeat(); } return; } const startTime = Date.now(); const tried = []; let hadSuccess = false; let rolePrinted = false; let currentApiKey = null; let lastErr = null; // Dynamic: try every active account in the pool (capped at 10) so a // large pool with many rate-limited accounts can still fall through // to a free one. Was hardcoded 3 — in pools bigger than 3 with the // first accounts rate-limited, healthy accounts were never reached // even though they would have worked (issue #5). const maxAttempts = Math.min(10, Math.max(3, getAccountList().filter(a => a.status === 'active').length)); // Accumulate chunks so we can cache a successful response at the end. let accText = ''; let accThinking = ''; // Cascade conversation pool (experimental, stream path) — bypassed in // tool-emulation mode because the fingerprint can't collapse turns // whose bodies carry / markup. const reuseEnabled = useCascade && !emulateTools && isExperimentalEnabled('cascadeConversationReuse'); const fpBefore = reuseEnabled ? fingerprintBefore(messages) : null; let reuseEntry = reuseEnabled ? poolCheckout(fpBefore) : null; if (reuseEntry) log.info(`Chat: cascade reuse HIT cascadeId=${reuseEntry.cascadeId.slice(0, 8)}… stream model=${model}`); // Always strip / blocks in Cascade mode. // In emulation mode, parsed calls are emitted as OpenAI tool_calls. // In non-emulation mode, blocks are silently stripped (defense-in-depth // against Cascade's system prompt inducing tool markup). const toolParser = useCascade ? new ToolCallStreamParser() : null; const collectedToolCalls = []; // Streaming path sanitizers. Every text/thinking delta flows through a // PathSanitizeStream before leaving the server so /tmp/windsurf-workspace, // /opt/windsurf and /root/WindsurfAPI literals can never slip out even // if a path straddles a chunk boundary. See src/sanitize.js. const pathStreamText = new PathSanitizeStream(); const pathStreamThinking = new PathSanitizeStream(); const emitContent = (clean) => { if (!clean) return; accText += clean; send({ id, object: 'chat.completion.chunk', created, model, choices: [{ index: 0, delta: { content: clean }, finish_reason: null }] }); }; const emitThinking = (clean) => { if (!clean) return; accThinking += clean; send({ id, object: 'chat.completion.chunk', created, model, choices: [{ index: 0, delta: { reasoning_content: clean }, finish_reason: null }] }); }; const emitToolCallDelta = (tc, idx) => { send({ id, object: 'chat.completion.chunk', created, model, choices: [{ index: 0, delta: { tool_calls: [{ index: idx, id: tc.id, type: 'function', function: { name: tc.name, arguments: sanitizeText(tc.argumentsJson || '{}') }, }], }, finish_reason: null }] }); }; const onChunk = (chunk) => { if (!rolePrinted) { rolePrinted = true; send({ id, object: 'chat.completion.chunk', created, model, choices: [{ index: 0, delta: { role: 'assistant', content: '' }, finish_reason: null }] }); } hadSuccess = true; if (chunk.text) { // Pipeline for text deltas: // raw chunk → ToolCallStreamParser (strip blocks) // → PathSanitizeStream (scrub server paths) // → client let safeText = chunk.text; if (toolParser) { const { text: safe, toolCalls: done } = toolParser.feed(chunk.text); safeText = safe; // Only emit tool_call deltas when emulating — otherwise the // parsed calls came from Cascade's built-in tools and are // silently discarded. if (emulateTools) { for (const tc of done) { const idx = collectedToolCalls.length; collectedToolCalls.push(tc); emitToolCallDelta(tc, idx); } } } if (safeText) emitContent(pathStreamText.feed(safeText)); } if (chunk.thinking) { emitThinking(pathStreamThinking.feed(chunk.thinking)); } }; try { for (let attempt = 0; attempt < maxAttempts; attempt++) { if (abortController.signal.aborted) return; let acct = null; if (reuseEntry && attempt === 0) { acct = acquireAccountByKey(reuseEntry.apiKey, modelKey); if (!acct) { log.info('Chat: cascade reuse skipped — owning account not available'); reuseEntry = null; } } if (!acct) { acct = await waitForAccount(tried, abortController.signal, QUEUE_MAX_WAIT_MS, modelKey); if (!acct) break; } tried.push(acct.apiKey); currentApiKey = acct.apiKey; // Pre-flight rate limit check (experimental) if (isExperimentalEnabled('preflightRateLimit')) { try { const px = getEffectiveProxy(acct.id) || null; const rl = await checkMessageRateLimit(acct.apiKey, px); if (!rl.hasCapacity) { log.warn(`Preflight: ${acct.email} has no capacity (remaining=${rl.messagesRemaining}), skipping`); markRateLimited(acct.id, modelKey); continue; } } catch (e) { log.debug(`Preflight check failed for ${acct.email}: ${e.message}`); } } try { await ensureLs(acct.proxy); } catch (e) { lastErr = e; break; } const ls = getLsFor(acct.proxy); if (!ls) { lastErr = new Error('No LS instance available'); break; } if (reuseEntry && reuseEntry.lsPort !== ls.port) { log.info('Chat: cascade reuse skipped — LS port changed'); reuseEntry = null; } const _msgCharsStream = (messages || []).reduce((n, m) => { const c = m?.content; return n + (typeof c === 'string' ? c.length : Array.isArray(c) ? c.reduce((k, p) => k + (typeof p?.text === 'string' ? p.text.length : 0), 0) : 0); }, 0); log.info(`Chat: model=${model} flow=${useCascade ? 'cascade' : 'legacy'} stream=true attempt=${attempt + 1} account=${acct.email} ls=${ls.port} turns=${(messages||[]).length} chars=${_msgCharsStream}${reuseEntry ? ' reuse=1' : ''}`); const client = new WindsurfClient(acct.apiKey, ls.port, ls.csrfToken); let cascadeResult = null; try { if (useCascade) { cascadeResult = await client.cascadeChat(cascadeMessages, modelEnum, modelUid, { onChunk, signal: abortController.signal, reuseEntry, toolPreamble, }); } else { await client.rawGetChatMessage(messages, modelEnum, modelUid, { onChunk }); } // Flush order matters: // 1. ToolCallStreamParser tail → may produce more text deltas // (e.g., a dangling that never closed falls // through as literal text) // 2. PathSanitizeStream tail (text) → scrubs anything the tool // parser held back AND anything we were holding ourselves // 3. PathSanitizeStream tail (thinking) if (toolParser) { const tail = toolParser.flush(); if (tail.text) emitContent(pathStreamText.feed(tail.text)); if (emulateTools) { for (const tc of tail.toolCalls) { const idx = collectedToolCalls.length; collectedToolCalls.push(tc); emitToolCallDelta(tc, idx); } } } emitContent(pathStreamText.flush()); emitThinking(pathStreamThinking.flush()); // Pool check-in on success (cascade only) if (reuseEnabled && cascadeResult?.cascadeId && accText) { const fpAfter = fingerprintAfter(messages, accText); poolCheckin(fpAfter, { cascadeId: cascadeResult.cascadeId, sessionId: cascadeResult.sessionId, lsPort: ls.port, apiKey: currentApiKey, createdAt: reuseEntry?.createdAt, }); } // success if (hadSuccess) reportSuccess(currentApiKey); updateCapability(currentApiKey, modelKey, true, 'success'); recordRequest(model, true, Date.now() - startTime, currentApiKey); if (!rolePrinted) { send({ id, object: 'chat.completion.chunk', created, model, choices: [{ index: 0, delta: { role: 'assistant', content: '' }, finish_reason: null }] }); } const finalReason = collectedToolCalls.length ? 'tool_calls' : 'stop'; const finalUsage = buildUsageBody(cascadeResult?.usage || null, messages, accText, accThinking); send({ id, object: 'chat.completion.chunk', created, model, choices: [{ index: 0, delta: {}, finish_reason: finalReason }], usage: finalUsage }); // OpenAI-compat: terminal usage chunk (stream_options.include_usage // convention — empty choices[] + usage). Prefer Cascade's own // CortexStepMetadata.model_usage numbers when present, fall back // to the local chars/4 estimator. See buildUsageBody(). { const usage = buildUsageBody(cascadeResult?.usage || null, messages, accText, accThinking); send({ id, object: 'chat.completion.chunk', created, model, choices: [], usage }); } if (!res.writableEnded) { res.write('data: [DONE]\n\n'); res.end(); } if (ckey && !collectedToolCalls.length && (accText || accThinking)) { cacheSet(ckey, { text: accText, thinking: accThinking }); } return; } catch (err) { lastErr = err; reuseEntry = null; // don't try to reuse on retry const isAuthFail = /unauthenticated|invalid api key|invalid_grant|permission_denied.*account/i.test(err.message); const isRateLimit = /rate limit|rate_limit|too many requests|quota/i.test(err.message); const isInternal = /internal error occurred.*error id/i.test(err.message); if (isAuthFail) reportError(currentApiKey); if (isRateLimit) { markRateLimited(currentApiKey, 5 * 60 * 1000, modelKey); err.isRateLimit = true; err.isModelError = true; } if (isInternal) { reportInternalError(currentApiKey); err.isModelError = true; } if (err.isModelError && !isRateLimit && !isInternal) { updateCapability(currentApiKey, modelKey, false, 'model_error'); } // Retry only if nothing has been streamed yet AND it's a retryable error if (!hadSuccess && (err.isModelError || isRateLimit)) { const tag = isRateLimit ? 'rate_limit' : isInternal ? 'internal_error' : 'model_error'; log.warn(`Account ${acct.email} failed (${tag}) on ${model}, trying next`); continue; } break; } } // All attempts failed log.error('Stream error after retries:', lastErr?.message); recordRequest(model, false, Date.now() - startTime, currentApiKey); try { if (!rolePrinted) { send({ id, object: 'chat.completion.chunk', created, model, choices: [{ index: 0, delta: { role: 'assistant', content: '' }, finish_reason: null }] }); } // Check if failure is due to all accounts being rate-limited const rl = isAllRateLimited(modelKey); const errMsg = rl.allLimited ? `${model} 所有账号均已达速率限制,请 ${Math.ceil(rl.retryAfterMs / 1000)} 秒后重试` : sanitizeText(lastErr?.message || 'no accounts'); send({ id, object: 'chat.completion.chunk', created, model, choices: [{ index: 0, delta: { content: `\n[Error: ${errMsg}]` }, finish_reason: 'stop' }] }); res.write('data: [DONE]\n\n'); } catch {} if (!res.writableEnded) res.end(); } finally { stopHeartbeat(); } }, }; }