File size: 3,106 Bytes
8d3471e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package stream

import (
	"context"
	"io"
	"time"

	"ds2api/internal/sse"
)

type StopReason string

const (
	StopReasonNone              StopReason = ""
	StopReasonContextCancelled  StopReason = "context_cancelled"
	StopReasonNoContentTimeout  StopReason = "no_content_timeout"
	StopReasonIdleTimeout       StopReason = "idle_timeout"
	StopReasonUpstreamCompleted StopReason = "upstream_completed"
	StopReasonHandlerRequested  StopReason = "handler_requested"
)

type ConsumeConfig struct {
	Context             context.Context
	Body                io.Reader
	ThinkingEnabled     bool
	InitialType         string
	KeepAliveInterval   time.Duration
	IdleTimeout         time.Duration
	MaxKeepAliveNoInput int
}

type ParsedDecision struct {
	Stop        bool
	StopReason  StopReason
	ContentSeen bool
}

type ConsumeHooks struct {
	OnParsed      func(parsed sse.LineResult) ParsedDecision
	OnKeepAlive   func()
	OnFinalize    func(reason StopReason, scannerErr error)
	OnContextDone func()
}

func ConsumeSSE(cfg ConsumeConfig, hooks ConsumeHooks) {
	if cfg.Context == nil {
		cfg.Context = context.Background()
	}
	initialType := cfg.InitialType
	if initialType == "" {
		if cfg.ThinkingEnabled {
			initialType = "thinking"
		} else {
			initialType = "text"
		}
	}
	parsedLines, done := sse.StartParsedLinePump(cfg.Context, cfg.Body, cfg.ThinkingEnabled, initialType)

	var ticker *time.Ticker
	if cfg.KeepAliveInterval > 0 {
		ticker = time.NewTicker(cfg.KeepAliveInterval)
		defer ticker.Stop()
	}

	hasContent := false
	lastContent := time.Now()
	keepaliveCount := 0

	finalize := func(reason StopReason, scannerErr error) {
		if hooks.OnFinalize != nil {
			hooks.OnFinalize(reason, scannerErr)
		}
	}
	contextDone := func() bool {
		if cfg.Context.Err() == nil {
			return false
		}
		if hooks.OnContextDone != nil {
			hooks.OnContextDone()
		}
		return true
	}

	for {
		if contextDone() {
			return
		}
		select {
		case <-cfg.Context.Done():
			if contextDone() {
				return
			}
			return
		case <-tickCh(ticker):
			if contextDone() {
				return
			}
			if !hasContent {
				keepaliveCount++
				if cfg.MaxKeepAliveNoInput > 0 && keepaliveCount >= cfg.MaxKeepAliveNoInput {
					finalize(StopReasonNoContentTimeout, nil)
					return
				}
			}
			if hasContent && cfg.IdleTimeout > 0 && time.Since(lastContent) > cfg.IdleTimeout {
				finalize(StopReasonIdleTimeout, nil)
				return
			}
			if hooks.OnKeepAlive != nil {
				hooks.OnKeepAlive()
			}
		case parsed, ok := <-parsedLines:
			if contextDone() {
				return
			}
			if !ok {
				finalize(StopReasonUpstreamCompleted, <-done)
				return
			}
			if hooks.OnParsed == nil {
				continue
			}
			decision := hooks.OnParsed(parsed)
			if decision.ContentSeen {
				hasContent = true
				lastContent = time.Now()
				keepaliveCount = 0
			}
			if decision.Stop {
				reason := decision.StopReason
				if reason == StopReasonNone {
					reason = StopReasonHandlerRequested
				}
				finalize(reason, nil)
				return
			}
		}
	}
}

func tickCh(ticker *time.Ticker) <-chan time.Time {
	if ticker == nil {
		return nil
	}
	return ticker.C
}