Spaces:
Runtime error
Runtime error
| # workflow_engine.py | |
| from typing import Dict, Any, List | |
| import importlib | |
| # Placeholder for agent runner modules. These will be dynamically loaded. | |
| # The actual runners are imported and registered in workbench.py's initialize_workbench function. | |
| # This file primarily defines the WorkflowEngine class that uses the TaskRouter. | |
| class WorkflowEngine: | |
| def __init__(self, task_router, memory_bridge): | |
| self.task_router = task_router | |
| self.memory_bridge = memory_bridge | |
| def execute_pipeline(self, pipeline_name: str, initial_task_description: str, initial_kwargs: Dict[str, Any] = None): | |
| """ | |
| Executes a predefined workflow pipeline. | |
| pipeline_name: The name of the pipeline (e.g., "Coding Pipeline"). | |
| initial_task_description: The starting description for the first task in the pipeline. | |
| initial_kwargs: Arguments to pass to the first task. | |
| """ | |
| print(f" | |
| --- Executing Pipeline: {pipeline_name} ---") | |
| if initial_kwargs is None: | |
| initial_kwargs = {} | |
| # Pipeline Definitions | |
| # These definitions map agent types to their tasks and how their inputs/outputs are chained. | |
| pipelines = { | |
| "Coding Pipeline": { | |
| "steps": [ | |
| # Step 1: ARCHITECT - Decomposes the initial task. | |
| {"agent_type": "ARCHITECT", "task_desc": initial_task_description, "kwargs": {"task_input_args": {"task_description": initial_task_description, "codebase_path": "/app/synesthesia"}}}, | |
| # Step 2: RESONANCE - Implements the first sub-task from ARCHITECT. | |
| {"agent_type": "RESONANCE", | |
| "task_desc": lambda arch_res, kw: arch_res.get("sub_tasks", ["Implement coding task"])[0] if arch_res and arch_res.get("status") == "completed" else "Implement coding task", | |
| "kwargs": {"task_input_args": {"task_description": lambda res: res.get("sub_tasks", ["Implement coding task"])[0] if res and res.get("status") == "completed" else "Implement coding task", "training_script": "ML_Pipeline/train.py"}}}, # Placeholder for training params | |
| # Step 3: PARALLEL-DEBUGGER - Evaluates the code generated by RESONANCE. | |
| {"agent_type": "PARALLEL-DEBUGGER", | |
| "task_desc": "Evaluate generated code for implementation task", | |
| "kwargs": {"task_input_args": {"code_to_evaluate": lambda res: res.get("code_snippet", "print('Placeholder code: needs actual code from Resonance')"), "expected_behavior": "Simulated code for debugging"}}}, | |
| # Step 4: SENTINEL - Performs quality checks on the implemented code. | |
| {"agent_type": "SENTINEL", "task_desc": "Perform code review on implemented task", "kwargs": {"task_input_args": {"codebase_path": "/app/synesthesia", "task_description": "Code review for new feature"}}} | |
| ], | |
| "success_message": "Coding pipeline completed successfully." | |
| }, | |
| "Debug Pipeline": { | |
| "steps": [ | |
| # Step 1: PULSE-DEBUG - Analyzes runtime issues. | |
| {"agent_type": "PULSE-DEBUG", "task_desc": initial_task_description, "kwargs": {"task_input_args": {"runtime_id": "runtime-xyz"}}}, | |
| # Step 2: PARALLEL-DEBUGGER - Debugs issues identified by PULSE-DEBUG. | |
| {"agent_type": "PARALLEL-DEBUGGER", | |
| "task_desc": "Debug runtime issues based on PulseDebug findings", | |
| "kwargs": {"task_input_args": {"code_to_evaluate": lambda res: res.get("recommendations", ["No specific recommendations found, simulating general debug code"])[0] if res and res.get("status") == "completed" else "print('Debugging simulated code based on PulseDebug findings')", "expected_behavior": "Debugging simulated code based on PulseDebug findings"}}}, | |
| # Step 3: SENTINEL - Validates the debug fix. | |
| {"agent_type": "SENTINEL", "task_desc": "Validate debug fix", "kwargs": {"task_input_args": {"codebase_path": "/app/synesthesia", "task_description": "Review debug fix"}}} | |
| ], | |
| "success_message": "Debug pipeline completed successfully." | |
| }, | |
| "UI Validation Pipeline": { | |
| "steps": [ | |
| # Step 1: PLAYWRIGHT-TESTER - Runs UI tests. | |
| {"agent_type": "PLAYWRIGHT-TESTER", "task_desc": "Run UI tests", "kwargs": {"task_input_args": {"query": "Test UI at app/streamlit_app.py"}}}, | |
| # Step 2: PULSE-DEBUG - Analyzes runtime based on test results. | |
| {"agent_type": "PULSE-DEBUG", | |
| "task_desc": "Analyze UI test results for performance issues", | |
| "kwargs": {"task_input_args": {"runtime_id": "workbench-ui-runtime", "task_description": "Analyze runtime"}}} | |
| ], | |
| "success_message": "UI validation pipeline completed successfully." | |
| } | |
| } | |
| pipeline = pipelines.get(pipeline_name) | |
| if not pipeline: | |
| return {"status": "failed", "message": f"Unknown pipeline: {pipeline_name}"} | |
| # Store results from previous steps to potentially pass to next steps | |
| step_results_context = {} | |
| current_pipeline_result = None | |
| for i, step in enumerate(pipeline["steps"]): | |
| agent_type = step["agent_type"] | |
| task_desc_raw = step["task_desc"] | |
| step_kwargs_defs = step.get("kwargs", {}) | |
| # Resolve task description dynamically | |
| current_task_desc = "" | |
| if callable(task_desc_raw): | |
| try: | |
| # Pass relevant context to the lambda function for task description resolution. | |
| # 'arch_res', 'resonance_res', etc., are results from specific preceding agents. | |
| # 'kw' provides initial pipeline arguments. | |
| current_task_desc = task_desc_raw( | |
| arch_res=step_results_context.get("ARCHITECT"), | |
| resonance_res=step_results_context.get("RESONANCE"), # Example for Coding Pipeline | |
| pulse_debug_res=step_results_context.get("PULSE-DEBUG"), # Example for Debug Pipeline | |
| kw=initial_kwargs # Pass initial kwargs if needed | |
| ) | |
| except Exception as e: | |
| print(f"WorkflowEngine Error: Failed to resolve task description for {agent_type} (Step {i+1}): {e}") | |
| current_task_desc = f"Error resolving task description for {agent_type}" | |
| else: | |
| current_task_desc = task_desc_raw | |
| # Resolve kwargs dynamically | |
| current_step_kwargs = {} | |
| for key, value_def in step_kwargs_defs.items(): | |
| if callable(value_def): | |
| try: | |
| # Pass relevant context to the lambda function for kwargs resolution. | |
| # This mapping needs to be careful about what each agent needs from previous steps. | |
| current_step_kwargs[key] = value_def( | |
| arch_res=step_results_context.get("ARCHITECT"), | |
| resonance_res=step_results_context.get("RESONANCE"), | |
| pulse_debug_res=step_results_context.get("PULSE-DEBUG"), | |
| current_result=current_pipeline_result, # Pass result of the *previous* step | |
| kw=initial_kwargs | |
| ) | |
| except Exception as e: | |
| print(f"WorkflowEngine Error: Failed to resolve kwarg '{key}' for {agent_type} (Step {i+1}): {e}") | |
| current_step_kwargs[key] = None # Handle error gracefully | |
| else: | |
| current_step_kwargs[key] = value_def | |
| # Route the task | |
| print(f"WorkflowEngine: Routing step {i+1}/{len(pipeline['steps'])} to {agent_type} with task: '{current_task_desc}' and kwargs: {current_step_kwargs}") | |
| routing_result = self.task_router.route_task( | |
| task_description=current_task_desc, | |
| agent_type=agent_type, | |
| **current_step_kwargs | |
| ) | |
| # Store result and check for failures | |
| current_pipeline_result = routing_result | |
| step_results_context[agent_type] = current_pipeline_result # Store result for next steps if needed | |
| if current_pipeline_result.get("status") == "failed": | |
| error_message = f"Pipeline '{pipeline_name}' failed at step {i+1} ({agent_type}): {current_pipeline_result.get('message', 'Unknown error')}" | |
| print(error_message) | |
| self.memory_bridge.record_conclusion(error_message) | |
| return {"status": "failed", "message": error_message, "failed_step": i+1, "agent": agent_type, "error_details": current_pipeline_result} | |
| print(f"Step {i+1} ({agent_type}) completed with status: {current_pipeline_result.get('status')}") | |
| # Pipeline completed successfully | |
| final_message = pipeline["success_message"] | |
| print(final_message) | |
| self.memory_bridge.record_observation(final_message) | |
| return {"status": "pipeline_completed", "final_result": current_pipeline_result, "pipeline": pipeline_name} | |
| def run_single_agent_task(self, agent_type: str, task_description: str, task_input_args: dict): | |
| """ | |
| Executes a single task using a specific agent. | |
| """ | |
| print(f" | |
| --- Executing Single Agent Task: {agent_type} ---") | |
| result = self.task_router.route_task( | |
| task_description=task_description, | |
| agent_type=agent_type, | |
| task_input_args=task_input_args | |
| ) | |
| if result.get("status") == "failed": | |
| error_message = f"Single agent task failed ({agent_type}): {result.get('message')}" | |
| print(error_message) | |
| self.memory_bridge.record_conclusion(error_message) | |
| return {"status": "failed", "message": error_message, "agent": agent_type, "error_details": result} | |
| else: | |
| print(f"Single agent task ({agent_type}) completed.") | |
| self.memory_bridge.record_observation(f"Single agent task ({agent_type}) completed: {result.get('message', 'Success')}") | |
| return result | |