tritan-api / core /nodes.py
Madras1's picture
Upload 4 files
bc074e0 verified
"""
Node Executors - Handle execution of different node types
"""
from typing import Any
import httpx
import os
from dotenv import load_dotenv
load_dotenv()
class NodeExecutor:
"""
Executes individual nodes based on their type.
Supports: trigger, action, condition, loop, llm, http, code, transform
"""
def __init__(self):
# LLM provider configurations
self.llm_providers = {
"groq": {
"base_url": "https://api.groq.com/openai/v1",
"api_key_env": "GROQ_API_KEY",
"models": ["llama-3.3-70b-versatile", "llama-3.1-8b-instant", "mixtral-8x7b-32768"]
},
"openrouter": {
"base_url": "https://openrouter.ai/api/v1",
"api_key_env": "OPENROUTER_API_KEY",
"models": ["anthropic/claude-3.5-sonnet", "openai/gpt-4o", "google/gemini-2.0-flash-exp:free"]
},
"cerebras": {
"base_url": "https://api.cerebras.ai/v1",
"api_key_env": "CEREBRAS_API_KEY",
"models": ["llama3.1-8b", "llama3.1-70b"]
},
"gemini": {
"base_url": "https://generativelanguage.googleapis.com/v1beta",
"api_key_env": "GEMINI_API_KEY",
"models": ["gemini-2.0-flash-exp", "gemini-1.5-pro"]
}
}
async def execute(self, node_type: str, config: dict, variables: dict) -> Any:
"""Execute a node and return its output"""
executor = getattr(self, f"_execute_{node_type}", None)
if not executor:
raise ValueError(f"Unknown node type: {node_type}")
return await executor(config, variables)
async def _execute_trigger(self, config: dict, variables: dict) -> dict:
"""Trigger node - starts the workflow"""
return {
"triggered": True,
"label": config.get("label", "Start"),
"timestamp": __import__("datetime").datetime.now().isoformat()
}
async def _execute_action(self, config: dict, variables: dict) -> dict:
"""Generic action node"""
action_type = config.get("config", {}).get("action_type", "log")
if action_type == "log":
message = config.get("config", {}).get("message", "Action executed")
print(f"[ACTION] {message}")
return {"logged": message}
return {"action": action_type, "executed": True}
async def _execute_condition(self, config: dict, variables: dict) -> dict:
"""Condition node - evaluates a condition"""
condition = config.get("condition") or config.get("config", {}).get("condition", "true")
# Simple condition evaluation (be careful with eval in production!)
try:
# Replace variable references with actual values
for var_name, var_value in variables.items():
condition = condition.replace(f"${{{var_name}}}", str(var_value))
result = eval(condition, {"__builtins__": {}}, variables)
return {"condition": condition, "result": bool(result)}
except Exception as e:
return {"condition": condition, "result": False, "error": str(e)}
async def _execute_loop(self, config: dict, variables: dict) -> dict:
"""Loop node - iterates over data"""
items = config.get("config", {}).get("items", [])
if isinstance(items, str) and items.startswith("${"):
# Reference to variable
var_name = items[2:-1]
items = variables.get(var_name, [])
return {"items": items, "count": len(items) if isinstance(items, list) else 0}
async def _execute_llm(self, config: dict, variables: dict) -> dict:
"""LLM node - calls AI model"""
provider = config.get("provider", "groq")
model = config.get("model", "llama-3.3-70b-versatile")
prompt = config.get("prompt", "Hello!")
temperature = config.get("temperature", 0.7)
# Replace variables in prompt
for var_name, var_value in variables.items():
if isinstance(var_value, dict):
var_value = str(var_value)
prompt = prompt.replace(f"${{{var_name}}}", str(var_value))
provider_config = self.llm_providers.get(provider)
if not provider_config:
raise ValueError(f"Unknown LLM provider: {provider}")
api_key = os.getenv(provider_config["api_key_env"])
if not api_key:
# Return mock response if no API key
return {
"provider": provider,
"model": model,
"response": f"[Mock response - set {provider_config['api_key_env']} for real responses]",
"prompt": prompt
}
# Call LLM API
if provider == "gemini":
response = await self._call_gemini(api_key, model, prompt, temperature)
else:
response = await self._call_openai_compatible(
provider_config["base_url"],
api_key,
model,
prompt,
temperature
)
return {
"provider": provider,
"model": model,
"response": response,
"prompt": prompt
}
async def _call_openai_compatible(
self,
base_url: str,
api_key: str,
model: str,
prompt: str,
temperature: float
) -> str:
"""Call OpenAI-compatible API (Groq, OpenRouter, Cerebras)"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{base_url}/chat/completions",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
json={
"model": model,
"messages": [{"role": "user", "content": prompt}],
"temperature": temperature
},
timeout=60.0
)
response.raise_for_status()
data = response.json()
return data["choices"][0]["message"]["content"]
async def _call_gemini(
self,
api_key: str,
model: str,
prompt: str,
temperature: float
) -> str:
"""Call Gemini API"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent",
params={"key": api_key},
json={
"contents": [{"parts": [{"text": prompt}]}],
"generationConfig": {"temperature": temperature}
},
timeout=60.0
)
response.raise_for_status()
data = response.json()
return data["candidates"][0]["content"]["parts"][0]["text"]
async def _execute_http(self, config: dict, variables: dict) -> dict:
"""HTTP request node"""
# Support both flat (from frontend) and nested (config.config.*) formats
method = config.get("method") or config.get("config", {}).get("method", "GET")
url = config.get("url") or config.get("config", {}).get("url", "")
headers = config.get("headers") or config.get("config", {}).get("headers", {})
body = config.get("body") or config.get("config", {}).get("body", None)
# Replace variables
for var_name, var_value in variables.items():
url = url.replace(f"${{{var_name}}}", str(var_value))
# Auto-add https:// if protocol is missing
if url and not url.startswith(('http://', 'https://')):
url = 'https://' + url
# Validate URL
if not url:
raise ValueError("HTTP node requires a URL")
async with httpx.AsyncClient() as client:
response = await client.request(
method=method,
url=url,
headers=headers,
json=body if body else None,
timeout=30.0
)
try:
data = response.json()
except:
data = response.text
return {
"status_code": response.status_code,
"data": data
}
async def _execute_code(self, config: dict, variables: dict) -> dict:
"""Custom code execution node (sandboxed)"""
code = config.get("code") or config.get("config", {}).get("code", "return {}")
# Very basic sandboxed execution
# In production, use a proper sandbox like RestrictedPython
safe_builtins = {
'str': str, 'int': int, 'float': float, 'bool': bool,
'len': len, 'range': range, 'list': list, 'dict': dict,
'sum': sum, 'min': min, 'max': max, 'abs': abs, 'round': round,
'sorted': sorted, 'enumerate': enumerate, 'zip': zip,
'True': True, 'False': False, 'None': None,
'print': print, 'dir': dir, 'type': type, 'isinstance': isinstance,
}
# Flatten variables for easier access in code
flat_vars = {"variables": variables, "result": None}
# Add all node outputs directly
for node_id, output in variables.items():
flat_vars[node_id] = output
# Extract common fields
if isinstance(output, dict):
# Only set http_data from actual HTTP responses (have status_code)
if "status_code" in output and "data" in output:
flat_vars["http_data"] = output["data"]
if "response" in output:
flat_vars["llm_response"] = output["response"]
flat_vars["last_response"] = output["response"]
if "result" in output:
flat_vars["last_result"] = output["result"]
try:
exec(f"result = {code}", {"__builtins__": safe_builtins}, flat_vars)
return {"result": flat_vars.get("result")}
except Exception as e:
return {"error": str(e)}
async def _execute_transform(self, config: dict, variables: dict) -> dict:
"""Data transformation node"""
transform_type = config.get("transform") or config.get("config", {}).get("transform", "passthrough")
input_data = config.get("input") or config.get("config", {}).get("input", None)
# Resolve input from variables
if isinstance(input_data, str) and input_data.startswith("${"):
var_name = input_data[2:-1]
input_data = variables.get(var_name)
if transform_type == "passthrough":
return {"data": input_data}
elif transform_type == "json_parse":
import json
return {"data": json.loads(input_data) if isinstance(input_data, str) else input_data}
elif transform_type == "uppercase":
return {"data": str(input_data).upper()}
elif transform_type == "lowercase":
return {"data": str(input_data).lower()}
return {"data": input_data}