""" Workflow runner tool - Primary orchestration tool for complex multi-step tasks """ from .base import BaseAgentTool from workflows.schema import WorkflowDefinition from workflows.executor import WorkflowExecutor import json import logging logger = logging.getLogger(__name__) class WorkflowRunnerTool(BaseAgentTool): """ Execute DAG workflows with parallel task execution. This is the primary tool for complex multi-step tasks that require orchestration across multiple tools with dependency management. """ name = "execute_workflow" description = """ Execute a DAG workflow with multiple tasks and dependencies. Workflow JSON format: { "name": "workflow_name", "description": "Optional description", "tasks": [ { "id": "task1", "tool": "web_search", "args": {"query": "python tutorials", "max_results": 5}, "depends_on": [] }, { "id": "task2", "tool": "python_eval", "args": {"expression": "len(${task1.result})"}, "depends_on": ["task1"] } ], "final_task": "task2", "max_parallel": 3, "timeout_seconds": 600 } Task arguments can reference previous task results using "${task_id}" or "${task_id.field}". Args: workflow_json (str): JSON string defining the workflow Returns: str: JSON with workflow result, execution trace, or error """ inputs = { "workflow_json": {"type": "string", "description": "Workflow definition as JSON"} } output_type = "string" cacheable = False # Workflows should not be cached def __init__(self, executor: WorkflowExecutor): """ Initialize workflow runner tool. Args: executor: WorkflowExecutor instance """ super().__init__() self.executor = executor def forward(self, workflow_json: str) -> str: """Execute workflow from JSON definition.""" try: # Parse workflow JSON workflow_dict = json.loads(workflow_json) workflow = WorkflowDefinition.from_dict(workflow_dict) logger.info(f"Executing workflow: {workflow.name}") logger.info(f"Tasks: {len(workflow.tasks)}, Final: {workflow.final_task}") # Execute workflow result = self.executor.execute(workflow) # Format response if result["success"]: return self._format_success( result["result"], metadata={ "workflow": workflow.name, "execution_time": result.get("execution_time"), "tasks_executed": len(result.get("all_results", {})), "trace": result.get("trace", []) } ) else: return self._format_error( ValueError(result["error"]), recovery_hint="Check workflow definition and task dependencies", fallback_action="Try simpler workflow or break into individual tool calls" ) except json.JSONDecodeError as e: return self._format_error( e, recovery_hint="Workflow must be valid JSON", fallback_action="Check JSON syntax and try again" ) except Exception as e: logger.error(f"Workflow execution failed: {e}", exc_info=True) return self._format_error( e, recovery_hint="Check workflow definition is valid", fallback_action="Try individual tool calls instead of workflow" )