File size: 3,310 Bytes
9dfccd9
 
 
 
68af3c5
9dfccd9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
beac0a3
 
9dfccd9
 
68af3c5
 
 
 
 
9dfccd9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import { useCallback, useEffect, useRef, useState } from 'react'
import { env } from '@/config/env'
import type { GraphNode, GraphEdge, GraphDoneEvent } from '@/types/api'

const MAX_RETRIES   = 1
const BASE_DELAY_MS = 1000

type Callbacks = {
  onNode:  (node: GraphNode) => void
  onEdge:  (edge: GraphEdge) => void
  onDone:  (summary: GraphDoneEvent) => void
  onError: (msg: string) => void
}

export type GraphStreamState = 'idle' | 'connecting' | 'streaming' | 'done' | 'error' | 'retrying'

export function useGraphStream() {
  const [gState, setGState]       = useState<GraphStreamState>('idle')
  const [retryCount, setRetry]    = useState(0)
  const firstNodeRef               = useRef(false)
  const wsRef                      = useRef<WebSocket | null>(null)
  const callbacksRef               = useRef<Callbacks | null>(null)
  const retryTimerRef              = useRef<ReturnType<typeof setTimeout> | null>(null)
  const activeRef                  = useRef(false)

  const connect = useCallback((callbacks: Callbacks, attempt = 0) => {
    callbacksRef.current = callbacks
    activeRef.current    = true
    firstNodeRef.current = false

    setGState('connecting')
    setRetry(attempt)

    const ws = new WebSocket(`${env.wsBaseUrl}/graph/stream`)
    wsRef.current = ws

    ws.onopen = () => setGState('streaming')

    ws.onmessage = (evt) => {
      try {
        const msg = JSON.parse(evt.data as string)
        if (msg.event === 'node') {
          if (!firstNodeRef.current) firstNodeRef.current = true
          callbacksRef.current?.onNode(msg as GraphNode)
        } else if (msg.event === 'edge') {
          callbacksRef.current?.onEdge(msg as GraphEdge)
        } else if (msg.event === 'done') {
          // Server is done — prevent onclose from triggering a reconnect
          activeRef.current = false
          setGState('done')
          callbacksRef.current?.onDone(msg as GraphDoneEvent)
        } else if (msg.event === 'error') {
          // Server signalled a terminal error — stop retrying
          activeRef.current = false
          setGState('error')
          callbacksRef.current?.onError(msg.message ?? 'Graph unavailable')
        }
      } catch {
        // Non-JSON frame — ignore
      }
    }

    ws.onerror = () => {
      callbacksRef.current?.onError('WebSocket error')
    }

    ws.onclose = () => {
      if (!activeRef.current) return
      if (attempt < MAX_RETRIES) {
        const delay = BASE_DELAY_MS * Math.pow(2, attempt)
        setGState('retrying')
        retryTimerRef.current = setTimeout(() => {
          if (activeRef.current && callbacksRef.current) {
            connect(callbacksRef.current, attempt + 1)
          }
        }, delay)
      } else {
        setGState('error')
        callbacksRef.current?.onError('Graph stream disconnected after max retries')
      }
    }
  }, [])

  const disconnect = useCallback(() => {
    activeRef.current = false
    if (retryTimerRef.current) clearTimeout(retryTimerRef.current)
    wsRef.current?.close()
    wsRef.current = null
    setGState('idle')
    setRetry(0)
  }, [])

  // Clean up on unmount
  useEffect(() => () => { disconnect() }, [disconnect])

  return {
    gState,
    retryCount,
    firstNodeArrived: firstNodeRef,
    connect,
    disconnect,
  }
}