general-reasoning-agent / tools /workflow_runner.py
chmielvu's picture
feat: add production refinements (Phase 1-3)
4454066 verified
"""
Workflow runner tool - Primary orchestration tool for complex multi-step tasks
"""
from .base import BaseAgentTool
from workflows.schema import WorkflowDefinition
from workflows.executor import WorkflowExecutor
import json
import logging
logger = logging.getLogger(__name__)
class WorkflowRunnerTool(BaseAgentTool):
"""
Execute DAG workflows with parallel task execution.
This is the primary tool for complex multi-step tasks that require
orchestration across multiple tools with dependency management.
"""
name = "execute_workflow"
description = """
Execute a DAG workflow with multiple tasks and dependencies.
Workflow JSON format:
{
"name": "workflow_name",
"description": "Optional description",
"tasks": [
{
"id": "task1",
"tool": "web_search",
"args": {"query": "python tutorials", "max_results": 5},
"depends_on": []
},
{
"id": "task2",
"tool": "python_eval",
"args": {"expression": "len(${task1.result})"},
"depends_on": ["task1"]
}
],
"final_task": "task2",
"max_parallel": 3,
"timeout_seconds": 600
}
Task arguments can reference previous task results using "${task_id}" or "${task_id.field}".
Args:
workflow_json (str): JSON string defining the workflow
Returns:
str: JSON with workflow result, execution trace, or error
"""
inputs = {
"workflow_json": {"type": "string", "description": "Workflow definition as JSON"}
}
output_type = "string"
cacheable = False # Workflows should not be cached
def __init__(self, executor: WorkflowExecutor):
"""
Initialize workflow runner tool.
Args:
executor: WorkflowExecutor instance
"""
super().__init__()
self.executor = executor
def forward(self, workflow_json: str) -> str:
"""Execute workflow from JSON definition."""
try:
# Parse workflow JSON
workflow_dict = json.loads(workflow_json)
workflow = WorkflowDefinition.from_dict(workflow_dict)
logger.info(f"Executing workflow: {workflow.name}")
logger.info(f"Tasks: {len(workflow.tasks)}, Final: {workflow.final_task}")
# Execute workflow
result = self.executor.execute(workflow)
# Format response
if result["success"]:
return self._format_success(
result["result"],
metadata={
"workflow": workflow.name,
"execution_time": result.get("execution_time"),
"tasks_executed": len(result.get("all_results", {})),
"trace": result.get("trace", [])
}
)
else:
return self._format_error(
ValueError(result["error"]),
recovery_hint="Check workflow definition and task dependencies",
fallback_action="Try simpler workflow or break into individual tool calls"
)
except json.JSONDecodeError as e:
return self._format_error(
e,
recovery_hint="Workflow must be valid JSON",
fallback_action="Check JSON syntax and try again"
)
except Exception as e:
logger.error(f"Workflow execution failed: {e}", exc_info=True)
return self._format_error(
e,
recovery_hint="Check workflow definition is valid",
fallback_action="Try individual tool calls instead of workflow"
)