| package gemini |
|
|
| import ( |
| "encoding/json" |
| "io" |
| "net/http" |
| "strings" |
| "time" |
|
|
| "ds2api/internal/assistantturn" |
| dsprotocol "ds2api/internal/deepseek/protocol" |
| "ds2api/internal/responsehistory" |
| "ds2api/internal/sse" |
| streamengine "ds2api/internal/stream" |
| ) |
|
|
| |
| func (h *Handler) handleStreamGenerateContent(w http.ResponseWriter, r *http.Request, resp *http.Response, model, finalPrompt string, thinkingEnabled, searchEnabled bool, toolNames []string, toolsRaw any, historySessions ...*responsehistory.Session) { |
| var historySession *responsehistory.Session |
| if len(historySessions) > 0 { |
| historySession = historySessions[0] |
| } |
| defer func() { _ = resp.Body.Close() }() |
| if resp.StatusCode != http.StatusOK { |
| body, _ := io.ReadAll(resp.Body) |
| if historySession != nil { |
| historySession.Error(resp.StatusCode, strings.TrimSpace(string(body)), "error", "", "") |
| } |
| writeGeminiError(w, resp.StatusCode, strings.TrimSpace(string(body))) |
| return |
| } |
|
|
| w.Header().Set("Content-Type", "text/event-stream") |
| w.Header().Set("Cache-Control", "no-cache, no-transform") |
| w.Header().Set("Connection", "keep-alive") |
| w.Header().Set("X-Accel-Buffering", "no") |
|
|
| rc := http.NewResponseController(w) |
| _, canFlush := w.(http.Flusher) |
| runtime := newGeminiStreamRuntime(w, rc, canFlush, model, finalPrompt, thinkingEnabled, searchEnabled, stripReferenceMarkersEnabled(), toolNames, toolsRaw, historySession) |
|
|
| initialType := "text" |
| if thinkingEnabled { |
| initialType = "thinking" |
| } |
| streamengine.ConsumeSSE(streamengine.ConsumeConfig{ |
| Context: r.Context(), |
| Body: resp.Body, |
| ThinkingEnabled: thinkingEnabled, |
| InitialType: initialType, |
| KeepAliveInterval: time.Duration(dsprotocol.KeepAliveTimeout) * time.Second, |
| IdleTimeout: time.Duration(dsprotocol.StreamIdleTimeout) * time.Second, |
| MaxKeepAliveNoInput: dsprotocol.MaxKeepaliveCount, |
| }, streamengine.ConsumeHooks{ |
| OnParsed: runtime.onParsed, |
| OnFinalize: func(_ streamengine.StopReason, _ error) { |
| runtime.finalize() |
| }, |
| }) |
| } |
|
|
| |
| type geminiStreamRuntime struct { |
| w http.ResponseWriter |
| rc *http.ResponseController |
| canFlush bool |
|
|
| model string |
| finalPrompt string |
|
|
| thinkingEnabled bool |
| searchEnabled bool |
| bufferContent bool |
| stripReferenceMarkers bool |
| toolNames []string |
| toolsRaw any |
|
|
| accumulator *assistantturn.Accumulator |
| contentFilter bool |
| responseMessageID int |
| history *responsehistory.Session |
| } |
|
|
| |
| func newGeminiStreamRuntime( |
| w http.ResponseWriter, |
| rc *http.ResponseController, |
| canFlush bool, |
| model string, |
| finalPrompt string, |
| thinkingEnabled bool, |
| searchEnabled bool, |
| stripReferenceMarkers bool, |
| toolNames []string, |
| toolsRaw any, |
| history *responsehistory.Session, |
| ) *geminiStreamRuntime { |
| return &geminiStreamRuntime{ |
| w: w, |
| rc: rc, |
| canFlush: canFlush, |
| model: model, |
| finalPrompt: finalPrompt, |
| thinkingEnabled: thinkingEnabled, |
| searchEnabled: searchEnabled, |
| bufferContent: len(toolNames) > 0, |
| stripReferenceMarkers: stripReferenceMarkers, |
| toolNames: toolNames, |
| toolsRaw: toolsRaw, |
| history: history, |
| accumulator: assistantturn.NewAccumulator(assistantturn.AccumulatorOptions{ |
| ThinkingEnabled: thinkingEnabled, |
| SearchEnabled: searchEnabled, |
| StripReferenceMarkers: stripReferenceMarkers, |
| }), |
| } |
| } |
|
|
| |
| func (s *geminiStreamRuntime) sendChunk(payload map[string]any) { |
| b, _ := json.Marshal(payload) |
| _, _ = s.w.Write([]byte("data: ")) |
| _, _ = s.w.Write(b) |
| _, _ = s.w.Write([]byte("\n\n")) |
| if s.canFlush { |
| _ = s.rc.Flush() |
| } |
| } |
|
|
| |
| func (s *geminiStreamRuntime) onParsed(parsed sse.LineResult) streamengine.ParsedDecision { |
| if !parsed.Parsed { |
| return streamengine.ParsedDecision{} |
| } |
| if parsed.ResponseMessageID > 0 { |
| s.responseMessageID = parsed.ResponseMessageID |
| } |
| if parsed.ContentFilter || parsed.ErrorMessage != "" || parsed.Stop { |
| if parsed.ContentFilter { |
| s.contentFilter = true |
| } |
| return streamengine.ParsedDecision{Stop: true} |
| } |
|
|
| accumulated := s.accumulator.Apply(parsed) |
| for _, p := range accumulated.Parts { |
| if p.Type == "thinking" { |
| if p.VisibleText == "" || s.bufferContent { |
| continue |
| } |
| s.sendChunk(map[string]any{ |
| "candidates": []map[string]any{ |
| { |
| "index": 0, |
| "content": map[string]any{ |
| "role": "model", |
| "parts": []map[string]any{{"text": p.VisibleText, "thought": true}}, |
| }, |
| }, |
| }, |
| "modelVersion": s.model, |
| }) |
| continue |
| } |
| if p.RawText == "" || p.CitationOnly || p.VisibleText == "" { |
| continue |
| } |
| if s.bufferContent { |
| continue |
| } |
| s.sendChunk(map[string]any{ |
| "candidates": []map[string]any{ |
| { |
| "index": 0, |
| "content": map[string]any{ |
| "role": "model", |
| "parts": []map[string]any{{"text": p.VisibleText}}, |
| }, |
| }, |
| }, |
| "modelVersion": s.model, |
| }) |
| } |
| if s.history != nil { |
| rawText, text, rawThinking, thinking, detectionThinking := s.accumulator.Snapshot() |
| s.history.Progress( |
| responsehistory.ThinkingForArchive(rawThinking, detectionThinking, thinking), |
| responsehistory.TextForArchive(rawText, text), |
| ) |
| } |
| return streamengine.ParsedDecision{ContentSeen: accumulated.ContentSeen} |
| } |
|
|
| |
| func (s *geminiStreamRuntime) finalize() { |
| rawText, text, rawThinking, thinking, detectionThinking := s.accumulator.Snapshot() |
| turn := assistantturn.BuildTurnFromStreamSnapshot(assistantturn.StreamSnapshot{ |
| RawText: rawText, |
| VisibleText: text, |
| RawThinking: rawThinking, |
| VisibleThinking: thinking, |
| DetectionThinking: detectionThinking, |
| ContentFilter: s.contentFilter, |
| ResponseMessageID: s.responseMessageID, |
| }, assistantturn.BuildOptions{ |
| Model: s.model, |
| Prompt: s.finalPrompt, |
| SearchEnabled: s.searchEnabled, |
| StripReferenceMarkers: s.stripReferenceMarkers, |
| ToolNames: s.toolNames, |
| ToolsRaw: s.toolsRaw, |
| }) |
| outcome := assistantturn.FinalizeTurn(turn, assistantturn.FinalizeOptions{}) |
| if s.history != nil { |
| s.history.Success( |
| http.StatusOK, |
| responsehistory.ThinkingForArchive(turn.RawThinking, turn.DetectionThinking, turn.Thinking), |
| responsehistory.TextForArchive(turn.RawText, turn.Text), |
| assistantturn.FinishReason(turn), |
| responsehistory.GenericUsage(turn), |
| ) |
| } |
|
|
| if s.bufferContent { |
| parts := buildGeminiPartsFromTurn(turn) |
| s.sendChunk(map[string]any{ |
| "candidates": []map[string]any{ |
| { |
| "index": 0, |
| "content": map[string]any{ |
| "role": "model", |
| "parts": parts, |
| }, |
| }, |
| }, |
| "modelVersion": s.model, |
| }) |
| } |
|
|
| s.sendChunk(map[string]any{ |
| "candidates": []map[string]any{ |
| { |
| "index": 0, |
| "content": map[string]any{ |
| "role": "model", |
| "parts": []map[string]any{ |
| {"text": ""}, |
| }, |
| }, |
| "finishReason": "STOP", |
| }, |
| }, |
| "modelVersion": s.model, |
| "usageMetadata": map[string]any{ |
| "promptTokenCount": outcome.Usage.InputTokens, |
| "candidatesTokenCount": outcome.Usage.OutputTokens, |
| "totalTokenCount": outcome.Usage.TotalTokens, |
| }, |
| }) |
| } |
|
|