gMAS / web_ui /frontend /src /stores /executionStore.ts
Артём Боярских
feat: added dynamic topologies, new templates and conditional edges
5cdde73
import { create } from "zustand";
import { api } from "@/lib/api";
import type {
StreamEvent,
AgentExecutionStatus,
RunnerConfig,
LLMProviderConfig,
RunHistoryEntry,
} from "@/types/execution";
type RunStatus = "idle" | "running" | "completed" | "error" | "cancelled";
interface ExecutionStore {
runId: string | null;
status: RunStatus;
events: StreamEvent[];
agentStatuses: Record<string, AgentExecutionStatus>;
tokenUsage: Record<string, number>;
currentAgent: string | null;
finalAnswer: string | null;
totalTokens: number;
totalTime: number;
error: string | null;
// WebSocket
ws: WebSocket | null;
// History
runHistory: RunHistoryEntry[];
// Actions
startExecution: (
graphId: string | null,
graphData: unknown,
taskQuery: string,
config?: RunnerConfig | null,
llmProvider?: LLMProviderConfig | null,
) => Promise<void>;
cancelExecution: () => void;
addEvent: (event: StreamEvent) => void;
clearRun: () => void;
fetchHistory: () => Promise<void>;
getRunDetail: (runId: string) => Promise<RunHistoryEntry>;
}
export const useExecutionStore = create<ExecutionStore>((set, get) => ({
runId: null,
status: "idle",
events: [],
agentStatuses: {},
tokenUsage: {},
currentAgent: null,
finalAnswer: null,
totalTokens: 0,
totalTime: 0,
error: null,
ws: null,
runHistory: [],
startExecution: async (graphId, graphData, taskQuery, config, llmProvider) => {
// Reset state
set({
status: "running",
events: [],
agentStatuses: {},
tokenUsage: {},
currentAgent: null,
finalAnswer: null,
totalTokens: 0,
totalTime: 0,
error: null,
});
try {
const body: Record<string, unknown> = { task_query: taskQuery };
if (graphId) body.graph_id = graphId;
if (graphData) body.graph = graphData;
if (config) body.config = config;
if (llmProvider) body.llm_provider = llmProvider;
const res = await api.post<{ run_id: string }>("/execution/run", body);
const runId = res.run_id;
set({ runId });
// Connect WebSocket
const protocol = window.location.protocol === "https:" ? "wss:" : "ws:";
const wsUrl = `${protocol}//${window.location.host}/ws/execution/${runId}`;
const ws = new WebSocket(wsUrl);
let receivedEvents = false;
ws.onmessage = (event) => {
try {
const data: StreamEvent = JSON.parse(event.data);
receivedEvents = true;
get().addEvent(data);
} catch {
// ignore parse errors
}
};
ws.onclose = (event) => {
const s = get();
if (s.status === "running") {
// Normal close (code 1000) or after receiving events = completed
if (event.code === 1000 || receivedEvents) {
set({ status: "completed" });
} else {
// Fetch run status from REST API as fallback
api.get<{ status: string; events: StreamEvent[]; result: unknown }>(`/execution/${runId}`)
.then((detail) => {
if (detail.events) {
detail.events.forEach((ev) => get().addEvent(ev));
}
set({ status: detail.status === "error" ? "error" : "completed" });
})
.catch(() => {
set({ error: "Connection lost", status: "error" });
});
}
}
set({ ws: null });
};
ws.onerror = () => {
// Only set error if we never got any events (true connection failure)
if (!receivedEvents && get().status === "running") {
// Try REST API fallback before reporting error
api.get<{ status: string; events: StreamEvent[]; result: unknown }>(`/execution/${runId}`)
.then((detail) => {
if (detail.events) {
detail.events.forEach((ev) => get().addEvent(ev));
}
set({ status: detail.status === "error" ? "error" : "completed" });
})
.catch(() => {
set({ error: "WebSocket connection error", status: "error" });
});
}
};
set({ ws });
} catch (e) {
set({ error: String(e), status: "error" });
}
},
cancelExecution: () => {
const { ws, runId } = get();
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ action: "cancel" }));
}
if (runId) {
api.delete(`/execution/${runId}`).catch(() => {});
}
set({ status: "cancelled" });
},
addEvent: (event) => {
set((s) => {
const events = [...s.events, event];
const updates: Partial<ExecutionStore> = { events };
switch (event.event_type) {
case "agent_start":
if (event.agent_id) {
updates.agentStatuses = {
...s.agentStatuses,
[event.agent_id]: "running",
};
updates.currentAgent = event.agent_id;
}
break;
case "agent_output":
if (event.agent_id) {
updates.agentStatuses = {
...s.agentStatuses,
[event.agent_id]: "completed",
};
if (event.tokens_used) {
updates.tokenUsage = {
...s.tokenUsage,
[event.agent_id]: (s.tokenUsage[event.agent_id] || 0) + event.tokens_used,
};
}
updates.currentAgent = null;
}
break;
case "agent_error":
if (event.agent_id) {
updates.agentStatuses = {
...s.agentStatuses,
[event.agent_id]: event.will_retry ? "running" : "error",
};
}
break;
case "run_end":
updates.status = event.success ? "completed" : "error";
updates.finalAnswer = event.final_answer || null;
updates.totalTokens = event.total_tokens || 0;
updates.totalTime = event.total_time || 0;
break;
case "error":
updates.status = "error";
updates.error = event.error || event.error_message || "Unknown error";
break;
case "cancelled":
updates.status = "cancelled";
break;
case "early_stop":
updates.status = "completed";
updates.finalAnswer = (event as any).reason || event.content || "Early stopped";
break;
case "run_start":
// Mark all agents in execution order as pending
if (event.execution_order) {
const statuses: Record<string, AgentExecutionStatus> = {};
event.execution_order.forEach((id) => {
statuses[id] = "pending";
});
updates.agentStatuses = { ...s.agentStatuses, ...statuses };
}
break;
}
return updates;
});
},
clearRun: () =>
set({
runId: null,
status: "idle",
events: [],
agentStatuses: {},
tokenUsage: {},
currentAgent: null,
finalAnswer: null,
totalTokens: 0,
totalTime: 0,
error: null,
}),
fetchHistory: async () => {
try {
const history = await api.get<RunHistoryEntry[]>("/execution/history/list");
set({ runHistory: history });
} catch {
// ignore
}
},
getRunDetail: async (runId: string) => {
return api.get<RunHistoryEntry>(`/execution/${runId}`);
},
}));