| package chat |
|
|
| import ( |
| "context" |
| "encoding/json" |
| "io" |
| "net/http" |
| "strings" |
| "time" |
|
|
| "ds2api/internal/assistantturn" |
| "ds2api/internal/auth" |
| "ds2api/internal/completionruntime" |
| "ds2api/internal/config" |
| dsprotocol "ds2api/internal/deepseek/protocol" |
| openaifmt "ds2api/internal/format/openai" |
| "ds2api/internal/promptcompat" |
| "ds2api/internal/sse" |
| streamengine "ds2api/internal/stream" |
| ) |
|
|
| func (h *Handler) ChatCompletions(w http.ResponseWriter, r *http.Request) { |
| if isVercelStreamReleaseRequest(r) { |
| h.handleVercelStreamRelease(w, r) |
| return |
| } |
| if isVercelStreamPowRequest(r) { |
| h.handleVercelStreamPow(w, r) |
| return |
| } |
| if isVercelStreamPrepareRequest(r) { |
| h.handleVercelStreamPrepare(w, r) |
| return |
| } |
|
|
| a, err := h.Auth.Determine(r) |
| if err != nil { |
| status := http.StatusUnauthorized |
| detail := err.Error() |
| if err == auth.ErrNoAccount { |
| status = http.StatusTooManyRequests |
| } |
| writeOpenAIError(w, status, detail) |
| return |
| } |
| var sessionID string |
| defer func() { |
| h.autoDeleteRemoteSession(r.Context(), a, sessionID) |
| h.Auth.Release(a) |
| }() |
|
|
| r = r.WithContext(auth.WithAuth(r.Context(), a)) |
|
|
| r.Body = http.MaxBytesReader(w, r.Body, openAIGeneralMaxSize) |
| var req map[string]any |
| if err := json.NewDecoder(r.Body).Decode(&req); err != nil { |
| if strings.Contains(strings.ToLower(err.Error()), "too large") { |
| writeOpenAIError(w, http.StatusRequestEntityTooLarge, "request body too large") |
| return |
| } |
| writeOpenAIError(w, http.StatusBadRequest, "invalid json") |
| return |
| } |
| if err := h.preprocessInlineFileInputs(r.Context(), a, req); err != nil { |
| writeOpenAIInlineFileError(w, err) |
| return |
| } |
| stdReq, err := promptcompat.NormalizeOpenAIChatRequest(h.Store, req, requestTraceID(r)) |
| if err != nil { |
| writeOpenAIError(w, http.StatusBadRequest, err.Error()) |
| return |
| } |
| stdReq, err = h.applyCurrentInputFile(r.Context(), a, stdReq) |
| if err != nil { |
| status, message := mapCurrentInputFileError(err) |
| writeOpenAIError(w, status, message) |
| return |
| } |
| historySession := startChatHistory(h.ChatHistory, r, a, stdReq) |
|
|
| if !stdReq.Stream { |
| result, outErr := completionruntime.ExecuteNonStreamWithRetry(r.Context(), h.DS, a, stdReq, completionruntime.Options{ |
| RetryEnabled: true, |
| CurrentInputFile: h.Store, |
| }) |
| sessionID = result.SessionID |
| if outErr != nil { |
| if historySession != nil { |
| historySession.error(outErr.Status, outErr.Message, outErr.Code, historyThinkingForArchive(result.Turn.RawThinking, result.Turn.DetectionThinking, result.Turn.Thinking), historyTextForArchive(result.Turn.RawText, result.Turn.Text)) |
| } |
| writeOpenAIErrorWithCode(w, outErr.Status, outErr.Message, outErr.Code) |
| return |
| } |
| respBody := openaifmt.BuildChatCompletionWithToolCalls(result.SessionID, stdReq.ResponseModel, result.Turn.Prompt, result.Turn.Thinking, result.Turn.Text, result.Turn.ToolCalls, stdReq.ToolsRaw) |
| respBody["usage"] = assistantturn.OpenAIChatUsage(result.Turn) |
| finishReason := assistantturn.FinalizeTurn(result.Turn, assistantturn.FinalizeOptions{}).FinishReason |
| if historySession != nil { |
| historySession.success(http.StatusOK, historyThinkingForArchive(result.Turn.RawThinking, result.Turn.DetectionThinking, result.Turn.Thinking), historyTextForArchive(result.Turn.RawText, result.Turn.Text), finishReason, assistantturn.OpenAIChatUsage(result.Turn)) |
| } |
| writeJSON(w, http.StatusOK, respBody) |
| return |
| } |
|
|
| start, outErr := completionruntime.StartCompletion(r.Context(), h.DS, a, stdReq, completionruntime.Options{ |
| CurrentInputFile: h.Store, |
| }) |
| sessionID = start.SessionID |
| if outErr != nil { |
| if historySession != nil { |
| historySession.error(outErr.Status, outErr.Message, outErr.Code, "", "") |
| } |
| writeOpenAIErrorWithCode(w, outErr.Status, outErr.Message, outErr.Code) |
| return |
| } |
| streamReq := start.Request |
| refFileTokens := streamReq.RefFileTokens |
| h.handleStreamWithRetry(w, r, a, start.Response, start.Payload, start.Pow, sessionID, streamReq.ResponseModel, streamReq.PromptTokenText, refFileTokens, streamReq.Thinking, streamReq.Search, streamReq.ToolNames, streamReq.ToolsRaw, streamReq.ToolChoice, historySession) |
| } |
|
|
| func (h *Handler) autoDeleteRemoteSession(ctx context.Context, a *auth.RequestAuth, sessionID string) { |
| mode := h.Store.AutoDeleteMode() |
| if mode == "none" || a.DeepSeekToken == "" { |
| return |
| } |
|
|
| deleteBaseCtx := context.WithoutCancel(ctx) |
| deleteCtx, cancel := context.WithTimeout(deleteBaseCtx, 10*time.Second) |
| defer cancel() |
|
|
| switch mode { |
| case "single": |
| if sessionID == "" { |
| config.Logger.Warn("[auto_delete_sessions] skipped single-session delete because session_id is empty", "account", a.AccountID) |
| return |
| } |
| _, err := h.DS.DeleteSessionForToken(deleteCtx, a.DeepSeekToken, sessionID) |
| if err != nil { |
| config.Logger.Warn("[auto_delete_sessions] failed", "account", a.AccountID, "mode", mode, "session_id", sessionID, "error", err) |
| return |
| } |
| config.Logger.Debug("[auto_delete_sessions] success", "account", a.AccountID, "mode", mode, "session_id", sessionID) |
| case "all": |
| if err := h.DS.DeleteAllSessionsForToken(deleteCtx, a.DeepSeekToken); err != nil { |
| config.Logger.Warn("[auto_delete_sessions] failed", "account", a.AccountID, "mode", mode, "error", err) |
| return |
| } |
| config.Logger.Debug("[auto_delete_sessions] success", "account", a.AccountID, "mode", mode) |
| default: |
| config.Logger.Warn("[auto_delete_sessions] unknown mode", "account", a.AccountID, "mode", mode) |
| } |
| } |
|
|
| func (h *Handler) handleNonStream(w http.ResponseWriter, resp *http.Response, completionID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, historySession *chatHistorySession) { |
| if resp.StatusCode != http.StatusOK { |
| defer func() { _ = resp.Body.Close() }() |
| body, _ := io.ReadAll(resp.Body) |
| if historySession != nil { |
| historySession.error(resp.StatusCode, string(body), "error", "", "") |
| } |
| writeOpenAIError(w, resp.StatusCode, string(body)) |
| return |
| } |
| result := sse.CollectStream(resp, thinkingEnabled, true) |
|
|
| turn := assistantturn.BuildTurnFromCollected(result, assistantturn.BuildOptions{ |
| Model: model, |
| Prompt: finalPrompt, |
| RefFileTokens: refFileTokens, |
| SearchEnabled: searchEnabled, |
| ToolNames: toolNames, |
| ToolsRaw: toolsRaw, |
| ToolChoice: promptcompat.DefaultToolChoicePolicy(), |
| }) |
| outcome := assistantturn.FinalizeTurn(turn, assistantturn.FinalizeOptions{}) |
| if outcome.ShouldFail { |
| status, message, code := outcome.Error.Status, outcome.Error.Message, outcome.Error.Code |
| if historySession != nil { |
| historySession.error(status, message, code, historyThinkingForArchive(turn.RawThinking, turn.DetectionThinking, turn.Thinking), historyTextForArchive(turn.RawText, turn.Text)) |
| } |
| writeOpenAIErrorWithCode(w, status, message, code) |
| return |
| } |
| respBody := openaifmt.BuildChatCompletionWithToolCalls(completionID, model, finalPrompt, turn.Thinking, turn.Text, turn.ToolCalls, toolsRaw) |
| respBody["usage"] = assistantturn.OpenAIChatUsage(turn) |
| if historySession != nil { |
| historySession.success(http.StatusOK, historyThinkingForArchive(turn.RawThinking, turn.DetectionThinking, turn.Thinking), historyTextForArchive(turn.RawText, turn.Text), outcome.FinishReason, assistantturn.OpenAIChatUsage(turn)) |
| } |
| writeJSON(w, http.StatusOK, respBody) |
| } |
|
|
| func (h *Handler) handleStream(w http.ResponseWriter, r *http.Request, resp *http.Response, completionID, model, finalPrompt string, refFileTokens int, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, historySession *chatHistorySession) { |
| defer func() { _ = resp.Body.Close() }() |
| if resp.StatusCode != http.StatusOK { |
| body, _ := io.ReadAll(resp.Body) |
| if historySession != nil { |
| historySession.error(resp.StatusCode, string(body), "error", "", "") |
| } |
| writeOpenAIError(w, resp.StatusCode, string(body)) |
| return |
| } |
| w.Header().Set("Content-Type", "text/event-stream") |
| w.Header().Set("Cache-Control", "no-cache, no-transform") |
| w.Header().Set("Connection", "keep-alive") |
| w.Header().Set("X-Accel-Buffering", "no") |
| rc := http.NewResponseController(w) |
| _, canFlush := w.(http.Flusher) |
| if !canFlush { |
| config.Logger.Warn("[stream] response writer does not support flush; streaming may be buffered") |
| } |
|
|
| created := time.Now().Unix() |
| bufferToolContent := len(toolNames) > 0 |
| emitEarlyToolDeltas := h.toolcallFeatureMatchEnabled() && h.toolcallEarlyEmitHighConfidence() |
| stripReferenceMarkers := stripReferenceMarkersEnabled() |
| initialType := "text" |
| if thinkingEnabled { |
| initialType = "thinking" |
| } |
|
|
| streamRuntime := newChatStreamRuntime( |
| w, |
| rc, |
| canFlush, |
| completionID, |
| created, |
| model, |
| finalPrompt, |
| thinkingEnabled, |
| searchEnabled, |
| stripReferenceMarkers, |
| toolNames, |
| toolsRaw, |
| promptcompat.DefaultToolChoicePolicy(), |
| bufferToolContent, |
| emitEarlyToolDeltas, |
| ) |
| streamRuntime.refFileTokens = refFileTokens |
|
|
| streamengine.ConsumeSSE(streamengine.ConsumeConfig{ |
| Context: r.Context(), |
| Body: resp.Body, |
| ThinkingEnabled: thinkingEnabled, |
| InitialType: initialType, |
| KeepAliveInterval: time.Duration(dsprotocol.KeepAliveTimeout) * time.Second, |
| IdleTimeout: time.Duration(dsprotocol.StreamIdleTimeout) * time.Second, |
| MaxKeepAliveNoInput: dsprotocol.MaxKeepaliveCount, |
| }, streamengine.ConsumeHooks{ |
| OnKeepAlive: func() { |
| streamRuntime.sendKeepAlive() |
| }, |
| OnParsed: func(parsed sse.LineResult) streamengine.ParsedDecision { |
| decision := streamRuntime.onParsed(parsed) |
| if historySession != nil { |
| historySession.progress(streamRuntime.historyThinking(), streamRuntime.historyText()) |
| } |
| return decision |
| }, |
| OnFinalize: func(reason streamengine.StopReason, _ error) { |
| if string(reason) == "content_filter" { |
| streamRuntime.finalize("content_filter", false) |
| } else { |
| streamRuntime.finalize("stop", false) |
| } |
| if historySession == nil { |
| return |
| } |
| if streamRuntime.finalErrorMessage != "" { |
| historySession.error(streamRuntime.finalErrorStatus, streamRuntime.finalErrorMessage, streamRuntime.finalErrorCode, streamRuntime.historyThinking(), streamRuntime.historyText()) |
| return |
| } |
| historySession.success(http.StatusOK, streamRuntime.historyThinking(), streamRuntime.historyText(), streamRuntime.finalFinishReason, streamRuntime.finalUsage) |
| }, |
| OnContextDone: func() { |
| streamRuntime.markContextCancelled() |
| if historySession != nil { |
| historySession.stopped(streamRuntime.historyThinking(), streamRuntime.historyText(), string(streamengine.StopReasonContextCancelled)) |
| } |
| }, |
| }) |
| } |
|
|