File size: 3,106 Bytes
8d3471e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 | package stream
import (
"context"
"io"
"time"
"ds2api/internal/sse"
)
type StopReason string
const (
StopReasonNone StopReason = ""
StopReasonContextCancelled StopReason = "context_cancelled"
StopReasonNoContentTimeout StopReason = "no_content_timeout"
StopReasonIdleTimeout StopReason = "idle_timeout"
StopReasonUpstreamCompleted StopReason = "upstream_completed"
StopReasonHandlerRequested StopReason = "handler_requested"
)
type ConsumeConfig struct {
Context context.Context
Body io.Reader
ThinkingEnabled bool
InitialType string
KeepAliveInterval time.Duration
IdleTimeout time.Duration
MaxKeepAliveNoInput int
}
type ParsedDecision struct {
Stop bool
StopReason StopReason
ContentSeen bool
}
type ConsumeHooks struct {
OnParsed func(parsed sse.LineResult) ParsedDecision
OnKeepAlive func()
OnFinalize func(reason StopReason, scannerErr error)
OnContextDone func()
}
func ConsumeSSE(cfg ConsumeConfig, hooks ConsumeHooks) {
if cfg.Context == nil {
cfg.Context = context.Background()
}
initialType := cfg.InitialType
if initialType == "" {
if cfg.ThinkingEnabled {
initialType = "thinking"
} else {
initialType = "text"
}
}
parsedLines, done := sse.StartParsedLinePump(cfg.Context, cfg.Body, cfg.ThinkingEnabled, initialType)
var ticker *time.Ticker
if cfg.KeepAliveInterval > 0 {
ticker = time.NewTicker(cfg.KeepAliveInterval)
defer ticker.Stop()
}
hasContent := false
lastContent := time.Now()
keepaliveCount := 0
finalize := func(reason StopReason, scannerErr error) {
if hooks.OnFinalize != nil {
hooks.OnFinalize(reason, scannerErr)
}
}
contextDone := func() bool {
if cfg.Context.Err() == nil {
return false
}
if hooks.OnContextDone != nil {
hooks.OnContextDone()
}
return true
}
for {
if contextDone() {
return
}
select {
case <-cfg.Context.Done():
if contextDone() {
return
}
return
case <-tickCh(ticker):
if contextDone() {
return
}
if !hasContent {
keepaliveCount++
if cfg.MaxKeepAliveNoInput > 0 && keepaliveCount >= cfg.MaxKeepAliveNoInput {
finalize(StopReasonNoContentTimeout, nil)
return
}
}
if hasContent && cfg.IdleTimeout > 0 && time.Since(lastContent) > cfg.IdleTimeout {
finalize(StopReasonIdleTimeout, nil)
return
}
if hooks.OnKeepAlive != nil {
hooks.OnKeepAlive()
}
case parsed, ok := <-parsedLines:
if contextDone() {
return
}
if !ok {
finalize(StopReasonUpstreamCompleted, <-done)
return
}
if hooks.OnParsed == nil {
continue
}
decision := hooks.OnParsed(parsed)
if decision.ContentSeen {
hasContent = true
lastContent = time.Now()
keepaliveCount = 0
}
if decision.Stop {
reason := decision.StopReason
if reason == StopReasonNone {
reason = StopReasonHandlerRequested
}
finalize(reason, nil)
return
}
}
}
}
func tickCh(ticker *time.Ticker) <-chan time.Time {
if ticker == nil {
return nil
}
return ticker.C
}
|