import { useCallback, useEffect, useRef } from 'react'; import { useAgentStore, type PlanItem } from '@/store/agentStore'; import { useSessionStore } from '@/store/sessionStore'; import { useLayoutStore } from '@/store/layoutStore'; import { getWebSocketUrl } from '@/utils/api'; import { logger } from '@/utils/logger'; import type { AgentEvent } from '@/types/events'; import type { Message, TraceLog } from '@/types/agent'; const WS_RECONNECT_DELAY = 1000; const WS_MAX_RECONNECT_DELAY = 30000; const WS_MAX_RETRIES = 5; interface UseAgentWebSocketOptions { sessionId: string | null; onReady?: () => void; onError?: (error: string) => void; onSessionDead?: (sessionId: string) => void; } export function useAgentWebSocket({ sessionId, onReady, onError, onSessionDead, }: UseAgentWebSocketOptions) { const wsRef = useRef(null); const reconnectTimeoutRef = useRef(null); const reconnectDelayRef = useRef(WS_RECONNECT_DELAY); const retriesRef = useRef(0); const { addMessage, updateMessage, appendToMessage, setProcessing, setConnected, setError, addTraceLog, updateTraceLog, clearTraceLogs, setPanelContent, setPanelTab, setActivePanelTab, clearPanelTabs, setPlan, setCurrentTurnMessageId, updateCurrentTurnTrace, removeLastTurn, } = useAgentStore(); const { setRightPanelOpen, setLeftSidebarOpen } = useLayoutStore(); const { setSessionActive } = useSessionStore(); const handleEvent = useCallback( (event: AgentEvent) => { if (!sessionId) return; switch (event.event_type) { case 'ready': setConnected(true); setProcessing(false); setSessionActive(sessionId, true); onReady?.(); break; case 'processing': setProcessing(true); clearTraceLogs(); // Don't clear panel tabs here - they should persist during approval flow // Tabs will be cleared when a new tool_call sets up new content setCurrentTurnMessageId(null); // Start a new turn break; // ── Streaming: individual token chunks ────────────────── case 'assistant_chunk': { const delta = (event.data?.content as string) || ''; if (!delta) break; const currentTurnMsgId = useAgentStore.getState().currentTurnMessageId; if (currentTurnMsgId) { // Append delta to the existing streaming message appendToMessage(sessionId, currentTurnMsgId, delta); } else { // First chunk — create the message (with pending traces if any) const currentTrace = useAgentStore.getState().traceLogs; const messageId = `msg_${Date.now()}`; const segments: Array<{ type: 'text' | 'tools'; content?: string; tools?: typeof currentTrace }> = []; if (currentTrace.length > 0) { segments.push({ type: 'tools', tools: [...currentTrace] }); clearTraceLogs(); } segments.push({ type: 'text', content: delta }); const message: Message = { id: messageId, role: 'assistant', content: delta, timestamp: new Date().toISOString(), segments, }; addMessage(sessionId, message); setCurrentTurnMessageId(messageId); } break; } // ── Streaming ended (text is already rendered via chunks) ─ case 'assistant_stream_end': // Nothing to do — chunks already built the message. // This event is just a signal that the stream is complete. break; // ── Legacy non-streaming full message (kept for backwards compat) case 'assistant_message': { const content = (event.data?.content as string) || ''; const currentTrace = useAgentStore.getState().traceLogs; const currentTurnMsgId = useAgentStore.getState().currentTurnMessageId; if (currentTurnMsgId) { // Update existing message - add segments chronologically const messages = useAgentStore.getState().getMessages(sessionId); const existingMsg = messages.find(m => m.id === currentTurnMsgId); if (existingMsg) { const segments = existingMsg.segments ? [...existingMsg.segments] : []; // If there are pending traces, add them as a tools segment first if (currentTrace.length > 0) { segments.push({ type: 'tools', tools: [...currentTrace] }); clearTraceLogs(); } // Add the new text segment if (content) { segments.push({ type: 'text', content }); } updateMessage(sessionId, currentTurnMsgId, { content: existingMsg.content + '\n\n' + content, segments, }); } } else { // Create new message const messageId = `msg_${Date.now()}`; const segments: Array<{ type: 'text' | 'tools'; content?: string; tools?: typeof currentTrace }> = []; // Add any pending traces first if (currentTrace.length > 0) { segments.push({ type: 'tools', tools: [...currentTrace] }); clearTraceLogs(); } // Add the text if (content) { segments.push({ type: 'text', content }); } const message: Message = { id: messageId, role: 'assistant', content, timestamp: new Date().toISOString(), segments, }; addMessage(sessionId, message); setCurrentTurnMessageId(messageId); } break; } case 'tool_call': { const toolName = (event.data?.tool as string) || 'unknown'; const toolCallId = (event.data?.tool_call_id as string) || ''; const args = (event.data?.arguments as Record) || {}; // Don't display plan_tool in trace logs (it shows up elsewhere in the UI) if (toolName !== 'plan_tool') { const log: TraceLog = { id: `tool_${Date.now()}_${toolCallId}`, toolCallId, type: 'call', text: `Agent is executing ${toolName}...`, tool: toolName, timestamp: new Date().toISOString(), completed: false, args, }; addTraceLog(log); // If no assistant message exists for this turn, create one now // so the ToolCallGroup renders immediately in the chat flow. const currentTurnMsgId = useAgentStore.getState().currentTurnMessageId; if (!currentTurnMsgId) { const messageId = `msg_${Date.now()}`; const currentTrace = useAgentStore.getState().traceLogs; addMessage(sessionId, { id: messageId, role: 'assistant', content: '', timestamp: new Date().toISOString(), segments: [{ type: 'tools', tools: [...currentTrace] }], }); setCurrentTurnMessageId(messageId); clearTraceLogs(); } else { updateCurrentTurnTrace(sessionId); } } // Auto-expand Right Panel for specific tools if (toolName === 'hf_jobs' && (args.operation === 'run' || args.operation === 'scheduled run') && args.script) { // Clear any existing tabs from previous jobs before setting new script clearPanelTabs(); // Use tab system for jobs - add script tab immediately setPanelTab({ id: 'script', title: 'Script', content: args.script, language: 'python', parameters: args }); setActivePanelTab('script'); setRightPanelOpen(true); setLeftSidebarOpen(false); } else if (toolName === 'hf_repo_files' && args.operation === 'upload' && args.content) { setPanelContent({ title: `File Upload: ${args.path || 'unnamed'}`, content: args.content, parameters: args, language: args.path?.endsWith('.py') ? 'python' : undefined }); setRightPanelOpen(true); setLeftSidebarOpen(false); } logger.log('Tool call:', toolName, args); break; } case 'tool_output': { const toolName = (event.data?.tool as string) || 'unknown'; const toolCallId = (event.data?.tool_call_id as string) || ''; const output = (event.data?.output as string) || ''; const success = event.data?.success as boolean; // Mark the corresponding trace log as completed and store the output. // If it had a pending approval, mark it as approved (tool_output means it ran). const prevLog = useAgentStore.getState().traceLogs.find( (l) => l.toolCallId === toolCallId ); const wasApproval = prevLog?.approvalStatus === 'pending'; updateTraceLog(toolCallId, toolName, { completed: true, output, success, ...(wasApproval ? { approvalStatus: 'approved' as const } : {}), }); updateCurrentTurnTrace(sessionId); // For hf_jobs: parse job output and enrich the TraceLog with job info if (toolName === 'hf_jobs' && output) { const updates: Partial = { approvalStatus: 'approved' as const }; // Parse job URL const urlMatch = output.match(/\*\*View at:\*\*\s*(https:\/\/[^\s\n]+)/); if (urlMatch) updates.jobUrl = urlMatch[1]; // Parse job status const statusMatch = output.match(/\*\*Final Status:\*\*\s*([^\n]+)/); if (statusMatch) updates.jobStatus = statusMatch[1].trim(); // Parse logs if (output.includes('**Logs:**')) { const parts = output.split('**Logs:**'); if (parts.length > 1) { const codeBlockMatch = parts[1].trim().match(/```([\s\S]*?)```/); if (codeBlockMatch) updates.jobLogs = codeBlockMatch[1].trim(); } } updateTraceLog(toolCallId, toolName, updates); updateCurrentTurnTrace(sessionId); // Add output tab so the user can see results (especially errors) setPanelTab({ id: 'output', title: 'Output', content: output, language: 'markdown', }); // Auto-switch to output tab on failure so errors are immediately visible if (!success) { setActivePanelTab('output'); } } // Don't create message bubbles for tool outputs - they only show in trace logs logger.log('Tool output:', toolName, success); break; } case 'tool_log': { const toolName = (event.data?.tool as string) || 'unknown'; const log = (event.data?.log as string) || ''; if (toolName === 'hf_jobs') { const currentTabs = useAgentStore.getState().panelTabs; const logsTab = currentTabs.find(t => t.id === 'logs'); // Append to existing logs tab or create new one const newContent = logsTab ? logsTab.content + '\n' + log : '--- Job execution started ---\n' + log; setPanelTab({ id: 'logs', title: 'Logs', content: newContent, language: 'text' }); // Auto-switch to logs tab when logs start streaming setActivePanelTab('logs'); if (!useLayoutStore.getState().isRightPanelOpen) { setRightPanelOpen(true); } } break; } case 'plan_update': { const plan = (event.data?.plan as PlanItem[]) || []; setPlan(plan); if (!useLayoutStore.getState().isRightPanelOpen) { setRightPanelOpen(true); } break; } case 'approval_required': { const tools = event.data?.tools as Array<{ tool: string; arguments: Record; tool_call_id: string; }>; // Create or update trace logs for approval tools. // The backend only sends tool_call events for non-approval tools, // so we must create TraceLogs here for approval-requiring tools. if (tools) { for (const t of tools) { // Check if a TraceLog already exists (shouldn't, but be safe) const existing = useAgentStore.getState().traceLogs.find( (log) => log.toolCallId === t.tool_call_id ); if (!existing) { addTraceLog({ id: `tool_${Date.now()}_${t.tool_call_id}`, toolCallId: t.tool_call_id, type: 'call', text: `Approval required for ${t.tool}`, tool: t.tool, timestamp: new Date().toISOString(), completed: false, args: t.arguments as Record, approvalStatus: 'pending', }); } else { updateTraceLog(t.tool_call_id, t.tool, { approvalStatus: 'pending', args: t.arguments as Record, }); } } // Ensure there's a message to render the approval UI in const currentTurnMsgId = useAgentStore.getState().currentTurnMessageId; if (!currentTurnMsgId) { const messageId = `msg_${Date.now()}`; const currentTrace = useAgentStore.getState().traceLogs; addMessage(sessionId, { id: messageId, role: 'assistant', content: '', timestamp: new Date().toISOString(), segments: [{ type: 'tools', tools: [...currentTrace] }], }); setCurrentTurnMessageId(messageId); clearTraceLogs(); } else { updateCurrentTurnTrace(sessionId); } } // Show the first tool's content in the panel if (tools && tools.length > 0) { const firstTool = tools[0]; const args = firstTool.arguments as Record; clearPanelTabs(); if (firstTool.tool === 'hf_jobs' && args.script) { setPanelTab({ id: 'script', title: 'Script', content: args.script, language: 'python', parameters: args }); setActivePanelTab('script'); } else if (firstTool.tool === 'hf_repo_files' && args.content) { const filename = args.path || 'file'; const isPython = filename.endsWith('.py'); setPanelTab({ id: 'content', title: filename.split('/').pop() || 'Content', content: args.content, language: isPython ? 'python' : 'text', parameters: args }); setActivePanelTab('content'); } else { setPanelTab({ id: 'args', title: firstTool.tool, content: JSON.stringify(args, null, 2), language: 'json', parameters: args }); setActivePanelTab('args'); } setRightPanelOpen(true); setLeftSidebarOpen(false); } setProcessing(false); break; } case 'turn_complete': setProcessing(false); setCurrentTurnMessageId(null); // Clear the current turn break; case 'compacted': { const oldTokens = event.data?.old_tokens as number; const newTokens = event.data?.new_tokens as number; logger.log(`Context compacted: ${oldTokens} -> ${newTokens} tokens`); break; } case 'error': { const errorMsg = (event.data?.error as string) || 'Unknown error'; setError(errorMsg); setProcessing(false); onError?.(errorMsg); break; } case 'shutdown': setConnected(false); setProcessing(false); break; case 'interrupted': setProcessing(false); break; case 'undo_complete': if (sessionId) { removeLastTurn(sessionId); } setProcessing(false); break; default: logger.log('Unknown event:', event); } }, // Zustand setters are stable, so we don't need them in deps // eslint-disable-next-line react-hooks/exhaustive-deps [sessionId, onReady, onError, onSessionDead] ); const connect = useCallback(() => { if (!sessionId) return; // Don't connect if already connected or connecting if (wsRef.current?.readyState === WebSocket.OPEN || wsRef.current?.readyState === WebSocket.CONNECTING) { return; } // Build WebSocket URL (centralized in utils/api.ts) const wsUrl = getWebSocketUrl(sessionId); logger.log('Connecting to WebSocket:', wsUrl); const ws = new WebSocket(wsUrl); ws.onopen = () => { logger.log('WebSocket connected'); setConnected(true); reconnectDelayRef.current = WS_RECONNECT_DELAY; retriesRef.current = 0; // Reset retry counter on successful connect }; ws.onmessage = (event) => { try { const data = JSON.parse(event.data) as AgentEvent; handleEvent(data); } catch (e) { logger.error('Failed to parse WebSocket message:', e); } }; ws.onerror = (error) => { logger.error('WebSocket error:', error); }; ws.onclose = (event) => { logger.log('WebSocket closed', event.code, event.reason); setConnected(false); // Don't reconnect if: // - Normal closure (1000) // - Session not found (4004) — session was deleted or backend restarted // - Auth failed (4001) or access denied (4003) — won't succeed on retry // - No session ID const noRetryCodes = [1000, 4001, 4003, 4004]; if (!noRetryCodes.includes(event.code) && sessionId) { retriesRef.current += 1; if (retriesRef.current > WS_MAX_RETRIES) { logger.warn(`WebSocket: max retries (${WS_MAX_RETRIES}) reached, giving up.`); onSessionDead?.(sessionId); return; } // Attempt to reconnect with exponential backoff if (reconnectTimeoutRef.current) { clearTimeout(reconnectTimeoutRef.current); } reconnectTimeoutRef.current = window.setTimeout(() => { reconnectDelayRef.current = Math.min( reconnectDelayRef.current * 2, WS_MAX_RECONNECT_DELAY ); connect(); }, reconnectDelayRef.current); } else if (event.code === 4004 && sessionId) { // Session not found — remove it from the store (lazy cleanup) logger.warn(`Session ${sessionId} no longer exists on backend, removing.`); onSessionDead?.(sessionId); } else if (noRetryCodes.includes(event.code) && event.code !== 1000) { logger.warn(`WebSocket permanently closed: ${event.code} ${event.reason}`); } }; wsRef.current = ws; }, [sessionId, handleEvent]); const disconnect = useCallback(() => { if (reconnectTimeoutRef.current) { clearTimeout(reconnectTimeoutRef.current); reconnectTimeoutRef.current = null; } if (wsRef.current) { wsRef.current.close(); wsRef.current = null; } setConnected(false); }, []); const sendPing = useCallback(() => { if (wsRef.current?.readyState === WebSocket.OPEN) { wsRef.current.send(JSON.stringify({ type: 'ping' })); } }, []); // Connect when sessionId changes (with a small delay to ensure session is ready) useEffect(() => { if (!sessionId) { disconnect(); return; } // Reset retry state for new session retriesRef.current = 0; reconnectDelayRef.current = WS_RECONNECT_DELAY; // Small delay to ensure session is fully created on backend const timeoutId = setTimeout(() => { connect(); }, 100); return () => { clearTimeout(timeoutId); disconnect(); }; // eslint-disable-next-line react-hooks/exhaustive-deps }, [sessionId]); // Heartbeat useEffect(() => { const interval = setInterval(sendPing, 30000); return () => clearInterval(interval); }, [sendPing]); return { isConnected: wsRef.current?.readyState === WebSocket.OPEN, connect, disconnect, }; }