File size: 3,670 Bytes
9853396 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 | package httpclient
import (
"context"
"errors"
"io"
"sync"
"github.com/tmaxmax/go-sse"
"github.com/looplj/axonhub/internal/log"
)
// decoderRegistry holds registered stream decoders.
type decoderRegistry struct {
mu sync.RWMutex
decoders map[string]StreamDecoderFactory
}
// globalRegistry is the global decoder registry.
var globalRegistry = &decoderRegistry{
decoders: make(map[string]StreamDecoderFactory),
}
// RegisterDecoder registers a stream decoder for a specific content type.
func RegisterDecoder(contentType string, factory StreamDecoderFactory) {
globalRegistry.mu.Lock()
defer globalRegistry.mu.Unlock()
globalRegistry.decoders[contentType] = factory
}
// GetDecoder returns a decoder factory for the given content type.
func GetDecoder(contentType string) (StreamDecoderFactory, bool) {
globalRegistry.mu.RLock()
defer globalRegistry.mu.RUnlock()
factory, exists := globalRegistry.decoders[contentType]
return factory, exists
}
// NewDefaultSSEDecoder creates a new default SSE decoder.
func NewDefaultSSEDecoder(ctx context.Context, rc io.ReadCloser) StreamDecoder {
return &defaultSSEDecoder{
ctx: ctx,
// sseStream: sse.NewStream(rc),
// 图片生成需要大量数据,设置最大事件大小
sseStream: sse.NewStreamWithConfig(rc, &sse.StreamConfig{
MaxEventSize: 32 * 1024 * 1024,
}),
}
}
// Ensure defaultSSEDecoder implements StreamDecoder.
var _ StreamDecoder = (*defaultSSEDecoder)(nil)
// defaultSSEDecoder implements streams.Stream for Server-Sent Events using go-sse Stream.
//
//nolint:containedctx // Checked.
type defaultSSEDecoder struct {
ctx context.Context
sseStream *sse.Stream
current *StreamEvent
err error
// NOT concurrency-safe: do not call Next/Close from multiple goroutines.
// Close is made idempotent (safe to call multiple times sequentially).
closed bool
closeErr error
}
// Next advances to the next event in the stream.
func (s *defaultSSEDecoder) Next() bool {
if s.err != nil {
return false
}
if s.closed {
return false
}
// Check context cancellation
select {
case <-s.ctx.Done():
log.Debug(s.ctx, "SSE stream closed")
s.err = s.ctx.Err()
_ = s.Close()
return false
default:
}
// Receive next event from go-sse Stream
event, err := s.sseStream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
log.Debug(s.ctx, "SSE stream closed")
_ = s.Close()
return false
}
s.err = err
_ = s.Close()
return false
}
log.Debug(s.ctx, "SSE event received", log.Any("event", event))
// Create stream event for this event
s.current = &StreamEvent{
LastEventID: event.LastEventID,
Type: event.Type,
Data: []byte(event.Data),
}
return true
}
// Current returns the current event data.
func (s *defaultSSEDecoder) Current() *StreamEvent {
return s.current
}
// Err returns any error that occurred during streaming.
func (s *defaultSSEDecoder) Err() error {
return s.err
}
// Close closes the stream and releases resources.
func (s *defaultSSEDecoder) Close() error {
// NOT concurrency-safe: callers must not call Close concurrently with Next.
if s.closed {
return s.closeErr
}
s.closed = true
if s.sseStream != nil {
s.closeErr = s.sseStream.Close()
log.Debug(s.ctx, "SSE stream closed")
}
return s.closeErr
}
// init registers the default SSE decoder.
func init() {
RegisterDecoder("text/event-stream", NewDefaultSSEDecoder)
RegisterDecoder("text/event-stream; charset=utf-8", NewDefaultSSEDecoder)
}
|