Synesthesia / workbench /workflow_engine.py
Ashiedu's picture
Sync unified workbench
0490201 verified
# workflow_engine.py
from typing import Dict, Any, List
import importlib
# Placeholder for agent runner modules. These will be dynamically loaded.
# The actual runners are imported and registered in workbench.py's initialize_workbench function.
# This file primarily defines the WorkflowEngine class that uses the TaskRouter.
class WorkflowEngine:
def __init__(self, task_router, memory_bridge):
self.task_router = task_router
self.memory_bridge = memory_bridge
def execute_pipeline(self, pipeline_name: str, initial_task_description: str, initial_kwargs: Dict[str, Any] = None):
"""
Executes a predefined workflow pipeline.
pipeline_name: The name of the pipeline (e.g., "Coding Pipeline").
initial_task_description: The starting description for the first task in the pipeline.
initial_kwargs: Arguments to pass to the first task.
"""
print(f"
--- Executing Pipeline: {pipeline_name} ---")
if initial_kwargs is None:
initial_kwargs = {}
# Pipeline Definitions
# These definitions map agent types to their tasks and how their inputs/outputs are chained.
pipelines = {
"Coding Pipeline": {
"steps": [
# Step 1: ARCHITECT - Decomposes the initial task.
{"agent_type": "ARCHITECT", "task_desc": initial_task_description, "kwargs": {"task_input_args": {"task_description": initial_task_description, "codebase_path": "/app/synesthesia"}}},
# Step 2: RESONANCE - Implements the first sub-task from ARCHITECT.
{"agent_type": "RESONANCE",
"task_desc": lambda arch_res, kw: arch_res.get("sub_tasks", ["Implement coding task"])[0] if arch_res and arch_res.get("status") == "completed" else "Implement coding task",
"kwargs": {"task_input_args": {"task_description": lambda res: res.get("sub_tasks", ["Implement coding task"])[0] if res and res.get("status") == "completed" else "Implement coding task", "training_script": "ML_Pipeline/train.py"}}}, # Placeholder for training params
# Step 3: PARALLEL-DEBUGGER - Evaluates the code generated by RESONANCE.
{"agent_type": "PARALLEL-DEBUGGER",
"task_desc": "Evaluate generated code for implementation task",
"kwargs": {"task_input_args": {"code_to_evaluate": lambda res: res.get("code_snippet", "print('Placeholder code: needs actual code from Resonance')"), "expected_behavior": "Simulated code for debugging"}}},
# Step 4: SENTINEL - Performs quality checks on the implemented code.
{"agent_type": "SENTINEL", "task_desc": "Perform code review on implemented task", "kwargs": {"task_input_args": {"codebase_path": "/app/synesthesia", "task_description": "Code review for new feature"}}}
],
"success_message": "Coding pipeline completed successfully."
},
"Debug Pipeline": {
"steps": [
# Step 1: PULSE-DEBUG - Analyzes runtime issues.
{"agent_type": "PULSE-DEBUG", "task_desc": initial_task_description, "kwargs": {"task_input_args": {"runtime_id": "runtime-xyz"}}},
# Step 2: PARALLEL-DEBUGGER - Debugs issues identified by PULSE-DEBUG.
{"agent_type": "PARALLEL-DEBUGGER",
"task_desc": "Debug runtime issues based on PulseDebug findings",
"kwargs": {"task_input_args": {"code_to_evaluate": lambda res: res.get("recommendations", ["No specific recommendations found, simulating general debug code"])[0] if res and res.get("status") == "completed" else "print('Debugging simulated code based on PulseDebug findings')", "expected_behavior": "Debugging simulated code based on PulseDebug findings"}}},
# Step 3: SENTINEL - Validates the debug fix.
{"agent_type": "SENTINEL", "task_desc": "Validate debug fix", "kwargs": {"task_input_args": {"codebase_path": "/app/synesthesia", "task_description": "Review debug fix"}}}
],
"success_message": "Debug pipeline completed successfully."
},
"UI Validation Pipeline": {
"steps": [
# Step 1: PLAYWRIGHT-TESTER - Runs UI tests.
{"agent_type": "PLAYWRIGHT-TESTER", "task_desc": "Run UI tests", "kwargs": {"task_input_args": {"query": "Test UI at app/streamlit_app.py"}}},
# Step 2: PULSE-DEBUG - Analyzes runtime based on test results.
{"agent_type": "PULSE-DEBUG",
"task_desc": "Analyze UI test results for performance issues",
"kwargs": {"task_input_args": {"runtime_id": "workbench-ui-runtime", "task_description": "Analyze runtime"}}}
],
"success_message": "UI validation pipeline completed successfully."
}
}
pipeline = pipelines.get(pipeline_name)
if not pipeline:
return {"status": "failed", "message": f"Unknown pipeline: {pipeline_name}"}
# Store results from previous steps to potentially pass to next steps
step_results_context = {}
current_pipeline_result = None
for i, step in enumerate(pipeline["steps"]):
agent_type = step["agent_type"]
task_desc_raw = step["task_desc"]
step_kwargs_defs = step.get("kwargs", {})
# Resolve task description dynamically
current_task_desc = ""
if callable(task_desc_raw):
try:
# Pass relevant context to the lambda function for task description resolution.
# 'arch_res', 'resonance_res', etc., are results from specific preceding agents.
# 'kw' provides initial pipeline arguments.
current_task_desc = task_desc_raw(
arch_res=step_results_context.get("ARCHITECT"),
resonance_res=step_results_context.get("RESONANCE"), # Example for Coding Pipeline
pulse_debug_res=step_results_context.get("PULSE-DEBUG"), # Example for Debug Pipeline
kw=initial_kwargs # Pass initial kwargs if needed
)
except Exception as e:
print(f"WorkflowEngine Error: Failed to resolve task description for {agent_type} (Step {i+1}): {e}")
current_task_desc = f"Error resolving task description for {agent_type}"
else:
current_task_desc = task_desc_raw
# Resolve kwargs dynamically
current_step_kwargs = {}
for key, value_def in step_kwargs_defs.items():
if callable(value_def):
try:
# Pass relevant context to the lambda function for kwargs resolution.
# This mapping needs to be careful about what each agent needs from previous steps.
current_step_kwargs[key] = value_def(
arch_res=step_results_context.get("ARCHITECT"),
resonance_res=step_results_context.get("RESONANCE"),
pulse_debug_res=step_results_context.get("PULSE-DEBUG"),
current_result=current_pipeline_result, # Pass result of the *previous* step
kw=initial_kwargs
)
except Exception as e:
print(f"WorkflowEngine Error: Failed to resolve kwarg '{key}' for {agent_type} (Step {i+1}): {e}")
current_step_kwargs[key] = None # Handle error gracefully
else:
current_step_kwargs[key] = value_def
# Route the task
print(f"WorkflowEngine: Routing step {i+1}/{len(pipeline['steps'])} to {agent_type} with task: '{current_task_desc}' and kwargs: {current_step_kwargs}")
routing_result = self.task_router.route_task(
task_description=current_task_desc,
agent_type=agent_type,
**current_step_kwargs
)
# Store result and check for failures
current_pipeline_result = routing_result
step_results_context[agent_type] = current_pipeline_result # Store result for next steps if needed
if current_pipeline_result.get("status") == "failed":
error_message = f"Pipeline '{pipeline_name}' failed at step {i+1} ({agent_type}): {current_pipeline_result.get('message', 'Unknown error')}"
print(error_message)
self.memory_bridge.record_conclusion(error_message)
return {"status": "failed", "message": error_message, "failed_step": i+1, "agent": agent_type, "error_details": current_pipeline_result}
print(f"Step {i+1} ({agent_type}) completed with status: {current_pipeline_result.get('status')}")
# Pipeline completed successfully
final_message = pipeline["success_message"]
print(final_message)
self.memory_bridge.record_observation(final_message)
return {"status": "pipeline_completed", "final_result": current_pipeline_result, "pipeline": pipeline_name}
def run_single_agent_task(self, agent_type: str, task_description: str, task_input_args: dict):
"""
Executes a single task using a specific agent.
"""
print(f"
--- Executing Single Agent Task: {agent_type} ---")
result = self.task_router.route_task(
task_description=task_description,
agent_type=agent_type,
task_input_args=task_input_args
)
if result.get("status") == "failed":
error_message = f"Single agent task failed ({agent_type}): {result.get('message')}"
print(error_message)
self.memory_bridge.record_conclusion(error_message)
return {"status": "failed", "message": error_message, "agent": agent_type, "error_details": result}
else:
print(f"Single agent task ({agent_type}) completed.")
self.memory_bridge.record_observation(f"Single agent task ({agent_type}) completed: {result.get('message', 'Success')}")
return result