| package stream |
|
|
| import ( |
| "context" |
| "io" |
| "time" |
|
|
| "ds2api/internal/sse" |
| ) |
|
|
| type StopReason string |
|
|
| const ( |
| StopReasonNone StopReason = "" |
| StopReasonContextCancelled StopReason = "context_cancelled" |
| StopReasonNoContentTimeout StopReason = "no_content_timeout" |
| StopReasonIdleTimeout StopReason = "idle_timeout" |
| StopReasonUpstreamCompleted StopReason = "upstream_completed" |
| StopReasonHandlerRequested StopReason = "handler_requested" |
| ) |
|
|
| type ConsumeConfig struct { |
| Context context.Context |
| Body io.Reader |
| ThinkingEnabled bool |
| InitialType string |
| KeepAliveInterval time.Duration |
| IdleTimeout time.Duration |
| MaxKeepAliveNoInput int |
| } |
|
|
| type ParsedDecision struct { |
| Stop bool |
| StopReason StopReason |
| ContentSeen bool |
| } |
|
|
| type ConsumeHooks struct { |
| OnParsed func(parsed sse.LineResult) ParsedDecision |
| OnKeepAlive func() |
| OnFinalize func(reason StopReason, scannerErr error) |
| OnContextDone func() |
| } |
|
|
| func ConsumeSSE(cfg ConsumeConfig, hooks ConsumeHooks) { |
| if cfg.Context == nil { |
| cfg.Context = context.Background() |
| } |
| initialType := cfg.InitialType |
| if initialType == "" { |
| if cfg.ThinkingEnabled { |
| initialType = "thinking" |
| } else { |
| initialType = "text" |
| } |
| } |
| parsedLines, done := sse.StartParsedLinePump(cfg.Context, cfg.Body, cfg.ThinkingEnabled, initialType) |
|
|
| var ticker *time.Ticker |
| if cfg.KeepAliveInterval > 0 { |
| ticker = time.NewTicker(cfg.KeepAliveInterval) |
| defer ticker.Stop() |
| } |
|
|
| hasContent := false |
| lastContent := time.Now() |
| keepaliveCount := 0 |
|
|
| finalize := func(reason StopReason, scannerErr error) { |
| if hooks.OnFinalize != nil { |
| hooks.OnFinalize(reason, scannerErr) |
| } |
| } |
| contextDone := func() bool { |
| if cfg.Context.Err() == nil { |
| return false |
| } |
| if hooks.OnContextDone != nil { |
| hooks.OnContextDone() |
| } |
| return true |
| } |
|
|
| for { |
| if contextDone() { |
| return |
| } |
| select { |
| case <-cfg.Context.Done(): |
| if contextDone() { |
| return |
| } |
| return |
| case <-tickCh(ticker): |
| if contextDone() { |
| return |
| } |
| if !hasContent { |
| keepaliveCount++ |
| if cfg.MaxKeepAliveNoInput > 0 && keepaliveCount >= cfg.MaxKeepAliveNoInput { |
| finalize(StopReasonNoContentTimeout, nil) |
| return |
| } |
| } |
| if hasContent && cfg.IdleTimeout > 0 && time.Since(lastContent) > cfg.IdleTimeout { |
| finalize(StopReasonIdleTimeout, nil) |
| return |
| } |
| if hooks.OnKeepAlive != nil { |
| hooks.OnKeepAlive() |
| } |
| case parsed, ok := <-parsedLines: |
| if contextDone() { |
| return |
| } |
| if !ok { |
| finalize(StopReasonUpstreamCompleted, <-done) |
| return |
| } |
| if hooks.OnParsed == nil { |
| continue |
| } |
| decision := hooks.OnParsed(parsed) |
| if decision.ContentSeen { |
| hasContent = true |
| lastContent = time.Now() |
| keepaliveCount = 0 |
| } |
| if decision.Stop { |
| reason := decision.StopReason |
| if reason == StopReasonNone { |
| reason = StopReasonHandlerRequested |
| } |
| finalize(reason, nil) |
| return |
| } |
| } |
| } |
| } |
|
|
| func tickCh(ticker *time.Ticker) <-chan time.Time { |
| if ticker == nil { |
| return nil |
| } |
| return ticker.C |
| } |
|
|