Spaces:
Paused
Paused
| package handler | |
| import ( | |
| "bufio" | |
| "encoding/json" | |
| "fmt" | |
| "io" | |
| "net/http" | |
| "strings" | |
| "github.com/google/uuid" | |
| "zai-proxy/internal/auth" | |
| "zai-proxy/internal/filter" | |
| "zai-proxy/internal/logger" | |
| "zai-proxy/internal/model" | |
| "zai-proxy/internal/upstream" | |
| ) | |
| // HandleMessages handles Anthropic Messages API requests (/v1/messages) | |
| func HandleMessages(w http.ResponseWriter, r *http.Request) { | |
| apiKey := r.Header.Get("x-api-key") | |
| if apiKey == "" { | |
| apiKey = strings.TrimPrefix(r.Header.Get("Authorization"), "Bearer ") | |
| } | |
| if apiKey == "" { | |
| writeAnthropicError(w, http.StatusUnauthorized, "authentication_error", "Missing API key") | |
| return | |
| } | |
| ok, reason := CheckAndTrack(apiKey, 0) | |
| if !ok { | |
| writeAnthropicError(w, http.StatusTooManyRequests, "rate_limit_error", reason) | |
| return | |
| } | |
| token := apiKey | |
| defer func() { TrackUsage(apiKey, 150) }() | |
| if token == "free" || strings.HasPrefix(token, "RWPX-") { | |
| anonymousToken, err := auth.GetAnonymousToken() | |
| if err != nil { | |
| logger.LogError("Failed to get anonymous token: %v", err) | |
| writeAnthropicError(w, http.StatusInternalServerError, "api_error", "Failed to get anonymous token") | |
| return | |
| } | |
| token = anonymousToken | |
| } | |
| var req model.AnthropicRequest | |
| if err := json.NewDecoder(r.Body).Decode(&req); err != nil { | |
| writeAnthropicError(w, http.StatusBadRequest, "invalid_request_error", "Invalid request body") | |
| return | |
| } | |
| if req.MaxTokens == 0 { | |
| req.MaxTokens = 8192 | |
| } | |
| // Determine if thinking is enabled | |
| thinkingEnabled := false | |
| if req.Thinking != nil && req.Thinking.Type == "enabled" { | |
| thinkingEnabled = true | |
| } | |
| // Resolve Claude model name to GLM model name | |
| resolvedModel, _ := model.ResolveClaudeModel(req.Model, thinkingEnabled) | |
| // Convert Anthropic messages to internal format | |
| messages, tools, toolChoice := convertAnthropicToInternal(req) | |
| resp, modelName, err := upstream.MakeUpstreamRequest(token, messages, resolvedModel, tools, toolChoice) | |
| if err != nil { | |
| logger.LogError("Upstream request failed: %v", err) | |
| writeAnthropicError(w, http.StatusBadGateway, "api_error", "Upstream error") | |
| return | |
| } | |
| defer resp.Body.Close() | |
| if resp.StatusCode != http.StatusOK { | |
| body, _ := io.ReadAll(resp.Body) | |
| bodyStr := string(body) | |
| if len(bodyStr) > 500 { | |
| bodyStr = bodyStr[:500] | |
| } | |
| logger.LogError("Upstream error: status=%d, body=%s", resp.StatusCode, bodyStr) | |
| writeAnthropicError(w, resp.StatusCode, "api_error", "Upstream error") | |
| return | |
| } | |
| messageID := fmt.Sprintf("msg_%s", uuid.New().String()[:24]) | |
| if req.Stream { | |
| handleAnthropicStream(w, resp.Body, messageID, modelName, req.Model, tools) | |
| } else { | |
| handleAnthropicNonStream(w, resp.Body, messageID, modelName, req.Model, tools) | |
| } | |
| } | |
| // convertAnthropicToInternal converts Anthropic request format to internal Message/Tool format | |
| func convertAnthropicToInternal(req model.AnthropicRequest) ([]model.Message, []model.Tool, interface{}) { | |
| var messages []model.Message | |
| // Convert system field to a system role message | |
| if req.System != nil { | |
| systemText := "" | |
| switch s := req.System.(type) { | |
| case string: | |
| systemText = s | |
| case []interface{}: | |
| // Array of content blocks | |
| for _, item := range s { | |
| if block, ok := item.(map[string]interface{}); ok { | |
| if t, ok := block["text"].(string); ok { | |
| systemText += t | |
| } | |
| } | |
| } | |
| } | |
| if systemText != "" { | |
| messages = append(messages, model.Message{ | |
| Role: "system", | |
| Content: systemText, | |
| }) | |
| } | |
| } | |
| // Convert Anthropic messages to internal format | |
| for _, msg := range req.Messages { | |
| switch msg.Role { | |
| case "user": | |
| text, blocks := msg.ParseContent() | |
| if len(blocks) == 0 { | |
| // Simple text message | |
| messages = append(messages, model.Message{ | |
| Role: "user", | |
| Content: text, | |
| }) | |
| } else { | |
| // Process content blocks - may contain tool_result | |
| for _, block := range blocks { | |
| switch block.Type { | |
| case "text": | |
| messages = append(messages, model.Message{ | |
| Role: "user", | |
| Content: block.Text, | |
| }) | |
| case "tool_result": | |
| // Convert tool_result to tool role message | |
| resultContent := "" | |
| switch c := block.Content.(type) { | |
| case string: | |
| resultContent = c | |
| case []interface{}: | |
| for _, item := range c { | |
| if part, ok := item.(map[string]interface{}); ok { | |
| if t, ok := part["text"].(string); ok { | |
| resultContent += t | |
| } | |
| } | |
| } | |
| } | |
| messages = append(messages, model.Message{ | |
| Role: "tool", | |
| Content: resultContent, | |
| ToolCallID: block.ToolUseID, | |
| }) | |
| case "image": | |
| // Skip image blocks for now | |
| } | |
| } | |
| } | |
| case "assistant": | |
| _, blocks := msg.ParseContent() | |
| if len(blocks) == 0 { | |
| // Simple text | |
| text, _ := msg.ParseContent() | |
| messages = append(messages, model.Message{ | |
| Role: "assistant", | |
| Content: text, | |
| }) | |
| } else { | |
| // Assistant message with content blocks | |
| var textContent string | |
| var toolCalls []model.ToolCall | |
| for _, block := range blocks { | |
| switch block.Type { | |
| case "text": | |
| textContent += block.Text | |
| case "thinking": | |
| // Skip thinking blocks in history - upstream doesn't need them | |
| case "tool_use": | |
| argsStr := "{}" | |
| if block.Input != nil { | |
| argsStr = string(block.Input) | |
| } | |
| toolCalls = append(toolCalls, model.ToolCall{ | |
| ID: block.ID, | |
| Type: "function", | |
| Function: model.FunctionCall{ | |
| Name: block.Name, | |
| Arguments: argsStr, | |
| }, | |
| }) | |
| } | |
| } | |
| messages = append(messages, model.Message{ | |
| Role: "assistant", | |
| Content: textContent, | |
| ToolCalls: toolCalls, | |
| }) | |
| } | |
| } | |
| } | |
| // Convert Anthropic tools to OpenAI format | |
| var tools []model.Tool | |
| for _, t := range req.Tools { | |
| tools = append(tools, model.Tool{ | |
| Type: "function", | |
| Function: model.ToolFunction{ | |
| Name: t.Name, | |
| Description: t.Description, | |
| Parameters: t.InputSchema, | |
| }, | |
| }) | |
| } | |
| // Convert tool_choice | |
| var toolChoice interface{} | |
| if req.ToolChoice != nil { | |
| switch tc := req.ToolChoice.(type) { | |
| case map[string]interface{}: | |
| tcType, _ := tc["type"].(string) | |
| switch tcType { | |
| case "auto": | |
| toolChoice = "auto" | |
| case "any": | |
| toolChoice = "required" | |
| case "none": | |
| toolChoice = "none" | |
| case "tool": | |
| if name, ok := tc["name"].(string); ok { | |
| toolChoice = map[string]interface{}{ | |
| "type": "function", | |
| "function": map[string]interface{}{"name": name}, | |
| } | |
| } | |
| } | |
| } | |
| } | |
| return messages, tools, toolChoice | |
| } | |
| // handleAnthropicStream processes upstream SSE and converts to Anthropic streaming format | |
| func handleAnthropicStream(w http.ResponseWriter, body io.ReadCloser, messageID, modelName, requestModel string, tools []model.Tool) { | |
| w.Header().Set("Content-Type", "text/event-stream") | |
| w.Header().Set("Cache-Control", "no-cache") | |
| w.Header().Set("Connection", "keep-alive") | |
| flusher, ok := w.(http.Flusher) | |
| if !ok { | |
| writeAnthropicError(w, http.StatusInternalServerError, "api_error", "Streaming not supported") | |
| return | |
| } | |
| // Send message_start | |
| msgStart := model.AnthropicMessageStart{ | |
| Type: "message_start", | |
| Message: model.AnthropicResponse{ | |
| ID: messageID, | |
| Type: "message", | |
| Role: "assistant", | |
| Content: []model.AnthropicContentBlock{}, | |
| Model: requestModel, | |
| StopReason: "", | |
| Usage: model.AnthropicUsage{InputTokens: 0, OutputTokens: 0}, | |
| }, | |
| } | |
| sendAnthropicSSE(w, flusher, "message_start", msgStart) | |
| scanner := bufio.NewScanner(body) | |
| scanner.Buffer(make([]byte, 1024*1024), 1024*1024) | |
| searchRefFilter := filter.NewSearchRefFilter() | |
| thinkingFilter := &filter.ThinkingFilter{} | |
| contentBlockIndex := 0 | |
| inThinkingBlock := false | |
| inTextBlock := false | |
| inToolUseBlock := false | |
| hasContent := false | |
| totalContentOutputLength := 0 | |
| hasToolCalls := false | |
| var collectedToolCalls []model.ToolCall | |
| promptToolBuffer := "" | |
| for scanner.Scan() { | |
| line := scanner.Text() | |
| logger.LogDebug("[Anthropic-Upstream] %s", line) | |
| if !strings.HasPrefix(line, "data: ") { | |
| continue | |
| } | |
| payload := strings.TrimPrefix(line, "data: ") | |
| if payload == "[DONE]" { | |
| break | |
| } | |
| var upstreamData model.UpstreamData | |
| if err := json.Unmarshal([]byte(payload), &upstreamData); err != nil { | |
| continue | |
| } | |
| if upstreamData.Data.Phase == "done" { | |
| break | |
| } | |
| // Handle thinking phase | |
| if upstreamData.Data.Phase == "thinking" && upstreamData.Data.DeltaContent != "" { | |
| isNewThinkingRound := false | |
| if thinkingFilter.LastPhase != "" && thinkingFilter.LastPhase != "thinking" { | |
| thinkingFilter.ResetForNewRound() | |
| thinkingFilter.ThinkingRoundCount++ | |
| isNewThinkingRound = true | |
| } | |
| thinkingFilter.LastPhase = "thinking" | |
| reasoningContent := thinkingFilter.ProcessThinking(upstreamData.Data.DeltaContent) | |
| if isNewThinkingRound && thinkingFilter.ThinkingRoundCount > 1 && reasoningContent != "" { | |
| reasoningContent = "\n\n" + reasoningContent | |
| } | |
| if reasoningContent != "" { | |
| thinkingFilter.LastOutputChunk = reasoningContent | |
| reasoningContent = searchRefFilter.Process(reasoningContent) | |
| if reasoningContent != "" { | |
| // Close previous non-thinking block if open | |
| if inTextBlock { | |
| sendAnthropicSSE(w, flusher, "content_block_stop", model.AnthropicContentBlockStop{ | |
| Type: "content_block_stop", Index: contentBlockIndex, | |
| }) | |
| contentBlockIndex++ | |
| inTextBlock = false | |
| } | |
| // Start thinking block if not already in one | |
| if !inThinkingBlock { | |
| sendAnthropicSSE(w, flusher, "content_block_start", model.AnthropicContentBlockStart{ | |
| Type: "content_block_start", | |
| Index: contentBlockIndex, | |
| ContentBlock: model.AnthropicContentBlock{Type: "thinking", Thinking: ""}, | |
| }) | |
| inThinkingBlock = true | |
| } | |
| hasContent = true | |
| sendAnthropicSSE(w, flusher, "content_block_delta", model.AnthropicContentBlockDelta{ | |
| Type: "content_block_delta", | |
| Index: contentBlockIndex, | |
| Delta: model.AnthropicContentBlockDelta2{Type: "thinking_delta", Thinking: reasoningContent}, | |
| }) | |
| } | |
| } | |
| continue | |
| } | |
| if upstreamData.Data.Phase != "" { | |
| thinkingFilter.LastPhase = upstreamData.Data.Phase | |
| } | |
| // Filter search results, image searches, mcp, etc. | |
| editContent := upstreamData.GetEditContent() | |
| if editContent != "" && filter.IsSearchResultContent(editContent) { | |
| if results := filter.ParseSearchResults(editContent); len(results) > 0 { | |
| searchRefFilter.AddSearchResults(results) | |
| } | |
| continue | |
| } | |
| if editContent != "" && strings.Contains(editContent, `"search_image"`) { | |
| textBeforeBlock := filter.ExtractTextBeforeGlmBlock(editContent) | |
| if textBeforeBlock != "" { | |
| emitAnthropicTextDelta(w, flusher, &contentBlockIndex, &inThinkingBlock, &inTextBlock, &inToolUseBlock, &hasContent, searchRefFilter.Process(textBeforeBlock)) | |
| } | |
| continue | |
| } | |
| if editContent != "" && strings.Contains(editContent, `"mcp"`) { | |
| textBeforeBlock := filter.ExtractTextBeforeGlmBlock(editContent) | |
| if textBeforeBlock != "" { | |
| emitAnthropicTextDelta(w, flusher, &contentBlockIndex, &inThinkingBlock, &inTextBlock, &inToolUseBlock, &hasContent, searchRefFilter.Process(textBeforeBlock)) | |
| } | |
| continue | |
| } | |
| if editContent != "" && filter.IsSearchToolCall(editContent, upstreamData.Data.Phase) { | |
| continue | |
| } | |
| // Handle function tool calls | |
| if len(tools) > 0 && editContent != "" && filter.IsFunctionToolCall(editContent, upstreamData.Data.Phase) { | |
| if toolCalls := filter.ParseFunctionToolCalls(editContent); len(toolCalls) > 0 { | |
| for i := range toolCalls { | |
| if toolCalls[i].ID == "" { | |
| toolCalls[i].ID = fmt.Sprintf("toolu_%s", uuid.New().String()[:24]) | |
| } | |
| } | |
| collectedToolCalls = toolCalls | |
| hasToolCalls = true | |
| // Close thinking/text blocks | |
| if inThinkingBlock { | |
| sendAnthropicSSE(w, flusher, "content_block_stop", model.AnthropicContentBlockStop{ | |
| Type: "content_block_stop", Index: contentBlockIndex, | |
| }) | |
| contentBlockIndex++ | |
| inThinkingBlock = false | |
| } | |
| if inTextBlock { | |
| sendAnthropicSSE(w, flusher, "content_block_stop", model.AnthropicContentBlockStop{ | |
| Type: "content_block_stop", Index: contentBlockIndex, | |
| }) | |
| contentBlockIndex++ | |
| inTextBlock = false | |
| } | |
| for _, tc := range toolCalls { | |
| emitAnthropicToolUse(w, flusher, &contentBlockIndex, &inToolUseBlock, tc) | |
| } | |
| } | |
| continue | |
| } | |
| // Flush thinking filter | |
| if thinkingRemaining := thinkingFilter.Flush(); thinkingRemaining != "" { | |
| thinkingFilter.LastOutputChunk = thinkingRemaining | |
| processedRemaining := searchRefFilter.Process(thinkingRemaining) | |
| if processedRemaining != "" { | |
| if !inThinkingBlock { | |
| // Close text block if open | |
| if inTextBlock { | |
| sendAnthropicSSE(w, flusher, "content_block_stop", model.AnthropicContentBlockStop{ | |
| Type: "content_block_stop", Index: contentBlockIndex, | |
| }) | |
| contentBlockIndex++ | |
| inTextBlock = false | |
| } | |
| sendAnthropicSSE(w, flusher, "content_block_start", model.AnthropicContentBlockStart{ | |
| Type: "content_block_start", | |
| Index: contentBlockIndex, | |
| ContentBlock: model.AnthropicContentBlock{Type: "thinking", Thinking: ""}, | |
| }) | |
| inThinkingBlock = true | |
| } | |
| hasContent = true | |
| sendAnthropicSSE(w, flusher, "content_block_delta", model.AnthropicContentBlockDelta{ | |
| Type: "content_block_delta", | |
| Index: contentBlockIndex, | |
| Delta: model.AnthropicContentBlockDelta2{Type: "thinking_delta", Thinking: processedRemaining}, | |
| }) | |
| } | |
| } | |
| // Extract content | |
| content := "" | |
| if upstreamData.Data.Phase == "answer" && upstreamData.Data.DeltaContent != "" { | |
| content = upstreamData.Data.DeltaContent | |
| } else if upstreamData.Data.Phase == "answer" && editContent != "" { | |
| if strings.Contains(editContent, "</details>") { | |
| if idx := strings.Index(editContent, "</details>"); idx != -1 { | |
| afterDetails := editContent[idx+len("</details>"):] | |
| if strings.HasPrefix(afterDetails, "\n") { | |
| content = afterDetails[1:] | |
| } else { | |
| content = afterDetails | |
| } | |
| totalContentOutputLength = len([]rune(content)) | |
| } | |
| } | |
| } else if (upstreamData.Data.Phase == "other" || upstreamData.Data.Phase == "tool_call") && editContent != "" { | |
| fullContentRunes := []rune(editContent) | |
| if len(fullContentRunes) > totalContentOutputLength { | |
| content = string(fullContentRunes[totalContentOutputLength:]) | |
| totalContentOutputLength = len(fullContentRunes) | |
| } else { | |
| content = editContent | |
| } | |
| } | |
| if content == "" { | |
| continue | |
| } | |
| content = searchRefFilter.Process(content) | |
| if content == "" { | |
| continue | |
| } | |
| hasContent = true | |
| if upstreamData.Data.Phase == "answer" && upstreamData.Data.DeltaContent != "" { | |
| totalContentOutputLength += len([]rune(content)) | |
| } | |
| // Prompt tool extraction: buffer answer text for <tool_call> detection | |
| if len(tools) > 0 { | |
| promptToolBuffer += content | |
| for { | |
| openIdx := strings.Index(promptToolBuffer, "<tool_call>") | |
| if openIdx == -1 { | |
| break | |
| } | |
| if openIdx > 0 { | |
| safeContent := promptToolBuffer[:openIdx] | |
| promptToolBuffer = promptToolBuffer[openIdx:] | |
| if safeContent != "" { | |
| emitAnthropicTextDelta(w, flusher, &contentBlockIndex, &inThinkingBlock, &inTextBlock, &inToolUseBlock, &hasContent, safeContent) | |
| } | |
| } | |
| afterOpen := promptToolBuffer[len("<tool_call>"):] | |
| closeIdx := strings.Index(promptToolBuffer, "</tool_call>") | |
| thinkCloseIdx := strings.Index(afterOpen, "</think>") | |
| nextOpenIdx := strings.Index(afterOpen, "<tool_call>") | |
| blockEnd := -1 | |
| if closeIdx != -1 { | |
| blockEnd = closeIdx + len("</tool_call>") | |
| } | |
| if thinkCloseIdx != -1 { | |
| candidate := len("<tool_call>") + thinkCloseIdx + len("</think>") | |
| if blockEnd == -1 || candidate < blockEnd { | |
| blockEnd = candidate | |
| } | |
| } | |
| if nextOpenIdx != -1 { | |
| candidate := len("<tool_call>") + nextOpenIdx | |
| if blockEnd == -1 || candidate < blockEnd { | |
| blockEnd = candidate | |
| } | |
| } | |
| if blockEnd == -1 { | |
| break | |
| } | |
| block := promptToolBuffer[:blockEnd] | |
| promptToolBuffer = promptToolBuffer[blockEnd:] | |
| _, ptToolCalls := filter.ExtractPromptToolCalls(block) | |
| if len(ptToolCalls) > 0 { | |
| collectedToolCalls = append(collectedToolCalls, ptToolCalls...) | |
| hasToolCalls = true | |
| // Close thinking/text blocks before emitting tool use | |
| if inThinkingBlock { | |
| sendAnthropicSSE(w, flusher, "content_block_stop", model.AnthropicContentBlockStop{ | |
| Type: "content_block_stop", Index: contentBlockIndex, | |
| }) | |
| contentBlockIndex++ | |
| inThinkingBlock = false | |
| } | |
| if inTextBlock { | |
| sendAnthropicSSE(w, flusher, "content_block_stop", model.AnthropicContentBlockStop{ | |
| Type: "content_block_stop", Index: contentBlockIndex, | |
| }) | |
| contentBlockIndex++ | |
| inTextBlock = false | |
| } | |
| for _, tc := range ptToolCalls { | |
| tc.ID = fmt.Sprintf("toolu_%s", uuid.New().String()[:24]) | |
| emitAnthropicToolUse(w, flusher, &contentBlockIndex, &inToolUseBlock, tc) | |
| } | |
| } | |
| } | |
| continue | |
| } | |
| emitAnthropicTextDelta(w, flusher, &contentBlockIndex, &inThinkingBlock, &inTextBlock, &inToolUseBlock, &hasContent, content) | |
| } | |
| if err := scanner.Err(); err != nil { | |
| logger.LogError("[Anthropic-Upstream] scanner error: %v", err) | |
| } | |
| // Flush remaining prompt tool buffer | |
| if promptToolBuffer != "" { | |
| cleanContent, ptToolCalls := filter.ExtractPromptToolCalls(promptToolBuffer) | |
| if len(ptToolCalls) > 0 { | |
| collectedToolCalls = append(collectedToolCalls, ptToolCalls...) | |
| hasToolCalls = true | |
| if inThinkingBlock { | |
| sendAnthropicSSE(w, flusher, "content_block_stop", model.AnthropicContentBlockStop{ | |
| Type: "content_block_stop", Index: contentBlockIndex, | |
| }) | |
| contentBlockIndex++ | |
| inThinkingBlock = false | |
| } | |
| if inTextBlock { | |
| sendAnthropicSSE(w, flusher, "content_block_stop", model.AnthropicContentBlockStop{ | |
| Type: "content_block_stop", Index: contentBlockIndex, | |
| }) | |
| contentBlockIndex++ | |
| inTextBlock = false | |
| } | |
| for _, tc := range ptToolCalls { | |
| tc.ID = fmt.Sprintf("toolu_%s", uuid.New().String()[:24]) | |
| emitAnthropicToolUse(w, flusher, &contentBlockIndex, &inToolUseBlock, tc) | |
| } | |
| } | |
| if cleanContent != "" { | |
| emitAnthropicTextDelta(w, flusher, &contentBlockIndex, &inThinkingBlock, &inTextBlock, &inToolUseBlock, &hasContent, cleanContent) | |
| } | |
| promptToolBuffer = "" | |
| } | |
| // Flush search ref filter | |
| if remaining := searchRefFilter.Flush(); remaining != "" { | |
| emitAnthropicTextDelta(w, flusher, &contentBlockIndex, &inThinkingBlock, &inTextBlock, &inToolUseBlock, &hasContent, remaining) | |
| } | |
| if !hasContent && !hasToolCalls { | |
| logger.LogError("Anthropic stream response 200 but no content received") | |
| } | |
| // Close any open blocks | |
| if inThinkingBlock { | |
| sendAnthropicSSE(w, flusher, "content_block_stop", model.AnthropicContentBlockStop{ | |
| Type: "content_block_stop", Index: contentBlockIndex, | |
| }) | |
| contentBlockIndex++ | |
| inThinkingBlock = false | |
| } | |
| if inTextBlock { | |
| sendAnthropicSSE(w, flusher, "content_block_stop", model.AnthropicContentBlockStop{ | |
| Type: "content_block_stop", Index: contentBlockIndex, | |
| }) | |
| contentBlockIndex++ | |
| inTextBlock = false | |
| } | |
| if inToolUseBlock { | |
| sendAnthropicSSE(w, flusher, "content_block_stop", model.AnthropicContentBlockStop{ | |
| Type: "content_block_stop", Index: contentBlockIndex, | |
| }) | |
| contentBlockIndex++ | |
| inToolUseBlock = false | |
| } | |
| // Determine stop reason | |
| stopReason := "end_turn" | |
| if hasToolCalls { | |
| stopReason = "tool_use" | |
| } | |
| // Send message_delta with stop_reason and usage | |
| sendAnthropicSSE(w, flusher, "message_delta", model.AnthropicMessageDelta{ | |
| Type: "message_delta", | |
| Delta: struct { | |
| StopReason string `json:"stop_reason"` | |
| StopSequence *string `json:"stop_sequence"` | |
| }{ | |
| StopReason: stopReason, | |
| }, | |
| Usage: model.AnthropicUsage{OutputTokens: contentBlockIndex * 100}, // Rough estimate | |
| }) | |
| // Send message_stop | |
| sendAnthropicSSE(w, flusher, "message_stop", model.AnthropicMessageStop{Type: "message_stop"}) | |
| // Suppress unused variable warnings | |
| _ = inThinkingBlock | |
| _ = inTextBlock | |
| _ = inToolUseBlock | |
| _ = contentBlockIndex | |
| } | |
| // handleAnthropicNonStream collects all upstream data and returns an Anthropic response | |
| func handleAnthropicNonStream(w http.ResponseWriter, body io.ReadCloser, messageID, modelName, requestModel string, tools []model.Tool) { | |
| scanner := bufio.NewScanner(body) | |
| scanner.Buffer(make([]byte, 1024*1024), 1024*1024) | |
| var chunks []string | |
| var reasoningChunks []string | |
| thinkingFilter := &filter.ThinkingFilter{} | |
| searchRefFilter := filter.NewSearchRefFilter() | |
| hasThinking := false | |
| var collectedToolCalls []model.ToolCall | |
| for scanner.Scan() { | |
| line := scanner.Text() | |
| if !strings.HasPrefix(line, "data: ") { | |
| continue | |
| } | |
| payload := strings.TrimPrefix(line, "data: ") | |
| if payload == "[DONE]" { | |
| break | |
| } | |
| var upstreamData model.UpstreamData | |
| if err := json.Unmarshal([]byte(payload), &upstreamData); err != nil { | |
| continue | |
| } | |
| if upstreamData.Data.Phase == "done" { | |
| break | |
| } | |
| if upstreamData.Data.Phase == "thinking" && upstreamData.Data.DeltaContent != "" { | |
| if thinkingFilter.LastPhase != "" && thinkingFilter.LastPhase != "thinking" { | |
| thinkingFilter.ResetForNewRound() | |
| thinkingFilter.ThinkingRoundCount++ | |
| if thinkingFilter.ThinkingRoundCount > 1 { | |
| reasoningChunks = append(reasoningChunks, "\n\n") | |
| } | |
| } | |
| thinkingFilter.LastPhase = "thinking" | |
| hasThinking = true | |
| reasoningContent := thinkingFilter.ProcessThinking(upstreamData.Data.DeltaContent) | |
| if reasoningContent != "" { | |
| thinkingFilter.LastOutputChunk = reasoningContent | |
| reasoningChunks = append(reasoningChunks, reasoningContent) | |
| } | |
| continue | |
| } | |
| if upstreamData.Data.Phase != "" { | |
| thinkingFilter.LastPhase = upstreamData.Data.Phase | |
| } | |
| editContent := upstreamData.GetEditContent() | |
| if editContent != "" && filter.IsSearchResultContent(editContent) { | |
| if results := filter.ParseSearchResults(editContent); len(results) > 0 { | |
| searchRefFilter.AddSearchResults(results) | |
| } | |
| continue | |
| } | |
| if editContent != "" && strings.Contains(editContent, `"search_image"`) { | |
| textBeforeBlock := filter.ExtractTextBeforeGlmBlock(editContent) | |
| if textBeforeBlock != "" { | |
| chunks = append(chunks, textBeforeBlock) | |
| } | |
| continue | |
| } | |
| if editContent != "" && strings.Contains(editContent, `"mcp"`) { | |
| textBeforeBlock := filter.ExtractTextBeforeGlmBlock(editContent) | |
| if textBeforeBlock != "" { | |
| chunks = append(chunks, textBeforeBlock) | |
| } | |
| continue | |
| } | |
| if editContent != "" && filter.IsSearchToolCall(editContent, upstreamData.Data.Phase) { | |
| continue | |
| } | |
| if len(tools) > 0 && editContent != "" && filter.IsFunctionToolCall(editContent, upstreamData.Data.Phase) { | |
| if toolCalls := filter.ParseFunctionToolCalls(editContent); len(toolCalls) > 0 { | |
| for i := range toolCalls { | |
| if toolCalls[i].ID == "" { | |
| toolCalls[i].ID = fmt.Sprintf("toolu_%s", uuid.New().String()[:24]) | |
| } | |
| } | |
| collectedToolCalls = toolCalls | |
| } | |
| continue | |
| } | |
| content := "" | |
| if upstreamData.Data.Phase == "answer" && upstreamData.Data.DeltaContent != "" { | |
| content = upstreamData.Data.DeltaContent | |
| } else if upstreamData.Data.Phase == "answer" && editContent != "" { | |
| if strings.Contains(editContent, "</details>") { | |
| reasoningContent := thinkingFilter.ExtractIncrementalThinking(editContent) | |
| if reasoningContent != "" { | |
| reasoningChunks = append(reasoningChunks, reasoningContent) | |
| } | |
| if idx := strings.Index(editContent, "</details>"); idx != -1 { | |
| afterDetails := editContent[idx+len("</details>"):] | |
| if strings.HasPrefix(afterDetails, "\n") { | |
| content = afterDetails[1:] | |
| } else { | |
| content = afterDetails | |
| } | |
| } | |
| } | |
| } else if (upstreamData.Data.Phase == "other" || upstreamData.Data.Phase == "tool_call") && editContent != "" { | |
| content = editContent | |
| } | |
| if content != "" { | |
| chunks = append(chunks, content) | |
| } | |
| } | |
| fullContent := strings.Join(chunks, "") | |
| fullContent = searchRefFilter.Process(fullContent) + searchRefFilter.Flush() | |
| fullReasoning := strings.Join(reasoningChunks, "") | |
| fullReasoning = searchRefFilter.Process(fullReasoning) + searchRefFilter.Flush() | |
| // Extract prompt tool calls from answer text | |
| if len(tools) > 0 && len(collectedToolCalls) == 0 { | |
| cleanContent, promptToolCalls := filter.ExtractPromptToolCalls(fullContent) | |
| if len(promptToolCalls) > 0 { | |
| collectedToolCalls = promptToolCalls | |
| fullContent = cleanContent | |
| } | |
| } | |
| // Build response content blocks | |
| var contentBlocks []model.AnthropicContentBlock | |
| if hasThinking && fullReasoning != "" { | |
| contentBlocks = append(contentBlocks, model.AnthropicContentBlock{ | |
| Type: "thinking", | |
| Thinking: fullReasoning, | |
| }) | |
| } | |
| if fullContent != "" { | |
| contentBlocks = append(contentBlocks, model.AnthropicContentBlock{ | |
| Type: "text", | |
| Text: fullContent, | |
| }) | |
| } | |
| for _, tc := range collectedToolCalls { | |
| if tc.ID == "" { | |
| tc.ID = fmt.Sprintf("toolu_%s", uuid.New().String()[:24]) | |
| } | |
| contentBlocks = append(contentBlocks, model.AnthropicContentBlock{ | |
| Type: "tool_use", | |
| ID: tc.ID, | |
| Name: tc.Function.Name, | |
| Input: json.RawMessage(tc.Function.Arguments), | |
| }) | |
| } | |
| if len(contentBlocks) == 0 { | |
| contentBlocks = append(contentBlocks, model.AnthropicContentBlock{ | |
| Type: "text", | |
| Text: "", | |
| }) | |
| } | |
| stopReason := "end_turn" | |
| if len(collectedToolCalls) > 0 { | |
| stopReason = "tool_use" | |
| } | |
| response := model.AnthropicResponse{ | |
| ID: messageID, | |
| Type: "message", | |
| Role: "assistant", | |
| Content: contentBlocks, | |
| Model: requestModel, | |
| StopReason: stopReason, | |
| Usage: model.AnthropicUsage{InputTokens: 100, OutputTokens: len(fullContent) / 4}, | |
| } | |
| w.Header().Set("Content-Type", "application/json") | |
| json.NewEncoder(w).Encode(response) | |
| } | |
| // emitAnthropicTextDelta sends a text content delta, managing block lifecycle | |
| func emitAnthropicTextDelta(w http.ResponseWriter, flusher http.Flusher, contentBlockIndex *int, inThinkingBlock, inTextBlock, inToolUseBlock *bool, hasContent *bool, text string) { | |
| if text == "" { | |
| return | |
| } | |
| // Close thinking block if transitioning to text | |
| if *inThinkingBlock { | |
| sendAnthropicSSE(w, flusher, "content_block_stop", model.AnthropicContentBlockStop{ | |
| Type: "content_block_stop", Index: *contentBlockIndex, | |
| }) | |
| *contentBlockIndex++ | |
| *inThinkingBlock = false | |
| } | |
| // Close tool_use block if transitioning to text | |
| if *inToolUseBlock { | |
| sendAnthropicSSE(w, flusher, "content_block_stop", model.AnthropicContentBlockStop{ | |
| Type: "content_block_stop", Index: *contentBlockIndex, | |
| }) | |
| *contentBlockIndex++ | |
| *inToolUseBlock = false | |
| } | |
| // Start text block if not in one | |
| if !*inTextBlock { | |
| sendAnthropicSSE(w, flusher, "content_block_start", model.AnthropicContentBlockStart{ | |
| Type: "content_block_start", | |
| Index: *contentBlockIndex, | |
| ContentBlock: model.AnthropicContentBlock{Type: "text", Text: ""}, | |
| }) | |
| *inTextBlock = true | |
| } | |
| *hasContent = true | |
| sendAnthropicSSE(w, flusher, "content_block_delta", model.AnthropicContentBlockDelta{ | |
| Type: "content_block_delta", | |
| Index: *contentBlockIndex, | |
| Delta: model.AnthropicContentBlockDelta2{Type: "text_delta", Text: text}, | |
| }) | |
| } | |
| // emitAnthropicToolUse sends a tool_use content block (start + input_json_delta + stop) | |
| func emitAnthropicToolUse(w http.ResponseWriter, flusher http.Flusher, contentBlockIndex *int, inToolUseBlock *bool, tc model.ToolCall) { | |
| // Close previous tool_use block if open | |
| if *inToolUseBlock { | |
| sendAnthropicSSE(w, flusher, "content_block_stop", model.AnthropicContentBlockStop{ | |
| Type: "content_block_stop", Index: *contentBlockIndex, | |
| }) | |
| *contentBlockIndex++ | |
| } | |
| toolID := tc.ID | |
| if toolID == "" { | |
| toolID = fmt.Sprintf("toolu_%s", uuid.New().String()[:24]) | |
| } | |
| // Send content_block_start with tool_use | |
| sendAnthropicSSE(w, flusher, "content_block_start", model.AnthropicContentBlockStart{ | |
| Type: "content_block_start", | |
| Index: *contentBlockIndex, | |
| ContentBlock: model.AnthropicContentBlock{ | |
| Type: "tool_use", | |
| ID: toolID, | |
| Name: tc.Function.Name, | |
| Input: json.RawMessage("{}"), | |
| }, | |
| }) | |
| *inToolUseBlock = true | |
| // Send input as a single delta | |
| sendAnthropicSSE(w, flusher, "content_block_delta", model.AnthropicContentBlockDelta{ | |
| Type: "content_block_delta", | |
| Index: *contentBlockIndex, | |
| Delta: model.AnthropicContentBlockDelta2{Type: "input_json_delta", PartialJSON: tc.Function.Arguments}, | |
| }) | |
| } | |
| // sendAnthropicSSE writes an SSE event in Anthropic format: "event: <type>\ndata: <json>\n\n" | |
| func sendAnthropicSSE(w http.ResponseWriter, flusher http.Flusher, eventType string, data interface{}) { | |
| jsonData, err := json.Marshal(data) | |
| if err != nil { | |
| logger.LogError("[Anthropic-SSE] marshal error: %v", err) | |
| return | |
| } | |
| fmt.Fprintf(w, "event: %s\ndata: %s\n\n", eventType, jsonData) | |
| flusher.Flush() | |
| } | |
| // writeAnthropicError writes an error response in Anthropic format | |
| func writeAnthropicError(w http.ResponseWriter, statusCode int, errorType, message string) { | |
| w.Header().Set("Content-Type", "application/json") | |
| w.WriteHeader(statusCode) | |
| json.NewEncoder(w).Encode(map[string]interface{}{ | |
| "type": "error", | |
| "error": map[string]interface{}{ | |
| "type": errorType, | |
| "message": message, | |
| }, | |
| }) | |
| } | |