import { useCallback, useEffect, useRef } from 'react'; import { useAgentStore } from '@/store/agentStore'; import { useLayoutStore } from '@/store/layoutStore'; import { useAuthStore } from '@/store/authStore'; import { useSessionStore } from '@/store/sessionStore'; import type { AgentEvent } from '@/types/events'; import type { Message, TraceLog } from '@/types/agent'; const API_BASE = import.meta.env.DEV ? 'http://127.0.0.1:7860' : ''; /** * SSE-based hook for receiving agent events. * Only connects when session phase is 'ready' - guaranteeing the backend session exists. */ export function useAgentEvents() { const eventSourceRef = useRef(null); const currentSessionIdRef = useRef(null); // Get phase from session store const phase = useSessionStore((s) => s.phase); const markActive = useSessionStore((s) => s.markActive); const markError = useSessionStore((s) => s.markError); const { addMessage, updateMessage, setProcessing, setConnected, setPendingApprovals, setError, addTraceLog, updateTraceLog, clearTraceLogs, setPanelContent, setPanelTab, setActivePanelTab, clearPanelTabs, setPlan, setCurrentTurnMessageId, setActiveJob, updateJobStatus, } = useAgentStore(); const { setRightPanelOpen, setLeftSidebarOpen } = useLayoutStore(); const handleEvent = useCallback( (event: AgentEvent) => { switch (event.event_type) { case 'connected': setConnected(true); break; case 'ready': setConnected(true); setProcessing(false); break; case 'processing': setProcessing(true); clearTraceLogs(); setCurrentTurnMessageId(null); break; case 'assistant_message': { const content = (event.data?.content as string) || ''; if (!content) break; const currentTurnMsgId = useAgentStore.getState().currentTurnMessageId; if (currentTurnMsgId) { const messages = useAgentStore.getState().messages; const existingMsg = messages.find(m => m.id === currentTurnMsgId); if (existingMsg) { const existingSegments = existingMsg.segments || []; const newContent = existingMsg.content + content; updateMessage(currentTurnMsgId, { content: newContent, segments: [...existingSegments, { type: 'text', content }], }); } } else { const messageId = `msg_${Date.now()}`; const message: Message = { id: messageId, role: 'assistant', content, timestamp: new Date().toISOString(), segments: [{ type: 'text', content }], }; addMessage(message); setCurrentTurnMessageId(messageId); } break; } case 'tool_call': { const toolName = (event.data?.tool as string) || 'unknown'; const args = (event.data?.arguments as Record) || {}; if (toolName !== 'plan_tool') { const log: TraceLog = { id: `tool_${Date.now()}`, type: 'call', text: `Agent is executing ${toolName}...`, tool: toolName, timestamp: new Date().toISOString(), completed: false, args: toolName === 'hf_jobs' ? args : undefined, }; addTraceLog(log); const currentTurnMsgId = useAgentStore.getState().currentTurnMessageId; const currentTrace = useAgentStore.getState().traceLogs; if (currentTurnMsgId) { const messages = useAgentStore.getState().messages; const existingMsg = messages.find(m => m.id === currentTurnMsgId); if (existingMsg) { const segments = existingMsg.segments ? [...existingMsg.segments] : []; const lastSegment = segments[segments.length - 1]; if (lastSegment && lastSegment.type === 'tools') { lastSegment.tools = [...currentTrace]; } else { segments.push({ type: 'tools', tools: [...currentTrace] }); } updateMessage(currentTurnMsgId, { segments }); } } else { const messageId = `msg_${Date.now()}`; const message: Message = { id: messageId, role: 'assistant', content: '', timestamp: new Date().toISOString(), segments: [{ type: 'tools', tools: [...currentTrace] }], }; addMessage(message); setCurrentTurnMessageId(messageId); } } if (toolName === 'hf_jobs' && (args.operation === 'run' || args.operation === 'scheduled run') && args.script) { setActiveJob(null); clearPanelTabs(); setPanelTab({ id: 'script', title: 'Script', content: args.script, language: 'python', parameters: args }); setActivePanelTab('script'); setRightPanelOpen(true); setLeftSidebarOpen(false); } else if (toolName === 'hf_repo_files' && args.operation === 'upload' && args.content) { setPanelContent({ title: `File Upload: ${args.path || 'unnamed'}`, content: args.content, parameters: args, language: args.path?.endsWith('.py') ? 'python' : undefined }); setRightPanelOpen(true); setLeftSidebarOpen(false); } console.log('[SSE] Tool call:', toolName, args); break; } case 'tool_output': { const toolName = (event.data?.tool as string) || 'unknown'; const output = (event.data?.output as string) || ''; const success = event.data?.success as boolean; updateTraceLog(toolName, { completed: true, output, success }); const currentTurnMsgId = useAgentStore.getState().currentTurnMessageId; const currentTrace = useAgentStore.getState().traceLogs; if (currentTurnMsgId) { const messages = useAgentStore.getState().messages; const existingMsg = messages.find(m => m.id === currentTurnMsgId); if (existingMsg && existingMsg.segments) { const segments = existingMsg.segments.map(seg => { if (seg.type === 'tools') { return { ...seg, tools: [...currentTrace] }; } return seg; }); updateMessage(currentTurnMsgId, { segments }); } } if (toolName === 'hf_jobs') { const turnMsgId = useAgentStore.getState().currentTurnMessageId; if (turnMsgId) { const messages = useAgentStore.getState().messages; const currentMsg = messages.find(m => m.id === turnMsgId); const currentOutput = currentMsg?.toolOutput || ''; const newOutput = currentOutput ? currentOutput + '\n\n' + output : output; updateMessage(turnMsgId, { toolOutput: newOutput }); } else { const messages = useAgentStore.getState().messages; const jobMsg = [...messages].reverse().find(m => m.approval); if (jobMsg) { const currentOutput = jobMsg.toolOutput || ''; const newOutput = currentOutput ? currentOutput + '\n\n' + output : output; updateMessage(jobMsg.id, { toolOutput: newOutput }); } else { const traceLogs = useAgentStore.getState().traceLogs; const jobTrace = [...traceLogs].reverse().find(t => t.tool === 'hf_jobs'); const traceArgs = jobTrace?.args || {}; const messageId = `msg_${Date.now()}`; const autoExecMessage: Message = { id: messageId, role: 'assistant', content: '', timestamp: new Date().toISOString(), approval: { status: 'approved', batch: { tools: [{ tool: toolName, arguments: traceArgs, tool_call_id: `auto_${Date.now()}` }], count: 1 } }, toolOutput: output }; addMessage(autoExecMessage); setCurrentTurnMessageId(messageId); } } } console.log('[SSE] Tool output:', toolName, success); break; } case 'tool_log': { const toolName = (event.data?.tool as string) || 'unknown'; const log = (event.data?.log as string) || ''; if (toolName === 'hf_jobs') { const currentTabs = useAgentStore.getState().panelTabs; const logsTab = currentTabs.find(t => t.id === 'logs'); const newContent = logsTab ? logsTab.content + '\n' + log : log; setPanelTab({ id: 'logs', title: 'Logs', content: newContent, language: 'text' }); setActivePanelTab('logs'); if (!useLayoutStore.getState().isRightPanelOpen) { setRightPanelOpen(true); } } break; } case 'job_started': { const jobId = (event.data?.job_id as string) || ''; const url = (event.data?.url as string) || ''; const hardware = (event.data?.hardware as string) || 'cpu-basic'; const isGpu = (event.data?.is_gpu as boolean) || false; setActiveJob({ jobId, url, status: isGpu ? 'queued' : 'pending', hardware, isGpu, submittedAt: new Date().toISOString(), statusMessage: isGpu ? 'Waiting for GPU resources...' : 'Starting job...', }); setPanelTab({ id: 'logs', title: 'Logs', content: '', language: 'text' }); setActivePanelTab('logs'); if (!useLayoutStore.getState().isRightPanelOpen) { setRightPanelOpen(true); } break; } case 'job_status': { const status = (event.data?.status as string) || ''; const message = (event.data?.message as string) || ''; const statusMap: Record = { 'queued': 'queued', 'pending': 'pending', 'running': 'running', 'completed': 'completed', 'failed': 'failed', 'canceled': 'canceled', 'error': 'error', }; const mappedStatus = statusMap[status.toLowerCase()] || 'pending'; updateJobStatus(mappedStatus, message); if (['completed', 'failed', 'canceled', 'error'].includes(mappedStatus)) { setTimeout(() => { const currentJob = useAgentStore.getState().activeJob; if (currentJob && ['completed', 'failed', 'canceled', 'error'].includes(currentJob.status)) { setActiveJob(null); } }, 3000); } break; } case 'plan_update': { const plan = (event.data?.plan as any[]) || []; setPlan(plan); if (!useLayoutStore.getState().isRightPanelOpen) { setRightPanelOpen(true); } break; } case 'approval_required': { const tools = event.data?.tools as Array<{ tool: string; arguments: Record; tool_call_id: string; }>; const count = (event.data?.count as number) || 0; const currentTurnMsgId = useAgentStore.getState().currentTurnMessageId; const approvalData = { status: 'pending' as const, batch: { tools, count } }; if (currentTurnMsgId) { updateMessage(currentTurnMsgId, { approval: approvalData }); } else { const messageId = `msg_${Date.now()}`; const message: Message = { id: messageId, role: 'assistant', content: '', timestamp: new Date().toISOString(), approval: approvalData, }; addMessage(message); setCurrentTurnMessageId(messageId); } if (tools && tools.length > 0) { const firstTool = tools[0]; const args = firstTool.arguments as Record; clearPanelTabs(); if (firstTool.tool === 'hf_jobs' && args.script) { setPanelTab({ id: 'script', title: 'Script', content: args.script, language: 'python', parameters: { ...args, tool_call_id: firstTool.tool_call_id } }); setActivePanelTab('script'); } else if (firstTool.tool === 'hf_repo_files' && args.content) { const filename = args.path || 'file'; const isPython = filename.endsWith('.py'); setPanelTab({ id: 'content', title: filename.split('/').pop() || 'Content', content: args.content, language: isPython ? 'python' : 'text', parameters: { ...args, tool_call_id: firstTool.tool_call_id } }); setActivePanelTab('content'); } else { setPanelTab({ id: 'args', title: firstTool.tool, content: JSON.stringify(args, null, 2), language: 'json', parameters: { ...args, tool_call_id: firstTool.tool_call_id } }); setActivePanelTab('args'); } setRightPanelOpen(true); setLeftSidebarOpen(false); } setPendingApprovals(null); setProcessing(false); break; } case 'turn_complete': setProcessing(false); break; case 'compacted': { const oldTokens = event.data?.old_tokens as number; const newTokens = event.data?.new_tokens as number; console.log(`[SSE] Context compacted: ${oldTokens} -> ${newTokens} tokens`); break; } case 'error': { const errorMsg = (event.data?.error as string) || 'Unknown error'; setError(errorMsg); setProcessing(false); break; } case 'shutdown': setConnected(false); setProcessing(false); break; case 'interrupted': setProcessing(false); break; case 'undo_complete': break; case 'server_shutdown': { const message = event.data?.message as string; console.warn('[SSE] Server shutdown:', message); setError(message || 'Server is shutting down'); setConnected(false); break; } default: console.log('[SSE] Unknown event:', event); } }, [] ); // Main effect: connect SSE when phase is 'ready', keep alive when 'active' useEffect(() => { const isReadyOrActive = phase.status === 'ready' || phase.status === 'active'; const sessionId = phase.status === 'ready' ? phase.sessionId : phase.status === 'active' ? phase.sessionId : null; // If not ready/active, close any existing connection if (!isReadyOrActive) { if (eventSourceRef.current) { console.log('[SSE] Closing connection (phase:', phase.status, ')'); eventSourceRef.current.close(); eventSourceRef.current = null; currentSessionIdRef.current = null; setConnected(false); } return; } // Already connected to this session - do nothing if (currentSessionIdRef.current === sessionId && eventSourceRef.current) { return; } // Only initiate new connection when phase is 'ready' (not 'active') if (phase.status !== 'ready') { return; } // Close existing connection if connecting to different session if (eventSourceRef.current) { console.log('[SSE] Closing previous connection'); eventSourceRef.current.close(); eventSourceRef.current = null; } const token = useAuthStore.getState().token; const tokenParam = token ? `?token=${encodeURIComponent(token)}` : ''; const sseUrl = `${API_BASE}/api/events/${sessionId}${tokenParam}`; console.log('[SSE] Connecting to session:', sessionId); const eventSource = new EventSource(sseUrl); eventSource.onopen = () => { console.log('[SSE] Connected successfully'); currentSessionIdRef.current = sessionId; setConnected(true); markActive(); // Transition: ready -> active }; eventSource.onmessage = (event) => { try { const data = JSON.parse(event.data) as AgentEvent; handleEvent(data); } catch (e) { console.error('[SSE] Failed to parse message:', e); } }; eventSource.onerror = (error) => { console.error('[SSE] Connection error:', error); setConnected(false); // If connection failed and we never opened, mark as error if (eventSource.readyState === EventSource.CLOSED) { markError('Failed to connect to session'); currentSessionIdRef.current = null; } }; eventSourceRef.current = eventSource; // Only cleanup on unmount, not on phase changes // Phase changes are handled at the top of the effect }, [phase, handleEvent, setConnected, markActive, markError]); // Separate cleanup effect for unmount only useEffect(() => { return () => { if (eventSourceRef.current) { console.log('[SSE] Unmount: closing connection'); eventSourceRef.current.close(); eventSourceRef.current = null; currentSessionIdRef.current = null; } }; }, []); return { isConnected: useAgentStore((s) => s.isConnected), }; }