|
|
import { useState, useEffect, useRef, useCallback, useMemo } from 'react'; |
|
|
import { |
|
|
streamAgent, |
|
|
getAgentStatus, |
|
|
stopAgent, |
|
|
AgentRun, |
|
|
getMessages, |
|
|
} from '@/lib/api'; |
|
|
import { toast } from 'sonner'; |
|
|
import { |
|
|
UnifiedMessage, |
|
|
ParsedContent, |
|
|
ParsedMetadata, |
|
|
} from '@/components/thread/types'; |
|
|
import { safeJsonParse } from '@/components/thread/utils'; |
|
|
|
|
|
interface ApiMessageType { |
|
|
message_id?: string; |
|
|
thread_id?: string; |
|
|
type: string; |
|
|
is_llm_message?: boolean; |
|
|
content: string; |
|
|
metadata?: string; |
|
|
created_at?: string; |
|
|
updated_at?: string; |
|
|
agent_id?: string; |
|
|
agents?: { |
|
|
name: string; |
|
|
avatar?: string; |
|
|
avatar_color?: string; |
|
|
}; |
|
|
} |
|
|
|
|
|
|
|
|
export interface UseAgentStreamResult { |
|
|
status: string; |
|
|
textContent: string; |
|
|
toolCall: ParsedContent | null; |
|
|
error: string | null; |
|
|
agentRunId: string | null; |
|
|
startStreaming: (runId: string) => void; |
|
|
stopStreaming: () => Promise<void>; |
|
|
} |
|
|
|
|
|
|
|
|
export interface AgentStreamCallbacks { |
|
|
onMessage: (message: UnifiedMessage) => void; |
|
|
onStatusChange?: (status: string) => void; |
|
|
onError?: (error: string) => void; |
|
|
onClose?: (finalStatus: string) => void; |
|
|
onAssistantStart?: () => void; |
|
|
onAssistantChunk?: (chunk: { content: string }) => void; |
|
|
} |
|
|
|
|
|
|
|
|
const mapApiMessagesToUnified = ( |
|
|
messagesData: ApiMessageType[] | null | undefined, |
|
|
currentThreadId: string, |
|
|
): UnifiedMessage[] => { |
|
|
return (messagesData || []) |
|
|
.filter((msg) => msg.type !== 'status') |
|
|
.map((msg: ApiMessageType) => ({ |
|
|
message_id: msg.message_id || null, |
|
|
thread_id: msg.thread_id || currentThreadId, |
|
|
type: (msg.type || 'system') as UnifiedMessage['type'], |
|
|
is_llm_message: Boolean(msg.is_llm_message), |
|
|
content: msg.content || '', |
|
|
metadata: msg.metadata || '{}', |
|
|
created_at: msg.created_at || new Date().toISOString(), |
|
|
updated_at: msg.updated_at || new Date().toISOString(), |
|
|
agent_id: (msg as any).agent_id, |
|
|
agents: (msg as any).agents, |
|
|
})); |
|
|
}; |
|
|
|
|
|
export function useAgentStream( |
|
|
callbacks: AgentStreamCallbacks, |
|
|
threadId: string, |
|
|
setMessages: (messages: UnifiedMessage[]) => void, |
|
|
): UseAgentStreamResult { |
|
|
const [agentRunId, setAgentRunId] = useState<string | null>(null); |
|
|
const [status, setStatus] = useState<string>('idle'); |
|
|
const [textContent, setTextContent] = useState< |
|
|
{ content: string; sequence?: number }[] |
|
|
>([]); |
|
|
const [toolCall, setToolCall] = useState<ParsedContent | null>(null); |
|
|
const [error, setError] = useState<string | null>(null); |
|
|
|
|
|
const streamCleanupRef = useRef<(() => void) | null>(null); |
|
|
const isMountedRef = useRef<boolean>(true); |
|
|
const currentRunIdRef = useRef<string | null>(null); |
|
|
const threadIdRef = useRef(threadId); |
|
|
const setMessagesRef = useRef(setMessages); |
|
|
|
|
|
const orderedTextContent = useMemo(() => { |
|
|
return textContent |
|
|
.sort((a, b) => (a.sequence ?? 0) - (b.sequence ?? 0)) |
|
|
.reduce((acc, curr) => acc + curr.content, ''); |
|
|
}, [textContent]); |
|
|
|
|
|
|
|
|
useEffect(() => { |
|
|
threadIdRef.current = threadId; |
|
|
}, [threadId]); |
|
|
|
|
|
useEffect(() => { |
|
|
setMessagesRef.current = setMessages; |
|
|
}, [setMessages]); |
|
|
|
|
|
|
|
|
const mapAgentStatus = (backendStatus: string): string => { |
|
|
switch (backendStatus) { |
|
|
case 'completed': |
|
|
return 'completed'; |
|
|
case 'stopped': |
|
|
return 'stopped'; |
|
|
case 'failed': |
|
|
return 'failed'; |
|
|
default: |
|
|
return 'error'; |
|
|
} |
|
|
}; |
|
|
|
|
|
|
|
|
const updateStatus = useCallback( |
|
|
(newStatus: string) => { |
|
|
if (isMountedRef.current) { |
|
|
setStatus(newStatus); |
|
|
callbacks.onStatusChange?.(newStatus); |
|
|
if (newStatus === 'error' && error) { |
|
|
callbacks.onError?.(error); |
|
|
} |
|
|
if ( |
|
|
[ |
|
|
'completed', |
|
|
'stopped', |
|
|
'failed', |
|
|
'error', |
|
|
'agent_not_running', |
|
|
].includes(newStatus) |
|
|
) { |
|
|
callbacks.onClose?.(newStatus); |
|
|
} |
|
|
} |
|
|
}, |
|
|
[callbacks, error], |
|
|
); |
|
|
|
|
|
|
|
|
const finalizeStream = useCallback( |
|
|
(finalStatus: string, runId: string | null = agentRunId) => { |
|
|
if (!isMountedRef.current) return; |
|
|
|
|
|
const currentThreadId = threadIdRef.current; |
|
|
const currentSetMessages = setMessagesRef.current; |
|
|
|
|
|
console.log( |
|
|
`[useAgentStream] Finalizing stream for ${runId} on thread ${currentThreadId} with status: ${finalStatus}`, |
|
|
); |
|
|
|
|
|
if (streamCleanupRef.current) { |
|
|
streamCleanupRef.current(); |
|
|
streamCleanupRef.current = null; |
|
|
} |
|
|
|
|
|
|
|
|
setTextContent([]); |
|
|
setToolCall(null); |
|
|
|
|
|
|
|
|
updateStatus(finalStatus); |
|
|
setAgentRunId(null); |
|
|
currentRunIdRef.current = null; |
|
|
|
|
|
|
|
|
|
|
|
const terminalStatuses = [ |
|
|
'completed', |
|
|
'stopped', |
|
|
'failed', |
|
|
'error', |
|
|
'agent_not_running', |
|
|
]; |
|
|
if (currentThreadId && terminalStatuses.includes(finalStatus)) { |
|
|
console.log( |
|
|
`[useAgentStream] Refetching messages for thread ${currentThreadId} after finalization with status ${finalStatus}.`, |
|
|
); |
|
|
getMessages(currentThreadId) |
|
|
.then((messagesData: ApiMessageType[]) => { |
|
|
if (isMountedRef.current && messagesData) { |
|
|
console.log( |
|
|
`[useAgentStream] Refetched ${messagesData.length} messages for thread ${currentThreadId}.`, |
|
|
); |
|
|
const unifiedMessages = mapApiMessagesToUnified( |
|
|
messagesData, |
|
|
currentThreadId, |
|
|
); |
|
|
currentSetMessages(unifiedMessages); |
|
|
} else if (!isMountedRef.current) { |
|
|
console.log( |
|
|
`[useAgentStream] Component unmounted before messages could be set after refetch for thread ${currentThreadId}.`, |
|
|
); |
|
|
} |
|
|
}) |
|
|
.catch((err) => { |
|
|
console.error( |
|
|
`[useAgentStream] Error refetching messages for thread ${currentThreadId} after finalization:`, |
|
|
err, |
|
|
); |
|
|
|
|
|
toast.error(`Failed to refresh messages: ${err.message}`); |
|
|
}); |
|
|
} else { |
|
|
console.log( |
|
|
`[useAgentStream] Skipping message refetch for thread ${currentThreadId}. Final status: ${finalStatus}`, |
|
|
); |
|
|
} |
|
|
|
|
|
|
|
|
if ( |
|
|
runId && |
|
|
(finalStatus === 'completed' || |
|
|
finalStatus === 'stopped' || |
|
|
finalStatus === 'agent_not_running') |
|
|
) { |
|
|
getAgentStatus(runId).catch((err) => { |
|
|
console.log( |
|
|
`[useAgentStream] Post-finalization status check for ${runId} failed (this might be expected if not found): ${err.message}`, |
|
|
); |
|
|
}); |
|
|
} |
|
|
}, |
|
|
[agentRunId, updateStatus], |
|
|
); |
|
|
|
|
|
|
|
|
|
|
|
const handleStreamMessage = useCallback( |
|
|
(rawData: string) => { |
|
|
if (!isMountedRef.current) return; |
|
|
(window as any).lastStreamMessage = Date.now(); |
|
|
|
|
|
let processedData = rawData; |
|
|
if (processedData.startsWith('data: ')) { |
|
|
processedData = processedData.substring(6).trim(); |
|
|
} |
|
|
if (!processedData) return; |
|
|
|
|
|
|
|
|
if ( |
|
|
processedData === |
|
|
'{"type": "status", "status": "completed", "message": "Agent run completed successfully"}' |
|
|
) { |
|
|
console.log( |
|
|
'[useAgentStream] Received final completion status message', |
|
|
); |
|
|
finalizeStream('completed', currentRunIdRef.current); |
|
|
return; |
|
|
} |
|
|
if ( |
|
|
processedData.includes('Run data not available for streaming') || |
|
|
processedData.includes('Stream ended with status: completed') |
|
|
) { |
|
|
console.log( |
|
|
`[useAgentStream] Detected final completion message: "${processedData}", finalizing.`, |
|
|
); |
|
|
finalizeStream('completed', currentRunIdRef.current); |
|
|
return; |
|
|
} |
|
|
|
|
|
|
|
|
try { |
|
|
const jsonData = JSON.parse(processedData); |
|
|
if (jsonData.status === 'error') { |
|
|
console.error('[useAgentStream] Received error status message:', jsonData); |
|
|
const errorMessage = jsonData.message || 'Unknown error occurred'; |
|
|
setError(errorMessage); |
|
|
toast.error(errorMessage, { duration: 15000 }); |
|
|
callbacks.onError?.(errorMessage); |
|
|
return; |
|
|
} |
|
|
} catch (jsonError) { |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
const message = safeJsonParse(processedData, null) as UnifiedMessage | null; |
|
|
if (!message) { |
|
|
console.warn( |
|
|
'[useAgentStream] Failed to parse streamed message:', |
|
|
processedData, |
|
|
); |
|
|
return; |
|
|
} |
|
|
|
|
|
const parsedContent = safeJsonParse<ParsedContent>(message.content, {}); |
|
|
const parsedMetadata = safeJsonParse<ParsedMetadata>( |
|
|
message.metadata, |
|
|
{}, |
|
|
); |
|
|
|
|
|
|
|
|
if (status !== 'streaming') updateStatus('streaming'); |
|
|
|
|
|
switch (message.type) { |
|
|
case 'assistant': |
|
|
console.log('[useAgentStream] test a:', parsedContent.content); |
|
|
console.log('[useAgentStream] test a1:', parsedMetadata); |
|
|
if ( |
|
|
parsedMetadata.stream_status === 'chunk' && |
|
|
parsedContent.content |
|
|
) { |
|
|
setTextContent((prev) => { |
|
|
return prev.concat({ |
|
|
sequence: message.sequence, |
|
|
content: parsedContent.content, |
|
|
}); |
|
|
}); |
|
|
callbacks.onAssistantChunk?.({ content: parsedContent.content }); |
|
|
} else if (parsedMetadata.stream_status === 'complete') { |
|
|
setTextContent([]); |
|
|
setToolCall(null); |
|
|
if (message.message_id) callbacks.onMessage(message); |
|
|
} else if (!parsedMetadata.stream_status) { |
|
|
|
|
|
callbacks.onAssistantStart?.(); |
|
|
if (message.message_id) callbacks.onMessage(message); |
|
|
} |
|
|
break; |
|
|
case 'tool': |
|
|
setToolCall(null); |
|
|
if (message.message_id) callbacks.onMessage(message); |
|
|
break; |
|
|
case 'status': |
|
|
switch (parsedContent.status_type) { |
|
|
case 'tool_started': |
|
|
setToolCall({ |
|
|
role: 'assistant', |
|
|
status_type: 'tool_started', |
|
|
name: parsedContent.function_name, |
|
|
arguments: parsedContent.arguments, |
|
|
xml_tag_name: parsedContent.xml_tag_name, |
|
|
tool_index: parsedContent.tool_index, |
|
|
}); |
|
|
break; |
|
|
case 'tool_completed': |
|
|
case 'tool_failed': |
|
|
case 'tool_error': |
|
|
if (toolCall?.tool_index === parsedContent.tool_index) { |
|
|
setToolCall(null); |
|
|
} |
|
|
break; |
|
|
case 'thread_run_end': |
|
|
console.log( |
|
|
'[useAgentStream] Received thread run end status, finalizing.', |
|
|
); |
|
|
break; |
|
|
case 'finish': |
|
|
|
|
|
console.log( |
|
|
'[useAgentStream] Received finish status:', |
|
|
parsedContent.finish_reason, |
|
|
); |
|
|
|
|
|
break; |
|
|
case 'error': |
|
|
console.error( |
|
|
'[useAgentStream] Received error status message:', |
|
|
parsedContent.message, |
|
|
); |
|
|
setError(parsedContent.message || 'Agent run failed'); |
|
|
finalizeStream('error', currentRunIdRef.current); |
|
|
break; |
|
|
|
|
|
default: |
|
|
|
|
|
break; |
|
|
} |
|
|
break; |
|
|
case 'user': |
|
|
case 'system': |
|
|
|
|
|
if (message.message_id) callbacks.onMessage(message); |
|
|
break; |
|
|
default: |
|
|
console.warn( |
|
|
'[useAgentStream] Unhandled message type:', |
|
|
message.type, |
|
|
); |
|
|
} |
|
|
}, |
|
|
[ |
|
|
threadId, |
|
|
setMessages, |
|
|
status, |
|
|
toolCall, |
|
|
callbacks, |
|
|
finalizeStream, |
|
|
updateStatus, |
|
|
], |
|
|
); |
|
|
|
|
|
const handleStreamError = useCallback( |
|
|
(err: Error | string | Event) => { |
|
|
if (!isMountedRef.current) return; |
|
|
|
|
|
|
|
|
let errorMessage = 'Unknown streaming error'; |
|
|
if (typeof err === 'string') { |
|
|
errorMessage = err; |
|
|
} else if (err instanceof Error) { |
|
|
errorMessage = err.message; |
|
|
} else if (err instanceof Event && err.type === 'error') { |
|
|
|
|
|
errorMessage = 'Stream connection error'; |
|
|
} |
|
|
|
|
|
console.error('[useAgentStream] Streaming error:', errorMessage, err); |
|
|
setError(errorMessage); |
|
|
|
|
|
|
|
|
toast.error(errorMessage, { duration: 15000 }); |
|
|
|
|
|
const runId = currentRunIdRef.current; |
|
|
if (!runId) { |
|
|
console.warn( |
|
|
'[useAgentStream] Stream error occurred but no agentRunId is active.', |
|
|
); |
|
|
finalizeStream('error'); |
|
|
return; |
|
|
} |
|
|
|
|
|
}, |
|
|
[finalizeStream], |
|
|
); |
|
|
|
|
|
const handleStreamClose = useCallback(() => { |
|
|
if (!isMountedRef.current) return; |
|
|
console.log('[useAgentStream] Stream connection closed by server.'); |
|
|
|
|
|
const runId = currentRunIdRef.current; |
|
|
if (!runId) { |
|
|
console.warn('[useAgentStream] Stream closed but no active agentRunId.'); |
|
|
|
|
|
if (status === 'streaming' || status === 'connecting') { |
|
|
finalizeStream('error'); |
|
|
} else if ( |
|
|
status !== 'idle' && |
|
|
status !== 'completed' && |
|
|
status !== 'stopped' && |
|
|
status !== 'agent_not_running' |
|
|
) { |
|
|
|
|
|
finalizeStream('idle'); |
|
|
} |
|
|
return; |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
getAgentStatus(runId) |
|
|
.then((agentStatus) => { |
|
|
if (!isMountedRef.current) return; |
|
|
|
|
|
console.log( |
|
|
`[useAgentStream] Agent status after stream close for ${runId}: ${agentStatus.status}`, |
|
|
); |
|
|
if (agentStatus.status === 'running') { |
|
|
console.warn( |
|
|
`[useAgentStream] Stream closed for ${runId}, but agent is still running. Finalizing with error.`, |
|
|
); |
|
|
setError('Stream closed unexpectedly while agent was running.'); |
|
|
finalizeStream('error', runId); |
|
|
toast.warning('Stream disconnected. Agent might still be running.'); |
|
|
} else { |
|
|
|
|
|
const finalStatus = mapAgentStatus(agentStatus.status); |
|
|
console.log( |
|
|
`[useAgentStream] Stream closed for ${runId}, agent status is ${agentStatus.status}. Finalizing stream as ${finalStatus}.`, |
|
|
); |
|
|
finalizeStream(finalStatus, runId); |
|
|
} |
|
|
}) |
|
|
.catch((err) => { |
|
|
if (!isMountedRef.current) return; |
|
|
|
|
|
const errorMessage = err instanceof Error ? err.message : String(err); |
|
|
console.error( |
|
|
`[useAgentStream] Error checking agent status for ${runId} after stream close: ${errorMessage}`, |
|
|
); |
|
|
|
|
|
const isNotFoundError = |
|
|
errorMessage.includes('not found') || |
|
|
errorMessage.includes('404') || |
|
|
errorMessage.includes('does not exist'); |
|
|
|
|
|
if (isNotFoundError) { |
|
|
console.log( |
|
|
`[useAgentStream] Agent run ${runId} not found after stream close. Finalizing.`, |
|
|
); |
|
|
|
|
|
finalizeStream('agent_not_running', runId); |
|
|
} else { |
|
|
|
|
|
finalizeStream('error', runId); |
|
|
} |
|
|
}); |
|
|
}, [status, finalizeStream]); |
|
|
|
|
|
|
|
|
useEffect(() => { |
|
|
isMountedRef.current = true; |
|
|
|
|
|
|
|
|
return () => { |
|
|
isMountedRef.current = false; |
|
|
console.log( |
|
|
'[useAgentStream] Unmounting or agentRunId changing. Cleaning up stream.', |
|
|
); |
|
|
if (streamCleanupRef.current) { |
|
|
streamCleanupRef.current(); |
|
|
streamCleanupRef.current = null; |
|
|
} |
|
|
|
|
|
setStatus('idle'); |
|
|
setTextContent([]); |
|
|
setToolCall(null); |
|
|
setError(null); |
|
|
setAgentRunId(null); |
|
|
currentRunIdRef.current = null; |
|
|
}; |
|
|
}, []); |
|
|
|
|
|
|
|
|
|
|
|
const startStreaming = useCallback( |
|
|
async (runId: string) => { |
|
|
if (!isMountedRef.current) return; |
|
|
console.log( |
|
|
`[useAgentStream] Received request to start streaming for ${runId}`, |
|
|
); |
|
|
|
|
|
|
|
|
if (streamCleanupRef.current) { |
|
|
console.log( |
|
|
'[useAgentStream] Cleaning up existing stream before starting new one.', |
|
|
); |
|
|
streamCleanupRef.current(); |
|
|
streamCleanupRef.current = null; |
|
|
} |
|
|
|
|
|
|
|
|
setTextContent([]); |
|
|
setToolCall(null); |
|
|
setError(null); |
|
|
updateStatus('connecting'); |
|
|
setAgentRunId(runId); |
|
|
currentRunIdRef.current = runId; |
|
|
|
|
|
try { |
|
|
|
|
|
const agentStatus = await getAgentStatus(runId); |
|
|
if (!isMountedRef.current) return; |
|
|
|
|
|
if (agentStatus.status !== 'running') { |
|
|
console.warn( |
|
|
`[useAgentStream] Agent run ${runId} is not in running state (status: ${agentStatus.status}). Cannot start stream.`, |
|
|
); |
|
|
setError(`Agent run is not running (status: ${agentStatus.status})`); |
|
|
finalizeStream( |
|
|
mapAgentStatus(agentStatus.status) || 'agent_not_running', |
|
|
runId, |
|
|
); |
|
|
return; |
|
|
} |
|
|
|
|
|
|
|
|
console.log( |
|
|
`[useAgentStream] Agent run ${runId} confirmed running. Setting up EventSource.`, |
|
|
); |
|
|
const cleanup = streamAgent(runId, { |
|
|
onMessage: handleStreamMessage, |
|
|
onError: handleStreamError, |
|
|
onClose: handleStreamClose, |
|
|
}); |
|
|
streamCleanupRef.current = cleanup; |
|
|
|
|
|
} catch (err) { |
|
|
if (!isMountedRef.current) return; |
|
|
|
|
|
const errorMessage = err instanceof Error ? err.message : String(err); |
|
|
console.error( |
|
|
`[useAgentStream] Error initiating stream for ${runId}: ${errorMessage}`, |
|
|
); |
|
|
setError(errorMessage); |
|
|
|
|
|
const isNotFoundError = |
|
|
errorMessage.includes('not found') || |
|
|
errorMessage.includes('404') || |
|
|
errorMessage.includes('does not exist'); |
|
|
|
|
|
finalizeStream(isNotFoundError ? 'agent_not_running' : 'error', runId); |
|
|
} |
|
|
}, |
|
|
[ |
|
|
updateStatus, |
|
|
finalizeStream, |
|
|
handleStreamMessage, |
|
|
handleStreamError, |
|
|
handleStreamClose, |
|
|
], |
|
|
); |
|
|
|
|
|
const stopStreaming = useCallback(async () => { |
|
|
if (!isMountedRef.current || !agentRunId) return; |
|
|
|
|
|
const runIdToStop = agentRunId; |
|
|
console.log( |
|
|
`[useAgentStream] Stopping stream for agent run ${runIdToStop}`, |
|
|
); |
|
|
|
|
|
|
|
|
finalizeStream('stopped', runIdToStop); |
|
|
|
|
|
try { |
|
|
await stopAgent(runIdToStop); |
|
|
toast.success('Agent stopped.'); |
|
|
|
|
|
} catch (err) { |
|
|
|
|
|
const errorMessage = err instanceof Error ? err.message : String(err); |
|
|
console.error( |
|
|
`[useAgentStream] Error sending stop request for ${runIdToStop}: ${errorMessage}`, |
|
|
); |
|
|
toast.error(`Failed to stop agent: ${errorMessage}`); |
|
|
} |
|
|
}, [agentRunId, finalizeStream]); |
|
|
|
|
|
return { |
|
|
status, |
|
|
textContent: orderedTextContent, |
|
|
toolCall, |
|
|
error, |
|
|
agentRunId, |
|
|
startStreaming, |
|
|
stopStreaming, |
|
|
}; |
|
|
} |