Spaces:
Sleeping
Sleeping
File size: 8,101 Bytes
d6815ad 63bfd20 d6815ad 63bfd20 d6815ad |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
"""
Workflow Execution Engine
"""
from models.workflow import Workflow, Node, NodeType
from models.execution import ExecutionState, ExecutionResult, ExecutionStatus, NodeResult
from .nodes import NodeExecutor
from .guardian import Guardian
from datetime import datetime
from typing import Any, AsyncGenerator
import asyncio
class WorkflowEngine:
"""
Main workflow execution engine.
Handles node execution, parallel branching, and state management.
"""
def __init__(self):
self.node_executor = NodeExecutor()
self.guardian = Guardian()
async def execute(self, workflow: Workflow) -> ExecutionResult:
"""Execute a workflow and return the final result"""
start_time = datetime.now()
# Initialize execution state
state = ExecutionState(
workflow_id=workflow.id,
status=ExecutionStatus.RUNNING,
started_at=start_time.isoformat()
)
try:
# Build execution order (topological sort)
execution_order = self._build_execution_order(workflow)
# Execute nodes in order
for node_id in execution_order:
node = self._get_node_by_id(workflow, node_id)
if not node:
continue
state.current_node = node_id
node_result = await self._execute_node(node, state)
state.node_results[node_id] = node_result
# Check for failures
if node_result.status == ExecutionStatus.FAILED:
state.status = ExecutionStatus.FAILED
break
if state.status != ExecutionStatus.FAILED:
state.status = ExecutionStatus.COMPLETED
except Exception as e:
state.status = ExecutionStatus.FAILED
state.node_results["_error"] = NodeResult(
node_id="_error",
status=ExecutionStatus.FAILED,
error=str(e)
)
end_time = datetime.now()
state.completed_at = end_time.isoformat()
return ExecutionResult(
execution_id=state.execution_id,
workflow_id=workflow.id,
status=state.status,
node_results=list(state.node_results.values()),
final_output=self._get_final_output(state),
total_duration_ms=(end_time - start_time).total_seconds() * 1000,
started_at=state.started_at,
completed_at=state.completed_at
)
async def execute_stream(self, workflow: Workflow) -> AsyncGenerator[dict, None]:
"""Execute workflow with streaming events"""
start_time = datetime.now()
yield {"type": "start", "workflow_id": workflow.id, "timestamp": start_time.isoformat()}
state = ExecutionState(
workflow_id=workflow.id,
status=ExecutionStatus.RUNNING,
started_at=start_time.isoformat()
)
try:
execution_order = self._build_execution_order(workflow)
for node_id in execution_order:
node = self._get_node_by_id(workflow, node_id)
if not node:
continue
yield {"type": "node_start", "node_id": node_id, "node_type": node.type}
node_result = await self._execute_node(node, state)
state.node_results[node_id] = node_result
yield {
"type": "node_complete",
"node_id": node_id,
"status": node_result.status,
"output": node_result.output,
"error": node_result.error
}
if node_result.status == ExecutionStatus.FAILED:
state.status = ExecutionStatus.FAILED
break
if state.status != ExecutionStatus.FAILED:
state.status = ExecutionStatus.COMPLETED
except Exception as e:
state.status = ExecutionStatus.FAILED
yield {"type": "error", "message": str(e)}
end_time = datetime.now()
yield {
"type": "complete",
"status": state.status,
"duration_ms": (end_time - start_time).total_seconds() * 1000
}
async def execute_node(self, node_type: str, config: dict) -> dict:
"""Execute a single node (for testing)"""
return await self.node_executor.execute(node_type, config, {})
async def _execute_node(self, node: Node, state: ExecutionState) -> NodeResult:
"""Execute a single node and return result"""
start_time = datetime.now()
try:
# Convert enum to string value if needed
node_type = node.type.value if hasattr(node.type, 'value') else str(node.type)
output = await self.node_executor.execute(
node_type,
node.data.model_dump(),
state.variables
)
end_time = datetime.now()
# Store output in variables for next nodes
state.variables[node.id] = output
return NodeResult(
node_id=node.id,
status=ExecutionStatus.COMPLETED,
output=output,
started_at=start_time.isoformat(),
completed_at=end_time.isoformat(),
duration_ms=(end_time - start_time).total_seconds() * 1000
)
except Exception as e:
end_time = datetime.now()
return NodeResult(
node_id=node.id,
status=ExecutionStatus.FAILED,
error=str(e),
started_at=start_time.isoformat(),
completed_at=end_time.isoformat(),
duration_ms=(end_time - start_time).total_seconds() * 1000
)
def _build_execution_order(self, workflow: Workflow) -> list[str]:
"""Build topological order for node execution"""
# Build adjacency list
graph: dict[str, list[str]] = {node.id: [] for node in workflow.nodes}
in_degree: dict[str, int] = {node.id: 0 for node in workflow.nodes}
for edge in workflow.edges:
graph[edge.source].append(edge.target)
in_degree[edge.target] += 1
# Find trigger nodes (in_degree = 0)
queue = [node_id for node_id, degree in in_degree.items() if degree == 0]
result = []
while queue:
node_id = queue.pop(0)
result.append(node_id)
for neighbor in graph[node_id]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
return result
def _get_node_by_id(self, workflow: Workflow, node_id: str) -> Node | None:
"""Get a node by its ID"""
for node in workflow.nodes:
if node.id == node_id:
return node
return None
def _get_final_output(self, state: ExecutionState) -> Any:
"""Get the final output from the last executed node"""
if not state.node_results:
return None
# Get the last completed node's output
for node_id in reversed(list(state.node_results.keys())):
result = state.node_results[node_id]
if result.status == ExecutionStatus.COMPLETED and result.output:
return result.output
return None
|