""" Node Executors - Handle execution of different node types """ from typing import Any import httpx import os from dotenv import load_dotenv load_dotenv() class NodeExecutor: """ Executes individual nodes based on their type. Supports: trigger, action, condition, loop, llm, http, code, transform """ def __init__(self): # LLM provider configurations self.llm_providers = { "groq": { "base_url": "https://api.groq.com/openai/v1", "api_key_env": "GROQ_API_KEY", "models": ["llama-3.3-70b-versatile", "llama-3.1-8b-instant", "mixtral-8x7b-32768"] }, "openrouter": { "base_url": "https://openrouter.ai/api/v1", "api_key_env": "OPENROUTER_API_KEY", "models": ["anthropic/claude-3.5-sonnet", "openai/gpt-4o", "google/gemini-2.0-flash-exp:free"] }, "cerebras": { "base_url": "https://api.cerebras.ai/v1", "api_key_env": "CEREBRAS_API_KEY", "models": ["llama3.1-8b", "llama3.1-70b"] }, "gemini": { "base_url": "https://generativelanguage.googleapis.com/v1beta", "api_key_env": "GEMINI_API_KEY", "models": ["gemini-2.0-flash-exp", "gemini-1.5-pro"] } } async def execute(self, node_type: str, config: dict, variables: dict) -> Any: """Execute a node and return its output""" executor = getattr(self, f"_execute_{node_type}", None) if not executor: raise ValueError(f"Unknown node type: {node_type}") return await executor(config, variables) async def _execute_trigger(self, config: dict, variables: dict) -> dict: """Trigger node - starts the workflow""" return { "triggered": True, "label": config.get("label", "Start"), "timestamp": __import__("datetime").datetime.now().isoformat() } async def _execute_action(self, config: dict, variables: dict) -> dict: """Generic action node""" action_type = config.get("config", {}).get("action_type", "log") if action_type == "log": message = config.get("config", {}).get("message", "Action executed") print(f"[ACTION] {message}") return {"logged": message} return {"action": action_type, "executed": True} async def _execute_condition(self, config: dict, variables: dict) -> dict: """Condition node - evaluates a condition""" condition = config.get("condition") or config.get("config", {}).get("condition", "true") # Simple condition evaluation (be careful with eval in production!) try: # Replace variable references with actual values for var_name, var_value in variables.items(): condition = condition.replace(f"${{{var_name}}}", str(var_value)) result = eval(condition, {"__builtins__": {}}, variables) return {"condition": condition, "result": bool(result)} except Exception as e: return {"condition": condition, "result": False, "error": str(e)} async def _execute_loop(self, config: dict, variables: dict) -> dict: """Loop node - iterates over data""" items = config.get("config", {}).get("items", []) if isinstance(items, str) and items.startswith("${"): # Reference to variable var_name = items[2:-1] items = variables.get(var_name, []) return {"items": items, "count": len(items) if isinstance(items, list) else 0} async def _execute_llm(self, config: dict, variables: dict) -> dict: """LLM node - calls AI model""" provider = config.get("provider", "groq") model = config.get("model", "llama-3.3-70b-versatile") prompt = config.get("prompt", "Hello!") temperature = config.get("temperature", 0.7) # Replace variables in prompt for var_name, var_value in variables.items(): if isinstance(var_value, dict): var_value = str(var_value) prompt = prompt.replace(f"${{{var_name}}}", str(var_value)) provider_config = self.llm_providers.get(provider) if not provider_config: raise ValueError(f"Unknown LLM provider: {provider}") api_key = os.getenv(provider_config["api_key_env"]) if not api_key: # Return mock response if no API key return { "provider": provider, "model": model, "response": f"[Mock response - set {provider_config['api_key_env']} for real responses]", "prompt": prompt } # Call LLM API if provider == "gemini": response = await self._call_gemini(api_key, model, prompt, temperature) else: response = await self._call_openai_compatible( provider_config["base_url"], api_key, model, prompt, temperature ) return { "provider": provider, "model": model, "response": response, "prompt": prompt } async def _call_openai_compatible( self, base_url: str, api_key: str, model: str, prompt: str, temperature: float ) -> str: """Call OpenAI-compatible API (Groq, OpenRouter, Cerebras)""" async with httpx.AsyncClient() as client: response = await client.post( f"{base_url}/chat/completions", headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" }, json={ "model": model, "messages": [{"role": "user", "content": prompt}], "temperature": temperature }, timeout=60.0 ) response.raise_for_status() data = response.json() return data["choices"][0]["message"]["content"] async def _call_gemini( self, api_key: str, model: str, prompt: str, temperature: float ) -> str: """Call Gemini API""" async with httpx.AsyncClient() as client: response = await client.post( f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent", params={"key": api_key}, json={ "contents": [{"parts": [{"text": prompt}]}], "generationConfig": {"temperature": temperature} }, timeout=60.0 ) response.raise_for_status() data = response.json() return data["candidates"][0]["content"]["parts"][0]["text"] async def _execute_http(self, config: dict, variables: dict) -> dict: """HTTP request node""" # Support both flat (from frontend) and nested (config.config.*) formats method = config.get("method") or config.get("config", {}).get("method", "GET") url = config.get("url") or config.get("config", {}).get("url", "") headers = config.get("headers") or config.get("config", {}).get("headers", {}) body = config.get("body") or config.get("config", {}).get("body", None) # Replace variables for var_name, var_value in variables.items(): url = url.replace(f"${{{var_name}}}", str(var_value)) # Auto-add https:// if protocol is missing if url and not url.startswith(('http://', 'https://')): url = 'https://' + url # Validate URL if not url: raise ValueError("HTTP node requires a URL") async with httpx.AsyncClient() as client: response = await client.request( method=method, url=url, headers=headers, json=body if body else None, timeout=30.0 ) try: data = response.json() except: data = response.text return { "status_code": response.status_code, "data": data } async def _execute_code(self, config: dict, variables: dict) -> dict: """Custom code execution node (sandboxed)""" code = config.get("code") or config.get("config", {}).get("code", "return {}") # Very basic sandboxed execution # In production, use a proper sandbox like RestrictedPython safe_builtins = { 'str': str, 'int': int, 'float': float, 'bool': bool, 'len': len, 'range': range, 'list': list, 'dict': dict, 'sum': sum, 'min': min, 'max': max, 'abs': abs, 'round': round, 'sorted': sorted, 'enumerate': enumerate, 'zip': zip, 'True': True, 'False': False, 'None': None, 'print': print, 'dir': dir, 'type': type, 'isinstance': isinstance, } # Flatten variables for easier access in code flat_vars = {"variables": variables, "result": None} # Add all node outputs directly for node_id, output in variables.items(): flat_vars[node_id] = output # Extract common fields if isinstance(output, dict): # Only set http_data from actual HTTP responses (have status_code) if "status_code" in output and "data" in output: flat_vars["http_data"] = output["data"] if "response" in output: flat_vars["llm_response"] = output["response"] flat_vars["last_response"] = output["response"] if "result" in output: flat_vars["last_result"] = output["result"] try: exec(f"result = {code}", {"__builtins__": safe_builtins}, flat_vars) return {"result": flat_vars.get("result")} except Exception as e: return {"error": str(e)} async def _execute_transform(self, config: dict, variables: dict) -> dict: """Data transformation node""" transform_type = config.get("transform") or config.get("config", {}).get("transform", "passthrough") input_data = config.get("input") or config.get("config", {}).get("input", None) # Resolve input from variables if isinstance(input_data, str) and input_data.startswith("${"): var_name = input_data[2:-1] input_data = variables.get(var_name) if transform_type == "passthrough": return {"data": input_data} elif transform_type == "json_parse": import json return {"data": json.loads(input_data) if isinstance(input_data, str) else input_data} elif transform_type == "uppercase": return {"data": str(input_data).upper()} elif transform_type == "lowercase": return {"data": str(input_data).lower()} return {"data": input_data}