Spaces:
Sleeping
Sleeping
| """ | |
| Workflow runner tool - Primary orchestration tool for complex multi-step tasks | |
| """ | |
| from .base import BaseAgentTool | |
| from workflows.schema import WorkflowDefinition | |
| from workflows.executor import WorkflowExecutor | |
| import json | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class WorkflowRunnerTool(BaseAgentTool): | |
| """ | |
| Execute DAG workflows with parallel task execution. | |
| This is the primary tool for complex multi-step tasks that require | |
| orchestration across multiple tools with dependency management. | |
| """ | |
| name = "execute_workflow" | |
| description = """ | |
| Execute a DAG workflow with multiple tasks and dependencies. | |
| Workflow JSON format: | |
| { | |
| "name": "workflow_name", | |
| "description": "Optional description", | |
| "tasks": [ | |
| { | |
| "id": "task1", | |
| "tool": "web_search", | |
| "args": {"query": "python tutorials", "max_results": 5}, | |
| "depends_on": [] | |
| }, | |
| { | |
| "id": "task2", | |
| "tool": "python_eval", | |
| "args": {"expression": "len(${task1.result})"}, | |
| "depends_on": ["task1"] | |
| } | |
| ], | |
| "final_task": "task2", | |
| "max_parallel": 3, | |
| "timeout_seconds": 600 | |
| } | |
| Task arguments can reference previous task results using "${task_id}" or "${task_id.field}". | |
| Args: | |
| workflow_json (str): JSON string defining the workflow | |
| Returns: | |
| str: JSON with workflow result, execution trace, or error | |
| """ | |
| inputs = { | |
| "workflow_json": {"type": "string", "description": "Workflow definition as JSON"} | |
| } | |
| output_type = "string" | |
| cacheable = False # Workflows should not be cached | |
| def __init__(self, executor: WorkflowExecutor): | |
| """ | |
| Initialize workflow runner tool. | |
| Args: | |
| executor: WorkflowExecutor instance | |
| """ | |
| super().__init__() | |
| self.executor = executor | |
| def forward(self, workflow_json: str) -> str: | |
| """Execute workflow from JSON definition.""" | |
| try: | |
| # Parse workflow JSON | |
| workflow_dict = json.loads(workflow_json) | |
| workflow = WorkflowDefinition.from_dict(workflow_dict) | |
| logger.info(f"Executing workflow: {workflow.name}") | |
| logger.info(f"Tasks: {len(workflow.tasks)}, Final: {workflow.final_task}") | |
| # Execute workflow | |
| result = self.executor.execute(workflow) | |
| # Format response | |
| if result["success"]: | |
| return self._format_success( | |
| result["result"], | |
| metadata={ | |
| "workflow": workflow.name, | |
| "execution_time": result.get("execution_time"), | |
| "tasks_executed": len(result.get("all_results", {})), | |
| "trace": result.get("trace", []) | |
| } | |
| ) | |
| else: | |
| return self._format_error( | |
| ValueError(result["error"]), | |
| recovery_hint="Check workflow definition and task dependencies", | |
| fallback_action="Try simpler workflow or break into individual tool calls" | |
| ) | |
| except json.JSONDecodeError as e: | |
| return self._format_error( | |
| e, | |
| recovery_hint="Workflow must be valid JSON", | |
| fallback_action="Check JSON syntax and try again" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Workflow execution failed: {e}", exc_info=True) | |
| return self._format_error( | |
| e, | |
| recovery_hint="Check workflow definition is valid", | |
| fallback_action="Try individual tool calls instead of workflow" | |
| ) | |