WindsurfAPI / src /handlers /messages.js
github-actions[bot]
Deploy from GitHub: 7495fde758f0be655f95e6331fec2898267f790c
f6266b9
/**
* POST /v1/messages β€” Anthropic Messages API compatibility layer.
*
* Translates Anthropic request/response format to/from the internal OpenAI
* format so Claude Code and any Anthropic SDK client can connect directly.
*
* Streaming path is a real-time translator: it pipes the OpenAI SSE stream
* from handleChatCompletions through a response shim that parses each
* chat.completion.chunk and emits the equivalent Anthropic message_start /
* content_block_* / message_delta / message_stop events as bytes arrive.
* No buffering, so first-token latency matches the upstream Cascade stream.
*/
import { randomUUID } from 'crypto';
import { handleChatCompletions } from './chat.js';
import { log } from '../config.js';
function genMsgId() {
return 'msg_' + randomUUID().replace(/-/g, '').slice(0, 24);
}
// ─── Anthropic β†’ OpenAI request translation ──────────────────
function anthropicToOpenAI(body) {
const messages = [];
if (body.system) {
const sysText = typeof body.system === 'string'
? body.system
: Array.isArray(body.system)
? body.system.map(b => b.text || '').join('\n')
: '';
if (sysText) messages.push({ role: 'system', content: sysText });
}
for (const m of (body.messages || [])) {
const role = m.role === 'assistant' ? 'assistant' : 'user';
if (typeof m.content === 'string') {
messages.push({ role, content: m.content });
} else if (Array.isArray(m.content)) {
const textParts = [];
const toolCalls = [];
const toolResults = [];
for (const block of m.content) {
if (block.type === 'text') {
textParts.push(block.text || '');
} else if (block.type === 'thinking') {
// Thinking blocks from assistant history β€” skip; the model will regenerate
} else if (block.type === 'tool_use' && role === 'assistant') {
toolCalls.push({
id: block.id || `call_${randomUUID().slice(0, 8)}`,
type: 'function',
function: { name: block.name, arguments: JSON.stringify(block.input || {}) },
});
} else if (block.type === 'tool_result') {
const content = typeof block.content === 'string'
? block.content
: Array.isArray(block.content)
? block.content.map(b => b.text || '').join('\n')
: JSON.stringify(block.content);
toolResults.push({ role: 'tool', tool_call_id: block.tool_use_id, content });
}
}
if (toolCalls.length) {
messages.push({
role: 'assistant',
content: textParts.length ? textParts.join('\n') : null,
tool_calls: toolCalls,
});
} else if (textParts.length) {
messages.push({ role, content: textParts.join('\n') });
}
for (const tr of toolResults) messages.push(tr);
}
}
const tools = (body.tools || []).map(t => ({
type: 'function',
function: {
name: t.name,
description: t.description || '',
parameters: t.input_schema || {},
},
}));
return {
model: body.model || 'claude-sonnet-4.6',
messages,
max_tokens: body.max_tokens || 8192,
stream: !!body.stream,
...(tools.length ? { tools } : {}),
...(body.temperature != null ? { temperature: body.temperature } : {}),
...(body.top_p != null ? { top_p: body.top_p } : {}),
...(body.stop_sequences ? { stop: body.stop_sequences } : {}),
};
}
// ─── OpenAI β†’ Anthropic non-stream response translation ──────
function openAIToAnthropic(result, model, msgId) {
const choice = result.choices?.[0];
const usage = result.usage || {};
const content = [];
if (choice?.message?.reasoning_content) {
content.push({ type: 'thinking', thinking: choice.message.reasoning_content });
}
if (choice?.message?.tool_calls?.length) {
if (choice.message.content) content.push({ type: 'text', text: choice.message.content });
for (const tc of choice.message.tool_calls) {
let input = {};
try { input = JSON.parse(tc.function?.arguments || '{}'); } catch {}
content.push({
type: 'tool_use',
id: tc.id,
name: tc.function?.name || 'unknown',
input,
});
}
} else {
content.push({ type: 'text', text: choice?.message?.content || '' });
}
const stopMap = { stop: 'end_turn', length: 'max_tokens', tool_calls: 'tool_use' };
return {
id: msgId,
type: 'message',
role: 'assistant',
content,
model: model || result.model,
stop_reason: stopMap[choice?.finish_reason] || 'end_turn',
stop_sequence: null,
usage: {
input_tokens: usage.prompt_tokens || usage.input_tokens || 0,
output_tokens: usage.completion_tokens || usage.output_tokens || 0,
cache_creation_input_tokens: usage.cache_creation_input_tokens || 0,
cache_read_input_tokens: usage.prompt_tokens_details?.cached_tokens || 0,
},
};
}
// ─── Streaming translator: intercepts OpenAI SSE, emits Anthropic SSE ──
class AnthropicStreamTranslator {
constructor(res, msgId, model) {
this.res = res;
this.msgId = msgId;
this.model = model;
// Current content block: null | { type, index }
// type: 'text' | 'thinking' | 'tool_use'
this.current = null;
this.blockIndex = 0;
this.toolCallBufs = new Map(); // index β†’ { id, name, argsBuffered }
this.finalUsage = null;
this.stopReason = 'end_turn';
this.messageStarted = false;
this.messageStopped = false;
this.pendingSseBuf = '';
}
send(event, data) {
if (!this.res.writableEnded) {
this.res.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`);
}
}
startMessage() {
if (this.messageStarted) return;
this.messageStarted = true;
this.send('message_start', {
type: 'message_start',
message: {
id: this.msgId,
type: 'message',
role: 'assistant',
content: [],
model: this.model,
stop_reason: null,
stop_sequence: null,
usage: { input_tokens: 0, output_tokens: 0, cache_creation_input_tokens: 0, cache_read_input_tokens: 0 },
},
});
}
startBlock(type, extra = {}) {
this.closeCurrentBlock();
this.current = { type, index: this.blockIndex };
let content_block;
if (type === 'text') content_block = { type: 'text', text: '' };
else if (type === 'thinking') content_block = { type: 'thinking', thinking: '' };
else if (type === 'tool_use') content_block = { type: 'tool_use', id: extra.id, name: extra.name, input: {} };
this.send('content_block_start', {
type: 'content_block_start',
index: this.blockIndex,
content_block,
});
}
closeCurrentBlock() {
if (!this.current) return;
this.send('content_block_stop', { type: 'content_block_stop', index: this.current.index });
this.blockIndex++;
this.current = null;
}
emitTextDelta(text) {
if (!text) return;
if (this.current?.type !== 'text') this.startBlock('text');
this.send('content_block_delta', {
type: 'content_block_delta',
index: this.current.index,
delta: { type: 'text_delta', text },
});
}
emitThinkingDelta(text) {
if (!text) return;
if (this.current?.type !== 'thinking') this.startBlock('thinking');
this.send('content_block_delta', {
type: 'content_block_delta',
index: this.current.index,
delta: { type: 'thinking_delta', thinking: text },
});
}
emitToolCallDelta(toolCall) {
const idx = toolCall.index ?? 0;
const existing = this.toolCallBufs.get(idx);
const id = toolCall.id || existing?.id;
const name = toolCall.function?.name || existing?.name;
const argsChunk = toolCall.function?.arguments || '';
if (!existing) {
// New tool call β€” start a new tool_use content block
this.startBlock('tool_use', { id, name });
this.toolCallBufs.set(idx, { id, name, blockIndex: this.current.index, argsBuffered: '' });
}
const buf = this.toolCallBufs.get(idx);
if (argsChunk) {
buf.argsBuffered += argsChunk;
this.send('content_block_delta', {
type: 'content_block_delta',
index: buf.blockIndex,
delta: { type: 'input_json_delta', partial_json: argsChunk },
});
}
}
processChunk(chunk) {
this.startMessage();
const choice = chunk.choices?.[0];
if (choice) {
const delta = choice.delta || {};
if (delta.reasoning_content) this.emitThinkingDelta(delta.reasoning_content);
if (delta.content) this.emitTextDelta(delta.content);
if (Array.isArray(delta.tool_calls)) {
for (const tc of delta.tool_calls) this.emitToolCallDelta(tc);
}
if (choice.finish_reason) {
const stopMap = { stop: 'end_turn', length: 'max_tokens', tool_calls: 'tool_use' };
this.stopReason = stopMap[choice.finish_reason] || 'end_turn';
}
}
if (chunk.usage) this.finalUsage = chunk.usage;
}
finish() {
if (this.messageStopped) return;
this.messageStopped = true;
this.closeCurrentBlock();
const u = this.finalUsage || {};
this.send('message_delta', {
type: 'message_delta',
delta: { stop_reason: this.stopReason, stop_sequence: null },
usage: {
input_tokens: u.prompt_tokens || u.input_tokens || 0,
output_tokens: u.completion_tokens || u.output_tokens || 0,
cache_creation_input_tokens: u.cache_creation_input_tokens || 0,
cache_read_input_tokens: u.prompt_tokens_details?.cached_tokens || 0,
},
});
this.send('message_stop', { type: 'message_stop' });
}
// SSE parser β€” handleChatCompletions writes `data: {...}\n\n` frames;
// accumulate and flush each complete frame as a translated event.
feed(rawChunk) {
this.pendingSseBuf += typeof rawChunk === 'string' ? rawChunk : rawChunk.toString('utf8');
let idx;
while ((idx = this.pendingSseBuf.indexOf('\n\n')) !== -1) {
const frame = this.pendingSseBuf.slice(0, idx);
this.pendingSseBuf = this.pendingSseBuf.slice(idx + 2);
const lines = frame.split('\n');
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const payload = line.slice(6);
if (payload === '[DONE]') continue;
try {
this.processChunk(JSON.parse(payload));
} catch (e) {
log.warn(`Messages SSE parse error: ${e.message}`);
}
}
}
}
}
// ─── Fake ServerResponse that pipes writes into the translator ──
function createCaptureRes(translator) {
const listeners = new Map();
const fire = (event) => {
const cbs = listeners.get(event) || [];
for (const cb of cbs) { try { cb(); } catch {} }
};
return {
writableEnded: false,
headersSent: false,
writeHead() { this.headersSent = true; },
write(chunk) {
translator.feed(chunk);
return true;
},
end(chunk) {
if (this.writableEnded) return;
if (chunk) translator.feed(chunk);
translator.finish();
this.writableEnded = true;
fire('close');
},
// Fire 'close' without marking writableEnded=true so chat.js's
// close handler sees an un-ended stream and triggers its abort path.
_clientDisconnected() { fire('close'); },
on(event, cb) {
if (!listeners.has(event)) listeners.set(event, []);
listeners.get(event).push(cb);
return this;
},
once(event, cb) {
const self = this;
const wrapped = function onceWrapper() {
self.off(event, wrapped);
cb.apply(self, arguments);
};
return self.on(event, wrapped);
},
off(event, cb) {
const arr = listeners.get(event);
if (arr) {
const idx = arr.indexOf(cb);
if (idx !== -1) arr.splice(idx, 1);
}
return this;
},
removeListener(event, cb) { return this.off(event, cb); },
emit() { return true; },
};
}
// ─── Main entry ───────────────────────────────────────────────
export async function handleMessages(body) {
const msgId = genMsgId();
const requestedModel = body.model || 'claude-sonnet-4.6';
const wantStream = !!body.stream;
const openaiBody = anthropicToOpenAI(body);
if (!wantStream) {
const result = await handleChatCompletions({ ...openaiBody, stream: false });
if (result.status !== 200) {
return {
status: result.status,
body: {
type: 'error',
error: {
type: result.body?.error?.type || 'api_error',
message: result.body?.error?.message || 'Unknown error',
},
},
};
}
return { status: 200, body: openAIToAnthropic(result.body, requestedModel, msgId) };
}
// Streaming path β€” ask handleChatCompletions for its streaming handler and
// point its writes at our translator shim. This lets the upstream Cascade
// poll loop drive the downstream SSE in real time β€” no buffer-then-replay.
const streamResult = await handleChatCompletions({ ...openaiBody, stream: true });
if (!streamResult.stream) {
// The OpenAI path returned a non-stream error (e.g. 403 model_not_entitled)
return {
status: streamResult.status || 502,
body: {
type: 'error',
error: {
type: streamResult.body?.error?.type || 'api_error',
message: streamResult.body?.error?.message || 'Upstream error',
},
},
};
}
return {
status: 200,
stream: true,
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no',
},
async handler(realRes) {
const translator = new AnthropicStreamTranslator(realRes, msgId, requestedModel);
const captureRes = createCaptureRes(translator);
// Forward client disconnect so the upstream cascade is cancelled.
// We don't call captureRes.end() here β€” that would set writableEnded=true
// and suppress the abort path inside chat.js's stream handler.
realRes.on('close', () => {
if (!captureRes.writableEnded) captureRes._clientDisconnected();
});
try {
await streamResult.handler(captureRes);
} catch (e) {
log.error(`Messages stream error: ${e.message}`);
if (!translator.messageStarted) {
translator.startMessage();
}
translator.finish();
}
if (!realRes.writableEnded) realRes.end();
},
};
}