Row-proxy / internal /handler /anthropic.go
Moge-Row's picture
fix undefined TrackUsage and cleanup keys.go
5c106f0
raw
history blame
29.5 kB
package handler
import (
"bufio"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"github.com/google/uuid"
"zai-proxy/internal/auth"
"zai-proxy/internal/filter"
"zai-proxy/internal/logger"
"zai-proxy/internal/model"
"zai-proxy/internal/upstream"
)
// HandleMessages handles Anthropic Messages API requests (/v1/messages)
func HandleMessages(w http.ResponseWriter, r *http.Request) {
apiKey := r.Header.Get("x-api-key")
if apiKey == "" {
apiKey = strings.TrimPrefix(r.Header.Get("Authorization"), "Bearer ")
}
if apiKey == "" {
writeAnthropicError(w, http.StatusUnauthorized, "authentication_error", "Missing API key")
return
}
ok, reason := CheckAndTrack(apiKey, 0)
if !ok {
writeAnthropicError(w, http.StatusTooManyRequests, "rate_limit_error", reason)
return
}
token := apiKey
defer func() { TrackUsage(apiKey, 150) }()
if token == "free" || strings.HasPrefix(token, "RWPX-") {
anonymousToken, err := auth.GetAnonymousToken()
if err != nil {
logger.LogError("Failed to get anonymous token: %v", err)
writeAnthropicError(w, http.StatusInternalServerError, "api_error", "Failed to get anonymous token")
return
}
token = anonymousToken
}
var req model.AnthropicRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeAnthropicError(w, http.StatusBadRequest, "invalid_request_error", "Invalid request body")
return
}
if req.MaxTokens == 0 {
req.MaxTokens = 8192
}
// Determine if thinking is enabled
thinkingEnabled := false
if req.Thinking != nil && req.Thinking.Type == "enabled" {
thinkingEnabled = true
}
// Resolve Claude model name to GLM model name
resolvedModel, _ := model.ResolveClaudeModel(req.Model, thinkingEnabled)
// Convert Anthropic messages to internal format
messages, tools, toolChoice := convertAnthropicToInternal(req)
resp, modelName, err := upstream.MakeUpstreamRequest(token, messages, resolvedModel, tools, toolChoice)
if err != nil {
logger.LogError("Upstream request failed: %v", err)
writeAnthropicError(w, http.StatusBadGateway, "api_error", "Upstream error")
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
bodyStr := string(body)
if len(bodyStr) > 500 {
bodyStr = bodyStr[:500]
}
logger.LogError("Upstream error: status=%d, body=%s", resp.StatusCode, bodyStr)
writeAnthropicError(w, resp.StatusCode, "api_error", "Upstream error")
return
}
messageID := fmt.Sprintf("msg_%s", uuid.New().String()[:24])
if req.Stream {
handleAnthropicStream(w, resp.Body, messageID, modelName, req.Model, tools)
} else {
handleAnthropicNonStream(w, resp.Body, messageID, modelName, req.Model, tools)
}
}
// convertAnthropicToInternal converts Anthropic request format to internal Message/Tool format
func convertAnthropicToInternal(req model.AnthropicRequest) ([]model.Message, []model.Tool, interface{}) {
var messages []model.Message
// Convert system field to a system role message
if req.System != nil {
systemText := ""
switch s := req.System.(type) {
case string:
systemText = s
case []interface{}:
// Array of content blocks
for _, item := range s {
if block, ok := item.(map[string]interface{}); ok {
if t, ok := block["text"].(string); ok {
systemText += t
}
}
}
}
if systemText != "" {
messages = append(messages, model.Message{
Role: "system",
Content: systemText,
})
}
}
// Convert Anthropic messages to internal format
for _, msg := range req.Messages {
switch msg.Role {
case "user":
text, blocks := msg.ParseContent()
if len(blocks) == 0 {
// Simple text message
messages = append(messages, model.Message{
Role: "user",
Content: text,
})
} else {
// Process content blocks - may contain tool_result
for _, block := range blocks {
switch block.Type {
case "text":
messages = append(messages, model.Message{
Role: "user",
Content: block.Text,
})
case "tool_result":
// Convert tool_result to tool role message
resultContent := ""
switch c := block.Content.(type) {
case string:
resultContent = c
case []interface{}:
for _, item := range c {
if part, ok := item.(map[string]interface{}); ok {
if t, ok := part["text"].(string); ok {
resultContent += t
}
}
}
}
messages = append(messages, model.Message{
Role: "tool",
Content: resultContent,
ToolCallID: block.ToolUseID,
})
case "image":
// Skip image blocks for now
}
}
}
case "assistant":
_, blocks := msg.ParseContent()
if len(blocks) == 0 {
// Simple text
text, _ := msg.ParseContent()
messages = append(messages, model.Message{
Role: "assistant",
Content: text,
})
} else {
// Assistant message with content blocks
var textContent string
var toolCalls []model.ToolCall
for _, block := range blocks {
switch block.Type {
case "text":
textContent += block.Text
case "thinking":
// Skip thinking blocks in history - upstream doesn't need them
case "tool_use":
argsStr := "{}"
if block.Input != nil {
argsStr = string(block.Input)
}
toolCalls = append(toolCalls, model.ToolCall{
ID: block.ID,
Type: "function",
Function: model.FunctionCall{
Name: block.Name,
Arguments: argsStr,
},
})
}
}
messages = append(messages, model.Message{
Role: "assistant",
Content: textContent,
ToolCalls: toolCalls,
})
}
}
}
// Convert Anthropic tools to OpenAI format
var tools []model.Tool
for _, t := range req.Tools {
tools = append(tools, model.Tool{
Type: "function",
Function: model.ToolFunction{
Name: t.Name,
Description: t.Description,
Parameters: t.InputSchema,
},
})
}
// Convert tool_choice
var toolChoice interface{}
if req.ToolChoice != nil {
switch tc := req.ToolChoice.(type) {
case map[string]interface{}:
tcType, _ := tc["type"].(string)
switch tcType {
case "auto":
toolChoice = "auto"
case "any":
toolChoice = "required"
case "none":
toolChoice = "none"
case "tool":
if name, ok := tc["name"].(string); ok {
toolChoice = map[string]interface{}{
"type": "function",
"function": map[string]interface{}{"name": name},
}
}
}
}
}
return messages, tools, toolChoice
}
// handleAnthropicStream processes upstream SSE and converts to Anthropic streaming format
func handleAnthropicStream(w http.ResponseWriter, body io.ReadCloser, messageID, modelName, requestModel string, tools []model.Tool) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
flusher, ok := w.(http.Flusher)
if !ok {
writeAnthropicError(w, http.StatusInternalServerError, "api_error", "Streaming not supported")
return
}
// Send message_start
msgStart := model.AnthropicMessageStart{
Type: "message_start",
Message: model.AnthropicResponse{
ID: messageID,
Type: "message",
Role: "assistant",
Content: []model.AnthropicContentBlock{},
Model: requestModel,
StopReason: "",
Usage: model.AnthropicUsage{InputTokens: 0, OutputTokens: 0},
},
}
sendAnthropicSSE(w, flusher, "message_start", msgStart)
scanner := bufio.NewScanner(body)
scanner.Buffer(make([]byte, 1024*1024), 1024*1024)
searchRefFilter := filter.NewSearchRefFilter()
thinkingFilter := &filter.ThinkingFilter{}
contentBlockIndex := 0
inThinkingBlock := false
inTextBlock := false
inToolUseBlock := false
hasContent := false
totalContentOutputLength := 0
hasToolCalls := false
var collectedToolCalls []model.ToolCall
promptToolBuffer := ""
for scanner.Scan() {
line := scanner.Text()
logger.LogDebug("[Anthropic-Upstream] %s", line)
if !strings.HasPrefix(line, "data: ") {
continue
}
payload := strings.TrimPrefix(line, "data: ")
if payload == "[DONE]" {
break
}
var upstreamData model.UpstreamData
if err := json.Unmarshal([]byte(payload), &upstreamData); err != nil {
continue
}
if upstreamData.Data.Phase == "done" {
break
}
// Handle thinking phase
if upstreamData.Data.Phase == "thinking" && upstreamData.Data.DeltaContent != "" {
isNewThinkingRound := false
if thinkingFilter.LastPhase != "" && thinkingFilter.LastPhase != "thinking" {
thinkingFilter.ResetForNewRound()
thinkingFilter.ThinkingRoundCount++
isNewThinkingRound = true
}
thinkingFilter.LastPhase = "thinking"
reasoningContent := thinkingFilter.ProcessThinking(upstreamData.Data.DeltaContent)
if isNewThinkingRound && thinkingFilter.ThinkingRoundCount > 1 && reasoningContent != "" {
reasoningContent = "\n\n" + reasoningContent
}
if reasoningContent != "" {
thinkingFilter.LastOutputChunk = reasoningContent
reasoningContent = searchRefFilter.Process(reasoningContent)
if reasoningContent != "" {
// Close previous non-thinking block if open
if inTextBlock {
sendAnthropicSSE(w, flusher, "content_block_stop", model.AnthropicContentBlockStop{
Type: "content_block_stop", Index: contentBlockIndex,
})
contentBlockIndex++
inTextBlock = false
}
// Start thinking block if not already in one
if !inThinkingBlock {
sendAnthropicSSE(w, flusher, "content_block_start", model.AnthropicContentBlockStart{
Type: "content_block_start",
Index: contentBlockIndex,
ContentBlock: model.AnthropicContentBlock{Type: "thinking", Thinking: ""},
})
inThinkingBlock = true
}
hasContent = true
sendAnthropicSSE(w, flusher, "content_block_delta", model.AnthropicContentBlockDelta{
Type: "content_block_delta",
Index: contentBlockIndex,
Delta: model.AnthropicContentBlockDelta2{Type: "thinking_delta", Thinking: reasoningContent},
})
}
}
continue
}
if upstreamData.Data.Phase != "" {
thinkingFilter.LastPhase = upstreamData.Data.Phase
}
// Filter search results, image searches, mcp, etc.
editContent := upstreamData.GetEditContent()
if editContent != "" && filter.IsSearchResultContent(editContent) {
if results := filter.ParseSearchResults(editContent); len(results) > 0 {
searchRefFilter.AddSearchResults(results)
}
continue
}
if editContent != "" && strings.Contains(editContent, `"search_image"`) {
textBeforeBlock := filter.ExtractTextBeforeGlmBlock(editContent)
if textBeforeBlock != "" {
emitAnthropicTextDelta(w, flusher, &contentBlockIndex, &inThinkingBlock, &inTextBlock, &inToolUseBlock, &hasContent, searchRefFilter.Process(textBeforeBlock))
}
continue
}
if editContent != "" && strings.Contains(editContent, `"mcp"`) {
textBeforeBlock := filter.ExtractTextBeforeGlmBlock(editContent)
if textBeforeBlock != "" {
emitAnthropicTextDelta(w, flusher, &contentBlockIndex, &inThinkingBlock, &inTextBlock, &inToolUseBlock, &hasContent, searchRefFilter.Process(textBeforeBlock))
}
continue
}
if editContent != "" && filter.IsSearchToolCall(editContent, upstreamData.Data.Phase) {
continue
}
// Handle function tool calls
if len(tools) > 0 && editContent != "" && filter.IsFunctionToolCall(editContent, upstreamData.Data.Phase) {
if toolCalls := filter.ParseFunctionToolCalls(editContent); len(toolCalls) > 0 {
for i := range toolCalls {
if toolCalls[i].ID == "" {
toolCalls[i].ID = fmt.Sprintf("toolu_%s", uuid.New().String()[:24])
}
}
collectedToolCalls = toolCalls
hasToolCalls = true
// Close thinking/text blocks
if inThinkingBlock {
sendAnthropicSSE(w, flusher, "content_block_stop", model.AnthropicContentBlockStop{
Type: "content_block_stop", Index: contentBlockIndex,
})
contentBlockIndex++
inThinkingBlock = false
}
if inTextBlock {
sendAnthropicSSE(w, flusher, "content_block_stop", model.AnthropicContentBlockStop{
Type: "content_block_stop", Index: contentBlockIndex,
})
contentBlockIndex++
inTextBlock = false
}
for _, tc := range toolCalls {
emitAnthropicToolUse(w, flusher, &contentBlockIndex, &inToolUseBlock, tc)
}
}
continue
}
// Flush thinking filter
if thinkingRemaining := thinkingFilter.Flush(); thinkingRemaining != "" {
thinkingFilter.LastOutputChunk = thinkingRemaining
processedRemaining := searchRefFilter.Process(thinkingRemaining)
if processedRemaining != "" {
if !inThinkingBlock {
// Close text block if open
if inTextBlock {
sendAnthropicSSE(w, flusher, "content_block_stop", model.AnthropicContentBlockStop{
Type: "content_block_stop", Index: contentBlockIndex,
})
contentBlockIndex++
inTextBlock = false
}
sendAnthropicSSE(w, flusher, "content_block_start", model.AnthropicContentBlockStart{
Type: "content_block_start",
Index: contentBlockIndex,
ContentBlock: model.AnthropicContentBlock{Type: "thinking", Thinking: ""},
})
inThinkingBlock = true
}
hasContent = true
sendAnthropicSSE(w, flusher, "content_block_delta", model.AnthropicContentBlockDelta{
Type: "content_block_delta",
Index: contentBlockIndex,
Delta: model.AnthropicContentBlockDelta2{Type: "thinking_delta", Thinking: processedRemaining},
})
}
}
// Extract content
content := ""
if upstreamData.Data.Phase == "answer" && upstreamData.Data.DeltaContent != "" {
content = upstreamData.Data.DeltaContent
} else if upstreamData.Data.Phase == "answer" && editContent != "" {
if strings.Contains(editContent, "</details>") {
if idx := strings.Index(editContent, "</details>"); idx != -1 {
afterDetails := editContent[idx+len("</details>"):]
if strings.HasPrefix(afterDetails, "\n") {
content = afterDetails[1:]
} else {
content = afterDetails
}
totalContentOutputLength = len([]rune(content))
}
}
} else if (upstreamData.Data.Phase == "other" || upstreamData.Data.Phase == "tool_call") && editContent != "" {
fullContentRunes := []rune(editContent)
if len(fullContentRunes) > totalContentOutputLength {
content = string(fullContentRunes[totalContentOutputLength:])
totalContentOutputLength = len(fullContentRunes)
} else {
content = editContent
}
}
if content == "" {
continue
}
content = searchRefFilter.Process(content)
if content == "" {
continue
}
hasContent = true
if upstreamData.Data.Phase == "answer" && upstreamData.Data.DeltaContent != "" {
totalContentOutputLength += len([]rune(content))
}
// Prompt tool extraction: buffer answer text for <tool_call> detection
if len(tools) > 0 {
promptToolBuffer += content
for {
openIdx := strings.Index(promptToolBuffer, "<tool_call>")
if openIdx == -1 {
break
}
if openIdx > 0 {
safeContent := promptToolBuffer[:openIdx]
promptToolBuffer = promptToolBuffer[openIdx:]
if safeContent != "" {
emitAnthropicTextDelta(w, flusher, &contentBlockIndex, &inThinkingBlock, &inTextBlock, &inToolUseBlock, &hasContent, safeContent)
}
}
afterOpen := promptToolBuffer[len("<tool_call>"):]
closeIdx := strings.Index(promptToolBuffer, "</tool_call>")
thinkCloseIdx := strings.Index(afterOpen, "</think>")
nextOpenIdx := strings.Index(afterOpen, "<tool_call>")
blockEnd := -1
if closeIdx != -1 {
blockEnd = closeIdx + len("</tool_call>")
}
if thinkCloseIdx != -1 {
candidate := len("<tool_call>") + thinkCloseIdx + len("</think>")
if blockEnd == -1 || candidate < blockEnd {
blockEnd = candidate
}
}
if nextOpenIdx != -1 {
candidate := len("<tool_call>") + nextOpenIdx
if blockEnd == -1 || candidate < blockEnd {
blockEnd = candidate
}
}
if blockEnd == -1 {
break
}
block := promptToolBuffer[:blockEnd]
promptToolBuffer = promptToolBuffer[blockEnd:]
_, ptToolCalls := filter.ExtractPromptToolCalls(block)
if len(ptToolCalls) > 0 {
collectedToolCalls = append(collectedToolCalls, ptToolCalls...)
hasToolCalls = true
// Close thinking/text blocks before emitting tool use
if inThinkingBlock {
sendAnthropicSSE(w, flusher, "content_block_stop", model.AnthropicContentBlockStop{
Type: "content_block_stop", Index: contentBlockIndex,
})
contentBlockIndex++
inThinkingBlock = false
}
if inTextBlock {
sendAnthropicSSE(w, flusher, "content_block_stop", model.AnthropicContentBlockStop{
Type: "content_block_stop", Index: contentBlockIndex,
})
contentBlockIndex++
inTextBlock = false
}
for _, tc := range ptToolCalls {
tc.ID = fmt.Sprintf("toolu_%s", uuid.New().String()[:24])
emitAnthropicToolUse(w, flusher, &contentBlockIndex, &inToolUseBlock, tc)
}
}
}
continue
}
emitAnthropicTextDelta(w, flusher, &contentBlockIndex, &inThinkingBlock, &inTextBlock, &inToolUseBlock, &hasContent, content)
}
if err := scanner.Err(); err != nil {
logger.LogError("[Anthropic-Upstream] scanner error: %v", err)
}
// Flush remaining prompt tool buffer
if promptToolBuffer != "" {
cleanContent, ptToolCalls := filter.ExtractPromptToolCalls(promptToolBuffer)
if len(ptToolCalls) > 0 {
collectedToolCalls = append(collectedToolCalls, ptToolCalls...)
hasToolCalls = true
if inThinkingBlock {
sendAnthropicSSE(w, flusher, "content_block_stop", model.AnthropicContentBlockStop{
Type: "content_block_stop", Index: contentBlockIndex,
})
contentBlockIndex++
inThinkingBlock = false
}
if inTextBlock {
sendAnthropicSSE(w, flusher, "content_block_stop", model.AnthropicContentBlockStop{
Type: "content_block_stop", Index: contentBlockIndex,
})
contentBlockIndex++
inTextBlock = false
}
for _, tc := range ptToolCalls {
tc.ID = fmt.Sprintf("toolu_%s", uuid.New().String()[:24])
emitAnthropicToolUse(w, flusher, &contentBlockIndex, &inToolUseBlock, tc)
}
}
if cleanContent != "" {
emitAnthropicTextDelta(w, flusher, &contentBlockIndex, &inThinkingBlock, &inTextBlock, &inToolUseBlock, &hasContent, cleanContent)
}
promptToolBuffer = ""
}
// Flush search ref filter
if remaining := searchRefFilter.Flush(); remaining != "" {
emitAnthropicTextDelta(w, flusher, &contentBlockIndex, &inThinkingBlock, &inTextBlock, &inToolUseBlock, &hasContent, remaining)
}
if !hasContent && !hasToolCalls {
logger.LogError("Anthropic stream response 200 but no content received")
}
// Close any open blocks
if inThinkingBlock {
sendAnthropicSSE(w, flusher, "content_block_stop", model.AnthropicContentBlockStop{
Type: "content_block_stop", Index: contentBlockIndex,
})
contentBlockIndex++
inThinkingBlock = false
}
if inTextBlock {
sendAnthropicSSE(w, flusher, "content_block_stop", model.AnthropicContentBlockStop{
Type: "content_block_stop", Index: contentBlockIndex,
})
contentBlockIndex++
inTextBlock = false
}
if inToolUseBlock {
sendAnthropicSSE(w, flusher, "content_block_stop", model.AnthropicContentBlockStop{
Type: "content_block_stop", Index: contentBlockIndex,
})
contentBlockIndex++
inToolUseBlock = false
}
// Determine stop reason
stopReason := "end_turn"
if hasToolCalls {
stopReason = "tool_use"
}
// Send message_delta with stop_reason and usage
sendAnthropicSSE(w, flusher, "message_delta", model.AnthropicMessageDelta{
Type: "message_delta",
Delta: struct {
StopReason string `json:"stop_reason"`
StopSequence *string `json:"stop_sequence"`
}{
StopReason: stopReason,
},
Usage: model.AnthropicUsage{OutputTokens: contentBlockIndex * 100}, // Rough estimate
})
// Send message_stop
sendAnthropicSSE(w, flusher, "message_stop", model.AnthropicMessageStop{Type: "message_stop"})
// Suppress unused variable warnings
_ = inThinkingBlock
_ = inTextBlock
_ = inToolUseBlock
_ = contentBlockIndex
}
// handleAnthropicNonStream collects all upstream data and returns an Anthropic response
func handleAnthropicNonStream(w http.ResponseWriter, body io.ReadCloser, messageID, modelName, requestModel string, tools []model.Tool) {
scanner := bufio.NewScanner(body)
scanner.Buffer(make([]byte, 1024*1024), 1024*1024)
var chunks []string
var reasoningChunks []string
thinkingFilter := &filter.ThinkingFilter{}
searchRefFilter := filter.NewSearchRefFilter()
hasThinking := false
var collectedToolCalls []model.ToolCall
for scanner.Scan() {
line := scanner.Text()
if !strings.HasPrefix(line, "data: ") {
continue
}
payload := strings.TrimPrefix(line, "data: ")
if payload == "[DONE]" {
break
}
var upstreamData model.UpstreamData
if err := json.Unmarshal([]byte(payload), &upstreamData); err != nil {
continue
}
if upstreamData.Data.Phase == "done" {
break
}
if upstreamData.Data.Phase == "thinking" && upstreamData.Data.DeltaContent != "" {
if thinkingFilter.LastPhase != "" && thinkingFilter.LastPhase != "thinking" {
thinkingFilter.ResetForNewRound()
thinkingFilter.ThinkingRoundCount++
if thinkingFilter.ThinkingRoundCount > 1 {
reasoningChunks = append(reasoningChunks, "\n\n")
}
}
thinkingFilter.LastPhase = "thinking"
hasThinking = true
reasoningContent := thinkingFilter.ProcessThinking(upstreamData.Data.DeltaContent)
if reasoningContent != "" {
thinkingFilter.LastOutputChunk = reasoningContent
reasoningChunks = append(reasoningChunks, reasoningContent)
}
continue
}
if upstreamData.Data.Phase != "" {
thinkingFilter.LastPhase = upstreamData.Data.Phase
}
editContent := upstreamData.GetEditContent()
if editContent != "" && filter.IsSearchResultContent(editContent) {
if results := filter.ParseSearchResults(editContent); len(results) > 0 {
searchRefFilter.AddSearchResults(results)
}
continue
}
if editContent != "" && strings.Contains(editContent, `"search_image"`) {
textBeforeBlock := filter.ExtractTextBeforeGlmBlock(editContent)
if textBeforeBlock != "" {
chunks = append(chunks, textBeforeBlock)
}
continue
}
if editContent != "" && strings.Contains(editContent, `"mcp"`) {
textBeforeBlock := filter.ExtractTextBeforeGlmBlock(editContent)
if textBeforeBlock != "" {
chunks = append(chunks, textBeforeBlock)
}
continue
}
if editContent != "" && filter.IsSearchToolCall(editContent, upstreamData.Data.Phase) {
continue
}
if len(tools) > 0 && editContent != "" && filter.IsFunctionToolCall(editContent, upstreamData.Data.Phase) {
if toolCalls := filter.ParseFunctionToolCalls(editContent); len(toolCalls) > 0 {
for i := range toolCalls {
if toolCalls[i].ID == "" {
toolCalls[i].ID = fmt.Sprintf("toolu_%s", uuid.New().String()[:24])
}
}
collectedToolCalls = toolCalls
}
continue
}
content := ""
if upstreamData.Data.Phase == "answer" && upstreamData.Data.DeltaContent != "" {
content = upstreamData.Data.DeltaContent
} else if upstreamData.Data.Phase == "answer" && editContent != "" {
if strings.Contains(editContent, "</details>") {
reasoningContent := thinkingFilter.ExtractIncrementalThinking(editContent)
if reasoningContent != "" {
reasoningChunks = append(reasoningChunks, reasoningContent)
}
if idx := strings.Index(editContent, "</details>"); idx != -1 {
afterDetails := editContent[idx+len("</details>"):]
if strings.HasPrefix(afterDetails, "\n") {
content = afterDetails[1:]
} else {
content = afterDetails
}
}
}
} else if (upstreamData.Data.Phase == "other" || upstreamData.Data.Phase == "tool_call") && editContent != "" {
content = editContent
}
if content != "" {
chunks = append(chunks, content)
}
}
fullContent := strings.Join(chunks, "")
fullContent = searchRefFilter.Process(fullContent) + searchRefFilter.Flush()
fullReasoning := strings.Join(reasoningChunks, "")
fullReasoning = searchRefFilter.Process(fullReasoning) + searchRefFilter.Flush()
// Extract prompt tool calls from answer text
if len(tools) > 0 && len(collectedToolCalls) == 0 {
cleanContent, promptToolCalls := filter.ExtractPromptToolCalls(fullContent)
if len(promptToolCalls) > 0 {
collectedToolCalls = promptToolCalls
fullContent = cleanContent
}
}
// Build response content blocks
var contentBlocks []model.AnthropicContentBlock
if hasThinking && fullReasoning != "" {
contentBlocks = append(contentBlocks, model.AnthropicContentBlock{
Type: "thinking",
Thinking: fullReasoning,
})
}
if fullContent != "" {
contentBlocks = append(contentBlocks, model.AnthropicContentBlock{
Type: "text",
Text: fullContent,
})
}
for _, tc := range collectedToolCalls {
if tc.ID == "" {
tc.ID = fmt.Sprintf("toolu_%s", uuid.New().String()[:24])
}
contentBlocks = append(contentBlocks, model.AnthropicContentBlock{
Type: "tool_use",
ID: tc.ID,
Name: tc.Function.Name,
Input: json.RawMessage(tc.Function.Arguments),
})
}
if len(contentBlocks) == 0 {
contentBlocks = append(contentBlocks, model.AnthropicContentBlock{
Type: "text",
Text: "",
})
}
stopReason := "end_turn"
if len(collectedToolCalls) > 0 {
stopReason = "tool_use"
}
response := model.AnthropicResponse{
ID: messageID,
Type: "message",
Role: "assistant",
Content: contentBlocks,
Model: requestModel,
StopReason: stopReason,
Usage: model.AnthropicUsage{InputTokens: 100, OutputTokens: len(fullContent) / 4},
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// emitAnthropicTextDelta sends a text content delta, managing block lifecycle
func emitAnthropicTextDelta(w http.ResponseWriter, flusher http.Flusher, contentBlockIndex *int, inThinkingBlock, inTextBlock, inToolUseBlock *bool, hasContent *bool, text string) {
if text == "" {
return
}
// Close thinking block if transitioning to text
if *inThinkingBlock {
sendAnthropicSSE(w, flusher, "content_block_stop", model.AnthropicContentBlockStop{
Type: "content_block_stop", Index: *contentBlockIndex,
})
*contentBlockIndex++
*inThinkingBlock = false
}
// Close tool_use block if transitioning to text
if *inToolUseBlock {
sendAnthropicSSE(w, flusher, "content_block_stop", model.AnthropicContentBlockStop{
Type: "content_block_stop", Index: *contentBlockIndex,
})
*contentBlockIndex++
*inToolUseBlock = false
}
// Start text block if not in one
if !*inTextBlock {
sendAnthropicSSE(w, flusher, "content_block_start", model.AnthropicContentBlockStart{
Type: "content_block_start",
Index: *contentBlockIndex,
ContentBlock: model.AnthropicContentBlock{Type: "text", Text: ""},
})
*inTextBlock = true
}
*hasContent = true
sendAnthropicSSE(w, flusher, "content_block_delta", model.AnthropicContentBlockDelta{
Type: "content_block_delta",
Index: *contentBlockIndex,
Delta: model.AnthropicContentBlockDelta2{Type: "text_delta", Text: text},
})
}
// emitAnthropicToolUse sends a tool_use content block (start + input_json_delta + stop)
func emitAnthropicToolUse(w http.ResponseWriter, flusher http.Flusher, contentBlockIndex *int, inToolUseBlock *bool, tc model.ToolCall) {
// Close previous tool_use block if open
if *inToolUseBlock {
sendAnthropicSSE(w, flusher, "content_block_stop", model.AnthropicContentBlockStop{
Type: "content_block_stop", Index: *contentBlockIndex,
})
*contentBlockIndex++
}
toolID := tc.ID
if toolID == "" {
toolID = fmt.Sprintf("toolu_%s", uuid.New().String()[:24])
}
// Send content_block_start with tool_use
sendAnthropicSSE(w, flusher, "content_block_start", model.AnthropicContentBlockStart{
Type: "content_block_start",
Index: *contentBlockIndex,
ContentBlock: model.AnthropicContentBlock{
Type: "tool_use",
ID: toolID,
Name: tc.Function.Name,
Input: json.RawMessage("{}"),
},
})
*inToolUseBlock = true
// Send input as a single delta
sendAnthropicSSE(w, flusher, "content_block_delta", model.AnthropicContentBlockDelta{
Type: "content_block_delta",
Index: *contentBlockIndex,
Delta: model.AnthropicContentBlockDelta2{Type: "input_json_delta", PartialJSON: tc.Function.Arguments},
})
}
// sendAnthropicSSE writes an SSE event in Anthropic format: "event: <type>\ndata: <json>\n\n"
func sendAnthropicSSE(w http.ResponseWriter, flusher http.Flusher, eventType string, data interface{}) {
jsonData, err := json.Marshal(data)
if err != nil {
logger.LogError("[Anthropic-SSE] marshal error: %v", err)
return
}
fmt.Fprintf(w, "event: %s\ndata: %s\n\n", eventType, jsonData)
flusher.Flush()
}
// writeAnthropicError writes an error response in Anthropic format
func writeAnthropicError(w http.ResponseWriter, statusCode int, errorType, message string) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(statusCode)
json.NewEncoder(w).Encode(map[string]interface{}{
"type": "error",
"error": map[string]interface{}{
"type": errorType,
"message": message,
},
})
}