GodSpeed / frontend /src /hooks /useSSEStream.ts
AdithyaVardan's picture
fix: type errors, merge OAuth branch, upgrade BM25Store
e2766a5
import { useCallback, useRef, useState } from 'react'
import { ssePost } from '@/lib/http'
import type { QueryInput, SSEEventMap, SSEEventName } from '@/types/api'
const TIMEOUT_MS = 30_000
type Callbacks = {
[K in SSEEventName]?: (data: SSEEventMap[K]) => void
}
export type StreamState = 'idle' | 'loading' | 'streaming' | 'complete' | 'error'
export function useSSEStream() {
const [state, setState] = useState<StreamState>('idle')
const [error, setError] = useState<string | null>(null)
const firstEventRef = useRef(false)
const completedRef = useRef(false)
const abortRef = useRef<AbortController | null>(null)
const stream = useCallback(
async (input: QueryInput, callbacks: Callbacks) => {
// Cancel any in-flight request
abortRef.current?.abort()
const controller = new AbortController()
abortRef.current = controller
firstEventRef.current = false
completedRef.current = false
setState('loading')
setError(null)
// Hard timeout β€” fire error event if backend stalls
const timeoutId = setTimeout(() => {
controller.abort()
setError('Query timed out after 30 seconds')
setState('error')
callbacks.error?.({ message: 'Query timed out after 30 seconds' })
}, TIMEOUT_MS)
try {
const res = await ssePost('/agent/query', input, controller.signal)
const reader = res.body!.getReader()
const decoder = new TextDecoder()
let buffer = ''
let currentEvent: SSEEventName = 'answer_chunk'
let malformedCount = 0
const MALFORMED_THRESHOLD = 5
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
// SSE messages are separated by double newline
const messages = buffer.split('\n\n')
buffer = messages.pop() ?? ''
for (const message of messages) {
if (!message.trim()) continue
let eventName: SSEEventName = currentEvent
let dataLine = ''
for (const line of message.split('\n')) {
if (line.startsWith('event: ')) {
eventName = line.slice(7).trim() as SSEEventName
} else if (line.startsWith('data: ')) {
dataLine = line.slice(6)
}
}
if (!dataLine) continue
// First event received β€” transition loading β†’ streaming
if (!firstEventRef.current) {
firstEventRef.current = true
setState('streaming')
clearTimeout(timeoutId)
}
try {
const data = JSON.parse(dataLine)
malformedCount = 0
// eslint-disable-next-line @typescript-eslint/no-explicit-any
;(callbacks as any)[eventName]?.(data)
if (eventName === 'done') {
completedRef.current = true
setState('complete')
} else if (eventName === 'error') {
setError(data.message)
setState('error')
}
} catch {
malformedCount++
if (malformedCount >= MALFORMED_THRESHOLD) {
const msg = 'Stream error β€” received malformed data from server'
setError(msg)
setState('error')
callbacks.error?.({ message: msg })
return
}
}
}
}
// Stream ended without a done event β€” treat as incomplete
if (firstEventRef.current && !completedRef.current) {
setError('Connection dropped β€” answer may be incomplete')
setState('error')
callbacks.error?.({ message: 'Connection dropped β€” answer may be incomplete' })
}
} catch (err) {
if ((err as Error).name === 'AbortError') return
const message = err instanceof Error ? err.message : 'Unknown error'
setError(message)
setState('error')
callbacks.error?.({ message })
} finally {
clearTimeout(timeoutId)
}
},
// eslint-disable-next-line react-hooks/exhaustive-deps
[],
)
const cancel = useCallback(() => {
abortRef.current?.abort()
setState('idle')
}, [])
return {
state,
error,
firstEventArrived: firstEventRef,
stream,
cancel,
}
}