Spaces:
Running
Running
| import { create } from "zustand"; | |
| import { api } from "@/lib/api"; | |
| import type { | |
| StreamEvent, | |
| AgentExecutionStatus, | |
| RunnerConfig, | |
| LLMProviderConfig, | |
| RunHistoryEntry, | |
| } from "@/types/execution"; | |
| type RunStatus = "idle" | "running" | "completed" | "error" | "cancelled"; | |
| interface ExecutionStore { | |
| runId: string | null; | |
| status: RunStatus; | |
| events: StreamEvent[]; | |
| agentStatuses: Record<string, AgentExecutionStatus>; | |
| tokenUsage: Record<string, number>; | |
| currentAgent: string | null; | |
| finalAnswer: string | null; | |
| totalTokens: number; | |
| totalTime: number; | |
| error: string | null; | |
| // WebSocket | |
| ws: WebSocket | null; | |
| // History | |
| runHistory: RunHistoryEntry[]; | |
| // Actions | |
| startExecution: ( | |
| graphId: string | null, | |
| graphData: unknown, | |
| taskQuery: string, | |
| config?: RunnerConfig | null, | |
| llmProvider?: LLMProviderConfig | null, | |
| ) => Promise<void>; | |
| cancelExecution: () => void; | |
| addEvent: (event: StreamEvent) => void; | |
| clearRun: () => void; | |
| fetchHistory: () => Promise<void>; | |
| getRunDetail: (runId: string) => Promise<RunHistoryEntry>; | |
| } | |
| export const useExecutionStore = create<ExecutionStore>((set, get) => ({ | |
| runId: null, | |
| status: "idle", | |
| events: [], | |
| agentStatuses: {}, | |
| tokenUsage: {}, | |
| currentAgent: null, | |
| finalAnswer: null, | |
| totalTokens: 0, | |
| totalTime: 0, | |
| error: null, | |
| ws: null, | |
| runHistory: [], | |
| startExecution: async (graphId, graphData, taskQuery, config, llmProvider) => { | |
| // Reset state | |
| set({ | |
| status: "running", | |
| events: [], | |
| agentStatuses: {}, | |
| tokenUsage: {}, | |
| currentAgent: null, | |
| finalAnswer: null, | |
| totalTokens: 0, | |
| totalTime: 0, | |
| error: null, | |
| }); | |
| try { | |
| const body: Record<string, unknown> = { task_query: taskQuery }; | |
| if (graphId) body.graph_id = graphId; | |
| if (graphData) body.graph = graphData; | |
| if (config) body.config = config; | |
| if (llmProvider) body.llm_provider = llmProvider; | |
| const res = await api.post<{ run_id: string }>("/execution/run", body); | |
| const runId = res.run_id; | |
| set({ runId }); | |
| // Connect WebSocket | |
| const protocol = window.location.protocol === "https:" ? "wss:" : "ws:"; | |
| const wsUrl = `${protocol}//${window.location.host}/ws/execution/${runId}`; | |
| const ws = new WebSocket(wsUrl); | |
| let receivedEvents = false; | |
| ws.onmessage = (event) => { | |
| try { | |
| const data: StreamEvent = JSON.parse(event.data); | |
| receivedEvents = true; | |
| get().addEvent(data); | |
| } catch { | |
| // ignore parse errors | |
| } | |
| }; | |
| ws.onclose = (event) => { | |
| const s = get(); | |
| if (s.status === "running") { | |
| // Normal close (code 1000) or after receiving events = completed | |
| if (event.code === 1000 || receivedEvents) { | |
| set({ status: "completed" }); | |
| } else { | |
| // Fetch run status from REST API as fallback | |
| api.get<{ status: string; events: StreamEvent[]; result: unknown }>(`/execution/${runId}`) | |
| .then((detail) => { | |
| if (detail.events) { | |
| detail.events.forEach((ev) => get().addEvent(ev)); | |
| } | |
| set({ status: detail.status === "error" ? "error" : "completed" }); | |
| }) | |
| .catch(() => { | |
| set({ error: "Connection lost", status: "error" }); | |
| }); | |
| } | |
| } | |
| set({ ws: null }); | |
| }; | |
| ws.onerror = () => { | |
| // Only set error if we never got any events (true connection failure) | |
| if (!receivedEvents && get().status === "running") { | |
| // Try REST API fallback before reporting error | |
| api.get<{ status: string; events: StreamEvent[]; result: unknown }>(`/execution/${runId}`) | |
| .then((detail) => { | |
| if (detail.events) { | |
| detail.events.forEach((ev) => get().addEvent(ev)); | |
| } | |
| set({ status: detail.status === "error" ? "error" : "completed" }); | |
| }) | |
| .catch(() => { | |
| set({ error: "WebSocket connection error", status: "error" }); | |
| }); | |
| } | |
| }; | |
| set({ ws }); | |
| } catch (e) { | |
| set({ error: String(e), status: "error" }); | |
| } | |
| }, | |
| cancelExecution: () => { | |
| const { ws, runId } = get(); | |
| if (ws && ws.readyState === WebSocket.OPEN) { | |
| ws.send(JSON.stringify({ action: "cancel" })); | |
| } | |
| if (runId) { | |
| api.delete(`/execution/${runId}`).catch(() => {}); | |
| } | |
| set({ status: "cancelled" }); | |
| }, | |
| addEvent: (event) => { | |
| set((s) => { | |
| const events = [...s.events, event]; | |
| const updates: Partial<ExecutionStore> = { events }; | |
| switch (event.event_type) { | |
| case "agent_start": | |
| if (event.agent_id) { | |
| updates.agentStatuses = { | |
| ...s.agentStatuses, | |
| [event.agent_id]: "running", | |
| }; | |
| updates.currentAgent = event.agent_id; | |
| } | |
| break; | |
| case "agent_output": | |
| if (event.agent_id) { | |
| updates.agentStatuses = { | |
| ...s.agentStatuses, | |
| [event.agent_id]: "completed", | |
| }; | |
| if (event.tokens_used) { | |
| updates.tokenUsage = { | |
| ...s.tokenUsage, | |
| [event.agent_id]: (s.tokenUsage[event.agent_id] || 0) + event.tokens_used, | |
| }; | |
| } | |
| updates.currentAgent = null; | |
| } | |
| break; | |
| case "agent_error": | |
| if (event.agent_id) { | |
| updates.agentStatuses = { | |
| ...s.agentStatuses, | |
| [event.agent_id]: event.will_retry ? "running" : "error", | |
| }; | |
| } | |
| break; | |
| case "run_end": | |
| updates.status = event.success ? "completed" : "error"; | |
| updates.finalAnswer = event.final_answer || null; | |
| updates.totalTokens = event.total_tokens || 0; | |
| updates.totalTime = event.total_time || 0; | |
| break; | |
| case "error": | |
| updates.status = "error"; | |
| updates.error = event.error || event.error_message || "Unknown error"; | |
| break; | |
| case "cancelled": | |
| updates.status = "cancelled"; | |
| break; | |
| case "early_stop": | |
| updates.status = "completed"; | |
| updates.finalAnswer = (event as any).reason || event.content || "Early stopped"; | |
| break; | |
| case "run_start": | |
| // Mark all agents in execution order as pending | |
| if (event.execution_order) { | |
| const statuses: Record<string, AgentExecutionStatus> = {}; | |
| event.execution_order.forEach((id) => { | |
| statuses[id] = "pending"; | |
| }); | |
| updates.agentStatuses = { ...s.agentStatuses, ...statuses }; | |
| } | |
| break; | |
| } | |
| return updates; | |
| }); | |
| }, | |
| clearRun: () => | |
| set({ | |
| runId: null, | |
| status: "idle", | |
| events: [], | |
| agentStatuses: {}, | |
| tokenUsage: {}, | |
| currentAgent: null, | |
| finalAnswer: null, | |
| totalTokens: 0, | |
| totalTime: 0, | |
| error: null, | |
| }), | |
| fetchHistory: async () => { | |
| try { | |
| const history = await api.get<RunHistoryEntry[]>("/execution/history/list"); | |
| set({ runHistory: history }); | |
| } catch { | |
| // ignore | |
| } | |
| }, | |
| getRunDetail: async (runId: string) => { | |
| return api.get<RunHistoryEntry>(`/execution/${runId}`); | |
| }, | |
| })); | |