Spaces:
Running
Running
| "use client"; | |
| import { useEffect, useMemo, useRef, useState } from "react"; | |
| import { | |
| API_BASE, | |
| PHASE_NAMES, | |
| SSE_EVENT_TYPES, | |
| SSE_TO_PHASE_INDEX, | |
| type AnySseEventType, | |
| type PhaseState, | |
| type PhaseStatus, | |
| } from "@/lib/api"; | |
| /** Last-seen SSE event surfaced to UI consumers (drives trigger button labels). */ | |
| export interface LatestSseEvent { | |
| type: AnySseEventType | "hello" | "heartbeat"; | |
| data: Record<string, unknown>; | |
| receivedAt: number; | |
| } | |
| interface UseEventStreamReturn { | |
| phases: PhaseState[] | undefined; | |
| connected: boolean; | |
| latest: LatestSseEvent | null; | |
| /** All events received this session, newest-last. Capped at 200. */ | |
| history: LatestSseEvent[]; | |
| /** User-facing connection error message; non-null when retries exhausted. */ | |
| connectionError: string | null; | |
| } | |
| const MAX_HISTORY = 200; | |
| /** | |
| * Backoff schedule (milliseconds) used when the SSE connection drops with a | |
| * suspected rate-limit (429). After exhausting these delays we surface a | |
| * connection-error message and stop retrying — the user can refresh manually. | |
| * Native EventSource auto-reconnect (~3s) handles non-429 transient errors. | |
| */ | |
| const RATE_LIMIT_BACKOFF_MS: readonly number[] = [5_000, 15_000, 60_000]; | |
| function nextStatus(prev: PhaseStatus | undefined, incoming: PhaseStatus): PhaseStatus { | |
| // "completed" and "failed" are sticky against earlier "running"/"pending". | |
| if (prev === "completed" && incoming === "running") return "completed"; | |
| if (prev === "failed") return "failed"; | |
| return incoming; | |
| } | |
| function applyEvent( | |
| current: PhaseState[] | undefined, | |
| type: AnySseEventType, | |
| data: Record<string, unknown>, | |
| ): PhaseState[] { | |
| // Initialise a baseline 7-phase scaffold the first time we receive any event. | |
| const base: PhaseState[] = | |
| current && current.length === PHASE_NAMES.length | |
| ? [...current] | |
| : PHASE_NAMES.map((name) => ({ name, status: "pending" as PhaseStatus })); | |
| const idx = SSE_TO_PHASE_INDEX[type]; | |
| if (idx === undefined) return base; | |
| const now = new Date().toISOString(); | |
| // Phases strictly before the active one become "completed". | |
| for (let i = 0; i < idx; i += 1) { | |
| base[i] = { | |
| ...base[i], | |
| status: nextStatus(base[i].status, "completed"), | |
| completedAt: base[i].completedAt ?? now, | |
| }; | |
| } | |
| // Phase-specific status transitions. | |
| if (type === "event.created") { | |
| // The event row is freshly created with a placeholder title — RSS poll | |
| // + Haiku scoring are still happening in the background. Show phase 0 | |
| // (Event Ingestion) as RUNNING with the placeholder label so the user | |
| // sees the news-fetch step animate. `event.updated` will flip it to | |
| // completed once the real title + scoring metadata land. | |
| base[idx] = { | |
| ...base[idx], | |
| status: nextStatus(base[idx].status, "running"), | |
| startedAt: base[idx].startedAt ?? now, | |
| details: { ...(base[idx].details ?? {}), ...data }, | |
| }; | |
| } else if (type === "event.updated") { | |
| // RSS poll + Haiku scoring done — real title + sources + scoring | |
| // metadata arrived. Mark phase 0 completed and merge new data into | |
| // phase details so the page can re-render the title. | |
| base[idx] = { | |
| ...base[idx], | |
| status: nextStatus(base[idx].status, "completed"), | |
| startedAt: base[idx].startedAt ?? now, | |
| completedAt: now, | |
| details: { ...(base[idx].details ?? {}), ...data }, | |
| }; | |
| } else if (type === "auction.opened" || type === "bid.submitted") { | |
| base[idx] = { | |
| ...base[idx], | |
| status: nextStatus(base[idx].status, "running"), | |
| startedAt: base[idx].startedAt ?? now, | |
| details: { ...(base[idx].details ?? {}), ...data }, | |
| }; | |
| } else if (type === "auction.settled") { | |
| base[idx] = { | |
| ...base[idx], | |
| status: nextStatus(base[idx].status, "completed"), | |
| startedAt: base[idx].startedAt ?? now, | |
| completedAt: now, | |
| details: { ...(base[idx].details ?? {}), ...data }, | |
| }; | |
| } else if (type === "translation.completed") { | |
| // Phase 2 stays "running" while debate sub-phases (critic / moderator / | |
| // refine) fire; we only mark it "completed" once `refine.completed` | |
| // arrives. If the backend skips the debate layers entirely (legacy | |
| // pipeline), the absence of those events leaves phase 2 in "running" | |
| // until `quality.verdict` arrives and progresses past it — matching | |
| // pre-debate behaviour. | |
| const prevSub = | |
| (base[idx].details?.subPhases as Record<string, PhaseStatus> | undefined) ?? {}; | |
| base[idx] = { | |
| ...base[idx], | |
| status: nextStatus(base[idx].status, "running"), | |
| startedAt: base[idx].startedAt ?? now, | |
| details: { | |
| ...(base[idx].details ?? {}), | |
| ...data, | |
| subPhases: { | |
| ...prevSub, | |
| "L1 Analysts": "completed", | |
| "L2 Translators": "completed", | |
| "L3 Critics": prevSub["L3 Critics"] ?? "running", | |
| }, | |
| }, | |
| }; | |
| } else if ( | |
| type === "critic.completed" || | |
| type === "moderator.verdict" || | |
| type === "refine.completed" | |
| ) { | |
| const prevSub = | |
| (base[idx].details?.subPhases as Record<string, PhaseStatus> | undefined) ?? {}; | |
| const nextSub: Record<string, PhaseStatus> = { ...prevSub }; | |
| if (type === "critic.completed") { | |
| nextSub["L1 Analysts"] = "completed"; | |
| nextSub["L2 Translators"] = "completed"; | |
| nextSub["L3 Critics"] = "completed"; | |
| nextSub["L4 Moderator"] = "running"; | |
| } else if (type === "moderator.verdict") { | |
| nextSub["L3 Critics"] = "completed"; | |
| nextSub["L4 Moderator"] = "completed"; | |
| nextSub["L5 Refine"] = "running"; | |
| } else { | |
| nextSub["L4 Moderator"] = "completed"; | |
| nextSub["L5 Refine"] = "completed"; | |
| } | |
| const phase2Completed = type === "refine.completed"; | |
| base[idx] = { | |
| ...base[idx], | |
| status: phase2Completed | |
| ? nextStatus(base[idx].status, "completed") | |
| : nextStatus(base[idx].status, "running"), | |
| startedAt: base[idx].startedAt ?? now, | |
| completedAt: phase2Completed ? now : base[idx].completedAt, | |
| details: { | |
| ...(base[idx].details ?? {}), | |
| ...data, | |
| subPhases: nextSub, | |
| }, | |
| }; | |
| } else if (type === "quality.verdict") { | |
| const verdict = String(data.verdict ?? ""); | |
| const lifecycleRejected = verdict !== "" && verdict !== "PASS"; | |
| // Judge phase ran successfully — it produced a verdict. The phase | |
| // itself is "completed" regardless of which way the verdict went. | |
| // This matches the REST /events snapshot (phase 4 status=completed, | |
| // top-level status=REJECTED) and is what the user sees when they | |
| // reload the page after lifecycle terminates. | |
| base[idx] = { | |
| ...base[idx], | |
| status: "completed", | |
| completedAt: now, | |
| details: { ...(base[idx].details ?? {}), ...data }, | |
| }; | |
| // If the judges rejected, the downstream phases (Anchor / Polymarket / | |
| // Streaming) are failed at the lifecycle level even though the backend | |
| // still emits onchain.committed / polymarket.submitted signals for | |
| // bookkeeping. Pre-mark them "failed" so the sticky `nextStatus` rule | |
| // (failed-wins) keeps them visually correct when those follow-up | |
| // events arrive. | |
| if (lifecycleRejected) { | |
| for (let i = idx + 1; i < base.length; i += 1) { | |
| base[i] = { ...base[i], status: "failed", completedAt: now }; | |
| } | |
| } | |
| } else if (type === "onchain.committed") { | |
| base[idx] = { | |
| ...base[idx], | |
| status: nextStatus(base[idx].status, "completed"), | |
| completedAt: now, | |
| details: { ...(base[idx].details ?? {}), ...data }, | |
| }; | |
| } else if (type === "polymarket.submitted") { | |
| base[idx] = { | |
| ...base[idx], | |
| status: nextStatus(base[idx].status, "completed"), | |
| completedAt: now, | |
| details: { ...(base[idx].details ?? {}), ...data }, | |
| }; | |
| } else if (type === "builder_fee.accrued") { | |
| base[idx] = { | |
| ...base[idx], | |
| status: nextStatus(base[idx].status, "running"), | |
| startedAt: base[idx].startedAt ?? now, | |
| details: { ...(base[idx].details ?? {}), ...data }, | |
| }; | |
| } else if (type === "event.finalized") { | |
| // `event.finalized` only flips the phase to "completed" when the | |
| // lifecycle ended cleanly. If the judges already marked this phase | |
| // failed (because verdict ≠ PASS), the sticky-failed rule in | |
| // nextStatus keeps it failed — matching what the REST /events | |
| // endpoint returns and the user-visible REJECTED badge. | |
| base[idx] = { | |
| ...base[idx], | |
| status: nextStatus(base[idx].status, "completed"), | |
| completedAt: now, | |
| details: { ...(base[idx].details ?? {}), ...data }, | |
| }; | |
| } | |
| return base; | |
| } | |
| /** | |
| * Subscribe to the backend SSE stream and reduce all 10 lifecycle event types | |
| * into a 7-phase array. Also surfaces `latest` and `history` so trigger flows | |
| * can display progress labels driven by named events. | |
| * | |
| * Backend uses sse-starlette which emits **named** events via the `event:` | |
| * field; the default `onmessage` handler only catches anonymous messages, so | |
| * we register listeners for each named type explicitly. | |
| */ | |
| export function useEventStream(eventId?: string): UseEventStreamReturn { | |
| const [phases, setPhases] = useState<PhaseState[] | undefined>(undefined); | |
| const [connected, setConnected] = useState(false); | |
| const [latest, setLatest] = useState<LatestSseEvent | null>(null); | |
| const [history, setHistory] = useState<LatestSseEvent[]>([]); | |
| const [connectionError, setConnectionError] = useState<string | null>(null); | |
| const filterEventId = useRef<string | undefined>(eventId); | |
| filterEventId.current = eventId; | |
| useEffect(() => { | |
| if (typeof window === "undefined") return; | |
| const url = eventId | |
| ? `${API_BASE}/sse/events?event_id=${encodeURIComponent(eventId)}` | |
| : `${API_BASE}/sse/events`; | |
| let cancelled = false; | |
| let currentSource: EventSource | null = null; | |
| let reconnectTimer: ReturnType<typeof setTimeout> | null = null; | |
| let backoffIndex = 0; | |
| // Track whether we've ever opened successfully since the last error — | |
| // a clean open resets the backoff schedule. | |
| let hadSuccessfulOpen = false; | |
| let cleanup: (() => void) | null = null; | |
| const allTypes: Array<AnySseEventType | "hello" | "heartbeat"> = [ | |
| ...SSE_EVENT_TYPES, | |
| "hello", | |
| "heartbeat", | |
| ]; | |
| const handle = (type: AnySseEventType | "hello" | "heartbeat", raw: string) => { | |
| let data: Record<string, unknown> = {}; | |
| try { | |
| data = JSON.parse(raw || "{}") as Record<string, unknown>; | |
| } catch { | |
| return; | |
| } | |
| // Optional eventId filter — backend may broadcast all events on one stream. | |
| if ( | |
| filterEventId.current && | |
| data.event_id !== undefined && | |
| String(data.event_id) !== String(filterEventId.current) | |
| ) { | |
| return; | |
| } | |
| const entry: LatestSseEvent = { type, data, receivedAt: Date.now() }; | |
| setLatest(entry); | |
| setHistory((prev) => { | |
| const next = [...prev, entry]; | |
| return next.length > MAX_HISTORY ? next.slice(next.length - MAX_HISTORY) : next; | |
| }); | |
| if (type !== "hello" && type !== "heartbeat") { | |
| setPhases((prev) => applyEvent(prev, type, data)); | |
| } | |
| }; | |
| /** | |
| * Probe the SSE URL with a HEAD request to detect 429 Too Many Requests. | |
| * EventSource doesn't expose the HTTP status code on error — without | |
| * this probe the hook can't distinguish a transient drop from a rate | |
| * limit. The probe is cheap (one HEAD request) and only fires after a | |
| * suspected failure. | |
| */ | |
| const isRateLimited = async (): Promise<boolean> => { | |
| try { | |
| const res = await fetch(url, { | |
| method: "HEAD", | |
| // Prevent the browser caching a previous 200 — we want the live status. | |
| cache: "no-store", | |
| }); | |
| return res.status === 429; | |
| } catch { | |
| return false; | |
| } | |
| }; | |
| const scheduleReconnect = (delayMs: number) => { | |
| if (cancelled) return; | |
| if (reconnectTimer) clearTimeout(reconnectTimer); | |
| reconnectTimer = setTimeout(() => { | |
| if (cancelled) return; | |
| open(); | |
| }, delayMs); | |
| }; | |
| const open = () => { | |
| if (cancelled) return; | |
| let source: EventSource | null = null; | |
| try { | |
| source = new EventSource(url); | |
| } catch { | |
| // Synchronous EventSource ctor failure — give up immediately. | |
| setConnectionError("Unable to open event stream. Please refresh the page."); | |
| return; | |
| } | |
| const es = source; | |
| currentSource = es; | |
| const onOpen = () => { | |
| if (cancelled) return; | |
| hadSuccessfulOpen = true; | |
| backoffIndex = 0; | |
| setConnected(true); | |
| setConnectionError(null); | |
| }; | |
| const onError = async () => { | |
| if (cancelled) return; | |
| setConnected(false); | |
| // Native EventSource auto-reconnects (~3s) on transient errors when | |
| // readyState becomes CONNECTING. If it transitions to CLOSED, the | |
| // browser has given up — we must reconnect manually. A 429 from the | |
| // backend also lands us here, so probe before scheduling backoff. | |
| if (es.readyState !== EventSource.CLOSED) { | |
| // Browser will auto-reconnect; nothing to do. | |
| return; | |
| } | |
| // Tear down listeners on the closed source so we don't leak handlers. | |
| teardownListeners(); | |
| currentSource = null; | |
| const rateLimited = await isRateLimited(); | |
| if (cancelled) return; | |
| if (rateLimited) { | |
| if (backoffIndex >= RATE_LIMIT_BACKOFF_MS.length) { | |
| setConnectionError( | |
| "Connection lost — too many reconnect attempts. Please refresh the page.", | |
| ); | |
| return; | |
| } | |
| const delay = RATE_LIMIT_BACKOFF_MS[backoffIndex]; | |
| backoffIndex += 1; | |
| scheduleReconnect(delay); | |
| return; | |
| } | |
| // Non-429 closed connection — reopen immediately if we had a | |
| // successful open before; otherwise apply a small delay to avoid | |
| // hammering the server on a hard outage. | |
| scheduleReconnect(hadSuccessfulOpen ? 1_000 : 3_000); | |
| }; | |
| es.addEventListener("open", onOpen); | |
| es.addEventListener("error", onError); | |
| const namedListeners: Array<{ | |
| type: AnySseEventType | "hello" | "heartbeat"; | |
| fn: (ev: MessageEvent<string>) => void; | |
| }> = allTypes.map((t) => ({ | |
| type: t, | |
| fn: (ev: MessageEvent<string>) => handle(t, ev.data ?? ""), | |
| })); | |
| namedListeners.forEach(({ type, fn }) => | |
| es.addEventListener(type as string, fn as EventListener), | |
| ); | |
| // Fallback for anonymous `message` events (legacy shape with {phase, phases}). | |
| const onAnonymous = (ev: MessageEvent<string>) => { | |
| try { | |
| const data = JSON.parse(ev.data ?? "{}") as { | |
| phase?: PhaseState; | |
| phases?: PhaseState[]; | |
| }; | |
| if (data.phases) setPhases(data.phases); | |
| else if (data.phase) { | |
| setPhases((prev) => { | |
| const list = prev ? [...prev] : []; | |
| const idx = list.findIndex((p) => p.name === data.phase!.name); | |
| if (idx >= 0) list[idx] = data.phase!; | |
| else list.push(data.phase!); | |
| return list; | |
| }); | |
| } | |
| } catch { | |
| // ignore | |
| } | |
| }; | |
| es.addEventListener("message", onAnonymous as EventListener); | |
| const teardownListeners = () => { | |
| namedListeners.forEach(({ type, fn }) => | |
| es.removeEventListener(type as string, fn as EventListener), | |
| ); | |
| es.removeEventListener("message", onAnonymous as EventListener); | |
| es.removeEventListener("open", onOpen); | |
| es.removeEventListener("error", onError as EventListener); | |
| }; | |
| cleanup = () => { | |
| teardownListeners(); | |
| es.close(); | |
| }; | |
| }; | |
| open(); | |
| return () => { | |
| cancelled = true; | |
| if (reconnectTimer) { | |
| clearTimeout(reconnectTimer); | |
| reconnectTimer = null; | |
| } | |
| if (cleanup) cleanup(); | |
| if (currentSource && currentSource.readyState !== EventSource.CLOSED) { | |
| currentSource.close(); | |
| } | |
| }; | |
| }, [eventId]); | |
| return useMemo( | |
| () => ({ phases, connected, latest, history, connectionError }), | |
| [phases, connected, latest, history, connectionError], | |
| ); | |
| } | |