Spaces:
Sleeping
Sleeping
| /** | |
| * POST /v1/chat/completions — OpenAI-compatible chat completions. | |
| * Routes to RawGetChatMessage (legacy) or Cascade (premium) based on model type. | |
| */ | |
| import { randomUUID } from 'crypto'; | |
| import { WindsurfClient } from '../client.js'; | |
| import { getApiKey, acquireAccountByKey, reportError, reportSuccess, markRateLimited, reportInternalError, updateCapability, getAccountList, isAllRateLimited } from '../auth.js'; | |
| import { resolveModel, getModelInfo } from '../models.js'; | |
| import { getLsFor, ensureLs } from '../langserver.js'; | |
| import { config, log } from '../config.js'; | |
| import { recordRequest } from '../dashboard/stats.js'; | |
| import { isModelAllowed } from '../dashboard/model-access.js'; | |
| import { cacheKey, cacheGet, cacheSet } from '../cache.js'; | |
| import { isExperimentalEnabled, getIdentityPromptFor } from '../runtime-config.js'; | |
| import { checkMessageRateLimit } from '../windsurf-api.js'; | |
| import { getEffectiveProxy } from '../dashboard/proxy-config.js'; | |
| import { | |
| fingerprintBefore, fingerprintAfter, checkout as poolCheckout, checkin as poolCheckin, | |
| } from '../conversation-pool.js'; | |
| import { | |
| normalizeMessagesForCascade, ToolCallStreamParser, parseToolCallsFromText, | |
| buildToolPreambleForProto, | |
| } from './tool-emulation.js'; | |
| import { sanitizeText, PathSanitizeStream } from '../sanitize.js'; | |
| const HEARTBEAT_MS = 15_000; | |
| const QUEUE_RETRY_MS = 1_000; | |
| const QUEUE_MAX_WAIT_MS = 30_000; | |
| // ── Model identity prompt ────────────────────────────────── | |
| // Templates live in runtime-config (editable from the dashboard). Use {model} | |
| // as a placeholder for the requested model name. Only applied when the | |
| // experimental "modelIdentityPrompt" toggle is ON. | |
| function buildIdentitySystemMessage(displayModel, provider) { | |
| const template = getIdentityPromptFor(provider); | |
| if (!template) return null; | |
| return template.replace(/\{model\}/g, displayModel); | |
| } | |
| function genId() { | |
| return 'chatcmpl-' + randomUUID().replace(/-/g, '').slice(0, 29); | |
| } | |
| // Rough token estimate (~4 chars/token). Used only to populate the | |
| // OpenAI-compatible `usage.prompt_tokens_details.cached_tokens` field so | |
| // upstream billing/dashboards (new-api) can recognise our local cache hits. | |
| function estimateTokens(messages) { | |
| if (!Array.isArray(messages)) return 0; | |
| let chars = 0; | |
| for (const m of messages) { | |
| if (typeof m?.content === 'string') chars += m.content.length; | |
| else if (Array.isArray(m?.content)) { | |
| for (const p of m.content) if (typeof p?.text === 'string') chars += p.text.length; | |
| } | |
| } | |
| return Math.max(1, Math.ceil(chars / 4)); | |
| } | |
| function cachedUsage(messages, completionText) { | |
| const prompt = estimateTokens(messages); | |
| const completion = Math.max(1, Math.ceil((completionText || '').length / 4)); | |
| return { | |
| prompt_tokens: prompt, | |
| completion_tokens: completion, | |
| total_tokens: prompt + completion, | |
| input_tokens: prompt, | |
| output_tokens: completion, | |
| prompt_tokens_details: { cached_tokens: prompt }, | |
| completion_tokens_details: { reasoning_tokens: 0 }, | |
| cached: true, | |
| }; | |
| } | |
| /** | |
| * Build an OpenAI-shaped `usage` object, preferring server-reported token | |
| * counts from Cascade's CortexStepMetadata.model_usage when available, and | |
| * falling back to the local chars/4 estimate otherwise. Keeps the same shape | |
| * in both branches so downstream billing doesn't have to care which source | |
| * produced the numbers. | |
| * | |
| * The Cascade backend reports usage as {inputTokens, outputTokens, | |
| * cacheReadTokens, cacheWriteTokens}. We map them onto the OpenAI shape: | |
| * prompt_tokens = inputTokens + cacheReadTokens + cacheWriteTokens | |
| * (total input tokens the model processed, whether fresh, | |
| * cache-read, or cache-written — matches the OpenAI | |
| * convention where prompt_tokens is the grand total) | |
| * completion_tokens = outputTokens | |
| * prompt_tokens_details.cached_tokens = cacheReadTokens | |
| * cache_creation_input_tokens (Anthropic ext) = cacheWriteTokens | |
| */ | |
| function buildUsageBody(serverUsage, messages, completionText, thinkingText = '') { | |
| if (serverUsage && (serverUsage.inputTokens || serverUsage.outputTokens)) { | |
| const inputTokens = serverUsage.inputTokens || 0; | |
| const outputTokens = serverUsage.outputTokens || 0; | |
| const cacheRead = serverUsage.cacheReadTokens || 0; | |
| const cacheWrite = serverUsage.cacheWriteTokens || 0; | |
| const promptTotal = inputTokens + cacheRead + cacheWrite; | |
| return { | |
| prompt_tokens: promptTotal, | |
| completion_tokens: outputTokens, | |
| total_tokens: promptTotal + outputTokens, | |
| input_tokens: promptTotal, | |
| output_tokens: outputTokens, | |
| prompt_tokens_details: { cached_tokens: cacheRead }, | |
| completion_tokens_details: { reasoning_tokens: 0 }, | |
| cache_creation_input_tokens: cacheWrite, | |
| }; | |
| } | |
| const prompt = estimateTokens(messages); | |
| const completion = Math.max(1, Math.ceil(((completionText || '').length + (thinkingText || '').length) / 4)); | |
| return { | |
| prompt_tokens: prompt, | |
| completion_tokens: completion, | |
| total_tokens: prompt + completion, | |
| input_tokens: prompt, | |
| output_tokens: completion, | |
| prompt_tokens_details: { cached_tokens: 0 }, | |
| completion_tokens_details: { reasoning_tokens: 0 }, | |
| }; | |
| } | |
| // Wait until getApiKey returns a non-null account, or until maxWaitMs expires. | |
| // Used when every account has momentarily exhausted its RPM budget so the | |
| // client is queued instead of getting a 503. | |
| async function waitForAccount(tried, signal, maxWaitMs = QUEUE_MAX_WAIT_MS, modelKey = null) { | |
| const deadline = Date.now() + maxWaitMs; | |
| let acct = getApiKey(tried, modelKey); | |
| while (!acct) { | |
| if (signal?.aborted) return null; | |
| if (Date.now() >= deadline) return null; | |
| await new Promise(r => setTimeout(r, QUEUE_RETRY_MS)); | |
| acct = getApiKey(tried, modelKey); | |
| } | |
| return acct; | |
| } | |
| export async function handleChatCompletions(body) { | |
| const { | |
| model: reqModel, | |
| stream = false, | |
| max_tokens, | |
| tools, | |
| tool_choice, | |
| } = body; | |
| // `messages` is `let` not `const` so the identity-prompt injection below | |
| // can prepend a system turn for the legacy path too. | |
| let messages = body.messages; | |
| const modelKey = resolveModel(reqModel || config.defaultModel); | |
| const modelInfo = getModelInfo(modelKey); | |
| const displayModel = modelInfo?.name || reqModel || config.defaultModel; | |
| const modelEnum = modelInfo?.enumValue || 0; | |
| const modelUid = modelInfo?.modelUid || null; | |
| // Models with a modelUid use the Cascade flow (StartCascade → SendUserCascadeMessage). | |
| // Legacy RawGetChatMessage only for models with enumValue>0 and NO modelUid. | |
| // Newer models (gemini-3.0, gpt-5.2, etc.) have both enumValue AND modelUid but | |
| // their high enum values cause "cannot parse invalid wire-format data" in the | |
| // legacy proto endpoint. Cascade handles them correctly via uid string. | |
| const useCascade = !!modelUid; | |
| // Tool-call emulation: if the client passed OpenAI-style tools[], we rewrite | |
| // tool-result turns into synthetic user text and inject the tool protocol | |
| // at the system-prompt level via CascadeConversationalPlannerConfig's | |
| // tool_calling_section (SectionOverrideConfig, OVERRIDE mode). This is far | |
| // more reliable than user-message-level injection because NO_TOOL mode's | |
| // baked-in system prompt tells the model "you have no tools" — which | |
| // overpowers user-message preambles. The section override replaces that | |
| // section directly so the model sees our emulated tool definitions as | |
| // authoritative system instructions. | |
| const hasTools = Array.isArray(tools) && tools.length > 0; | |
| const hasToolHistory = Array.isArray(messages) && messages.some(m => m?.role === 'tool' || (m?.role === 'assistant' && Array.isArray(m.tool_calls) && m.tool_calls.length)); | |
| const emulateTools = useCascade && (hasTools || hasToolHistory); | |
| // Build proto-level preamble (goes into tool_calling_section override); | |
| // pass empty tools to normalizeMessagesForCascade so it only rewrites | |
| // role:tool / assistant.tool_calls messages without injecting a user-level | |
| // preamble (that's now handled at the proto layer). | |
| const toolPreamble = emulateTools ? buildToolPreambleForProto(tools || [], tool_choice) : ''; | |
| let cascadeMessages = emulateTools | |
| ? normalizeMessagesForCascade(messages, []) | |
| : [...messages]; | |
| // ── Model identity prompt injection ── | |
| // When enabled, prepend a system message so the model identifies itself as | |
| // the requested model (e.g. "I am Claude Opus 4.6") instead of leaking the | |
| // Cascade/Windsurf backend identity. Inject into BOTH messages (for legacy | |
| // RawGetChatMessage path) and cascadeMessages (Cascade path) — they diverge | |
| // once tool-emulation rewrites the Cascade path, but the system identity | |
| // should be identical in both. | |
| if (isExperimentalEnabled('modelIdentityPrompt') && modelInfo?.provider) { | |
| const identityText = buildIdentitySystemMessage(displayModel, modelInfo.provider); | |
| if (identityText) { | |
| const sysMsg = { role: 'system', content: identityText }; | |
| cascadeMessages = [sysMsg, ...cascadeMessages]; | |
| messages = [sysMsg, ...messages]; | |
| } | |
| } | |
| // Global model access control (allowlist / blocklist from dashboard) | |
| const access = isModelAllowed(modelKey); | |
| if (!access.allowed) { | |
| return { status: 403, body: { error: { message: access.reason, type: 'model_blocked' } } }; | |
| } | |
| // Per-account model routing preflight: if NO active account has this | |
| // model in its tier ∩ available list, fail fast instead of looping | |
| // through every account trying to find one. This surfaces tier | |
| // entitlement and blocklist errors as a clean 403 rather than a 30s | |
| // queue timeout → pool_exhausted. | |
| const anyEligible = getAccountList().some(a => | |
| a.status === 'active' && (a.availableModels || []).includes(modelKey) | |
| ); | |
| if (!anyEligible) { | |
| return { | |
| status: 403, | |
| body: { | |
| error: { | |
| message: `模型 ${displayModel} 在当前账号池中不可用(未订阅或已被封禁)`, | |
| type: 'model_not_entitled', | |
| }, | |
| }, | |
| }; | |
| } | |
| const chatId = genId(); | |
| const created = Math.floor(Date.now() / 1000); | |
| const ckey = cacheKey(body); | |
| if (stream) { | |
| return streamResponse(chatId, created, displayModel, modelKey, messages, cascadeMessages, modelEnum, modelUid, useCascade, ckey, emulateTools, toolPreamble); | |
| } | |
| // ── Local response cache (exact body match) ───────────── | |
| const cached = cacheGet(ckey); | |
| if (cached) { | |
| log.info(`Chat: cache HIT model=${displayModel} flow=non-stream`); | |
| recordRequest(displayModel, true, 0, null); | |
| const message = { role: 'assistant', content: cached.text || null }; | |
| if (cached.thinking) message.reasoning_content = cached.thinking; | |
| return { | |
| status: 200, | |
| body: { | |
| id: chatId, object: 'chat.completion', created, model: displayModel, | |
| choices: [{ index: 0, message, finish_reason: 'stop' }], | |
| usage: cachedUsage(messages, cached.text), | |
| }, | |
| }; | |
| } | |
| // ── Cascade conversation pool (experimental) ── | |
| // If the client is continuing a prior conversation and we still hold the | |
| // cascade_id from last turn, pin this request to that exact (account, LS) | |
| // pair so the Windsurf backend serves from its hot per-cascade context | |
| // instead of replaying the whole history. | |
| // | |
| // Tool-emulation mode bypasses the reuse pool: fingerprint can't stably | |
| // collapse a conversation whose assistant turns contain synthesised | |
| // <tool_call> markup and whose user turns contain <tool_result> wrappers. | |
| const reuseEnabled = useCascade && !emulateTools && isExperimentalEnabled('cascadeConversationReuse'); | |
| const fpBefore = reuseEnabled ? fingerprintBefore(messages) : null; | |
| let reuseEntry = reuseEnabled ? poolCheckout(fpBefore) : null; | |
| if (reuseEntry) log.info(`Chat: cascade reuse HIT cascadeId=${reuseEntry.cascadeId.slice(0, 8)}… model=${displayModel}`); | |
| // Non-stream: retry with a different account on model-not-available errors | |
| const tried = []; | |
| let lastErr = null; | |
| // Dynamic: try every active account in the pool (capped at 10) so a | |
| // large pool with many rate-limited accounts can still fall through | |
| // to a free one. Was hardcoded 3 — in pools bigger than 3 with the | |
| // first accounts rate-limited, healthy accounts were never reached | |
| // even though they would have worked (issue #5). | |
| const maxAttempts = Math.min(10, Math.max(3, getAccountList().filter(a => a.status === 'active').length)); | |
| for (let attempt = 0; attempt < maxAttempts; attempt++) { | |
| let acct = null; | |
| if (reuseEntry && attempt === 0) { | |
| // First attempt pins to the account that owns the cached cascade. | |
| acct = acquireAccountByKey(reuseEntry.apiKey, modelKey); | |
| if (!acct) { | |
| log.info('Chat: cascade reuse skipped — owning account not available, falling back to fresh cascade'); | |
| reuseEntry = null; | |
| } | |
| } | |
| if (!acct) { | |
| acct = await waitForAccount(tried, null, QUEUE_MAX_WAIT_MS, modelKey); | |
| if (!acct) break; | |
| } | |
| tried.push(acct.apiKey); | |
| // Pre-flight rate limit check (experimental): ask server.codeium.com if | |
| // this account still has message capacity before burning an LS round trip. | |
| if (isExperimentalEnabled('preflightRateLimit')) { | |
| try { | |
| const px = getEffectiveProxy(acct.id) || null; | |
| const rl = await checkMessageRateLimit(acct.apiKey, px); | |
| if (!rl.hasCapacity) { | |
| log.warn(`Preflight: ${acct.email} has no capacity (remaining=${rl.messagesRemaining}), skipping`); | |
| markRateLimited(acct.id, modelKey); | |
| continue; | |
| } | |
| } catch (e) { | |
| log.debug(`Preflight check failed for ${acct.email}: ${e.message}`); | |
| // Fail open — proceed with the request | |
| } | |
| } | |
| await ensureLs(acct.proxy); | |
| const ls = getLsFor(acct.proxy); | |
| if (!ls) { lastErr = { status: 503, body: { error: { message: 'No LS instance available', type: 'ls_unavailable' } } }; break; } | |
| // Cascade pins cascade_id to a specific LS port too; if the LS it was | |
| // born on has been replaced, the cascade_id is dead. | |
| if (reuseEntry && reuseEntry.lsPort !== ls.port) { | |
| log.info('Chat: cascade reuse skipped — LS port changed'); | |
| reuseEntry = null; | |
| } | |
| const _msgChars = (messages || []).reduce((n, m) => { | |
| const c = m?.content; | |
| return n + (typeof c === 'string' ? c.length : Array.isArray(c) ? c.reduce((k, p) => k + (typeof p?.text === 'string' ? p.text.length : 0), 0) : 0); | |
| }, 0); | |
| log.info(`Chat: model=${displayModel} flow=${useCascade ? 'cascade' : 'legacy'} attempt=${attempt + 1} account=${acct.email} ls=${ls.port} turns=${(messages||[]).length} chars=${_msgChars}${reuseEntry ? ' reuse=1' : ''}${emulateTools ? ' tools=emu' : ''}`); | |
| const client = new WindsurfClient(acct.apiKey, ls.port, ls.csrfToken); | |
| const result = await nonStreamResponse( | |
| client, chatId, created, displayModel, modelKey, messages, cascadeMessages, modelEnum, modelUid, | |
| useCascade, acct.apiKey, ckey, | |
| reuseEnabled ? { reuseEntry, lsPort: ls.port, apiKey: acct.apiKey } : null, | |
| emulateTools, toolPreamble, | |
| ); | |
| if (result.status === 200) return result; | |
| reuseEntry = null; // don't try to reuse on the retry | |
| lastErr = result; | |
| const errType = result.body?.error?.type; | |
| // Rate limit: this account is done for this model, try the next one | |
| if (errType === 'rate_limit_exceeded') { | |
| log.warn(`Account ${acct.email} rate-limited on ${displayModel}, trying next account`); | |
| continue; | |
| } | |
| // Model not available on this account (permission_denied, etc.) | |
| if (errType === 'model_not_available') { | |
| log.warn(`Account ${acct.email} cannot serve ${displayModel}, trying next account`); | |
| continue; | |
| } | |
| break; // other errors (502, transport) — don't retry | |
| } | |
| // If all accounts exhausted, check if it's because they're all rate-limited | |
| if (!lastErr || lastErr.status === 429) { | |
| const rl = isAllRateLimited(modelKey); | |
| if (rl.allLimited) { | |
| return { status: 429, body: { error: { message: `${displayModel} 所有账号均已达速率限制,请 ${Math.ceil(rl.retryAfterMs / 1000)} 秒后重试`, type: 'rate_limit_exceeded', retry_after_ms: rl.retryAfterMs } } }; | |
| } | |
| } | |
| return lastErr || { status: 503, body: { error: { message: 'No active accounts available', type: 'pool_exhausted' } } }; | |
| } | |
| async function nonStreamResponse(client, id, created, model, modelKey, messages, cascadeMessages, modelEnum, modelUid, useCascade, apiKey, ckey, poolCtx, emulateTools, toolPreamble) { | |
| const startTime = Date.now(); | |
| try { | |
| let allText = ''; | |
| let allThinking = ''; | |
| let cascadeMeta = null; | |
| let toolCalls = []; | |
| // Server-reported token usage from CortexStepMetadata.model_usage, summed | |
| // across all trajectory steps. Preferred over the chars/4 estimate when | |
| // present so downstream billing (new-api, etc.) sees real Cascade numbers. | |
| let serverUsage = null; | |
| if (useCascade) { | |
| const chunks = await client.cascadeChat(cascadeMessages, modelEnum, modelUid, { reuseEntry: poolCtx?.reuseEntry || null, toolPreamble }); | |
| for (const c of chunks) { | |
| if (c.text) allText += c.text; | |
| if (c.thinking) allThinking += c.thinking; | |
| } | |
| cascadeMeta = { cascadeId: chunks.cascadeId, sessionId: chunks.sessionId }; | |
| serverUsage = chunks.usage || null; | |
| // Always strip <tool_call>/<tool_result> blocks from Cascade text. | |
| // - emulateTools=true: parsed tool_calls become OpenAI-format tool_calls. | |
| // - emulateTools=false: blocks are silently discarded (defense-in-depth | |
| // against Cascade's system prompt inducing tool markup even after we | |
| // override tool_calling_section). | |
| { | |
| const parsed = parseToolCallsFromText(allText); | |
| allText = parsed.text; | |
| if (emulateTools) toolCalls = parsed.toolCalls; | |
| } | |
| // Built-in Cascade tool calls (chunks.toolCalls — edit_file, view_file, | |
| // list_directory, run_command, etc.) are intentionally DROPPED. Their | |
| // argumentsJson and result fields reference server-internal paths like | |
| // /tmp/windsurf-workspace/config.yaml and must never be exposed to an | |
| // API caller. Emulated tool calls (above) are safe because they | |
| // reference the caller's own tool schema. | |
| } else { | |
| const chunks = await client.rawGetChatMessage(messages, modelEnum, modelUid); | |
| for (const c of chunks) { | |
| if (c.text) allText += c.text; | |
| } | |
| } | |
| // Scrub server-internal filesystem paths from everything we're about to | |
| // return. See src/sanitize.js for the patterns and rationale. | |
| allText = sanitizeText(allText); | |
| allThinking = sanitizeText(allThinking); | |
| if (toolCalls.length) { | |
| toolCalls = toolCalls.map(tc => ({ | |
| ...tc, | |
| argumentsJson: sanitizeText(tc.argumentsJson || ''), | |
| })); | |
| } | |
| // Check the cascade back into the pool under the *post-turn* fingerprint | |
| // so the next request in the same conversation can resume it. | |
| if (poolCtx && cascadeMeta?.cascadeId && allText) { | |
| const fpAfter = fingerprintAfter(messages, allText); | |
| poolCheckin(fpAfter, { | |
| cascadeId: cascadeMeta.cascadeId, | |
| sessionId: cascadeMeta.sessionId, | |
| lsPort: poolCtx.lsPort, | |
| apiKey: poolCtx.apiKey, | |
| createdAt: poolCtx.reuseEntry?.createdAt, | |
| }); | |
| } | |
| reportSuccess(apiKey); | |
| updateCapability(apiKey, modelKey, true, 'success'); | |
| recordRequest(model, true, Date.now() - startTime, apiKey); | |
| // Store in cache for next identical request. Skip caching tool_call | |
| // responses — they're inherently contextual and the cache doesn't | |
| // preserve the tool_calls array, so a cache hit would return a | |
| // content-only response with finish_reason:stop, breaking tool flow. | |
| if (ckey && !toolCalls.length) cacheSet(ckey, { text: allText, thinking: allThinking }); | |
| const message = { role: 'assistant', content: allText || null }; | |
| if (allThinking) message.reasoning_content = allThinking; | |
| if (toolCalls.length) { | |
| message.tool_calls = toolCalls.map((tc, i) => ({ | |
| id: tc.id || `call_${i}_${Date.now().toString(36)}`, | |
| type: 'function', | |
| function: { | |
| name: tc.name || 'unknown', | |
| arguments: tc.argumentsJson || tc.arguments || '{}', | |
| }, | |
| })); | |
| // OpenAI convention: content is null when finish_reason is tool_calls. | |
| // In text emulation the model often emits an inline answer alongside the | |
| // <tool_call> block (e.g., hallucinated weather data). Set content to | |
| // null so clients that check `content !== null` behave correctly and the | |
| // caller waits for the real tool result rather than showing hallucinated | |
| // data. | |
| message.content = null; | |
| } | |
| // Prefer server-reported usage; fall back to chars/4 estimate only when | |
| // the trajectory didn't include a ModelUsageStats field. | |
| const usage = buildUsageBody(serverUsage, messages, allText, allThinking); | |
| const finishReason = toolCalls.length ? 'tool_calls' : 'stop'; | |
| return { | |
| status: 200, | |
| body: { | |
| id, object: 'chat.completion', created, model, | |
| choices: [{ index: 0, message, finish_reason: finishReason }], | |
| usage, | |
| }, | |
| }; | |
| } catch (err) { | |
| // Only count true auth failures against the account. Workspace/cascade/model | |
| // errors and transport issues shouldn't disable the key. | |
| const isAuthFail = /unauthenticated|invalid api key|invalid_grant|permission_denied.*account/i.test(err.message); | |
| const isRateLimit = /rate limit|rate_limit|too many requests|quota/i.test(err.message); | |
| const isInternal = /internal error occurred.*error id/i.test(err.message); | |
| if (isAuthFail) reportError(apiKey); | |
| if (isRateLimit) { markRateLimited(apiKey, 5 * 60 * 1000, modelKey); err.isRateLimit = true; err.isModelError = true; } | |
| if (isInternal) { reportInternalError(apiKey); err.isModelError = true; } | |
| if (err.isModelError && !isRateLimit && !isInternal) { | |
| updateCapability(apiKey, modelKey, false, 'model_error'); | |
| } | |
| recordRequest(model, false, Date.now() - startTime, apiKey); | |
| log.error('Chat error:', err.message); | |
| // Rate limits → 429 with Retry-After; model errors → 403; others → 502 | |
| if (isRateLimit) { | |
| const rl = isAllRateLimited(modelKey); | |
| return { | |
| status: 429, | |
| body: { error: { message: `${model} 已达速率限制,请稍后重试`, type: 'rate_limit_exceeded', retry_after_ms: rl.retryAfterMs || 60000 } }, | |
| }; | |
| } | |
| return { | |
| status: err.isModelError ? 403 : 502, | |
| body: { error: { message: sanitizeText(err.message), type: err.isModelError ? 'model_not_available' : 'upstream_error' } }, | |
| }; | |
| } | |
| } | |
| function streamResponse(id, created, model, modelKey, messages, cascadeMessages, modelEnum, modelUid, useCascade, ckey, emulateTools, toolPreamble) { | |
| return { | |
| status: 200, | |
| stream: true, | |
| headers: { | |
| 'Content-Type': 'text/event-stream', | |
| 'Cache-Control': 'no-cache', | |
| 'Connection': 'keep-alive', | |
| 'X-Accel-Buffering': 'no', | |
| }, | |
| async handler(res) { | |
| const abortController = new AbortController(); | |
| res.on('close', () => { | |
| if (!res.writableEnded) { | |
| log.info('Client disconnected mid-stream, aborting upstream'); | |
| abortController.abort(); | |
| } | |
| }); | |
| const send = (data) => { | |
| if (!res.writableEnded) res.write(`data: ${JSON.stringify(data)}\n\n`); | |
| }; | |
| // SSE heartbeat: keep the TCP/HTTP connection alive through any silent | |
| // period (LS warmup, Cascade "thinking", queue wait). `:` prefix is a | |
| // comment line per the SSE spec — clients ignore it, intermediaries see | |
| // bytes flowing, idle timers get reset. | |
| const heartbeat = setInterval(() => { | |
| if (!res.writableEnded) res.write(': ping\n\n'); | |
| }, HEARTBEAT_MS); | |
| const stopHeartbeat = () => clearInterval(heartbeat); | |
| res.on('close', stopHeartbeat); | |
| // ── Cache hit: replay stored response as a fake stream ── | |
| const cached = cacheGet(ckey); | |
| if (cached) { | |
| log.info(`Chat: cache HIT model=${model} flow=stream`); | |
| recordRequest(model, true, 0, null); | |
| try { | |
| send({ id, object: 'chat.completion.chunk', created, model, | |
| choices: [{ index: 0, delta: { role: 'assistant', content: '' }, finish_reason: null }] }); | |
| if (cached.thinking) { | |
| send({ id, object: 'chat.completion.chunk', created, model, | |
| choices: [{ index: 0, delta: { reasoning_content: cached.thinking }, finish_reason: null }] }); | |
| } | |
| if (cached.text) { | |
| send({ id, object: 'chat.completion.chunk', created, model, | |
| choices: [{ index: 0, delta: { content: cached.text }, finish_reason: null }] }); | |
| } | |
| send({ id, object: 'chat.completion.chunk', created, model, | |
| choices: [{ index: 0, delta: {}, finish_reason: 'stop' }], | |
| usage: cachedUsage(messages, cached.text) }); | |
| if (!res.writableEnded) { res.write('data: [DONE]\n\n'); res.end(); } | |
| } finally { | |
| stopHeartbeat(); | |
| } | |
| return; | |
| } | |
| const startTime = Date.now(); | |
| const tried = []; | |
| let hadSuccess = false; | |
| let rolePrinted = false; | |
| let currentApiKey = null; | |
| let lastErr = null; | |
| // Dynamic: try every active account in the pool (capped at 10) so a | |
| // large pool with many rate-limited accounts can still fall through | |
| // to a free one. Was hardcoded 3 — in pools bigger than 3 with the | |
| // first accounts rate-limited, healthy accounts were never reached | |
| // even though they would have worked (issue #5). | |
| const maxAttempts = Math.min(10, Math.max(3, getAccountList().filter(a => a.status === 'active').length)); | |
| // Accumulate chunks so we can cache a successful response at the end. | |
| let accText = ''; | |
| let accThinking = ''; | |
| // Cascade conversation pool (experimental, stream path) — bypassed in | |
| // tool-emulation mode because the fingerprint can't collapse turns | |
| // whose bodies carry <tool_call>/<tool_result> markup. | |
| const reuseEnabled = useCascade && !emulateTools && isExperimentalEnabled('cascadeConversationReuse'); | |
| const fpBefore = reuseEnabled ? fingerprintBefore(messages) : null; | |
| let reuseEntry = reuseEnabled ? poolCheckout(fpBefore) : null; | |
| if (reuseEntry) log.info(`Chat: cascade reuse HIT cascadeId=${reuseEntry.cascadeId.slice(0, 8)}… stream model=${model}`); | |
| // Always strip <tool_call>/<tool_result> blocks in Cascade mode. | |
| // In emulation mode, parsed calls are emitted as OpenAI tool_calls. | |
| // In non-emulation mode, blocks are silently stripped (defense-in-depth | |
| // against Cascade's system prompt inducing tool markup). | |
| const toolParser = useCascade ? new ToolCallStreamParser() : null; | |
| const collectedToolCalls = []; | |
| // Streaming path sanitizers. Every text/thinking delta flows through a | |
| // PathSanitizeStream before leaving the server so /tmp/windsurf-workspace, | |
| // /opt/windsurf and /root/WindsurfAPI literals can never slip out even | |
| // if a path straddles a chunk boundary. See src/sanitize.js. | |
| const pathStreamText = new PathSanitizeStream(); | |
| const pathStreamThinking = new PathSanitizeStream(); | |
| const emitContent = (clean) => { | |
| if (!clean) return; | |
| accText += clean; | |
| send({ id, object: 'chat.completion.chunk', created, model, | |
| choices: [{ index: 0, delta: { content: clean }, finish_reason: null }] }); | |
| }; | |
| const emitThinking = (clean) => { | |
| if (!clean) return; | |
| accThinking += clean; | |
| send({ id, object: 'chat.completion.chunk', created, model, | |
| choices: [{ index: 0, delta: { reasoning_content: clean }, finish_reason: null }] }); | |
| }; | |
| const emitToolCallDelta = (tc, idx) => { | |
| send({ id, object: 'chat.completion.chunk', created, model, | |
| choices: [{ index: 0, delta: { | |
| tool_calls: [{ | |
| index: idx, | |
| id: tc.id, | |
| type: 'function', | |
| function: { name: tc.name, arguments: sanitizeText(tc.argumentsJson || '{}') }, | |
| }], | |
| }, finish_reason: null }] }); | |
| }; | |
| const onChunk = (chunk) => { | |
| if (!rolePrinted) { | |
| rolePrinted = true; | |
| send({ id, object: 'chat.completion.chunk', created, model, | |
| choices: [{ index: 0, delta: { role: 'assistant', content: '' }, finish_reason: null }] }); | |
| } | |
| hadSuccess = true; | |
| if (chunk.text) { | |
| // Pipeline for text deltas: | |
| // raw chunk → ToolCallStreamParser (strip <tool_call> blocks) | |
| // → PathSanitizeStream (scrub server paths) | |
| // → client | |
| let safeText = chunk.text; | |
| if (toolParser) { | |
| const { text: safe, toolCalls: done } = toolParser.feed(chunk.text); | |
| safeText = safe; | |
| // Only emit tool_call deltas when emulating — otherwise the | |
| // parsed calls came from Cascade's built-in tools and are | |
| // silently discarded. | |
| if (emulateTools) { | |
| for (const tc of done) { | |
| const idx = collectedToolCalls.length; | |
| collectedToolCalls.push(tc); | |
| emitToolCallDelta(tc, idx); | |
| } | |
| } | |
| } | |
| if (safeText) emitContent(pathStreamText.feed(safeText)); | |
| } | |
| if (chunk.thinking) { | |
| emitThinking(pathStreamThinking.feed(chunk.thinking)); | |
| } | |
| }; | |
| try { | |
| for (let attempt = 0; attempt < maxAttempts; attempt++) { | |
| if (abortController.signal.aborted) return; | |
| let acct = null; | |
| if (reuseEntry && attempt === 0) { | |
| acct = acquireAccountByKey(reuseEntry.apiKey, modelKey); | |
| if (!acct) { | |
| log.info('Chat: cascade reuse skipped — owning account not available'); | |
| reuseEntry = null; | |
| } | |
| } | |
| if (!acct) { | |
| acct = await waitForAccount(tried, abortController.signal, QUEUE_MAX_WAIT_MS, modelKey); | |
| if (!acct) break; | |
| } | |
| tried.push(acct.apiKey); | |
| currentApiKey = acct.apiKey; | |
| // Pre-flight rate limit check (experimental) | |
| if (isExperimentalEnabled('preflightRateLimit')) { | |
| try { | |
| const px = getEffectiveProxy(acct.id) || null; | |
| const rl = await checkMessageRateLimit(acct.apiKey, px); | |
| if (!rl.hasCapacity) { | |
| log.warn(`Preflight: ${acct.email} has no capacity (remaining=${rl.messagesRemaining}), skipping`); | |
| markRateLimited(acct.id, modelKey); | |
| continue; | |
| } | |
| } catch (e) { | |
| log.debug(`Preflight check failed for ${acct.email}: ${e.message}`); | |
| } | |
| } | |
| try { await ensureLs(acct.proxy); } catch (e) { lastErr = e; break; } | |
| const ls = getLsFor(acct.proxy); | |
| if (!ls) { lastErr = new Error('No LS instance available'); break; } | |
| if (reuseEntry && reuseEntry.lsPort !== ls.port) { | |
| log.info('Chat: cascade reuse skipped — LS port changed'); | |
| reuseEntry = null; | |
| } | |
| const _msgCharsStream = (messages || []).reduce((n, m) => { | |
| const c = m?.content; | |
| return n + (typeof c === 'string' ? c.length : Array.isArray(c) ? c.reduce((k, p) => k + (typeof p?.text === 'string' ? p.text.length : 0), 0) : 0); | |
| }, 0); | |
| log.info(`Chat: model=${model} flow=${useCascade ? 'cascade' : 'legacy'} stream=true attempt=${attempt + 1} account=${acct.email} ls=${ls.port} turns=${(messages||[]).length} chars=${_msgCharsStream}${reuseEntry ? ' reuse=1' : ''}`); | |
| const client = new WindsurfClient(acct.apiKey, ls.port, ls.csrfToken); | |
| let cascadeResult = null; | |
| try { | |
| if (useCascade) { | |
| cascadeResult = await client.cascadeChat(cascadeMessages, modelEnum, modelUid, { | |
| onChunk, signal: abortController.signal, reuseEntry, toolPreamble, | |
| }); | |
| } else { | |
| await client.rawGetChatMessage(messages, modelEnum, modelUid, { onChunk }); | |
| } | |
| // Flush order matters: | |
| // 1. ToolCallStreamParser tail → may produce more text deltas | |
| // (e.g., a dangling <tool_call> that never closed falls | |
| // through as literal text) | |
| // 2. PathSanitizeStream tail (text) → scrubs anything the tool | |
| // parser held back AND anything we were holding ourselves | |
| // 3. PathSanitizeStream tail (thinking) | |
| if (toolParser) { | |
| const tail = toolParser.flush(); | |
| if (tail.text) emitContent(pathStreamText.feed(tail.text)); | |
| if (emulateTools) { | |
| for (const tc of tail.toolCalls) { | |
| const idx = collectedToolCalls.length; | |
| collectedToolCalls.push(tc); | |
| emitToolCallDelta(tc, idx); | |
| } | |
| } | |
| } | |
| emitContent(pathStreamText.flush()); | |
| emitThinking(pathStreamThinking.flush()); | |
| // Pool check-in on success (cascade only) | |
| if (reuseEnabled && cascadeResult?.cascadeId && accText) { | |
| const fpAfter = fingerprintAfter(messages, accText); | |
| poolCheckin(fpAfter, { | |
| cascadeId: cascadeResult.cascadeId, | |
| sessionId: cascadeResult.sessionId, | |
| lsPort: ls.port, | |
| apiKey: currentApiKey, | |
| createdAt: reuseEntry?.createdAt, | |
| }); | |
| } | |
| // success | |
| if (hadSuccess) reportSuccess(currentApiKey); | |
| updateCapability(currentApiKey, modelKey, true, 'success'); | |
| recordRequest(model, true, Date.now() - startTime, currentApiKey); | |
| if (!rolePrinted) { | |
| send({ id, object: 'chat.completion.chunk', created, model, | |
| choices: [{ index: 0, delta: { role: 'assistant', content: '' }, finish_reason: null }] }); | |
| } | |
| const finalReason = collectedToolCalls.length ? 'tool_calls' : 'stop'; | |
| const finalUsage = buildUsageBody(cascadeResult?.usage || null, messages, accText, accThinking); | |
| send({ id, object: 'chat.completion.chunk', created, model, | |
| choices: [{ index: 0, delta: {}, finish_reason: finalReason }], | |
| usage: finalUsage }); | |
| // OpenAI-compat: terminal usage chunk (stream_options.include_usage | |
| // convention — empty choices[] + usage). Prefer Cascade's own | |
| // CortexStepMetadata.model_usage numbers when present, fall back | |
| // to the local chars/4 estimator. See buildUsageBody(). | |
| { | |
| const usage = buildUsageBody(cascadeResult?.usage || null, messages, accText, accThinking); | |
| send({ id, object: 'chat.completion.chunk', created, model, | |
| choices: [], usage }); | |
| } | |
| if (!res.writableEnded) { res.write('data: [DONE]\n\n'); res.end(); } | |
| if (ckey && !collectedToolCalls.length && (accText || accThinking)) { | |
| cacheSet(ckey, { text: accText, thinking: accThinking }); | |
| } | |
| return; | |
| } catch (err) { | |
| lastErr = err; | |
| reuseEntry = null; // don't try to reuse on retry | |
| const isAuthFail = /unauthenticated|invalid api key|invalid_grant|permission_denied.*account/i.test(err.message); | |
| const isRateLimit = /rate limit|rate_limit|too many requests|quota/i.test(err.message); | |
| const isInternal = /internal error occurred.*error id/i.test(err.message); | |
| if (isAuthFail) reportError(currentApiKey); | |
| if (isRateLimit) { markRateLimited(currentApiKey, 5 * 60 * 1000, modelKey); err.isRateLimit = true; err.isModelError = true; } | |
| if (isInternal) { reportInternalError(currentApiKey); err.isModelError = true; } | |
| if (err.isModelError && !isRateLimit && !isInternal) { | |
| updateCapability(currentApiKey, modelKey, false, 'model_error'); | |
| } | |
| // Retry only if nothing has been streamed yet AND it's a retryable error | |
| if (!hadSuccess && (err.isModelError || isRateLimit)) { | |
| const tag = isRateLimit ? 'rate_limit' : isInternal ? 'internal_error' : 'model_error'; | |
| log.warn(`Account ${acct.email} failed (${tag}) on ${model}, trying next`); | |
| continue; | |
| } | |
| break; | |
| } | |
| } | |
| // All attempts failed | |
| log.error('Stream error after retries:', lastErr?.message); | |
| recordRequest(model, false, Date.now() - startTime, currentApiKey); | |
| try { | |
| if (!rolePrinted) { | |
| send({ id, object: 'chat.completion.chunk', created, model, | |
| choices: [{ index: 0, delta: { role: 'assistant', content: '' }, finish_reason: null }] }); | |
| } | |
| // Check if failure is due to all accounts being rate-limited | |
| const rl = isAllRateLimited(modelKey); | |
| const errMsg = rl.allLimited | |
| ? `${model} 所有账号均已达速率限制,请 ${Math.ceil(rl.retryAfterMs / 1000)} 秒后重试` | |
| : sanitizeText(lastErr?.message || 'no accounts'); | |
| send({ id, object: 'chat.completion.chunk', created, model, | |
| choices: [{ index: 0, delta: { content: `\n[Error: ${errMsg}]` }, finish_reason: 'stop' }] }); | |
| res.write('data: [DONE]\n\n'); | |
| } catch {} | |
| if (!res.writableEnded) res.end(); | |
| } finally { | |
| stopHeartbeat(); | |
| } | |
| }, | |
| }; | |
| } | |