| package responses |
|
|
| import ( |
| "io" |
| "net/http" |
| "strings" |
| "time" |
|
|
| "ds2api/internal/auth" |
| "ds2api/internal/config" |
| dsprotocol "ds2api/internal/deepseek/protocol" |
| "ds2api/internal/promptcompat" |
| "ds2api/internal/responsehistory" |
| streamengine "ds2api/internal/stream" |
| ) |
|
|
| func (h *Handler) handleResponsesStreamWithRetry(w http.ResponseWriter, r *http.Request, a *auth.RequestAuth, resp *http.Response, payload map[string]any, pow, owner, responseID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, toolChoice promptcompat.ToolChoicePolicy, traceID string, historySession *responsehistory.Session) { |
| streamRuntime, initialType, ok := h.prepareResponsesStreamRuntime(w, resp, owner, responseID, model, finalPrompt, refFileTokens, thinkingEnabled, searchEnabled, toolNames, toolsRaw, toolChoice, traceID, historySession) |
| if !ok { |
| return |
| } |
| attempts := 0 |
| currentResp := resp |
| for { |
| terminalWritten, retryable := h.consumeResponsesStreamAttempt(r, currentResp, streamRuntime, initialType, thinkingEnabled, attempts < emptyOutputRetryMaxAttempts()) |
| if terminalWritten { |
| logResponsesStreamTerminal(streamRuntime, attempts) |
| return |
| } |
| if !retryable || !emptyOutputRetryEnabled() || attempts >= emptyOutputRetryMaxAttempts() { |
| streamRuntime.finalize("stop", false) |
| config.Logger.Info("[openai_empty_retry] terminal empty output", "surface", "responses", "stream", true, "retry_attempts", attempts, "success_source", "none", "error_code", streamRuntime.finalErrorCode) |
| return |
| } |
| attempts++ |
| config.Logger.Info("[openai_empty_retry] attempting synthetic retry", "surface", "responses", "stream", true, "retry_attempt", attempts, "parent_message_id", streamRuntime.responseMessageID) |
| retryPow, powErr := h.DS.GetPow(r.Context(), a, 3) |
| if powErr != nil { |
| config.Logger.Warn("[openai_empty_retry] retry PoW fetch failed, falling back to original PoW", "surface", "responses", "stream", true, "retry_attempt", attempts, "error", powErr) |
| retryPow = pow |
| } |
| nextResp, err := h.DS.CallCompletion(r.Context(), a, clonePayloadForEmptyOutputRetry(payload, streamRuntime.responseMessageID), retryPow, 3) |
| if err != nil { |
| streamRuntime.failResponse(http.StatusInternalServerError, "Failed to get completion.", "error") |
| config.Logger.Warn("[openai_empty_retry] retry request failed", "surface", "responses", "stream", true, "retry_attempt", attempts, "error", err) |
| return |
| } |
| if nextResp.StatusCode != http.StatusOK { |
| defer func() { _ = nextResp.Body.Close() }() |
| body, _ := io.ReadAll(nextResp.Body) |
| streamRuntime.failResponse(nextResp.StatusCode, strings.TrimSpace(string(body)), "error") |
| return |
| } |
| streamRuntime.finalPrompt = usagePromptWithEmptyOutputRetry(finalPrompt, attempts) |
| currentResp = nextResp |
| } |
| } |
|
|
| func (h *Handler) prepareResponsesStreamRuntime(w http.ResponseWriter, resp *http.Response, owner, responseID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, toolChoice promptcompat.ToolChoicePolicy, traceID string, historySession *responsehistory.Session) (*responsesStreamRuntime, string, bool) { |
| if resp.StatusCode != http.StatusOK { |
| defer func() { _ = resp.Body.Close() }() |
| body, _ := io.ReadAll(resp.Body) |
| if historySession != nil { |
| historySession.Error(resp.StatusCode, strings.TrimSpace(string(body)), "error", "", "") |
| } |
| writeOpenAIError(w, resp.StatusCode, strings.TrimSpace(string(body))) |
| return nil, "", false |
| } |
| w.Header().Set("Content-Type", "text/event-stream") |
| w.Header().Set("Cache-Control", "no-cache, no-transform") |
| w.Header().Set("Connection", "keep-alive") |
| w.Header().Set("X-Accel-Buffering", "no") |
| rc := http.NewResponseController(w) |
| _, canFlush := w.(http.Flusher) |
| initialType := "text" |
| if thinkingEnabled { |
| initialType = "thinking" |
| } |
| streamRuntime := newResponsesStreamRuntime( |
| w, rc, canFlush, responseID, model, finalPrompt, thinkingEnabled, searchEnabled, |
| stripReferenceMarkersEnabled(), toolNames, toolsRaw, len(toolNames) > 0, |
| h.toolcallFeatureMatchEnabled() && h.toolcallEarlyEmitHighConfidence(), |
| toolChoice, traceID, func(obj map[string]any) { |
| h.getResponseStore().put(owner, responseID, obj) |
| }, historySession, |
| ) |
| streamRuntime.refFileTokens = refFileTokens |
| streamRuntime.sendCreated() |
| return streamRuntime, initialType, true |
| } |
|
|
| func (h *Handler) consumeResponsesStreamAttempt(r *http.Request, resp *http.Response, streamRuntime *responsesStreamRuntime, initialType string, thinkingEnabled bool, allowDeferEmpty bool) (bool, bool) { |
| defer func() { _ = resp.Body.Close() }() |
| finalReason := "stop" |
| streamengine.ConsumeSSE(streamengine.ConsumeConfig{ |
| Context: r.Context(), |
| Body: resp.Body, |
| ThinkingEnabled: thinkingEnabled, |
| InitialType: initialType, |
| KeepAliveInterval: time.Duration(dsprotocol.KeepAliveTimeout) * time.Second, |
| IdleTimeout: time.Duration(dsprotocol.StreamIdleTimeout) * time.Second, |
| MaxKeepAliveNoInput: dsprotocol.MaxKeepaliveCount, |
| }, streamengine.ConsumeHooks{ |
| OnParsed: streamRuntime.onParsed, |
| OnFinalize: func(reason streamengine.StopReason, _ error) { |
| if string(reason) == "content_filter" { |
| finalReason = "content_filter" |
| } |
| }, |
| OnContextDone: func() { |
| streamRuntime.markContextCancelled() |
| }, |
| }) |
| if streamRuntime.finalErrorCode == string(streamengine.StopReasonContextCancelled) { |
| return true, false |
| } |
| terminalWritten := streamRuntime.finalize(finalReason, allowDeferEmpty && finalReason != "content_filter") |
| if terminalWritten { |
| return true, false |
| } |
| return false, true |
| } |
|
|
| func logResponsesStreamTerminal(streamRuntime *responsesStreamRuntime, attempts int) { |
| source := "first_attempt" |
| if attempts > 0 { |
| source = "synthetic_retry" |
| } |
| if streamRuntime.finalErrorCode == string(streamengine.StopReasonContextCancelled) { |
| config.Logger.Info("[openai_empty_retry] terminal cancelled", "surface", "responses", "stream", true, "retry_attempts", attempts, "error_code", streamRuntime.finalErrorCode) |
| return |
| } |
| if streamRuntime.failed { |
| config.Logger.Info("[openai_empty_retry] terminal empty output", "surface", "responses", "stream", true, "retry_attempts", attempts, "success_source", "none", "error_code", streamRuntime.finalErrorCode) |
| return |
| } |
| config.Logger.Info("[openai_empty_retry] completed", "surface", "responses", "stream", true, "retry_attempts", attempts, "success_source", source) |
| } |
|
|