tritan-api / api /execute.py
Madras1's picture
Upload 17 files
d6815ad verified
"""
Workflow execution endpoints
"""
from fastapi import APIRouter, HTTPException
from fastapi.responses import StreamingResponse
from models.workflow import Workflow
from models.execution import ExecutionState, ExecutionResult, ExecutionStatus, NodeResult
from core.engine import WorkflowEngine
from typing import AsyncGenerator
import json
router = APIRouter()
engine = WorkflowEngine()
@router.post("/")
async def execute_workflow(workflow: Workflow):
"""Execute a workflow and return results"""
try:
result = await engine.execute(workflow)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/stream")
async def execute_workflow_stream(workflow: Workflow):
"""Execute a workflow with streaming results"""
async def generate() -> AsyncGenerator[str, None]:
async for event in engine.execute_stream(workflow):
yield f"data: {json.dumps(event)}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
@router.post("/node/{node_type}")
async def execute_single_node(node_type: str, config: dict):
"""Execute a single node (for testing)"""
try:
result = await engine.execute_node(node_type, config)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))