| package chat |
|
|
| import ( |
| "context" |
| "io" |
| "net/http" |
| "strings" |
| "time" |
|
|
| "ds2api/internal/assistantturn" |
| "ds2api/internal/auth" |
| "ds2api/internal/config" |
| dsprotocol "ds2api/internal/deepseek/protocol" |
| openaifmt "ds2api/internal/format/openai" |
| "ds2api/internal/promptcompat" |
| "ds2api/internal/sse" |
| streamengine "ds2api/internal/stream" |
| ) |
|
|
| type chatNonStreamResult struct { |
| rawThinking string |
| rawText string |
| thinking string |
| toolDetectionThinking string |
| text string |
| contentFilter bool |
| detectedCalls int |
| body map[string]any |
| finishReason string |
| responseMessageID int |
| outputError *assistantturn.OutputError |
| } |
|
|
| func (r chatNonStreamResult) historyText() string { |
| return historyTextForArchive(r.rawText, r.text) |
| } |
|
|
| func (r chatNonStreamResult) historyThinking() string { |
| return historyThinkingForArchive(r.rawThinking, r.toolDetectionThinking, r.thinking) |
| } |
|
|
| func (h *Handler) handleNonStreamWithRetry(w http.ResponseWriter, ctx context.Context, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, completionID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, historySession *chatHistorySession) { |
| attempts := 0 |
| currentResp := resp |
| usagePrompt := finalPrompt |
| accumulatedThinking := "" |
| accumulatedRawThinking := "" |
| accumulatedToolDetectionThinking := "" |
| for { |
| result, ok := h.collectChatNonStreamAttempt(w, currentResp, completionID, model, usagePrompt, thinkingEnabled, searchEnabled, toolNames, toolsRaw) |
| if !ok { |
| return |
| } |
| accumulatedThinking += sse.TrimContinuationOverlap(accumulatedThinking, result.thinking) |
| accumulatedRawThinking += sse.TrimContinuationOverlap(accumulatedRawThinking, result.rawThinking) |
| accumulatedToolDetectionThinking += sse.TrimContinuationOverlap(accumulatedToolDetectionThinking, result.toolDetectionThinking) |
| result.thinking = accumulatedThinking |
| result.rawThinking = accumulatedRawThinking |
| result.toolDetectionThinking = accumulatedToolDetectionThinking |
| detected := detectAssistantToolCalls(result.rawText, result.text, result.rawThinking, result.toolDetectionThinking, toolNames) |
| result.detectedCalls = len(detected.Calls) |
| result.body = openaifmt.BuildChatCompletionWithToolCalls(completionID, model, usagePrompt, result.thinking, result.text, detected.Calls, toolsRaw) |
| addRefFileTokensToUsage(result.body, refFileTokens) |
| result.finishReason = chatFinishReason(result.body) |
| if !shouldRetryChatNonStream(result, attempts) { |
| h.finishChatNonStreamResult(w, result, attempts, usagePrompt, refFileTokens, historySession) |
| return |
| } |
|
|
| attempts++ |
| config.Logger.Info("[openai_empty_retry] attempting synthetic retry", "surface", "chat.completions", "stream", false, "retry_attempt", attempts, "parent_message_id", result.responseMessageID) |
| retryPow, powErr := h.DS.GetPow(ctx, a, 3) |
| if powErr != nil { |
| config.Logger.Warn("[openai_empty_retry] retry PoW fetch failed, falling back to original PoW", "surface", "chat.completions", "stream", false, "retry_attempt", attempts, "error", powErr) |
| retryPow = pow |
| } |
| retryPayload := clonePayloadForEmptyOutputRetry(payload, result.responseMessageID) |
| nextResp, err := h.DS.CallCompletion(ctx, a, retryPayload, retryPow, 3) |
| if err != nil { |
| if historySession != nil { |
| historySession.error(http.StatusInternalServerError, "Failed to get completion.", "error", result.historyThinking(), result.historyText()) |
| } |
| writeOpenAIError(w, http.StatusInternalServerError, "Failed to get completion.") |
| config.Logger.Warn("[openai_empty_retry] retry request failed", "surface", "chat.completions", "stream", false, "retry_attempt", attempts, "error", err) |
| return |
| } |
| usagePrompt = usagePromptWithEmptyOutputRetry(usagePrompt, attempts) |
| currentResp = nextResp |
| } |
| } |
|
|
| func (h *Handler) collectChatNonStreamAttempt(w http.ResponseWriter, resp *http.Response, completionID, model, usagePrompt string, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any) (chatNonStreamResult, bool) { |
| if resp.StatusCode != http.StatusOK { |
| defer func() { _ = resp.Body.Close() }() |
| body, _ := io.ReadAll(resp.Body) |
| writeOpenAIError(w, resp.StatusCode, string(body)) |
| return chatNonStreamResult{}, false |
| } |
| result := sse.CollectStream(resp, thinkingEnabled, true) |
| turn := assistantturn.BuildTurnFromCollected(result, assistantturn.BuildOptions{ |
| Model: model, |
| Prompt: usagePrompt, |
| SearchEnabled: searchEnabled, |
| ToolNames: toolNames, |
| ToolsRaw: toolsRaw, |
| }) |
| respBody := openaifmt.BuildChatCompletionWithToolCalls(completionID, model, usagePrompt, turn.Thinking, turn.Text, turn.ToolCalls, toolsRaw) |
| return chatNonStreamResult{ |
| rawThinking: result.Thinking, |
| rawText: result.Text, |
| thinking: turn.Thinking, |
| toolDetectionThinking: result.ToolDetectionThinking, |
| text: turn.Text, |
| contentFilter: result.ContentFilter, |
| detectedCalls: len(turn.ToolCalls), |
| body: respBody, |
| finishReason: chatFinishReason(respBody), |
| responseMessageID: result.ResponseMessageID, |
| outputError: turn.Error, |
| }, true |
| } |
|
|
| func (h *Handler) finishChatNonStreamResult(w http.ResponseWriter, result chatNonStreamResult, attempts int, usagePrompt string, refFileTokens int, historySession *chatHistorySession) { |
| if result.detectedCalls == 0 && strings.TrimSpace(result.text) == "" { |
| status, message, code := upstreamEmptyOutputDetail(result.contentFilter, result.text, result.thinking) |
| if result.outputError != nil { |
| status, message, code = result.outputError.Status, result.outputError.Message, result.outputError.Code |
| } |
| if historySession != nil { |
| historySession.error(status, message, code, result.historyThinking(), result.historyText()) |
| } |
| writeOpenAIErrorWithCode(w, status, message, code) |
| config.Logger.Info("[openai_empty_retry] terminal empty output", "surface", "chat.completions", "stream", false, "retry_attempts", attempts, "success_source", "none", "content_filter", result.contentFilter) |
| return |
| } |
| if historySession != nil { |
| historySession.success(http.StatusOK, result.historyThinking(), result.historyText(), result.finishReason, openaifmt.BuildChatUsageForModel("", usagePrompt, result.thinking, result.text, refFileTokens)) |
| } |
| writeJSON(w, http.StatusOK, result.body) |
| source := "first_attempt" |
| if attempts > 0 { |
| source = "synthetic_retry" |
| } |
| config.Logger.Info("[openai_empty_retry] completed", "surface", "chat.completions", "stream", false, "retry_attempts", attempts, "success_source", source) |
| } |
|
|
| func chatFinishReason(respBody map[string]any) string { |
| if choices, ok := respBody["choices"].([]map[string]any); ok && len(choices) > 0 { |
| if fr, _ := choices[0]["finish_reason"].(string); strings.TrimSpace(fr) != "" { |
| return fr |
| } |
| } |
| return "stop" |
| } |
|
|
| func shouldRetryChatNonStream(result chatNonStreamResult, attempts int) bool { |
| return emptyOutputRetryEnabled() && |
| attempts < emptyOutputRetryMaxAttempts() && |
| !result.contentFilter && |
| result.detectedCalls == 0 && |
| strings.TrimSpace(result.text) == "" |
| } |
|
|
| func (h *Handler) handleStreamWithRetry(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, completionID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, toolChoice promptcompat.ToolChoicePolicy, historySession *chatHistorySession) { |
| streamRuntime, initialType, ok := h.prepareChatStreamRuntime(w, resp, completionID, model, finalPrompt, refFileTokens, thinkingEnabled, searchEnabled, toolNames, toolsRaw, toolChoice, historySession) |
| if !ok { |
| return |
| } |
| attempts := 0 |
| currentResp := resp |
| for { |
| terminalWritten, retryable := h.consumeChatStreamAttempt(r, currentResp, streamRuntime, initialType, thinkingEnabled, historySession, attempts < emptyOutputRetryMaxAttempts()) |
| if terminalWritten { |
| logChatStreamTerminal(streamRuntime, attempts) |
| return |
| } |
| if !retryable || !emptyOutputRetryEnabled() || attempts >= emptyOutputRetryMaxAttempts() { |
| streamRuntime.finalize("stop", false) |
| recordChatStreamHistory(streamRuntime, historySession) |
| config.Logger.Info("[openai_empty_retry] terminal empty output", "surface", "chat.completions", "stream", true, "retry_attempts", attempts, "success_source", "none") |
| return |
| } |
| attempts++ |
| config.Logger.Info("[openai_empty_retry] attempting synthetic retry", "surface", "chat.completions", "stream", true, "retry_attempt", attempts, "parent_message_id", streamRuntime.responseMessageID) |
| retryPow, powErr := h.DS.GetPow(r.Context(), a, 3) |
| if powErr != nil { |
| config.Logger.Warn("[openai_empty_retry] retry PoW fetch failed, falling back to original PoW", "surface", "chat.completions", "stream", true, "retry_attempt", attempts, "error", powErr) |
| retryPow = pow |
| } |
| nextResp, err := h.DS.CallCompletion(r.Context(), a, clonePayloadForEmptyOutputRetry(payload, streamRuntime.responseMessageID), retryPow, 3) |
| if err != nil { |
| failChatStreamRetry(streamRuntime, historySession, http.StatusInternalServerError, "Failed to get completion.", "error") |
| config.Logger.Warn("[openai_empty_retry] retry request failed", "surface", "chat.completions", "stream", true, "retry_attempt", attempts, "error", err) |
| return |
| } |
| if nextResp.StatusCode != http.StatusOK { |
| defer func() { _ = nextResp.Body.Close() }() |
| body, _ := io.ReadAll(nextResp.Body) |
| failChatStreamRetry(streamRuntime, historySession, nextResp.StatusCode, string(body), "error") |
| return |
| } |
| streamRuntime.finalPrompt = usagePromptWithEmptyOutputRetry(finalPrompt, attempts) |
| currentResp = nextResp |
| } |
| } |
|
|
| func (h *Handler) prepareChatStreamRuntime(w http.ResponseWriter, resp *http.Response, completionID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, toolChoice promptcompat.ToolChoicePolicy, historySession *chatHistorySession) (*chatStreamRuntime, string, bool) { |
| if resp.StatusCode != http.StatusOK { |
| defer func() { _ = resp.Body.Close() }() |
| body, _ := io.ReadAll(resp.Body) |
| if historySession != nil { |
| historySession.error(resp.StatusCode, string(body), "error", "", "") |
| } |
| writeOpenAIError(w, resp.StatusCode, string(body)) |
| return nil, "", false |
| } |
| w.Header().Set("Content-Type", "text/event-stream") |
| w.Header().Set("Cache-Control", "no-cache, no-transform") |
| w.Header().Set("Connection", "keep-alive") |
| w.Header().Set("X-Accel-Buffering", "no") |
| rc := http.NewResponseController(w) |
| _, canFlush := w.(http.Flusher) |
| if !canFlush { |
| config.Logger.Warn("[stream] response writer does not support flush; streaming may be buffered") |
| } |
| initialType := "text" |
| if thinkingEnabled { |
| initialType = "thinking" |
| } |
| streamRuntime := newChatStreamRuntime( |
| w, rc, canFlush, completionID, time.Now().Unix(), model, finalPrompt, |
| thinkingEnabled, searchEnabled, stripReferenceMarkersEnabled(), toolNames, toolsRaw, |
| toolChoice, |
| len(toolNames) > 0, h.toolcallFeatureMatchEnabled() && h.toolcallEarlyEmitHighConfidence(), |
| ) |
| streamRuntime.refFileTokens = refFileTokens |
| return streamRuntime, initialType, true |
| } |
|
|
| func (h *Handler) consumeChatStreamAttempt(r *http.Request, resp *http.Response, streamRuntime *chatStreamRuntime, initialType string, thinkingEnabled bool, historySession *chatHistorySession, allowDeferEmpty bool) (bool, bool) { |
| defer func() { _ = resp.Body.Close() }() |
| finalReason := "stop" |
| streamengine.ConsumeSSE(streamengine.ConsumeConfig{ |
| Context: r.Context(), |
| Body: resp.Body, |
| ThinkingEnabled: thinkingEnabled, |
| InitialType: initialType, |
| KeepAliveInterval: time.Duration(dsprotocol.KeepAliveTimeout) * time.Second, |
| IdleTimeout: time.Duration(dsprotocol.StreamIdleTimeout) * time.Second, |
| MaxKeepAliveNoInput: dsprotocol.MaxKeepaliveCount, |
| }, streamengine.ConsumeHooks{ |
| OnKeepAlive: streamRuntime.sendKeepAlive, |
| OnParsed: func(parsed sse.LineResult) streamengine.ParsedDecision { |
| decision := streamRuntime.onParsed(parsed) |
| if historySession != nil { |
| historySession.progress(streamRuntime.historyThinking(), streamRuntime.historyText()) |
| } |
| return decision |
| }, |
| OnFinalize: func(reason streamengine.StopReason, _ error) { |
| if string(reason) == "content_filter" { |
| finalReason = "content_filter" |
| } |
| }, |
| OnContextDone: func() { |
| streamRuntime.markContextCancelled() |
| if historySession != nil { |
| historySession.stopped(streamRuntime.historyThinking(), streamRuntime.historyText(), string(streamengine.StopReasonContextCancelled)) |
| } |
| }, |
| }) |
| if streamRuntime.finalErrorCode == string(streamengine.StopReasonContextCancelled) { |
| return true, false |
| } |
| terminalWritten := streamRuntime.finalize(finalReason, allowDeferEmpty && finalReason != "content_filter") |
| if terminalWritten { |
| recordChatStreamHistory(streamRuntime, historySession) |
| return true, false |
| } |
| return false, true |
| } |
|
|
| func recordChatStreamHistory(streamRuntime *chatStreamRuntime, historySession *chatHistorySession) { |
| if historySession == nil { |
| return |
| } |
| if streamRuntime.finalErrorMessage != "" { |
| historySession.error(streamRuntime.finalErrorStatus, streamRuntime.finalErrorMessage, streamRuntime.finalErrorCode, streamRuntime.historyThinking(), streamRuntime.historyText()) |
| return |
| } |
| historySession.success(http.StatusOK, streamRuntime.historyThinking(), streamRuntime.historyText(), streamRuntime.finalFinishReason, streamRuntime.finalUsage) |
| } |
|
|
| func failChatStreamRetry(streamRuntime *chatStreamRuntime, historySession *chatHistorySession, status int, message, code string) { |
| streamRuntime.sendFailedChunk(status, message, code) |
| if historySession != nil { |
| historySession.error(status, message, code, streamRuntime.historyThinking(), streamRuntime.historyText()) |
| } |
| } |
|
|
| func logChatStreamTerminal(streamRuntime *chatStreamRuntime, attempts int) { |
| source := "first_attempt" |
| if attempts > 0 { |
| source = "synthetic_retry" |
| } |
| if streamRuntime.finalErrorCode == string(streamengine.StopReasonContextCancelled) { |
| config.Logger.Info("[openai_empty_retry] terminal cancelled", "surface", "chat.completions", "stream", true, "retry_attempts", attempts, "error_code", streamRuntime.finalErrorCode) |
| return |
| } |
| if streamRuntime.finalErrorMessage != "" { |
| config.Logger.Info("[openai_empty_retry] terminal empty output", "surface", "chat.completions", "stream", true, "retry_attempts", attempts, "success_source", "none", "error_code", streamRuntime.finalErrorCode) |
| return |
| } |
| config.Logger.Info("[openai_empty_retry] completed", "surface", "chat.completions", "stream", true, "retry_attempts", attempts, "success_source", source) |
| } |
|
|