Spaces:
Build error
Build error
| """ | |
| Workflow Orchestration Engine | |
| Provides workflow management using LangGraph for complex agent workflows | |
| """ | |
| import asyncio | |
| import json | |
| import logging | |
| from typing import Dict, List, Any, Optional, Callable, Union | |
| from dataclasses import dataclass, field | |
| from enum import Enum | |
| from datetime import datetime, timedelta | |
| import uuid | |
| from contextlib import asynccontextmanager | |
| from langgraph.graph import StateGraph, END | |
| from langgraph.prebuilt import ToolExecutor | |
| from langchain_core.tools import BaseTool | |
| from langchain_core.messages import BaseMessage, HumanMessage, AIMessage | |
| logger = logging.getLogger(__name__) | |
| # ============================= | |
| # Workflow Types | |
| # ============================= | |
| class WorkflowStatus(Enum): | |
| """Workflow execution status""" | |
| PENDING = "pending" | |
| RUNNING = "running" | |
| COMPLETED = "completed" | |
| FAILED = "failed" | |
| CANCELLED = "cancelled" | |
| TIMEOUT = "timeout" | |
| class WorkflowType(Enum): | |
| """Types of workflows""" | |
| SEQUENTIAL = "sequential" | |
| PARALLEL = "parallel" | |
| CONDITIONAL = "conditional" | |
| LOOP = "loop" | |
| FAN_OUT = "fan_out" | |
| FAN_IN = "fan_in" | |
| class WorkflowStep: | |
| """Represents a step in a workflow""" | |
| step_id: str | |
| name: str | |
| description: str | |
| agent_id: Optional[str] = None | |
| tool_name: Optional[str] = None | |
| input_mapping: Dict[str, str] = field(default_factory=dict) | |
| output_mapping: Dict[str, str] = field(default_factory=dict) | |
| timeout: Optional[float] = None | |
| retry_count: int = 3 | |
| retry_delay: float = 1.0 | |
| dependencies: List[str] = field(default_factory=list) | |
| condition: Optional[str] = None # Expression for conditional execution | |
| parallel: bool = False | |
| class WorkflowDefinition: | |
| """Complete workflow definition""" | |
| workflow_id: str | |
| name: str | |
| description: str | |
| workflow_type: WorkflowType | |
| steps: List[WorkflowStep] | |
| input_schema: Dict[str, Any] = field(default_factory=dict) | |
| output_schema: Dict[str, Any] = field(default_factory=dict) | |
| timeout: Optional[float] = None | |
| max_retries: int = 3 | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| class WorkflowExecution: | |
| """Workflow execution instance""" | |
| execution_id: str | |
| workflow_id: str | |
| status: WorkflowStatus | |
| input_data: Dict[str, Any] | |
| output_data: Optional[Dict[str, Any]] = None | |
| step_results: Dict[str, Any] = field(default_factory=dict) | |
| error_message: Optional[str] = None | |
| start_time: datetime = field(default_factory=datetime.now) | |
| end_time: Optional[datetime] = None | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| # ============================= | |
| # Workflow State | |
| # ============================= | |
| class WorkflowState: | |
| """State passed between workflow steps""" | |
| execution_id: str | |
| workflow_id: str | |
| current_step: str | |
| step_results: Dict[str, Any] = field(default_factory=dict) | |
| input_data: Dict[str, Any] = field(default_factory=dict) | |
| output_data: Dict[str, Any] = field(default_factory=dict) | |
| error: Optional[str] = None | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| step_history: List[str] = field(default_factory=list) | |
| retry_count: Dict[str, int] = field(default_factory=dict) | |
| # ============================= | |
| # Workflow Engine | |
| # ============================= | |
| class WorkflowEngine: | |
| """Main workflow orchestration engine""" | |
| def __init__(self): | |
| self.workflows: Dict[str, WorkflowDefinition] = {} | |
| self.executions: Dict[str, WorkflowExecution] = {} | |
| self.agents: Dict[str, Any] = {} # Agent registry | |
| self.tools: Dict[str, BaseTool] = {} # Tool registry | |
| self._lock = asyncio.Lock() | |
| async def register_workflow(self, workflow: WorkflowDefinition) -> bool: | |
| """Register a new workflow definition""" | |
| async with self._lock: | |
| self.workflows[workflow.workflow_id] = workflow | |
| logger.info(f"Registered workflow: {workflow.name} ({workflow.workflow_id})") | |
| return True | |
| async def unregister_workflow(self, workflow_id: str) -> bool: | |
| """Unregister a workflow definition""" | |
| async with self._lock: | |
| if workflow_id in self.workflows: | |
| del self.workflows[workflow_id] | |
| logger.info(f"Unregistered workflow: {workflow_id}") | |
| return True | |
| return False | |
| async def register_agent(self, agent_id: str, agent: Any) -> None: | |
| """Register an agent for workflow execution""" | |
| self.agents[agent_id] = agent | |
| async def register_tool(self, tool_name: str, tool: BaseTool) -> None: | |
| """Register a tool for workflow execution""" | |
| self.tools[tool_name] = tool | |
| async def execute_workflow(self, workflow_id: str, input_data: Dict[str, Any], | |
| execution_id: Optional[str] = None) -> WorkflowExecution: | |
| """Execute a workflow""" | |
| if workflow_id not in self.workflows: | |
| raise ValueError(f"Workflow {workflow_id} not found") | |
| workflow = self.workflows[workflow_id] | |
| execution_id = execution_id or str(uuid.uuid4()) | |
| # Create execution instance | |
| execution = WorkflowExecution( | |
| execution_id=execution_id, | |
| workflow_id=workflow_id, | |
| status=WorkflowStatus.PENDING, | |
| input_data=input_data | |
| ) | |
| self.executions[execution_id] = execution | |
| try: | |
| # Update status to running | |
| execution.status = WorkflowStatus.RUNNING | |
| # Create workflow graph | |
| graph = await self._create_workflow_graph(workflow) | |
| # Execute workflow | |
| initial_state = WorkflowState( | |
| execution_id=execution_id, | |
| workflow_id=workflow_id, | |
| current_step="start", | |
| input_data=input_data | |
| ) | |
| # Run the workflow | |
| final_state = await self._run_workflow(graph, initial_state, workflow) | |
| # Update execution with results | |
| execution.output_data = final_state.output_data | |
| execution.step_results = final_state.step_results | |
| execution.status = WorkflowStatus.COMPLETED | |
| execution.end_time = datetime.now() | |
| logger.info(f"Workflow execution completed: {execution_id}") | |
| except Exception as e: | |
| execution.status = WorkflowStatus.FAILED | |
| execution.error_message = str(e) | |
| execution.end_time = datetime.now() | |
| logger.error(f"Workflow execution failed: {execution_id}, error: {e}") | |
| return execution | |
| async def _create_workflow_graph(self, workflow: WorkflowDefinition) -> StateGraph: | |
| """Create LangGraph state graph from workflow definition""" | |
| workflow_graph = StateGraph(WorkflowState) | |
| # Add nodes for each step | |
| for step in workflow.steps: | |
| workflow_graph.add_node(step.step_id, self._create_step_node(step)) | |
| # Add edges based on workflow type | |
| if workflow.workflow_type == WorkflowType.SEQUENTIAL: | |
| await self._add_sequential_edges(workflow_graph, workflow.steps) | |
| elif workflow.workflow_type == WorkflowType.PARALLEL: | |
| await self._add_parallel_edges(workflow_graph, workflow.steps) | |
| elif workflow.workflow_type == WorkflowType.CONDITIONAL: | |
| await self._add_conditional_edges(workflow_graph, workflow.steps) | |
| elif workflow.workflow_type == WorkflowType.LOOP: | |
| await self._add_loop_edges(workflow_graph, workflow.steps) | |
| # Set entry point | |
| if workflow.steps: | |
| workflow_graph.set_entry_point(workflow.steps[0].step_id) | |
| return workflow_graph.compile() | |
| def _create_step_node(self, step: WorkflowStep) -> Callable: | |
| """Create a node function for a workflow step""" | |
| async def step_node(state: WorkflowState) -> WorkflowState: | |
| try: | |
| logger.info(f"Executing step: {step.name} ({step.step_id})") | |
| # Update current step | |
| state.current_step = step.step_id | |
| state.step_history.append(step.step_id) | |
| # Prepare input for the step | |
| step_input = self._prepare_step_input(step, state) | |
| # Execute the step | |
| if step.agent_id: | |
| result = await self._execute_agent_step(step, step_input) | |
| elif step.tool_name: | |
| result = await self._execute_tool_step(step, step_input) | |
| else: | |
| result = await self._execute_custom_step(step, step_input) | |
| # Process output | |
| self._process_step_output(step, result, state) | |
| # Update step results | |
| state.step_results[step.step_id] = { | |
| "status": "success", | |
| "result": result, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Step execution failed: {step.step_id}, error: {e}") | |
| # Handle retries | |
| retry_count = state.retry_count.get(step.step_id, 0) | |
| if retry_count < step.retry_count: | |
| state.retry_count[step.step_id] = retry_count + 1 | |
| await asyncio.sleep(step.retry_delay * (retry_count + 1)) | |
| # Retry the step | |
| return await step_node(state) | |
| else: | |
| # Max retries exceeded | |
| state.error = str(e) | |
| state.step_results[step.step_id] = { | |
| "status": "failed", | |
| "error": str(e), | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| return state | |
| return step_node | |
| def _prepare_step_input(self, step: WorkflowStep, state: WorkflowState) -> Dict[str, Any]: | |
| """Prepare input data for a workflow step""" | |
| step_input = {} | |
| for input_key, source_path in step.input_mapping.items(): | |
| if source_path.startswith("input."): | |
| # Map from workflow input | |
| key = source_path.split(".", 1)[1] | |
| step_input[input_key] = state.input_data.get(key) | |
| elif source_path.startswith("output."): | |
| # Map from previous step output | |
| step_id, output_key = source_path.split(".", 2)[1:] | |
| if step_id in state.step_results: | |
| step_input[input_key] = state.step_results[step_id].get("result", {}).get(output_key) | |
| else: | |
| # Direct value | |
| step_input[input_key] = source_path | |
| return step_input | |
| async def _execute_agent_step(self, step: WorkflowStep, step_input: Dict[str, Any]) -> Dict[str, Any]: | |
| """Execute a step using an agent""" | |
| if step.agent_id not in self.agents: | |
| raise ValueError(f"Agent {step.agent_id} not found") | |
| agent = self.agents[step.agent_id] | |
| # Create message for the agent | |
| message = HumanMessage(content=json.dumps(step_input)) | |
| # Execute agent | |
| response = await agent.ainvoke([message]) | |
| return { | |
| "agent_id": step.agent_id, | |
| "response": response.content, | |
| "metadata": response.additional_kwargs | |
| } | |
| async def _execute_tool_step(self, step: WorkflowStep, step_input: Dict[str, Any]) -> Dict[str, Any]: | |
| """Execute a step using a tool""" | |
| if step.tool_name not in self.tools: | |
| raise ValueError(f"Tool {step.tool_name} not found") | |
| tool = self.tools[step.tool_name] | |
| # Execute tool | |
| result = await tool.ainvoke(step_input) | |
| return { | |
| "tool_name": step.tool_name, | |
| "result": result, | |
| "metadata": {} | |
| } | |
| async def _execute_custom_step(self, step: WorkflowStep, step_input: Dict[str, Any]) -> Dict[str, Any]: | |
| """Execute a custom step (placeholder for custom logic)""" | |
| # This would be implemented based on custom step types | |
| return { | |
| "step_type": "custom", | |
| "input": step_input, | |
| "result": {"status": "completed"} | |
| } | |
| def _process_step_output(self, step: WorkflowStep, result: Dict[str, Any], state: WorkflowState): | |
| """Process output from a workflow step""" | |
| for output_key, target_path in step.output_mapping.items(): | |
| if target_path.startswith("output."): | |
| # Map to workflow output | |
| key = target_path.split(".", 1)[1] | |
| state.output_data[key] = result.get(output_key) | |
| else: | |
| # Store in step results | |
| state.step_results[step.step_id][output_key] = result.get(output_key) | |
| async def _add_sequential_edges(self, graph: StateGraph, steps: List[WorkflowStep]): | |
| """Add edges for sequential workflow""" | |
| for i in range(len(steps) - 1): | |
| current_step = steps[i] | |
| next_step = steps[i + 1] | |
| graph.add_edge(current_step.step_id, next_step.step_id) | |
| # Add final edge to END | |
| if steps: | |
| graph.add_edge(steps[-1].step_id, END) | |
| async def _add_parallel_edges(self, graph: StateGraph, steps: List[WorkflowStep]): | |
| """Add edges for parallel workflow""" | |
| # All steps can run in parallel | |
| for step in steps: | |
| graph.add_edge(step.step_id, END) | |
| async def _add_conditional_edges(self, graph: StateGraph, steps: List[WorkflowStep]): | |
| """Add edges for conditional workflow""" | |
| # This would implement conditional routing based on step conditions | |
| for step in steps: | |
| if step.condition: | |
| # Add conditional edge | |
| graph.add_conditional_edges( | |
| step.step_id, | |
| self._create_condition_function(step.condition) | |
| ) | |
| else: | |
| graph.add_edge(step.step_id, END) | |
| async def _add_loop_edges(self, graph: StateGraph, steps: List[WorkflowStep]): | |
| """Add edges for loop workflow""" | |
| # This would implement loop logic | |
| for step in steps: | |
| graph.add_edge(step.step_id, step.step_id) # Loop back to same step | |
| def _create_condition_function(self, condition: str) -> Callable: | |
| """Create a condition function for conditional routing""" | |
| def condition_func(state: WorkflowState) -> str: | |
| # Simple condition evaluation (would be more sophisticated in practice) | |
| try: | |
| # Evaluate condition against state | |
| return "continue" if eval(condition, {"state": state}) else "end" | |
| except: | |
| return "end" | |
| return condition_func | |
| async def _run_workflow(self, graph: StateGraph, initial_state: WorkflowState, | |
| workflow: WorkflowDefinition) -> WorkflowState: | |
| """Run the workflow graph""" | |
| # Execute the graph | |
| final_state = await graph.ainvoke(initial_state) | |
| # Check for timeout | |
| if workflow.timeout: | |
| execution_time = (datetime.now() - initial_state.start_time).total_seconds() | |
| if execution_time > workflow.timeout: | |
| raise TimeoutError(f"Workflow execution timed out after {workflow.timeout} seconds") | |
| return final_state | |
| async def get_execution_status(self, execution_id: str) -> Optional[WorkflowExecution]: | |
| """Get the status of a workflow execution""" | |
| return self.executions.get(execution_id) | |
| async def cancel_execution(self, execution_id: str) -> bool: | |
| """Cancel a workflow execution""" | |
| if execution_id in self.executions: | |
| execution = self.executions[execution_id] | |
| if execution.status == WorkflowStatus.RUNNING: | |
| execution.status = WorkflowStatus.CANCELLED | |
| execution.end_time = datetime.now() | |
| return True | |
| return False | |
| async def get_workflow_definitions(self) -> List[WorkflowDefinition]: | |
| """Get all workflow definitions""" | |
| return list(self.workflows.values()) | |
| async def get_execution_history(self, workflow_id: Optional[str] = None) -> List[WorkflowExecution]: | |
| """Get execution history""" | |
| executions = list(self.executions.values()) | |
| if workflow_id: | |
| executions = [e for e in executions if e.workflow_id == workflow_id] | |
| return executions | |
| # ============================= | |
| # Workflow Builder | |
| # ============================= | |
| class WorkflowBuilder: | |
| """Builder for creating workflow definitions""" | |
| def __init__(self, name: str, description: str = ""): | |
| self.workflow_id = str(uuid.uuid4()) | |
| self.name = name | |
| self.description = description | |
| self.workflow_type = WorkflowType.SEQUENTIAL | |
| self.steps: List[WorkflowStep] = [] | |
| self.input_schema: Dict[str, Any] = {} | |
| self.output_schema: Dict[str, Any] = {} | |
| self.timeout: Optional[float] = None | |
| self.max_retries: int = 3 | |
| self.metadata: Dict[str, Any] = {} | |
| def set_type(self, workflow_type: WorkflowType) -> 'WorkflowBuilder': | |
| """Set workflow type""" | |
| self.workflow_type = workflow_type | |
| return self | |
| def add_step(self, step: WorkflowStep) -> 'WorkflowBuilder': | |
| """Add a step to the workflow""" | |
| self.steps.append(step) | |
| return self | |
| def add_agent_step(self, name: str, agent_id: str, description: str = "", | |
| input_mapping: Optional[Dict[str, str]] = None, | |
| output_mapping: Optional[Dict[str, str]] = None) -> 'WorkflowBuilder': | |
| """Add an agent step""" | |
| step = WorkflowStep( | |
| step_id=str(uuid.uuid4()), | |
| name=name, | |
| description=description, | |
| agent_id=agent_id, | |
| input_mapping=input_mapping or {}, | |
| output_mapping=output_mapping or {} | |
| ) | |
| return self.add_step(step) | |
| def add_tool_step(self, name: str, tool_name: str, description: str = "", | |
| input_mapping: Optional[Dict[str, str]] = None, | |
| output_mapping: Optional[Dict[str, str]] = None) -> 'WorkflowBuilder': | |
| """Add a tool step""" | |
| step = WorkflowStep( | |
| step_id=str(uuid.uuid4()), | |
| name=name, | |
| description=description, | |
| tool_name=tool_name, | |
| input_mapping=input_mapping or {}, | |
| output_mapping=output_mapping or {} | |
| ) | |
| return self.add_step(step) | |
| def set_input_schema(self, schema: Dict[str, Any]) -> 'WorkflowBuilder': | |
| """Set input schema""" | |
| self.input_schema = schema | |
| return self | |
| def set_output_schema(self, schema: Dict[str, Any]) -> 'WorkflowBuilder': | |
| """Set output schema""" | |
| self.output_schema = schema | |
| return self | |
| def set_timeout(self, timeout: float) -> 'WorkflowBuilder': | |
| """Set workflow timeout""" | |
| self.timeout = timeout | |
| return self | |
| def set_max_retries(self, max_retries: int) -> 'WorkflowBuilder': | |
| """Set maximum retries""" | |
| self.max_retries = max_retries | |
| return self | |
| def add_metadata(self, key: str, value: Any) -> 'WorkflowBuilder': | |
| """Add metadata""" | |
| self.metadata[key] = value | |
| return self | |
| def build(self) -> WorkflowDefinition: | |
| """Build the workflow definition""" | |
| return WorkflowDefinition( | |
| workflow_id=self.workflow_id, | |
| name=self.name, | |
| description=self.description, | |
| workflow_type=self.workflow_type, | |
| steps=self.steps, | |
| input_schema=self.input_schema, | |
| output_schema=self.output_schema, | |
| timeout=self.timeout, | |
| max_retries=self.max_retries, | |
| metadata=self.metadata | |
| ) | |
| # ============================= | |
| # Global Workflow Engine | |
| # ============================= | |
| # Global workflow engine instance | |
| workflow_engine = WorkflowEngine() | |
| # ============================= | |
| # Utility Functions | |
| # ============================= | |
| async def register_workflow(workflow: WorkflowDefinition) -> bool: | |
| """Register a workflow with the global engine""" | |
| return await workflow_engine.register_workflow(workflow) | |
| async def execute_workflow(workflow_id: str, input_data: Dict[str, Any], | |
| execution_id: Optional[str] = None) -> WorkflowExecution: | |
| """Execute a workflow using the global engine""" | |
| return await workflow_engine.execute_workflow(workflow_id, input_data, execution_id) | |
| async def get_execution_status(execution_id: str) -> Optional[WorkflowExecution]: | |
| """Get execution status from the global engine""" | |
| return await workflow_engine.get_execution_status(execution_id) | |
| def create_workflow_builder(name: str, description: str = "") -> WorkflowBuilder: | |
| """Create a new workflow builder""" | |
| return WorkflowBuilder(name, description) |