import React, { useState, useCallback, useMemo, useContext, createContext } from 'react'; import api from './api'; // Import the centralized api import ReactFlow, { MiniMap, Controls, Background, useNodesState, useEdgesState, addEdge, } from 'reactflow'; import CustomNode from './CustomNode'; import InputNode from './InputNode'; import OutputNode from './OutputNode'; import RunNode from './RunNode'; import LogPanel from './LogPanel'; import { LogContext } from './LogContext'; import { HuggingfaceTokenProvider } from './HuggingfaceTokenContext'; import TokenInput from './TokenInput'; import 'reactflow/dist/style.css'; import './CustomNode.css'; import './InputNode.css'; import './OutputNode.css'; import './RunNode.css'; import './LogPanel.css'; import './TokenInput.css'; export const WorkflowContext = createContext(); const initialNodes = []; const initialEdges = []; let nodeIdCounter = 0; function App() { const [nodes, setNodes, onNodesChange] = useNodesState(initialNodes); const [edges, setEdges, onEdgesChange] = useEdgesState(initialEdges); const [isRunning, setIsRunning] = useState(false); const { addLog } = useContext(LogContext); const nodeTypes = useMemo(() => ({ custom: CustomNode, inputNode: InputNode, outputNode: OutputNode, runNode: RunNode, }), []); const onConnect = useCallback( (params) => { const { source, sourceHandle, target, targetHandle } = params; const sourceNode = nodes.find(n => n.id === source); const targetNode = nodes.find(n => n.id === target); addLog(`Attempting to connect ${sourceNode.type} ('${source}') to ${targetNode.type} ('${target}')`); if (sourceNode.type === 'runNode') { const newEdge = { ...params, animated: false, type: 'step', style: { stroke: '#28a745', strokeWidth: 2 } }; setEdges((eds) => addEdge(newEdge, eds)); addLog(`Connected Run trigger to ${targetNode.data.label || targetNode.id}`, 'SUCCESS'); return; } if (targetNode.type === 'outputNode') { const sourceCompId = sourceHandle.replace('output-', ''); const sourceOutput = sourceNode.data.outputs.find(o => String(o.id) === sourceCompId); if (sourceOutput) { // Get selected JSON path if this is a JSON output const isJsonOutput = sourceOutput.type.toLowerCase() === 'json'; const jsonPath = isJsonOutput ? sourceNode.data.jsonPaths?.[sourceCompId] : null; const newEdge = { ...params, animated: true, type: 'smoothstep', data: isJsonOutput ? { jsonPath } : undefined }; setEdges((eds) => addEdge(newEdge, eds)); setNodes((nds) => nds.map((node) => node.id === target ? { ...node, data: { ...node.data, connectedType: sourceOutput.type, label: `Result: ${sourceOutput.label}${jsonPath ? ` (${jsonPath})` : ''}` } } : node ) ); const logMsg = isJsonOutput && jsonPath ? `Connected ${sourceNode.data.label}'s output to Output Node using JSON path: ${jsonPath}` : `Connected ${sourceNode.data.label}'s output to Output Node.`; addLog(logMsg, 'SUCCESS'); } return; } if (sourceNode.type === 'outputNode' && targetNode.type === 'custom') { // Mark this OutputNode as being used as a source setNodes((nds) => nds.map((node) => node.id === source ? { ...node, data: { ...node.data, isSource: true } } : node ) ); const targetCompId = targetHandle.replace('input-', ''); const targetInput = targetNode.data.inputs.find(i => String(i.id) === targetCompId); const sourceOutputType = sourceNode.data.connectedType; // Enhanced connection validation - allow image to connect to text inputs const isTypeMatch = sourceOutputType === targetInput.type; const isImageToTextPrompt = sourceOutputType === 'image' && ['textbox', 'string', 'text'].includes(targetInput.type.toLowerCase()) && (targetInput.name === 'prompt' || targetInput.label.toLowerCase().includes('prompt')); if (targetInput && sourceOutputType && (isTypeMatch || isImageToTextPrompt)) { const newEdge = { ...params, animated: true, type: 'smoothstep', data: { sourceType: sourceOutputType, // Track the original source type conversionType: isImageToTextPrompt ? 'imageToText' : null } }; setEdges((eds) => addEdge(newEdge, eds)); setNodes((nds) => nds.map((node) => { if (node.id === target) { const newConnections = { ...node.data.connections, targets: [...(node.data.connections?.targets || []), targetCompId] }; return { ...node, data: { ...node.data, connections: newConnections } }; } return node; }) ); if (isImageToTextPrompt) { addLog(`Connected Output Node with image to ${targetNode.data.label}'s '${targetInput.label}' input. The image URL will be used as prompt.`, 'INFO'); } else { addLog(`Connected Output Node to ${targetNode.data.label}'s input '${targetInput.label}'.`, 'SUCCESS'); } } else { addLog(`Connection failed: Incompatible types between Output (${sourceOutputType}) and Input (${targetInput?.type}).`, 'WARN'); } return; } if (sourceNode.type === 'custom' && targetNode.type === 'custom') { const sourceCompId = sourceHandle.replace('output-', ''); const targetCompId = targetHandle.replace('input-', ''); const sourceOutput = sourceNode.data.outputs.find(o => String(o.id) === sourceCompId); const targetInput = targetNode.data.inputs.find(i => String(i.id) === targetCompId); if (sourceOutput && targetInput && sourceOutput.type === targetInput.type) { // Get selected JSON path if this is a JSON output const isJsonOutput = sourceOutput.type.toLowerCase() === 'json'; const jsonPath = isJsonOutput ? sourceNode.data.jsonPaths?.[sourceCompId] : null; const newEdge = { ...params, animated: true, type: 'smoothstep', data: isJsonOutput ? { jsonPath } : undefined }; setEdges((eds) => addEdge(newEdge, eds)); setNodes((nds) => nds.map((node) => { if (node.id === target) { const newConnections = { ...node.data.connections, targets: [...(node.data.connections?.targets || []), targetCompId] }; return { ...node, data: { ...node.data, connections: newConnections } }; } return node; }) ); const logMsg = isJsonOutput && jsonPath ? `Connected ${sourceNode.data.label} to ${targetNode.data.label} using JSON path: ${jsonPath}` : `Connected ${sourceNode.data.label} to ${targetNode.data.label}.`; addLog(logMsg, 'SUCCESS'); } else { addLog(`Connection failed: Incompatible types between ${sourceOutput?.type} and ${targetInput?.type}.`, 'WARN'); } } }, [nodes, setEdges, setNodes, addLog], ); // Helper to check if an output node is used as a source const isOutputNodeUsedAsSource = useCallback((nodeId) => { return edges.some(edge => edge.source === nodeId && nodes.find(n => n.id === nodeId)?.type === 'outputNode'); }, [edges, nodes]); // Helper to extract image URL from various formats const extractImageUrl = (data) => { if (typeof data === 'string') { return data; } else if (Array.isArray(data) && typeof data[0] === 'string') { return data[0]; } else if (typeof data === 'object' && data !== null) { return data.url || data.path || data.src || ''; } return ''; }; const addNode = (type) => { const newId = `node-${nodeIdCounter++}`; let newNode; if (type === 'input') { newNode = { id: newId, type: 'inputNode', position: { x: Math.random() * 200, y: Math.random() * 200 }, data: { onLoad: getSpaceDetails } }; addLog("Added new Space Node. Please provide a Hugging Face Space ID.", 'INFO'); } else if (type === 'output') { newNode = { id: newId, type: 'outputNode', position: { x: Math.random() * 200 + 500, y: Math.random() * 200 }, data: { label: 'Output' } }; addLog("Added new Output Node.", 'INFO'); } else { newNode = { id: newId, type: 'runNode', position: { x: 50, y: Math.random() * 200 }, data: {} }; // No more onRun prop addLog("Added new Run Node. Connect it to start your workflow.", 'INFO'); } setNodes((nds) => nds.concat(newNode)); }; const handleEndpointChange = useCallback((nodeId, newApiName) => { setNodes((nds) => nds.map((node) => { if (node.id === nodeId) { const newEndpoint = node.data.endpoints.find(e => e.api_name === newApiName); if (newEndpoint) { addLog(`[${node.data.label}] Changed main API to: ${newApiName}`, 'INFO'); const newData = { ...node.data, apiName: newEndpoint.api_name, inputs: newEndpoint.inputs, outputs: newEndpoint.outputs, connections: { targets: [] }, status: null, result: null, jsonPaths: {}, // Reset JSON path selections on endpoint change }; return { ...node, data: newData }; } } return node; }) ); setEdges((eds) => eds.filter(edge => edge.target !== nodeId)); }, [addLog, setNodes, setEdges]); // Handle JSON path selection changes const handleOutputPathChange = useCallback((nodeId, outputId, path) => { setNodes((nds) => nds.map((node) => { if (node.id === nodeId) { // Store the selected path in the node data const jsonPaths = { ...(node.data.jsonPaths || {}), [outputId]: path }; addLog(`[${node.data.label}] Set JSON path for output ${outputId} to: ${path}`, 'INFO'); // Update the edges that use this output setEdges((eds) => eds.map(edge => { if (edge.source === nodeId && edge.sourceHandle === `output-${outputId}`) { return { ...edge, data: { ...edge.data, jsonPath: path } }; } return edge; }) ); return { ...node, data: { ...node.data, jsonPaths } }; } return node; }) ); }, [addLog, setNodes, setEdges]); const getSpaceDetails = async (nodeIdToReplace, spaceId) => { addLog(`Fetching details for Space ID: ${spaceId}...`); setNodes(nds => nds.map(n => n.id === nodeIdToReplace ? { ...n, data: { ...n.data, error: null } } : n)); try { const response = await api.get(`/space/?space_id=${spaceId}`); const { endpoints } = response.data; const preferredEndpoint = endpoints.find(e => e.is_preferred) || endpoints[0]; if (preferredEndpoint) { const originalNode = nodes.find(n => n.id === nodeIdToReplace); const loadedNode = { id: nodeIdToReplace, type: 'custom', position: originalNode?.position || { x: 100, y: 100 }, data: { label: spaceId, apiName: preferredEndpoint.api_name, inputs: preferredEndpoint.inputs, outputs: preferredEndpoint.outputs, connections: { targets: [] }, endpoints: endpoints, onEndpointChange: handleEndpointChange, onOutputPathChange: handleOutputPathChange, // Add the new callback jsonPaths: {}, // Initialize empty JSON paths }, }; setNodes((nds) => nds.map((node) => (node.id === nodeIdToReplace ? loadedNode : node))); addLog(`Successfully loaded Space: ${spaceId}. Main API: ${preferredEndpoint.api_name}`, 'SUCCESS'); } else { throw new Error("No usable API endpoints found for this space."); } } catch (err) { const errorMessage = err.response?.data?.error || err.message || 'Failed to fetch space details.'; addLog(`Failed to load Space: ${errorMessage}`, 'ERROR'); setNodes(nds => nds.map(n => n.id === nodeIdToReplace ? { ...n, data: { ...n.data, error: errorMessage, onLoad: getSpaceDetails } } : n)); } }; // Find all nodes reachable from a starting node (for isolated workflow execution) const findReachableNodes = useCallback((startNodeId) => { const reachableNodes = new Set([startNodeId]); const nodesToProcess = [startNodeId]; while (nodesToProcess.length > 0) { const currentNodeId = nodesToProcess.shift(); // Find all edges where the current node is the source edges.forEach(edge => { if (edge.source === currentNodeId && !reachableNodes.has(edge.target)) { reachableNodes.add(edge.target); nodesToProcess.push(edge.target); } }); } return Array.from(reachableNodes); }, [edges]); const resetNodeStatuses = () => { setNodes(nds => nds.map(n => { // If this is an OutputNode being used as a source, preserve its result if (n.type === 'outputNode' && isOutputNodeUsedAsSource(n.id)) { // Preserve the result but clear status and error const { status, error, ...restData } = n.data; return { ...n, data: { ...restData, isSource: true } }; } // For all other nodes, clear status, result and error if (n.data && ('status' in n.data || 'result' in n.data || 'error' in n.data)) { const { status, result, error, ...restData } = n.data; return { ...n, data: restData }; } return n; })); }; const pollForResult = async (nodeId, jobId, executionResults, nodesMap) => { const node = nodesMap.get(nodeId); addLog(`[${node.data.label}] Polling for result (Job ID: ${jobId})...`); const processLogs = (logs) => { if (logs && logs.length > 0) { logs.forEach(logMsg => { addLog(`[BACKEND] ${logMsg}`, 'INFO'); }); } }; const poll = async () => { try { const response = await api.get(`/result/${jobId}`); // Always process logs, whether the job is running or complete processLogs(response.data.logs); if (response.data.status === 'processing') { setTimeout(poll, 2000); // Continue polling } else if (response.data.status === 'completed') { addLog(`[${node.data.label}] Job completed successfully.`, 'SUCCESS'); executionResults.set(nodeId, response.data.result); setNodes(nds => nds.map(n => n.id === nodeId ? { ...n, data: { ...n.data, status: 'success', result: response.data.result } } : n)); await processNextNodes(nodeId, executionResults, nodesMap); } else if (response.data.status === 'error' || response.data.status === 'failed' || response.data.status === 'cancelled') { throw new Error(response.data.error || `Job ended with status: ${response.data.status}`); } } catch (e) { addLog(`[${node.data.label}] Polling failed: ${e.message}`, 'ERROR'); setNodes(nds => nds.map(n => n.id === nodeId ? { ...n, data: { ...n.data, status: 'error', error: e.message } } : n)); } }; poll(); }; // Helper function to get a value from a JSON object using a path string const getValueByJsonPath = (obj, path) => { if (!path || path === '(entire object)') return obj; return path.split('.').reduce((prev, curr) => { return prev && prev[curr] !== undefined ? prev[curr] : undefined; }, obj); }; const processNextNodes = async (currentNodeId, executionResults, nodesMap) => { const adj = new Map(nodes.map(n => [n.id, []])); edges.forEach(edge => { if (!adj.has(edge.source)) adj.set(edge.source, []); adj.get(edge.source).push({ nodeId: edge.target, edge }); }); const nextConnections = adj.get(currentNodeId) || []; addLog(`Processing next nodes for '${nodesMap.get(currentNodeId).data.label || currentNodeId}': ${nextConnections.length} found.`); for (const { nodeId, edge } of nextConnections) { await executeNode(nodesMap.get(nodeId), executionResults, nodesMap, edge); } }; const executeNode = async (node, executionResults, nodesMap, incomingEdge = null) => { if (!node || executionResults.has(node.id)) return; addLog(`Executing node: ${node.data.label || node.type} (ID: ${node.id})`); setNodes(nds => nds.map(n => n.id === node.id ? { ...n, data: { ...n.data, status: 'running' } } : n)); try { if (node.type === 'runNode') { setNodes(nds => nds.map(n => n.id === node.id ? { ...n, data: { ...n.data, status: 'success' } } : n)); await processNextNodes(node.id, executionResults, nodesMap); } else if (node.type === 'inputNode') { addLog(`Node '${node.id}' is an unconfigured Space Node. Please load a Space ID to use it.`, 'WARN'); setNodes(nds => nds.map(n => n.id === node.id ? { ...n, data: { ...n.data, status: 'error', error: 'Unconfigured Node' } } : n)); return; } else if (node.type === 'custom') { const apiInputs = await Promise.all(node.data.inputs.map(async (input) => { const incomingEdge = edges.find(e => e.target === node.id && e.targetHandle === `input-${input.id}`); if (incomingEdge) { const sourceNode = nodesMap.get(incomingEdge.source); // Special handling for OutputNode sources - get the result directly from the node data or executionResults if (sourceNode && sourceNode.type === 'outputNode') { // Get either the stored result or the latest execution result const sourceResult = sourceNode.data.result || executionResults.get(incomingEdge.source); if (sourceResult !== undefined) { const sourceOutputType = sourceNode.data.connectedType; addLog(`[${node.data.label}] Input '${input.label}' from Output Node (type: ${sourceOutputType})`, 'INFO'); // Check if we need to convert an image to text (for prompt inputs) const isImageToTextConversion = incomingEdge.data?.conversionType === 'imageToTextPrompt' || (sourceOutputType === 'image' && ['textbox', 'string', 'text'].includes(input.type.toLowerCase()) && (input.name === 'prompt' || input.label.toLowerCase().includes('prompt'))); if (isImageToTextConversion) { // Extract the image URL and use it as the prompt text const imageUrl = extractImageUrl(sourceResult); addLog(`[${node.data.label}] Converting image to URL for prompt input: ${imageUrl}`, 'INFO'); return imageUrl; } return sourceResult; } } else if (sourceNode && sourceNode.type === 'custom') { const sourceResult = executionResults.get(incomingEdge.source); if (sourceResult !== undefined) { const outputId = incomingEdge.sourceHandle.replace('output-', ''); let outputValue = Array.isArray(sourceResult) ? sourceResult[sourceNode.data.outputs.findIndex(o => String(o.id) === outputId)] : sourceResult; // Check if this is a JSON type with a path selection const output = sourceNode.data.outputs.find(o => String(o.id) === outputId); const isJsonOutput = output && output.type.toLowerCase() === 'json'; const jsonPath = incomingEdge.data?.jsonPath; if (isJsonOutput && jsonPath && typeof outputValue === 'object') { outputValue = getValueByJsonPath(outputValue, jsonPath); addLog(`[${node.data.label}] Using JSON path '${jsonPath}' to extract from input '${input.label}'`); } addLog(`[${node.data.label}] Input '${input.label}' from Custom Node, using value: ${JSON.stringify(outputValue)}`, 'INFO'); // If the value is a file path from an output node, we need to re-upload it for the next node. if (typeof outputValue === 'string' && (outputValue.startsWith('http') || outputValue.startsWith('/media'))) { try { addLog(`[${node.data.label}] Detected output file URL. Fetching and re-uploading for next stage...`); const response = await fetch(outputValue); const blob = await response.blob(); const fileName = outputValue.substring(outputValue.lastIndexOf('/') + 1); const file = new File([blob], fileName); const formData = new FormData(); formData.append('file', file); const uploadResponse = await api.post('/upload/', formData, { headers: { 'Content-Type': 'multipart/form-data' }, }); addLog(`[${node.data.label}] File re-uploaded. New path: ${uploadResponse.data.path}`); return uploadResponse.data.path; } catch (e) { addLog(`[${node.data.label}] Failed to re-upload file: ${e.message}`, 'ERROR'); return null; // Skip this input if re-upload fails } } return outputValue; } } } // If no connection or the connection doesn't have a value, use the form value const controlId = `node-${node.id}-input-${input.id}`; const element = document.getElementById(controlId); if (!element) { addLog(`[${node.data.label}] Input '${input.label}' has no form element. Skipping.`, 'WARN'); return null; } const value = element.type === 'checkbox' ? element.checked : element.value; addLog(`[${node.data.label}] Input '${input.label}' from form, using value: ${value}`, 'INFO'); return value; })); const filteredInputs = apiInputs.filter(input => input !== null); // Check for required fields before sending the request const missingRequiredInputs = node.data.inputs .filter(input => input.required && !filteredInputs.some((_, index) => index === node.data.inputs.indexOf(input))) .map(input => input.name || input.label); if (missingRequiredInputs.length > 0) { throw new Error(`Missing required inputs: ${missingRequiredInputs.join(', ')}`); } addLog(`[${node.data.label}] Sending request to /predict/ with inputs: ${JSON.stringify(filteredInputs)}`); const response = await api.post('/predict/', { space_id: node.data.label, api_name: node.data.apiName, inputs: filteredInputs, }); if (response.data.error) throw new Error(response.data.error); if (response.data.job_id) { setNodes(nds => nds.map(n => n.id === node.id ? { ...n, data: { ...n.data, status: 'polling' } } : n)); await pollForResult(node.id, response.data.job_id, executionResults, nodesMap); } else { addLog(`[${node.data.label}] Request successful, no job ID returned.`, 'SUCCESS'); executionResults.set(node.id, response.data.result); setNodes(nds => nds.map(n => n.id === node.id ? { ...n, data: { ...n.data, status: 'success', result: response.data.result } } : n)); await processNextNodes(node.id, executionResults, nodesMap); } } else if (node.type === 'outputNode') { // If this OutputNode is being used as a source and already has content, // we should preserve its content and just add it to execution results if (node.data.isSource && node.data.result !== undefined && node.data.result !== null) { addLog(`[OutputNode: ${node.data.label}] Using existing content as source for downstream nodes`, 'INFO'); // Add this node's result to executionResults so downstream nodes can use it executionResults.set(node.id, node.data.result); // Mark it as success without changing its content setNodes(nds => nds.map(n => n.id === node.id ? { ...n, data: { ...n.data, status: 'success' } } : n)); } else { // Original logic for displaying output from an incoming connection const incomingEdge = edges.find(e => e.target === node.id); if (incomingEdge) { const sourceNode = nodesMap.get(incomingEdge.source); if (sourceNode && sourceNode.type === 'custom') { const outputId = incomingEdge.sourceHandle.replace('output-', ''); const sourceResult = executionResults.get(incomingEdge.source); let displayResult = Array.isArray(sourceResult) ? sourceResult[sourceNode.data.outputs.findIndex(o => `output-${o.id}` === incomingEdge.sourceHandle)] : sourceResult; // Check if this is a JSON type with a path selection const output = sourceNode.data.outputs.find(o => String(o.id) === outputId); const isJsonOutput = output && output.type.toLowerCase() === 'json'; const jsonPath = incomingEdge.data?.jsonPath; if (isJsonOutput && jsonPath && typeof displayResult === 'object') { displayResult = getValueByJsonPath(displayResult, jsonPath); addLog(`[OutputNode] Using JSON path '${jsonPath}' to extract result`, 'INFO'); } addLog(`[OutputNode] Displaying result: ${JSON.stringify(displayResult)}`, 'SUCCESS'); // Store the result in node data and execution results setNodes(nds => nds.map(n => n.id === node.id ? { ...n, data: { ...n.data, status: 'success', result: displayResult } } : n)); executionResults.set(node.id, displayResult); } } } // Always process next nodes regardless of whether we're displaying or forwarding content await processNextNodes(node.id, executionResults, nodesMap); } } catch (e) { addLog(`Execution failed at node ${node.data.label || node.id}: ${e.message}`, 'ERROR'); setNodes(nds => nds.map(n => n.id === node.id ? { ...n, data: { ...n.data, status: 'error', error: e.message } } : n)); } }; // Function to run workflow from a specific run node const handleRunFromNode = async (startNodeId) => { if (isRunning) { addLog("A workflow is already running. Please wait for it to complete.", 'WARN'); return; } setIsRunning(true); try { addLog(`================== WORKFLOW RUNNING FROM NODE ${startNodeId} ==================`, 'INFO'); resetNodeStatuses(); // Get only the nodes reachable from this specific Run Node const reachableNodeIds = findReachableNodes(startNodeId); addLog(`This workflow includes ${reachableNodeIds.length} node(s)`, 'INFO'); // Map of all nodes for easier access const nodesMap = new Map(nodes.map(n => [n.id, n])); // Track execution results const executionResults = new Map(); // Start execution from the run node await executeNode(nodesMap.get(startNodeId), executionResults, nodesMap); addLog(`================== WORKFLOW COMPLETED ==================`, 'SUCCESS'); } catch (error) { addLog(`Workflow execution failed: ${error.message}`, 'ERROR'); } finally { setIsRunning(false); } }; // For backward compatibility or general run const handleRun = async () => { // Find all run nodes const runNodes = nodes.filter(node => node.type === 'runNode'); if (runNodes.length > 0) { // Run from the first run node found (for backward compatibility) await handleRunFromNode(runNodes[0].id); } else { addLog("No Run Nodes found in the workflow. Please add a Run Node.", 'WARN'); } }; return (
); } export default App;