Spaces:
Sleeping
Sleeping
| /** | |
| * POST /v1/messages β Anthropic Messages API compatibility layer. | |
| * | |
| * Translates Anthropic request/response format to/from the internal OpenAI | |
| * format so Claude Code and any Anthropic SDK client can connect directly. | |
| * | |
| * Streaming path is a real-time translator: it pipes the OpenAI SSE stream | |
| * from handleChatCompletions through a response shim that parses each | |
| * chat.completion.chunk and emits the equivalent Anthropic message_start / | |
| * content_block_* / message_delta / message_stop events as bytes arrive. | |
| * No buffering, so first-token latency matches the upstream Cascade stream. | |
| */ | |
| import { randomUUID } from 'crypto'; | |
| import { handleChatCompletions } from './chat.js'; | |
| import { log } from '../config.js'; | |
| function genMsgId() { | |
| return 'msg_' + randomUUID().replace(/-/g, '').slice(0, 24); | |
| } | |
| // βββ Anthropic β OpenAI request translation ββββββββββββββββββ | |
| function anthropicToOpenAI(body) { | |
| const messages = []; | |
| if (body.system) { | |
| const sysText = typeof body.system === 'string' | |
| ? body.system | |
| : Array.isArray(body.system) | |
| ? body.system.map(b => b.text || '').join('\n') | |
| : ''; | |
| if (sysText) messages.push({ role: 'system', content: sysText }); | |
| } | |
| for (const m of (body.messages || [])) { | |
| const role = m.role === 'assistant' ? 'assistant' : 'user'; | |
| if (typeof m.content === 'string') { | |
| messages.push({ role, content: m.content }); | |
| } else if (Array.isArray(m.content)) { | |
| const textParts = []; | |
| const toolCalls = []; | |
| const toolResults = []; | |
| for (const block of m.content) { | |
| if (block.type === 'text') { | |
| textParts.push(block.text || ''); | |
| } else if (block.type === 'thinking') { | |
| // Thinking blocks from assistant history β skip; the model will regenerate | |
| } else if (block.type === 'tool_use' && role === 'assistant') { | |
| toolCalls.push({ | |
| id: block.id || `call_${randomUUID().slice(0, 8)}`, | |
| type: 'function', | |
| function: { name: block.name, arguments: JSON.stringify(block.input || {}) }, | |
| }); | |
| } else if (block.type === 'tool_result') { | |
| const content = typeof block.content === 'string' | |
| ? block.content | |
| : Array.isArray(block.content) | |
| ? block.content.map(b => b.text || '').join('\n') | |
| : JSON.stringify(block.content); | |
| toolResults.push({ role: 'tool', tool_call_id: block.tool_use_id, content }); | |
| } | |
| } | |
| if (toolCalls.length) { | |
| messages.push({ | |
| role: 'assistant', | |
| content: textParts.length ? textParts.join('\n') : null, | |
| tool_calls: toolCalls, | |
| }); | |
| } else if (textParts.length) { | |
| messages.push({ role, content: textParts.join('\n') }); | |
| } | |
| for (const tr of toolResults) messages.push(tr); | |
| } | |
| } | |
| const tools = (body.tools || []).map(t => ({ | |
| type: 'function', | |
| function: { | |
| name: t.name, | |
| description: t.description || '', | |
| parameters: t.input_schema || {}, | |
| }, | |
| })); | |
| return { | |
| model: body.model || 'claude-sonnet-4.6', | |
| messages, | |
| max_tokens: body.max_tokens || 8192, | |
| stream: !!body.stream, | |
| ...(tools.length ? { tools } : {}), | |
| ...(body.temperature != null ? { temperature: body.temperature } : {}), | |
| ...(body.top_p != null ? { top_p: body.top_p } : {}), | |
| ...(body.stop_sequences ? { stop: body.stop_sequences } : {}), | |
| }; | |
| } | |
| // βββ OpenAI β Anthropic non-stream response translation ββββββ | |
| function openAIToAnthropic(result, model, msgId) { | |
| const choice = result.choices?.[0]; | |
| const usage = result.usage || {}; | |
| const content = []; | |
| if (choice?.message?.reasoning_content) { | |
| content.push({ type: 'thinking', thinking: choice.message.reasoning_content }); | |
| } | |
| if (choice?.message?.tool_calls?.length) { | |
| if (choice.message.content) content.push({ type: 'text', text: choice.message.content }); | |
| for (const tc of choice.message.tool_calls) { | |
| let input = {}; | |
| try { input = JSON.parse(tc.function?.arguments || '{}'); } catch {} | |
| content.push({ | |
| type: 'tool_use', | |
| id: tc.id, | |
| name: tc.function?.name || 'unknown', | |
| input, | |
| }); | |
| } | |
| } else { | |
| content.push({ type: 'text', text: choice?.message?.content || '' }); | |
| } | |
| const stopMap = { stop: 'end_turn', length: 'max_tokens', tool_calls: 'tool_use' }; | |
| return { | |
| id: msgId, | |
| type: 'message', | |
| role: 'assistant', | |
| content, | |
| model: model || result.model, | |
| stop_reason: stopMap[choice?.finish_reason] || 'end_turn', | |
| stop_sequence: null, | |
| usage: { | |
| input_tokens: usage.prompt_tokens || usage.input_tokens || 0, | |
| output_tokens: usage.completion_tokens || usage.output_tokens || 0, | |
| cache_creation_input_tokens: usage.cache_creation_input_tokens || 0, | |
| cache_read_input_tokens: usage.prompt_tokens_details?.cached_tokens || 0, | |
| }, | |
| }; | |
| } | |
| // βββ Streaming translator: intercepts OpenAI SSE, emits Anthropic SSE ββ | |
| class AnthropicStreamTranslator { | |
| constructor(res, msgId, model) { | |
| this.res = res; | |
| this.msgId = msgId; | |
| this.model = model; | |
| // Current content block: null | { type, index } | |
| // type: 'text' | 'thinking' | 'tool_use' | |
| this.current = null; | |
| this.blockIndex = 0; | |
| this.toolCallBufs = new Map(); // index β { id, name, argsBuffered } | |
| this.finalUsage = null; | |
| this.stopReason = 'end_turn'; | |
| this.messageStarted = false; | |
| this.messageStopped = false; | |
| this.pendingSseBuf = ''; | |
| } | |
| send(event, data) { | |
| if (!this.res.writableEnded) { | |
| this.res.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`); | |
| } | |
| } | |
| startMessage() { | |
| if (this.messageStarted) return; | |
| this.messageStarted = true; | |
| this.send('message_start', { | |
| type: 'message_start', | |
| message: { | |
| id: this.msgId, | |
| type: 'message', | |
| role: 'assistant', | |
| content: [], | |
| model: this.model, | |
| stop_reason: null, | |
| stop_sequence: null, | |
| usage: { input_tokens: 0, output_tokens: 0, cache_creation_input_tokens: 0, cache_read_input_tokens: 0 }, | |
| }, | |
| }); | |
| } | |
| startBlock(type, extra = {}) { | |
| this.closeCurrentBlock(); | |
| this.current = { type, index: this.blockIndex }; | |
| let content_block; | |
| if (type === 'text') content_block = { type: 'text', text: '' }; | |
| else if (type === 'thinking') content_block = { type: 'thinking', thinking: '' }; | |
| else if (type === 'tool_use') content_block = { type: 'tool_use', id: extra.id, name: extra.name, input: {} }; | |
| this.send('content_block_start', { | |
| type: 'content_block_start', | |
| index: this.blockIndex, | |
| content_block, | |
| }); | |
| } | |
| closeCurrentBlock() { | |
| if (!this.current) return; | |
| this.send('content_block_stop', { type: 'content_block_stop', index: this.current.index }); | |
| this.blockIndex++; | |
| this.current = null; | |
| } | |
| emitTextDelta(text) { | |
| if (!text) return; | |
| if (this.current?.type !== 'text') this.startBlock('text'); | |
| this.send('content_block_delta', { | |
| type: 'content_block_delta', | |
| index: this.current.index, | |
| delta: { type: 'text_delta', text }, | |
| }); | |
| } | |
| emitThinkingDelta(text) { | |
| if (!text) return; | |
| if (this.current?.type !== 'thinking') this.startBlock('thinking'); | |
| this.send('content_block_delta', { | |
| type: 'content_block_delta', | |
| index: this.current.index, | |
| delta: { type: 'thinking_delta', thinking: text }, | |
| }); | |
| } | |
| emitToolCallDelta(toolCall) { | |
| const idx = toolCall.index ?? 0; | |
| const existing = this.toolCallBufs.get(idx); | |
| const id = toolCall.id || existing?.id; | |
| const name = toolCall.function?.name || existing?.name; | |
| const argsChunk = toolCall.function?.arguments || ''; | |
| if (!existing) { | |
| // New tool call β start a new tool_use content block | |
| this.startBlock('tool_use', { id, name }); | |
| this.toolCallBufs.set(idx, { id, name, blockIndex: this.current.index, argsBuffered: '' }); | |
| } | |
| const buf = this.toolCallBufs.get(idx); | |
| if (argsChunk) { | |
| buf.argsBuffered += argsChunk; | |
| this.send('content_block_delta', { | |
| type: 'content_block_delta', | |
| index: buf.blockIndex, | |
| delta: { type: 'input_json_delta', partial_json: argsChunk }, | |
| }); | |
| } | |
| } | |
| processChunk(chunk) { | |
| this.startMessage(); | |
| const choice = chunk.choices?.[0]; | |
| if (choice) { | |
| const delta = choice.delta || {}; | |
| if (delta.reasoning_content) this.emitThinkingDelta(delta.reasoning_content); | |
| if (delta.content) this.emitTextDelta(delta.content); | |
| if (Array.isArray(delta.tool_calls)) { | |
| for (const tc of delta.tool_calls) this.emitToolCallDelta(tc); | |
| } | |
| if (choice.finish_reason) { | |
| const stopMap = { stop: 'end_turn', length: 'max_tokens', tool_calls: 'tool_use' }; | |
| this.stopReason = stopMap[choice.finish_reason] || 'end_turn'; | |
| } | |
| } | |
| if (chunk.usage) this.finalUsage = chunk.usage; | |
| } | |
| finish() { | |
| if (this.messageStopped) return; | |
| this.messageStopped = true; | |
| this.closeCurrentBlock(); | |
| const u = this.finalUsage || {}; | |
| this.send('message_delta', { | |
| type: 'message_delta', | |
| delta: { stop_reason: this.stopReason, stop_sequence: null }, | |
| usage: { | |
| input_tokens: u.prompt_tokens || u.input_tokens || 0, | |
| output_tokens: u.completion_tokens || u.output_tokens || 0, | |
| cache_creation_input_tokens: u.cache_creation_input_tokens || 0, | |
| cache_read_input_tokens: u.prompt_tokens_details?.cached_tokens || 0, | |
| }, | |
| }); | |
| this.send('message_stop', { type: 'message_stop' }); | |
| } | |
| // SSE parser β handleChatCompletions writes `data: {...}\n\n` frames; | |
| // accumulate and flush each complete frame as a translated event. | |
| feed(rawChunk) { | |
| this.pendingSseBuf += typeof rawChunk === 'string' ? rawChunk : rawChunk.toString('utf8'); | |
| let idx; | |
| while ((idx = this.pendingSseBuf.indexOf('\n\n')) !== -1) { | |
| const frame = this.pendingSseBuf.slice(0, idx); | |
| this.pendingSseBuf = this.pendingSseBuf.slice(idx + 2); | |
| const lines = frame.split('\n'); | |
| for (const line of lines) { | |
| if (!line.startsWith('data: ')) continue; | |
| const payload = line.slice(6); | |
| if (payload === '[DONE]') continue; | |
| try { | |
| this.processChunk(JSON.parse(payload)); | |
| } catch (e) { | |
| log.warn(`Messages SSE parse error: ${e.message}`); | |
| } | |
| } | |
| } | |
| } | |
| } | |
| // βββ Fake ServerResponse that pipes writes into the translator ββ | |
| function createCaptureRes(translator) { | |
| const listeners = new Map(); | |
| const fire = (event) => { | |
| const cbs = listeners.get(event) || []; | |
| for (const cb of cbs) { try { cb(); } catch {} } | |
| }; | |
| return { | |
| writableEnded: false, | |
| headersSent: false, | |
| writeHead() { this.headersSent = true; }, | |
| write(chunk) { | |
| translator.feed(chunk); | |
| return true; | |
| }, | |
| end(chunk) { | |
| if (this.writableEnded) return; | |
| if (chunk) translator.feed(chunk); | |
| translator.finish(); | |
| this.writableEnded = true; | |
| fire('close'); | |
| }, | |
| // Fire 'close' without marking writableEnded=true so chat.js's | |
| // close handler sees an un-ended stream and triggers its abort path. | |
| _clientDisconnected() { fire('close'); }, | |
| on(event, cb) { | |
| if (!listeners.has(event)) listeners.set(event, []); | |
| listeners.get(event).push(cb); | |
| return this; | |
| }, | |
| once(event, cb) { | |
| const self = this; | |
| const wrapped = function onceWrapper() { | |
| self.off(event, wrapped); | |
| cb.apply(self, arguments); | |
| }; | |
| return self.on(event, wrapped); | |
| }, | |
| off(event, cb) { | |
| const arr = listeners.get(event); | |
| if (arr) { | |
| const idx = arr.indexOf(cb); | |
| if (idx !== -1) arr.splice(idx, 1); | |
| } | |
| return this; | |
| }, | |
| removeListener(event, cb) { return this.off(event, cb); }, | |
| emit() { return true; }, | |
| }; | |
| } | |
| // βββ Main entry βββββββββββββββββββββββββββββββββββββββββββββββ | |
| export async function handleMessages(body) { | |
| const msgId = genMsgId(); | |
| const requestedModel = body.model || 'claude-sonnet-4.6'; | |
| const wantStream = !!body.stream; | |
| const openaiBody = anthropicToOpenAI(body); | |
| if (!wantStream) { | |
| const result = await handleChatCompletions({ ...openaiBody, stream: false }); | |
| if (result.status !== 200) { | |
| return { | |
| status: result.status, | |
| body: { | |
| type: 'error', | |
| error: { | |
| type: result.body?.error?.type || 'api_error', | |
| message: result.body?.error?.message || 'Unknown error', | |
| }, | |
| }, | |
| }; | |
| } | |
| return { status: 200, body: openAIToAnthropic(result.body, requestedModel, msgId) }; | |
| } | |
| // Streaming path β ask handleChatCompletions for its streaming handler and | |
| // point its writes at our translator shim. This lets the upstream Cascade | |
| // poll loop drive the downstream SSE in real time β no buffer-then-replay. | |
| const streamResult = await handleChatCompletions({ ...openaiBody, stream: true }); | |
| if (!streamResult.stream) { | |
| // The OpenAI path returned a non-stream error (e.g. 403 model_not_entitled) | |
| return { | |
| status: streamResult.status || 502, | |
| body: { | |
| type: 'error', | |
| error: { | |
| type: streamResult.body?.error?.type || 'api_error', | |
| message: streamResult.body?.error?.message || 'Upstream error', | |
| }, | |
| }, | |
| }; | |
| } | |
| return { | |
| status: 200, | |
| stream: true, | |
| headers: { | |
| 'Content-Type': 'text/event-stream', | |
| 'Cache-Control': 'no-cache', | |
| 'Connection': 'keep-alive', | |
| 'X-Accel-Buffering': 'no', | |
| }, | |
| async handler(realRes) { | |
| const translator = new AnthropicStreamTranslator(realRes, msgId, requestedModel); | |
| const captureRes = createCaptureRes(translator); | |
| // Forward client disconnect so the upstream cascade is cancelled. | |
| // We don't call captureRes.end() here β that would set writableEnded=true | |
| // and suppress the abort path inside chat.js's stream handler. | |
| realRes.on('close', () => { | |
| if (!captureRes.writableEnded) captureRes._clientDisconnected(); | |
| }); | |
| try { | |
| await streamResult.handler(captureRes); | |
| } catch (e) { | |
| log.error(`Messages stream error: ${e.message}`); | |
| if (!translator.messageStarted) { | |
| translator.startMessage(); | |
| } | |
| translator.finish(); | |
| } | |
| if (!realRes.writableEnded) realRes.end(); | |
| }, | |
| }; | |
| } | |