import { create } from "zustand"; import { api } from "@/lib/api"; import type { StreamEvent, AgentExecutionStatus, RunnerConfig, LLMProviderConfig, RunHistoryEntry, } from "@/types/execution"; type RunStatus = "idle" | "running" | "completed" | "error" | "cancelled"; interface ExecutionStore { runId: string | null; status: RunStatus; events: StreamEvent[]; agentStatuses: Record; tokenUsage: Record; currentAgent: string | null; finalAnswer: string | null; totalTokens: number; totalTime: number; error: string | null; // WebSocket ws: WebSocket | null; // History runHistory: RunHistoryEntry[]; // Actions startExecution: ( graphId: string | null, graphData: unknown, taskQuery: string, config?: RunnerConfig | null, llmProvider?: LLMProviderConfig | null, ) => Promise; cancelExecution: () => void; addEvent: (event: StreamEvent) => void; clearRun: () => void; fetchHistory: () => Promise; getRunDetail: (runId: string) => Promise; } export const useExecutionStore = create((set, get) => ({ runId: null, status: "idle", events: [], agentStatuses: {}, tokenUsage: {}, currentAgent: null, finalAnswer: null, totalTokens: 0, totalTime: 0, error: null, ws: null, runHistory: [], startExecution: async (graphId, graphData, taskQuery, config, llmProvider) => { // Reset state set({ status: "running", events: [], agentStatuses: {}, tokenUsage: {}, currentAgent: null, finalAnswer: null, totalTokens: 0, totalTime: 0, error: null, }); try { const body: Record = { task_query: taskQuery }; if (graphId) body.graph_id = graphId; if (graphData) body.graph = graphData; if (config) body.config = config; if (llmProvider) body.llm_provider = llmProvider; const res = await api.post<{ run_id: string }>("/execution/run", body); const runId = res.run_id; set({ runId }); // Connect WebSocket const protocol = window.location.protocol === "https:" ? "wss:" : "ws:"; const wsUrl = `${protocol}//${window.location.host}/ws/execution/${runId}`; const ws = new WebSocket(wsUrl); let receivedEvents = false; ws.onmessage = (event) => { try { const data: StreamEvent = JSON.parse(event.data); receivedEvents = true; get().addEvent(data); } catch { // ignore parse errors } }; ws.onclose = (event) => { const s = get(); if (s.status === "running") { // Normal close (code 1000) or after receiving events = completed if (event.code === 1000 || receivedEvents) { set({ status: "completed" }); } else { // Fetch run status from REST API as fallback api.get<{ status: string; events: StreamEvent[]; result: unknown }>(`/execution/${runId}`) .then((detail) => { if (detail.events) { detail.events.forEach((ev) => get().addEvent(ev)); } set({ status: detail.status === "error" ? "error" : "completed" }); }) .catch(() => { set({ error: "Connection lost", status: "error" }); }); } } set({ ws: null }); }; ws.onerror = () => { // Only set error if we never got any events (true connection failure) if (!receivedEvents && get().status === "running") { // Try REST API fallback before reporting error api.get<{ status: string; events: StreamEvent[]; result: unknown }>(`/execution/${runId}`) .then((detail) => { if (detail.events) { detail.events.forEach((ev) => get().addEvent(ev)); } set({ status: detail.status === "error" ? "error" : "completed" }); }) .catch(() => { set({ error: "WebSocket connection error", status: "error" }); }); } }; set({ ws }); } catch (e) { set({ error: String(e), status: "error" }); } }, cancelExecution: () => { const { ws, runId } = get(); if (ws && ws.readyState === WebSocket.OPEN) { ws.send(JSON.stringify({ action: "cancel" })); } if (runId) { api.delete(`/execution/${runId}`).catch(() => {}); } set({ status: "cancelled" }); }, addEvent: (event) => { set((s) => { const events = [...s.events, event]; const updates: Partial = { events }; switch (event.event_type) { case "agent_start": if (event.agent_id) { updates.agentStatuses = { ...s.agentStatuses, [event.agent_id]: "running", }; updates.currentAgent = event.agent_id; } break; case "agent_output": if (event.agent_id) { updates.agentStatuses = { ...s.agentStatuses, [event.agent_id]: "completed", }; if (event.tokens_used) { updates.tokenUsage = { ...s.tokenUsage, [event.agent_id]: (s.tokenUsage[event.agent_id] || 0) + event.tokens_used, }; } updates.currentAgent = null; } break; case "agent_error": if (event.agent_id) { updates.agentStatuses = { ...s.agentStatuses, [event.agent_id]: event.will_retry ? "running" : "error", }; } break; case "run_end": updates.status = event.success ? "completed" : "error"; updates.finalAnswer = event.final_answer || null; updates.totalTokens = event.total_tokens || 0; updates.totalTime = event.total_time || 0; break; case "error": updates.status = "error"; updates.error = event.error || event.error_message || "Unknown error"; break; case "cancelled": updates.status = "cancelled"; break; case "early_stop": updates.status = "completed"; updates.finalAnswer = (event as any).reason || event.content || "Early stopped"; break; case "run_start": // Mark all agents in execution order as pending if (event.execution_order) { const statuses: Record = {}; event.execution_order.forEach((id) => { statuses[id] = "pending"; }); updates.agentStatuses = { ...s.agentStatuses, ...statuses }; } break; } return updates; }); }, clearRun: () => set({ runId: null, status: "idle", events: [], agentStatuses: {}, tokenUsage: {}, currentAgent: null, finalAnswer: null, totalTokens: 0, totalTime: 0, error: null, }), fetchHistory: async () => { try { const history = await api.get("/execution/history/list"); set({ runHistory: history }); } catch { // ignore } }, getRunDetail: async (runId: string) => { return api.get(`/execution/${runId}`); }, }));