Spaces:
Sleeping
Sleeping
File size: 38,261 Bytes
f6266b9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 | /**
* POST /v1/chat/completions — OpenAI-compatible chat completions.
* Routes to RawGetChatMessage (legacy) or Cascade (premium) based on model type.
*/
import { randomUUID } from 'crypto';
import { WindsurfClient } from '../client.js';
import { getApiKey, acquireAccountByKey, reportError, reportSuccess, markRateLimited, reportInternalError, updateCapability, getAccountList, isAllRateLimited } from '../auth.js';
import { resolveModel, getModelInfo } from '../models.js';
import { getLsFor, ensureLs } from '../langserver.js';
import { config, log } from '../config.js';
import { recordRequest } from '../dashboard/stats.js';
import { isModelAllowed } from '../dashboard/model-access.js';
import { cacheKey, cacheGet, cacheSet } from '../cache.js';
import { isExperimentalEnabled, getIdentityPromptFor } from '../runtime-config.js';
import { checkMessageRateLimit } from '../windsurf-api.js';
import { getEffectiveProxy } from '../dashboard/proxy-config.js';
import {
fingerprintBefore, fingerprintAfter, checkout as poolCheckout, checkin as poolCheckin,
} from '../conversation-pool.js';
import {
normalizeMessagesForCascade, ToolCallStreamParser, parseToolCallsFromText,
buildToolPreambleForProto,
} from './tool-emulation.js';
import { sanitizeText, PathSanitizeStream } from '../sanitize.js';
const HEARTBEAT_MS = 15_000;
const QUEUE_RETRY_MS = 1_000;
const QUEUE_MAX_WAIT_MS = 30_000;
// ── Model identity prompt ──────────────────────────────────
// Templates live in runtime-config (editable from the dashboard). Use {model}
// as a placeholder for the requested model name. Only applied when the
// experimental "modelIdentityPrompt" toggle is ON.
function buildIdentitySystemMessage(displayModel, provider) {
const template = getIdentityPromptFor(provider);
if (!template) return null;
return template.replace(/\{model\}/g, displayModel);
}
function genId() {
return 'chatcmpl-' + randomUUID().replace(/-/g, '').slice(0, 29);
}
// Rough token estimate (~4 chars/token). Used only to populate the
// OpenAI-compatible `usage.prompt_tokens_details.cached_tokens` field so
// upstream billing/dashboards (new-api) can recognise our local cache hits.
function estimateTokens(messages) {
if (!Array.isArray(messages)) return 0;
let chars = 0;
for (const m of messages) {
if (typeof m?.content === 'string') chars += m.content.length;
else if (Array.isArray(m?.content)) {
for (const p of m.content) if (typeof p?.text === 'string') chars += p.text.length;
}
}
return Math.max(1, Math.ceil(chars / 4));
}
function cachedUsage(messages, completionText) {
const prompt = estimateTokens(messages);
const completion = Math.max(1, Math.ceil((completionText || '').length / 4));
return {
prompt_tokens: prompt,
completion_tokens: completion,
total_tokens: prompt + completion,
input_tokens: prompt,
output_tokens: completion,
prompt_tokens_details: { cached_tokens: prompt },
completion_tokens_details: { reasoning_tokens: 0 },
cached: true,
};
}
/**
* Build an OpenAI-shaped `usage` object, preferring server-reported token
* counts from Cascade's CortexStepMetadata.model_usage when available, and
* falling back to the local chars/4 estimate otherwise. Keeps the same shape
* in both branches so downstream billing doesn't have to care which source
* produced the numbers.
*
* The Cascade backend reports usage as {inputTokens, outputTokens,
* cacheReadTokens, cacheWriteTokens}. We map them onto the OpenAI shape:
* prompt_tokens = inputTokens + cacheReadTokens + cacheWriteTokens
* (total input tokens the model processed, whether fresh,
* cache-read, or cache-written — matches the OpenAI
* convention where prompt_tokens is the grand total)
* completion_tokens = outputTokens
* prompt_tokens_details.cached_tokens = cacheReadTokens
* cache_creation_input_tokens (Anthropic ext) = cacheWriteTokens
*/
function buildUsageBody(serverUsage, messages, completionText, thinkingText = '') {
if (serverUsage && (serverUsage.inputTokens || serverUsage.outputTokens)) {
const inputTokens = serverUsage.inputTokens || 0;
const outputTokens = serverUsage.outputTokens || 0;
const cacheRead = serverUsage.cacheReadTokens || 0;
const cacheWrite = serverUsage.cacheWriteTokens || 0;
const promptTotal = inputTokens + cacheRead + cacheWrite;
return {
prompt_tokens: promptTotal,
completion_tokens: outputTokens,
total_tokens: promptTotal + outputTokens,
input_tokens: promptTotal,
output_tokens: outputTokens,
prompt_tokens_details: { cached_tokens: cacheRead },
completion_tokens_details: { reasoning_tokens: 0 },
cache_creation_input_tokens: cacheWrite,
};
}
const prompt = estimateTokens(messages);
const completion = Math.max(1, Math.ceil(((completionText || '').length + (thinkingText || '').length) / 4));
return {
prompt_tokens: prompt,
completion_tokens: completion,
total_tokens: prompt + completion,
input_tokens: prompt,
output_tokens: completion,
prompt_tokens_details: { cached_tokens: 0 },
completion_tokens_details: { reasoning_tokens: 0 },
};
}
// Wait until getApiKey returns a non-null account, or until maxWaitMs expires.
// Used when every account has momentarily exhausted its RPM budget so the
// client is queued instead of getting a 503.
async function waitForAccount(tried, signal, maxWaitMs = QUEUE_MAX_WAIT_MS, modelKey = null) {
const deadline = Date.now() + maxWaitMs;
let acct = getApiKey(tried, modelKey);
while (!acct) {
if (signal?.aborted) return null;
if (Date.now() >= deadline) return null;
await new Promise(r => setTimeout(r, QUEUE_RETRY_MS));
acct = getApiKey(tried, modelKey);
}
return acct;
}
export async function handleChatCompletions(body) {
const {
model: reqModel,
stream = false,
max_tokens,
tools,
tool_choice,
} = body;
// `messages` is `let` not `const` so the identity-prompt injection below
// can prepend a system turn for the legacy path too.
let messages = body.messages;
const modelKey = resolveModel(reqModel || config.defaultModel);
const modelInfo = getModelInfo(modelKey);
const displayModel = modelInfo?.name || reqModel || config.defaultModel;
const modelEnum = modelInfo?.enumValue || 0;
const modelUid = modelInfo?.modelUid || null;
// Models with a modelUid use the Cascade flow (StartCascade → SendUserCascadeMessage).
// Legacy RawGetChatMessage only for models with enumValue>0 and NO modelUid.
// Newer models (gemini-3.0, gpt-5.2, etc.) have both enumValue AND modelUid but
// their high enum values cause "cannot parse invalid wire-format data" in the
// legacy proto endpoint. Cascade handles them correctly via uid string.
const useCascade = !!modelUid;
// Tool-call emulation: if the client passed OpenAI-style tools[], we rewrite
// tool-result turns into synthetic user text and inject the tool protocol
// at the system-prompt level via CascadeConversationalPlannerConfig's
// tool_calling_section (SectionOverrideConfig, OVERRIDE mode). This is far
// more reliable than user-message-level injection because NO_TOOL mode's
// baked-in system prompt tells the model "you have no tools" — which
// overpowers user-message preambles. The section override replaces that
// section directly so the model sees our emulated tool definitions as
// authoritative system instructions.
const hasTools = Array.isArray(tools) && tools.length > 0;
const hasToolHistory = Array.isArray(messages) && messages.some(m => m?.role === 'tool' || (m?.role === 'assistant' && Array.isArray(m.tool_calls) && m.tool_calls.length));
const emulateTools = useCascade && (hasTools || hasToolHistory);
// Build proto-level preamble (goes into tool_calling_section override);
// pass empty tools to normalizeMessagesForCascade so it only rewrites
// role:tool / assistant.tool_calls messages without injecting a user-level
// preamble (that's now handled at the proto layer).
const toolPreamble = emulateTools ? buildToolPreambleForProto(tools || [], tool_choice) : '';
let cascadeMessages = emulateTools
? normalizeMessagesForCascade(messages, [])
: [...messages];
// ── Model identity prompt injection ──
// When enabled, prepend a system message so the model identifies itself as
// the requested model (e.g. "I am Claude Opus 4.6") instead of leaking the
// Cascade/Windsurf backend identity. Inject into BOTH messages (for legacy
// RawGetChatMessage path) and cascadeMessages (Cascade path) — they diverge
// once tool-emulation rewrites the Cascade path, but the system identity
// should be identical in both.
if (isExperimentalEnabled('modelIdentityPrompt') && modelInfo?.provider) {
const identityText = buildIdentitySystemMessage(displayModel, modelInfo.provider);
if (identityText) {
const sysMsg = { role: 'system', content: identityText };
cascadeMessages = [sysMsg, ...cascadeMessages];
messages = [sysMsg, ...messages];
}
}
// Global model access control (allowlist / blocklist from dashboard)
const access = isModelAllowed(modelKey);
if (!access.allowed) {
return { status: 403, body: { error: { message: access.reason, type: 'model_blocked' } } };
}
// Per-account model routing preflight: if NO active account has this
// model in its tier ∩ available list, fail fast instead of looping
// through every account trying to find one. This surfaces tier
// entitlement and blocklist errors as a clean 403 rather than a 30s
// queue timeout → pool_exhausted.
const anyEligible = getAccountList().some(a =>
a.status === 'active' && (a.availableModels || []).includes(modelKey)
);
if (!anyEligible) {
return {
status: 403,
body: {
error: {
message: `模型 ${displayModel} 在当前账号池中不可用(未订阅或已被封禁)`,
type: 'model_not_entitled',
},
},
};
}
const chatId = genId();
const created = Math.floor(Date.now() / 1000);
const ckey = cacheKey(body);
if (stream) {
return streamResponse(chatId, created, displayModel, modelKey, messages, cascadeMessages, modelEnum, modelUid, useCascade, ckey, emulateTools, toolPreamble);
}
// ── Local response cache (exact body match) ─────────────
const cached = cacheGet(ckey);
if (cached) {
log.info(`Chat: cache HIT model=${displayModel} flow=non-stream`);
recordRequest(displayModel, true, 0, null);
const message = { role: 'assistant', content: cached.text || null };
if (cached.thinking) message.reasoning_content = cached.thinking;
return {
status: 200,
body: {
id: chatId, object: 'chat.completion', created, model: displayModel,
choices: [{ index: 0, message, finish_reason: 'stop' }],
usage: cachedUsage(messages, cached.text),
},
};
}
// ── Cascade conversation pool (experimental) ──
// If the client is continuing a prior conversation and we still hold the
// cascade_id from last turn, pin this request to that exact (account, LS)
// pair so the Windsurf backend serves from its hot per-cascade context
// instead of replaying the whole history.
//
// Tool-emulation mode bypasses the reuse pool: fingerprint can't stably
// collapse a conversation whose assistant turns contain synthesised
// <tool_call> markup and whose user turns contain <tool_result> wrappers.
const reuseEnabled = useCascade && !emulateTools && isExperimentalEnabled('cascadeConversationReuse');
const fpBefore = reuseEnabled ? fingerprintBefore(messages) : null;
let reuseEntry = reuseEnabled ? poolCheckout(fpBefore) : null;
if (reuseEntry) log.info(`Chat: cascade reuse HIT cascadeId=${reuseEntry.cascadeId.slice(0, 8)}… model=${displayModel}`);
// Non-stream: retry with a different account on model-not-available errors
const tried = [];
let lastErr = null;
// Dynamic: try every active account in the pool (capped at 10) so a
// large pool with many rate-limited accounts can still fall through
// to a free one. Was hardcoded 3 — in pools bigger than 3 with the
// first accounts rate-limited, healthy accounts were never reached
// even though they would have worked (issue #5).
const maxAttempts = Math.min(10, Math.max(3, getAccountList().filter(a => a.status === 'active').length));
for (let attempt = 0; attempt < maxAttempts; attempt++) {
let acct = null;
if (reuseEntry && attempt === 0) {
// First attempt pins to the account that owns the cached cascade.
acct = acquireAccountByKey(reuseEntry.apiKey, modelKey);
if (!acct) {
log.info('Chat: cascade reuse skipped — owning account not available, falling back to fresh cascade');
reuseEntry = null;
}
}
if (!acct) {
acct = await waitForAccount(tried, null, QUEUE_MAX_WAIT_MS, modelKey);
if (!acct) break;
}
tried.push(acct.apiKey);
// Pre-flight rate limit check (experimental): ask server.codeium.com if
// this account still has message capacity before burning an LS round trip.
if (isExperimentalEnabled('preflightRateLimit')) {
try {
const px = getEffectiveProxy(acct.id) || null;
const rl = await checkMessageRateLimit(acct.apiKey, px);
if (!rl.hasCapacity) {
log.warn(`Preflight: ${acct.email} has no capacity (remaining=${rl.messagesRemaining}), skipping`);
markRateLimited(acct.id, modelKey);
continue;
}
} catch (e) {
log.debug(`Preflight check failed for ${acct.email}: ${e.message}`);
// Fail open — proceed with the request
}
}
await ensureLs(acct.proxy);
const ls = getLsFor(acct.proxy);
if (!ls) { lastErr = { status: 503, body: { error: { message: 'No LS instance available', type: 'ls_unavailable' } } }; break; }
// Cascade pins cascade_id to a specific LS port too; if the LS it was
// born on has been replaced, the cascade_id is dead.
if (reuseEntry && reuseEntry.lsPort !== ls.port) {
log.info('Chat: cascade reuse skipped — LS port changed');
reuseEntry = null;
}
const _msgChars = (messages || []).reduce((n, m) => {
const c = m?.content;
return n + (typeof c === 'string' ? c.length : Array.isArray(c) ? c.reduce((k, p) => k + (typeof p?.text === 'string' ? p.text.length : 0), 0) : 0);
}, 0);
log.info(`Chat: model=${displayModel} flow=${useCascade ? 'cascade' : 'legacy'} attempt=${attempt + 1} account=${acct.email} ls=${ls.port} turns=${(messages||[]).length} chars=${_msgChars}${reuseEntry ? ' reuse=1' : ''}${emulateTools ? ' tools=emu' : ''}`);
const client = new WindsurfClient(acct.apiKey, ls.port, ls.csrfToken);
const result = await nonStreamResponse(
client, chatId, created, displayModel, modelKey, messages, cascadeMessages, modelEnum, modelUid,
useCascade, acct.apiKey, ckey,
reuseEnabled ? { reuseEntry, lsPort: ls.port, apiKey: acct.apiKey } : null,
emulateTools, toolPreamble,
);
if (result.status === 200) return result;
reuseEntry = null; // don't try to reuse on the retry
lastErr = result;
const errType = result.body?.error?.type;
// Rate limit: this account is done for this model, try the next one
if (errType === 'rate_limit_exceeded') {
log.warn(`Account ${acct.email} rate-limited on ${displayModel}, trying next account`);
continue;
}
// Model not available on this account (permission_denied, etc.)
if (errType === 'model_not_available') {
log.warn(`Account ${acct.email} cannot serve ${displayModel}, trying next account`);
continue;
}
break; // other errors (502, transport) — don't retry
}
// If all accounts exhausted, check if it's because they're all rate-limited
if (!lastErr || lastErr.status === 429) {
const rl = isAllRateLimited(modelKey);
if (rl.allLimited) {
return { status: 429, body: { error: { message: `${displayModel} 所有账号均已达速率限制,请 ${Math.ceil(rl.retryAfterMs / 1000)} 秒后重试`, type: 'rate_limit_exceeded', retry_after_ms: rl.retryAfterMs } } };
}
}
return lastErr || { status: 503, body: { error: { message: 'No active accounts available', type: 'pool_exhausted' } } };
}
async function nonStreamResponse(client, id, created, model, modelKey, messages, cascadeMessages, modelEnum, modelUid, useCascade, apiKey, ckey, poolCtx, emulateTools, toolPreamble) {
const startTime = Date.now();
try {
let allText = '';
let allThinking = '';
let cascadeMeta = null;
let toolCalls = [];
// Server-reported token usage from CortexStepMetadata.model_usage, summed
// across all trajectory steps. Preferred over the chars/4 estimate when
// present so downstream billing (new-api, etc.) sees real Cascade numbers.
let serverUsage = null;
if (useCascade) {
const chunks = await client.cascadeChat(cascadeMessages, modelEnum, modelUid, { reuseEntry: poolCtx?.reuseEntry || null, toolPreamble });
for (const c of chunks) {
if (c.text) allText += c.text;
if (c.thinking) allThinking += c.thinking;
}
cascadeMeta = { cascadeId: chunks.cascadeId, sessionId: chunks.sessionId };
serverUsage = chunks.usage || null;
// Always strip <tool_call>/<tool_result> blocks from Cascade text.
// - emulateTools=true: parsed tool_calls become OpenAI-format tool_calls.
// - emulateTools=false: blocks are silently discarded (defense-in-depth
// against Cascade's system prompt inducing tool markup even after we
// override tool_calling_section).
{
const parsed = parseToolCallsFromText(allText);
allText = parsed.text;
if (emulateTools) toolCalls = parsed.toolCalls;
}
// Built-in Cascade tool calls (chunks.toolCalls — edit_file, view_file,
// list_directory, run_command, etc.) are intentionally DROPPED. Their
// argumentsJson and result fields reference server-internal paths like
// /tmp/windsurf-workspace/config.yaml and must never be exposed to an
// API caller. Emulated tool calls (above) are safe because they
// reference the caller's own tool schema.
} else {
const chunks = await client.rawGetChatMessage(messages, modelEnum, modelUid);
for (const c of chunks) {
if (c.text) allText += c.text;
}
}
// Scrub server-internal filesystem paths from everything we're about to
// return. See src/sanitize.js for the patterns and rationale.
allText = sanitizeText(allText);
allThinking = sanitizeText(allThinking);
if (toolCalls.length) {
toolCalls = toolCalls.map(tc => ({
...tc,
argumentsJson: sanitizeText(tc.argumentsJson || ''),
}));
}
// Check the cascade back into the pool under the *post-turn* fingerprint
// so the next request in the same conversation can resume it.
if (poolCtx && cascadeMeta?.cascadeId && allText) {
const fpAfter = fingerprintAfter(messages, allText);
poolCheckin(fpAfter, {
cascadeId: cascadeMeta.cascadeId,
sessionId: cascadeMeta.sessionId,
lsPort: poolCtx.lsPort,
apiKey: poolCtx.apiKey,
createdAt: poolCtx.reuseEntry?.createdAt,
});
}
reportSuccess(apiKey);
updateCapability(apiKey, modelKey, true, 'success');
recordRequest(model, true, Date.now() - startTime, apiKey);
// Store in cache for next identical request. Skip caching tool_call
// responses — they're inherently contextual and the cache doesn't
// preserve the tool_calls array, so a cache hit would return a
// content-only response with finish_reason:stop, breaking tool flow.
if (ckey && !toolCalls.length) cacheSet(ckey, { text: allText, thinking: allThinking });
const message = { role: 'assistant', content: allText || null };
if (allThinking) message.reasoning_content = allThinking;
if (toolCalls.length) {
message.tool_calls = toolCalls.map((tc, i) => ({
id: tc.id || `call_${i}_${Date.now().toString(36)}`,
type: 'function',
function: {
name: tc.name || 'unknown',
arguments: tc.argumentsJson || tc.arguments || '{}',
},
}));
// OpenAI convention: content is null when finish_reason is tool_calls.
// In text emulation the model often emits an inline answer alongside the
// <tool_call> block (e.g., hallucinated weather data). Set content to
// null so clients that check `content !== null` behave correctly and the
// caller waits for the real tool result rather than showing hallucinated
// data.
message.content = null;
}
// Prefer server-reported usage; fall back to chars/4 estimate only when
// the trajectory didn't include a ModelUsageStats field.
const usage = buildUsageBody(serverUsage, messages, allText, allThinking);
const finishReason = toolCalls.length ? 'tool_calls' : 'stop';
return {
status: 200,
body: {
id, object: 'chat.completion', created, model,
choices: [{ index: 0, message, finish_reason: finishReason }],
usage,
},
};
} catch (err) {
// Only count true auth failures against the account. Workspace/cascade/model
// errors and transport issues shouldn't disable the key.
const isAuthFail = /unauthenticated|invalid api key|invalid_grant|permission_denied.*account/i.test(err.message);
const isRateLimit = /rate limit|rate_limit|too many requests|quota/i.test(err.message);
const isInternal = /internal error occurred.*error id/i.test(err.message);
if (isAuthFail) reportError(apiKey);
if (isRateLimit) { markRateLimited(apiKey, 5 * 60 * 1000, modelKey); err.isRateLimit = true; err.isModelError = true; }
if (isInternal) { reportInternalError(apiKey); err.isModelError = true; }
if (err.isModelError && !isRateLimit && !isInternal) {
updateCapability(apiKey, modelKey, false, 'model_error');
}
recordRequest(model, false, Date.now() - startTime, apiKey);
log.error('Chat error:', err.message);
// Rate limits → 429 with Retry-After; model errors → 403; others → 502
if (isRateLimit) {
const rl = isAllRateLimited(modelKey);
return {
status: 429,
body: { error: { message: `${model} 已达速率限制,请稍后重试`, type: 'rate_limit_exceeded', retry_after_ms: rl.retryAfterMs || 60000 } },
};
}
return {
status: err.isModelError ? 403 : 502,
body: { error: { message: sanitizeText(err.message), type: err.isModelError ? 'model_not_available' : 'upstream_error' } },
};
}
}
function streamResponse(id, created, model, modelKey, messages, cascadeMessages, modelEnum, modelUid, useCascade, ckey, emulateTools, toolPreamble) {
return {
status: 200,
stream: true,
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no',
},
async handler(res) {
const abortController = new AbortController();
res.on('close', () => {
if (!res.writableEnded) {
log.info('Client disconnected mid-stream, aborting upstream');
abortController.abort();
}
});
const send = (data) => {
if (!res.writableEnded) res.write(`data: ${JSON.stringify(data)}\n\n`);
};
// SSE heartbeat: keep the TCP/HTTP connection alive through any silent
// period (LS warmup, Cascade "thinking", queue wait). `:` prefix is a
// comment line per the SSE spec — clients ignore it, intermediaries see
// bytes flowing, idle timers get reset.
const heartbeat = setInterval(() => {
if (!res.writableEnded) res.write(': ping\n\n');
}, HEARTBEAT_MS);
const stopHeartbeat = () => clearInterval(heartbeat);
res.on('close', stopHeartbeat);
// ── Cache hit: replay stored response as a fake stream ──
const cached = cacheGet(ckey);
if (cached) {
log.info(`Chat: cache HIT model=${model} flow=stream`);
recordRequest(model, true, 0, null);
try {
send({ id, object: 'chat.completion.chunk', created, model,
choices: [{ index: 0, delta: { role: 'assistant', content: '' }, finish_reason: null }] });
if (cached.thinking) {
send({ id, object: 'chat.completion.chunk', created, model,
choices: [{ index: 0, delta: { reasoning_content: cached.thinking }, finish_reason: null }] });
}
if (cached.text) {
send({ id, object: 'chat.completion.chunk', created, model,
choices: [{ index: 0, delta: { content: cached.text }, finish_reason: null }] });
}
send({ id, object: 'chat.completion.chunk', created, model,
choices: [{ index: 0, delta: {}, finish_reason: 'stop' }],
usage: cachedUsage(messages, cached.text) });
if (!res.writableEnded) { res.write('data: [DONE]\n\n'); res.end(); }
} finally {
stopHeartbeat();
}
return;
}
const startTime = Date.now();
const tried = [];
let hadSuccess = false;
let rolePrinted = false;
let currentApiKey = null;
let lastErr = null;
// Dynamic: try every active account in the pool (capped at 10) so a
// large pool with many rate-limited accounts can still fall through
// to a free one. Was hardcoded 3 — in pools bigger than 3 with the
// first accounts rate-limited, healthy accounts were never reached
// even though they would have worked (issue #5).
const maxAttempts = Math.min(10, Math.max(3, getAccountList().filter(a => a.status === 'active').length));
// Accumulate chunks so we can cache a successful response at the end.
let accText = '';
let accThinking = '';
// Cascade conversation pool (experimental, stream path) — bypassed in
// tool-emulation mode because the fingerprint can't collapse turns
// whose bodies carry <tool_call>/<tool_result> markup.
const reuseEnabled = useCascade && !emulateTools && isExperimentalEnabled('cascadeConversationReuse');
const fpBefore = reuseEnabled ? fingerprintBefore(messages) : null;
let reuseEntry = reuseEnabled ? poolCheckout(fpBefore) : null;
if (reuseEntry) log.info(`Chat: cascade reuse HIT cascadeId=${reuseEntry.cascadeId.slice(0, 8)}… stream model=${model}`);
// Always strip <tool_call>/<tool_result> blocks in Cascade mode.
// In emulation mode, parsed calls are emitted as OpenAI tool_calls.
// In non-emulation mode, blocks are silently stripped (defense-in-depth
// against Cascade's system prompt inducing tool markup).
const toolParser = useCascade ? new ToolCallStreamParser() : null;
const collectedToolCalls = [];
// Streaming path sanitizers. Every text/thinking delta flows through a
// PathSanitizeStream before leaving the server so /tmp/windsurf-workspace,
// /opt/windsurf and /root/WindsurfAPI literals can never slip out even
// if a path straddles a chunk boundary. See src/sanitize.js.
const pathStreamText = new PathSanitizeStream();
const pathStreamThinking = new PathSanitizeStream();
const emitContent = (clean) => {
if (!clean) return;
accText += clean;
send({ id, object: 'chat.completion.chunk', created, model,
choices: [{ index: 0, delta: { content: clean }, finish_reason: null }] });
};
const emitThinking = (clean) => {
if (!clean) return;
accThinking += clean;
send({ id, object: 'chat.completion.chunk', created, model,
choices: [{ index: 0, delta: { reasoning_content: clean }, finish_reason: null }] });
};
const emitToolCallDelta = (tc, idx) => {
send({ id, object: 'chat.completion.chunk', created, model,
choices: [{ index: 0, delta: {
tool_calls: [{
index: idx,
id: tc.id,
type: 'function',
function: { name: tc.name, arguments: sanitizeText(tc.argumentsJson || '{}') },
}],
}, finish_reason: null }] });
};
const onChunk = (chunk) => {
if (!rolePrinted) {
rolePrinted = true;
send({ id, object: 'chat.completion.chunk', created, model,
choices: [{ index: 0, delta: { role: 'assistant', content: '' }, finish_reason: null }] });
}
hadSuccess = true;
if (chunk.text) {
// Pipeline for text deltas:
// raw chunk → ToolCallStreamParser (strip <tool_call> blocks)
// → PathSanitizeStream (scrub server paths)
// → client
let safeText = chunk.text;
if (toolParser) {
const { text: safe, toolCalls: done } = toolParser.feed(chunk.text);
safeText = safe;
// Only emit tool_call deltas when emulating — otherwise the
// parsed calls came from Cascade's built-in tools and are
// silently discarded.
if (emulateTools) {
for (const tc of done) {
const idx = collectedToolCalls.length;
collectedToolCalls.push(tc);
emitToolCallDelta(tc, idx);
}
}
}
if (safeText) emitContent(pathStreamText.feed(safeText));
}
if (chunk.thinking) {
emitThinking(pathStreamThinking.feed(chunk.thinking));
}
};
try {
for (let attempt = 0; attempt < maxAttempts; attempt++) {
if (abortController.signal.aborted) return;
let acct = null;
if (reuseEntry && attempt === 0) {
acct = acquireAccountByKey(reuseEntry.apiKey, modelKey);
if (!acct) {
log.info('Chat: cascade reuse skipped — owning account not available');
reuseEntry = null;
}
}
if (!acct) {
acct = await waitForAccount(tried, abortController.signal, QUEUE_MAX_WAIT_MS, modelKey);
if (!acct) break;
}
tried.push(acct.apiKey);
currentApiKey = acct.apiKey;
// Pre-flight rate limit check (experimental)
if (isExperimentalEnabled('preflightRateLimit')) {
try {
const px = getEffectiveProxy(acct.id) || null;
const rl = await checkMessageRateLimit(acct.apiKey, px);
if (!rl.hasCapacity) {
log.warn(`Preflight: ${acct.email} has no capacity (remaining=${rl.messagesRemaining}), skipping`);
markRateLimited(acct.id, modelKey);
continue;
}
} catch (e) {
log.debug(`Preflight check failed for ${acct.email}: ${e.message}`);
}
}
try { await ensureLs(acct.proxy); } catch (e) { lastErr = e; break; }
const ls = getLsFor(acct.proxy);
if (!ls) { lastErr = new Error('No LS instance available'); break; }
if (reuseEntry && reuseEntry.lsPort !== ls.port) {
log.info('Chat: cascade reuse skipped — LS port changed');
reuseEntry = null;
}
const _msgCharsStream = (messages || []).reduce((n, m) => {
const c = m?.content;
return n + (typeof c === 'string' ? c.length : Array.isArray(c) ? c.reduce((k, p) => k + (typeof p?.text === 'string' ? p.text.length : 0), 0) : 0);
}, 0);
log.info(`Chat: model=${model} flow=${useCascade ? 'cascade' : 'legacy'} stream=true attempt=${attempt + 1} account=${acct.email} ls=${ls.port} turns=${(messages||[]).length} chars=${_msgCharsStream}${reuseEntry ? ' reuse=1' : ''}`);
const client = new WindsurfClient(acct.apiKey, ls.port, ls.csrfToken);
let cascadeResult = null;
try {
if (useCascade) {
cascadeResult = await client.cascadeChat(cascadeMessages, modelEnum, modelUid, {
onChunk, signal: abortController.signal, reuseEntry, toolPreamble,
});
} else {
await client.rawGetChatMessage(messages, modelEnum, modelUid, { onChunk });
}
// Flush order matters:
// 1. ToolCallStreamParser tail → may produce more text deltas
// (e.g., a dangling <tool_call> that never closed falls
// through as literal text)
// 2. PathSanitizeStream tail (text) → scrubs anything the tool
// parser held back AND anything we were holding ourselves
// 3. PathSanitizeStream tail (thinking)
if (toolParser) {
const tail = toolParser.flush();
if (tail.text) emitContent(pathStreamText.feed(tail.text));
if (emulateTools) {
for (const tc of tail.toolCalls) {
const idx = collectedToolCalls.length;
collectedToolCalls.push(tc);
emitToolCallDelta(tc, idx);
}
}
}
emitContent(pathStreamText.flush());
emitThinking(pathStreamThinking.flush());
// Pool check-in on success (cascade only)
if (reuseEnabled && cascadeResult?.cascadeId && accText) {
const fpAfter = fingerprintAfter(messages, accText);
poolCheckin(fpAfter, {
cascadeId: cascadeResult.cascadeId,
sessionId: cascadeResult.sessionId,
lsPort: ls.port,
apiKey: currentApiKey,
createdAt: reuseEntry?.createdAt,
});
}
// success
if (hadSuccess) reportSuccess(currentApiKey);
updateCapability(currentApiKey, modelKey, true, 'success');
recordRequest(model, true, Date.now() - startTime, currentApiKey);
if (!rolePrinted) {
send({ id, object: 'chat.completion.chunk', created, model,
choices: [{ index: 0, delta: { role: 'assistant', content: '' }, finish_reason: null }] });
}
const finalReason = collectedToolCalls.length ? 'tool_calls' : 'stop';
const finalUsage = buildUsageBody(cascadeResult?.usage || null, messages, accText, accThinking);
send({ id, object: 'chat.completion.chunk', created, model,
choices: [{ index: 0, delta: {}, finish_reason: finalReason }],
usage: finalUsage });
// OpenAI-compat: terminal usage chunk (stream_options.include_usage
// convention — empty choices[] + usage). Prefer Cascade's own
// CortexStepMetadata.model_usage numbers when present, fall back
// to the local chars/4 estimator. See buildUsageBody().
{
const usage = buildUsageBody(cascadeResult?.usage || null, messages, accText, accThinking);
send({ id, object: 'chat.completion.chunk', created, model,
choices: [], usage });
}
if (!res.writableEnded) { res.write('data: [DONE]\n\n'); res.end(); }
if (ckey && !collectedToolCalls.length && (accText || accThinking)) {
cacheSet(ckey, { text: accText, thinking: accThinking });
}
return;
} catch (err) {
lastErr = err;
reuseEntry = null; // don't try to reuse on retry
const isAuthFail = /unauthenticated|invalid api key|invalid_grant|permission_denied.*account/i.test(err.message);
const isRateLimit = /rate limit|rate_limit|too many requests|quota/i.test(err.message);
const isInternal = /internal error occurred.*error id/i.test(err.message);
if (isAuthFail) reportError(currentApiKey);
if (isRateLimit) { markRateLimited(currentApiKey, 5 * 60 * 1000, modelKey); err.isRateLimit = true; err.isModelError = true; }
if (isInternal) { reportInternalError(currentApiKey); err.isModelError = true; }
if (err.isModelError && !isRateLimit && !isInternal) {
updateCapability(currentApiKey, modelKey, false, 'model_error');
}
// Retry only if nothing has been streamed yet AND it's a retryable error
if (!hadSuccess && (err.isModelError || isRateLimit)) {
const tag = isRateLimit ? 'rate_limit' : isInternal ? 'internal_error' : 'model_error';
log.warn(`Account ${acct.email} failed (${tag}) on ${model}, trying next`);
continue;
}
break;
}
}
// All attempts failed
log.error('Stream error after retries:', lastErr?.message);
recordRequest(model, false, Date.now() - startTime, currentApiKey);
try {
if (!rolePrinted) {
send({ id, object: 'chat.completion.chunk', created, model,
choices: [{ index: 0, delta: { role: 'assistant', content: '' }, finish_reason: null }] });
}
// Check if failure is due to all accounts being rate-limited
const rl = isAllRateLimited(modelKey);
const errMsg = rl.allLimited
? `${model} 所有账号均已达速率限制,请 ${Math.ceil(rl.retryAfterMs / 1000)} 秒后重试`
: sanitizeText(lastErr?.message || 'no accounts');
send({ id, object: 'chat.completion.chunk', created, model,
choices: [{ index: 0, delta: { content: `\n[Error: ${errMsg}]` }, finish_reason: 'stop' }] });
res.write('data: [DONE]\n\n');
} catch {}
if (!res.writableEnded) res.end();
} finally {
stopHeartbeat();
}
},
};
}
|