# workflow_engine.py from typing import Dict, Any, List import importlib # Placeholder for agent runner modules. These will be dynamically loaded. # The actual runners are imported and registered in workbench.py's initialize_workbench function. # This file primarily defines the WorkflowEngine class that uses the TaskRouter. class WorkflowEngine: def __init__(self, task_router, memory_bridge): self.task_router = task_router self.memory_bridge = memory_bridge def execute_pipeline(self, pipeline_name: str, initial_task_description: str, initial_kwargs: Dict[str, Any] = None): """ Executes a predefined workflow pipeline. pipeline_name: The name of the pipeline (e.g., "Coding Pipeline"). initial_task_description: The starting description for the first task in the pipeline. initial_kwargs: Arguments to pass to the first task. """ print(f" --- Executing Pipeline: {pipeline_name} ---") if initial_kwargs is None: initial_kwargs = {} # Pipeline Definitions # These definitions map agent types to their tasks and how their inputs/outputs are chained. pipelines = { "Coding Pipeline": { "steps": [ # Step 1: ARCHITECT - Decomposes the initial task. {"agent_type": "ARCHITECT", "task_desc": initial_task_description, "kwargs": {"task_input_args": {"task_description": initial_task_description, "codebase_path": "/app/synesthesia"}}}, # Step 2: RESONANCE - Implements the first sub-task from ARCHITECT. {"agent_type": "RESONANCE", "task_desc": lambda arch_res, kw: arch_res.get("sub_tasks", ["Implement coding task"])[0] if arch_res and arch_res.get("status") == "completed" else "Implement coding task", "kwargs": {"task_input_args": {"task_description": lambda res: res.get("sub_tasks", ["Implement coding task"])[0] if res and res.get("status") == "completed" else "Implement coding task", "training_script": "ML_Pipeline/train.py"}}}, # Placeholder for training params # Step 3: PARALLEL-DEBUGGER - Evaluates the code generated by RESONANCE. {"agent_type": "PARALLEL-DEBUGGER", "task_desc": "Evaluate generated code for implementation task", "kwargs": {"task_input_args": {"code_to_evaluate": lambda res: res.get("code_snippet", "print('Placeholder code: needs actual code from Resonance')"), "expected_behavior": "Simulated code for debugging"}}}, # Step 4: SENTINEL - Performs quality checks on the implemented code. {"agent_type": "SENTINEL", "task_desc": "Perform code review on implemented task", "kwargs": {"task_input_args": {"codebase_path": "/app/synesthesia", "task_description": "Code review for new feature"}}} ], "success_message": "Coding pipeline completed successfully." }, "Debug Pipeline": { "steps": [ # Step 1: PULSE-DEBUG - Analyzes runtime issues. {"agent_type": "PULSE-DEBUG", "task_desc": initial_task_description, "kwargs": {"task_input_args": {"runtime_id": "runtime-xyz"}}}, # Step 2: PARALLEL-DEBUGGER - Debugs issues identified by PULSE-DEBUG. {"agent_type": "PARALLEL-DEBUGGER", "task_desc": "Debug runtime issues based on PulseDebug findings", "kwargs": {"task_input_args": {"code_to_evaluate": lambda res: res.get("recommendations", ["No specific recommendations found, simulating general debug code"])[0] if res and res.get("status") == "completed" else "print('Debugging simulated code based on PulseDebug findings')", "expected_behavior": "Debugging simulated code based on PulseDebug findings"}}}, # Step 3: SENTINEL - Validates the debug fix. {"agent_type": "SENTINEL", "task_desc": "Validate debug fix", "kwargs": {"task_input_args": {"codebase_path": "/app/synesthesia", "task_description": "Review debug fix"}}} ], "success_message": "Debug pipeline completed successfully." }, "UI Validation Pipeline": { "steps": [ # Step 1: PLAYWRIGHT-TESTER - Runs UI tests. {"agent_type": "PLAYWRIGHT-TESTER", "task_desc": "Run UI tests", "kwargs": {"task_input_args": {"query": "Test UI at app/streamlit_app.py"}}}, # Step 2: PULSE-DEBUG - Analyzes runtime based on test results. {"agent_type": "PULSE-DEBUG", "task_desc": "Analyze UI test results for performance issues", "kwargs": {"task_input_args": {"runtime_id": "workbench-ui-runtime", "task_description": "Analyze runtime"}}} ], "success_message": "UI validation pipeline completed successfully." } } pipeline = pipelines.get(pipeline_name) if not pipeline: return {"status": "failed", "message": f"Unknown pipeline: {pipeline_name}"} # Store results from previous steps to potentially pass to next steps step_results_context = {} current_pipeline_result = None for i, step in enumerate(pipeline["steps"]): agent_type = step["agent_type"] task_desc_raw = step["task_desc"] step_kwargs_defs = step.get("kwargs", {}) # Resolve task description dynamically current_task_desc = "" if callable(task_desc_raw): try: # Pass relevant context to the lambda function for task description resolution. # 'arch_res', 'resonance_res', etc., are results from specific preceding agents. # 'kw' provides initial pipeline arguments. current_task_desc = task_desc_raw( arch_res=step_results_context.get("ARCHITECT"), resonance_res=step_results_context.get("RESONANCE"), # Example for Coding Pipeline pulse_debug_res=step_results_context.get("PULSE-DEBUG"), # Example for Debug Pipeline kw=initial_kwargs # Pass initial kwargs if needed ) except Exception as e: print(f"WorkflowEngine Error: Failed to resolve task description for {agent_type} (Step {i+1}): {e}") current_task_desc = f"Error resolving task description for {agent_type}" else: current_task_desc = task_desc_raw # Resolve kwargs dynamically current_step_kwargs = {} for key, value_def in step_kwargs_defs.items(): if callable(value_def): try: # Pass relevant context to the lambda function for kwargs resolution. # This mapping needs to be careful about what each agent needs from previous steps. current_step_kwargs[key] = value_def( arch_res=step_results_context.get("ARCHITECT"), resonance_res=step_results_context.get("RESONANCE"), pulse_debug_res=step_results_context.get("PULSE-DEBUG"), current_result=current_pipeline_result, # Pass result of the *previous* step kw=initial_kwargs ) except Exception as e: print(f"WorkflowEngine Error: Failed to resolve kwarg '{key}' for {agent_type} (Step {i+1}): {e}") current_step_kwargs[key] = None # Handle error gracefully else: current_step_kwargs[key] = value_def # Route the task print(f"WorkflowEngine: Routing step {i+1}/{len(pipeline['steps'])} to {agent_type} with task: '{current_task_desc}' and kwargs: {current_step_kwargs}") routing_result = self.task_router.route_task( task_description=current_task_desc, agent_type=agent_type, **current_step_kwargs ) # Store result and check for failures current_pipeline_result = routing_result step_results_context[agent_type] = current_pipeline_result # Store result for next steps if needed if current_pipeline_result.get("status") == "failed": error_message = f"Pipeline '{pipeline_name}' failed at step {i+1} ({agent_type}): {current_pipeline_result.get('message', 'Unknown error')}" print(error_message) self.memory_bridge.record_conclusion(error_message) return {"status": "failed", "message": error_message, "failed_step": i+1, "agent": agent_type, "error_details": current_pipeline_result} print(f"Step {i+1} ({agent_type}) completed with status: {current_pipeline_result.get('status')}") # Pipeline completed successfully final_message = pipeline["success_message"] print(final_message) self.memory_bridge.record_observation(final_message) return {"status": "pipeline_completed", "final_result": current_pipeline_result, "pipeline": pipeline_name} def run_single_agent_task(self, agent_type: str, task_description: str, task_input_args: dict): """ Executes a single task using a specific agent. """ print(f" --- Executing Single Agent Task: {agent_type} ---") result = self.task_router.route_task( task_description=task_description, agent_type=agent_type, task_input_args=task_input_args ) if result.get("status") == "failed": error_message = f"Single agent task failed ({agent_type}): {result.get('message')}" print(error_message) self.memory_bridge.record_conclusion(error_message) return {"status": "failed", "message": error_message, "agent": agent_type, "error_details": result} else: print(f"Single agent task ({agent_type}) completed.") self.memory_bridge.record_observation(f"Single agent task ({agent_type}) completed: {result.get('message', 'Success')}") return result