Spaces:
Sleeping
Sleeping
| """ | |
| Node Executors - Handle execution of different node types | |
| """ | |
| from typing import Any | |
| import httpx | |
| import os | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| class NodeExecutor: | |
| """ | |
| Executes individual nodes based on their type. | |
| Supports: trigger, action, condition, loop, llm, http, code, transform | |
| """ | |
| def __init__(self): | |
| # LLM provider configurations | |
| self.llm_providers = { | |
| "groq": { | |
| "base_url": "https://api.groq.com/openai/v1", | |
| "api_key_env": "GROQ_API_KEY", | |
| "models": ["llama-3.3-70b-versatile", "llama-3.1-8b-instant", "mixtral-8x7b-32768"] | |
| }, | |
| "openrouter": { | |
| "base_url": "https://openrouter.ai/api/v1", | |
| "api_key_env": "OPENROUTER_API_KEY", | |
| "models": ["anthropic/claude-3.5-sonnet", "openai/gpt-4o", "google/gemini-2.0-flash-exp:free"] | |
| }, | |
| "cerebras": { | |
| "base_url": "https://api.cerebras.ai/v1", | |
| "api_key_env": "CEREBRAS_API_KEY", | |
| "models": ["llama3.1-8b", "llama3.1-70b"] | |
| }, | |
| "gemini": { | |
| "base_url": "https://generativelanguage.googleapis.com/v1beta", | |
| "api_key_env": "GEMINI_API_KEY", | |
| "models": ["gemini-2.0-flash-exp", "gemini-1.5-pro"] | |
| } | |
| } | |
| async def execute(self, node_type: str, config: dict, variables: dict) -> Any: | |
| """Execute a node and return its output""" | |
| executor = getattr(self, f"_execute_{node_type}", None) | |
| if not executor: | |
| raise ValueError(f"Unknown node type: {node_type}") | |
| return await executor(config, variables) | |
| async def _execute_trigger(self, config: dict, variables: dict) -> dict: | |
| """Trigger node - starts the workflow""" | |
| return { | |
| "triggered": True, | |
| "label": config.get("label", "Start"), | |
| "timestamp": __import__("datetime").datetime.now().isoformat() | |
| } | |
| async def _execute_action(self, config: dict, variables: dict) -> dict: | |
| """Generic action node""" | |
| action_type = config.get("config", {}).get("action_type", "log") | |
| if action_type == "log": | |
| message = config.get("config", {}).get("message", "Action executed") | |
| print(f"[ACTION] {message}") | |
| return {"logged": message} | |
| return {"action": action_type, "executed": True} | |
| async def _execute_condition(self, config: dict, variables: dict) -> dict: | |
| """Condition node - evaluates a condition""" | |
| condition = config.get("condition") or config.get("config", {}).get("condition", "true") | |
| # Simple condition evaluation (be careful with eval in production!) | |
| try: | |
| # Replace variable references with actual values | |
| for var_name, var_value in variables.items(): | |
| condition = condition.replace(f"${{{var_name}}}", str(var_value)) | |
| result = eval(condition, {"__builtins__": {}}, variables) | |
| return {"condition": condition, "result": bool(result)} | |
| except Exception as e: | |
| return {"condition": condition, "result": False, "error": str(e)} | |
| async def _execute_loop(self, config: dict, variables: dict) -> dict: | |
| """Loop node - iterates over data""" | |
| items = config.get("config", {}).get("items", []) | |
| if isinstance(items, str) and items.startswith("${"): | |
| # Reference to variable | |
| var_name = items[2:-1] | |
| items = variables.get(var_name, []) | |
| return {"items": items, "count": len(items) if isinstance(items, list) else 0} | |
| async def _execute_llm(self, config: dict, variables: dict) -> dict: | |
| """LLM node - calls AI model""" | |
| provider = config.get("provider", "groq") | |
| model = config.get("model", "llama-3.3-70b-versatile") | |
| prompt = config.get("prompt", "Hello!") | |
| temperature = config.get("temperature", 0.7) | |
| # Replace variables in prompt | |
| for var_name, var_value in variables.items(): | |
| if isinstance(var_value, dict): | |
| var_value = str(var_value) | |
| prompt = prompt.replace(f"${{{var_name}}}", str(var_value)) | |
| provider_config = self.llm_providers.get(provider) | |
| if not provider_config: | |
| raise ValueError(f"Unknown LLM provider: {provider}") | |
| api_key = os.getenv(provider_config["api_key_env"]) | |
| if not api_key: | |
| # Return mock response if no API key | |
| return { | |
| "provider": provider, | |
| "model": model, | |
| "response": f"[Mock response - set {provider_config['api_key_env']} for real responses]", | |
| "prompt": prompt | |
| } | |
| # Call LLM API | |
| if provider == "gemini": | |
| response = await self._call_gemini(api_key, model, prompt, temperature) | |
| else: | |
| response = await self._call_openai_compatible( | |
| provider_config["base_url"], | |
| api_key, | |
| model, | |
| prompt, | |
| temperature | |
| ) | |
| return { | |
| "provider": provider, | |
| "model": model, | |
| "response": response, | |
| "prompt": prompt | |
| } | |
| async def _call_openai_compatible( | |
| self, | |
| base_url: str, | |
| api_key: str, | |
| model: str, | |
| prompt: str, | |
| temperature: float | |
| ) -> str: | |
| """Call OpenAI-compatible API (Groq, OpenRouter, Cerebras)""" | |
| async with httpx.AsyncClient() as client: | |
| response = await client.post( | |
| f"{base_url}/chat/completions", | |
| headers={ | |
| "Authorization": f"Bearer {api_key}", | |
| "Content-Type": "application/json" | |
| }, | |
| json={ | |
| "model": model, | |
| "messages": [{"role": "user", "content": prompt}], | |
| "temperature": temperature | |
| }, | |
| timeout=60.0 | |
| ) | |
| response.raise_for_status() | |
| data = response.json() | |
| return data["choices"][0]["message"]["content"] | |
| async def _call_gemini( | |
| self, | |
| api_key: str, | |
| model: str, | |
| prompt: str, | |
| temperature: float | |
| ) -> str: | |
| """Call Gemini API""" | |
| async with httpx.AsyncClient() as client: | |
| response = await client.post( | |
| f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent", | |
| params={"key": api_key}, | |
| json={ | |
| "contents": [{"parts": [{"text": prompt}]}], | |
| "generationConfig": {"temperature": temperature} | |
| }, | |
| timeout=60.0 | |
| ) | |
| response.raise_for_status() | |
| data = response.json() | |
| return data["candidates"][0]["content"]["parts"][0]["text"] | |
| async def _execute_http(self, config: dict, variables: dict) -> dict: | |
| """HTTP request node""" | |
| # Support both flat (from frontend) and nested (config.config.*) formats | |
| method = config.get("method") or config.get("config", {}).get("method", "GET") | |
| url = config.get("url") or config.get("config", {}).get("url", "") | |
| headers = config.get("headers") or config.get("config", {}).get("headers", {}) | |
| body = config.get("body") or config.get("config", {}).get("body", None) | |
| # Replace variables | |
| for var_name, var_value in variables.items(): | |
| url = url.replace(f"${{{var_name}}}", str(var_value)) | |
| # Auto-add https:// if protocol is missing | |
| if url and not url.startswith(('http://', 'https://')): | |
| url = 'https://' + url | |
| # Validate URL | |
| if not url: | |
| raise ValueError("HTTP node requires a URL") | |
| async with httpx.AsyncClient() as client: | |
| response = await client.request( | |
| method=method, | |
| url=url, | |
| headers=headers, | |
| json=body if body else None, | |
| timeout=30.0 | |
| ) | |
| try: | |
| data = response.json() | |
| except: | |
| data = response.text | |
| return { | |
| "status_code": response.status_code, | |
| "data": data | |
| } | |
| async def _execute_code(self, config: dict, variables: dict) -> dict: | |
| """Custom code execution node (sandboxed)""" | |
| code = config.get("code") or config.get("config", {}).get("code", "return {}") | |
| # Very basic sandboxed execution | |
| # In production, use a proper sandbox like RestrictedPython | |
| safe_builtins = { | |
| 'str': str, 'int': int, 'float': float, 'bool': bool, | |
| 'len': len, 'range': range, 'list': list, 'dict': dict, | |
| 'sum': sum, 'min': min, 'max': max, 'abs': abs, 'round': round, | |
| 'sorted': sorted, 'enumerate': enumerate, 'zip': zip, | |
| 'True': True, 'False': False, 'None': None, | |
| 'print': print, 'dir': dir, 'type': type, 'isinstance': isinstance, | |
| } | |
| # Flatten variables for easier access in code | |
| flat_vars = {"variables": variables, "result": None} | |
| # Add all node outputs directly | |
| for node_id, output in variables.items(): | |
| flat_vars[node_id] = output | |
| # Extract common fields | |
| if isinstance(output, dict): | |
| # Only set http_data from actual HTTP responses (have status_code) | |
| if "status_code" in output and "data" in output: | |
| flat_vars["http_data"] = output["data"] | |
| if "response" in output: | |
| flat_vars["llm_response"] = output["response"] | |
| flat_vars["last_response"] = output["response"] | |
| if "result" in output: | |
| flat_vars["last_result"] = output["result"] | |
| try: | |
| exec(f"result = {code}", {"__builtins__": safe_builtins}, flat_vars) | |
| return {"result": flat_vars.get("result")} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| async def _execute_transform(self, config: dict, variables: dict) -> dict: | |
| """Data transformation node""" | |
| transform_type = config.get("transform") or config.get("config", {}).get("transform", "passthrough") | |
| input_data = config.get("input") or config.get("config", {}).get("input", None) | |
| # Resolve input from variables | |
| if isinstance(input_data, str) and input_data.startswith("${"): | |
| var_name = input_data[2:-1] | |
| input_data = variables.get(var_name) | |
| if transform_type == "passthrough": | |
| return {"data": input_data} | |
| elif transform_type == "json_parse": | |
| import json | |
| return {"data": json.loads(input_data) if isinstance(input_data, str) else input_data} | |
| elif transform_type == "uppercase": | |
| return {"data": str(input_data).upper()} | |
| elif transform_type == "lowercase": | |
| return {"data": str(input_data).lower()} | |
| return {"data": input_data} | |