Spaces:
Runtime error
Runtime error
File size: 10,558 Bytes
0490201 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# workflow_engine.py
from typing import Dict, Any, List
import importlib
# Placeholder for agent runner modules. These will be dynamically loaded.
# The actual runners are imported and registered in workbench.py's initialize_workbench function.
# This file primarily defines the WorkflowEngine class that uses the TaskRouter.
class WorkflowEngine:
def __init__(self, task_router, memory_bridge):
self.task_router = task_router
self.memory_bridge = memory_bridge
def execute_pipeline(self, pipeline_name: str, initial_task_description: str, initial_kwargs: Dict[str, Any] = None):
"""
Executes a predefined workflow pipeline.
pipeline_name: The name of the pipeline (e.g., "Coding Pipeline").
initial_task_description: The starting description for the first task in the pipeline.
initial_kwargs: Arguments to pass to the first task.
"""
print(f"
--- Executing Pipeline: {pipeline_name} ---")
if initial_kwargs is None:
initial_kwargs = {}
# Pipeline Definitions
# These definitions map agent types to their tasks and how their inputs/outputs are chained.
pipelines = {
"Coding Pipeline": {
"steps": [
# Step 1: ARCHITECT - Decomposes the initial task.
{"agent_type": "ARCHITECT", "task_desc": initial_task_description, "kwargs": {"task_input_args": {"task_description": initial_task_description, "codebase_path": "/app/synesthesia"}}},
# Step 2: RESONANCE - Implements the first sub-task from ARCHITECT.
{"agent_type": "RESONANCE",
"task_desc": lambda arch_res, kw: arch_res.get("sub_tasks", ["Implement coding task"])[0] if arch_res and arch_res.get("status") == "completed" else "Implement coding task",
"kwargs": {"task_input_args": {"task_description": lambda res: res.get("sub_tasks", ["Implement coding task"])[0] if res and res.get("status") == "completed" else "Implement coding task", "training_script": "ML_Pipeline/train.py"}}}, # Placeholder for training params
# Step 3: PARALLEL-DEBUGGER - Evaluates the code generated by RESONANCE.
{"agent_type": "PARALLEL-DEBUGGER",
"task_desc": "Evaluate generated code for implementation task",
"kwargs": {"task_input_args": {"code_to_evaluate": lambda res: res.get("code_snippet", "print('Placeholder code: needs actual code from Resonance')"), "expected_behavior": "Simulated code for debugging"}}},
# Step 4: SENTINEL - Performs quality checks on the implemented code.
{"agent_type": "SENTINEL", "task_desc": "Perform code review on implemented task", "kwargs": {"task_input_args": {"codebase_path": "/app/synesthesia", "task_description": "Code review for new feature"}}}
],
"success_message": "Coding pipeline completed successfully."
},
"Debug Pipeline": {
"steps": [
# Step 1: PULSE-DEBUG - Analyzes runtime issues.
{"agent_type": "PULSE-DEBUG", "task_desc": initial_task_description, "kwargs": {"task_input_args": {"runtime_id": "runtime-xyz"}}},
# Step 2: PARALLEL-DEBUGGER - Debugs issues identified by PULSE-DEBUG.
{"agent_type": "PARALLEL-DEBUGGER",
"task_desc": "Debug runtime issues based on PulseDebug findings",
"kwargs": {"task_input_args": {"code_to_evaluate": lambda res: res.get("recommendations", ["No specific recommendations found, simulating general debug code"])[0] if res and res.get("status") == "completed" else "print('Debugging simulated code based on PulseDebug findings')", "expected_behavior": "Debugging simulated code based on PulseDebug findings"}}},
# Step 3: SENTINEL - Validates the debug fix.
{"agent_type": "SENTINEL", "task_desc": "Validate debug fix", "kwargs": {"task_input_args": {"codebase_path": "/app/synesthesia", "task_description": "Review debug fix"}}}
],
"success_message": "Debug pipeline completed successfully."
},
"UI Validation Pipeline": {
"steps": [
# Step 1: PLAYWRIGHT-TESTER - Runs UI tests.
{"agent_type": "PLAYWRIGHT-TESTER", "task_desc": "Run UI tests", "kwargs": {"task_input_args": {"query": "Test UI at app/streamlit_app.py"}}},
# Step 2: PULSE-DEBUG - Analyzes runtime based on test results.
{"agent_type": "PULSE-DEBUG",
"task_desc": "Analyze UI test results for performance issues",
"kwargs": {"task_input_args": {"runtime_id": "workbench-ui-runtime", "task_description": "Analyze runtime"}}}
],
"success_message": "UI validation pipeline completed successfully."
}
}
pipeline = pipelines.get(pipeline_name)
if not pipeline:
return {"status": "failed", "message": f"Unknown pipeline: {pipeline_name}"}
# Store results from previous steps to potentially pass to next steps
step_results_context = {}
current_pipeline_result = None
for i, step in enumerate(pipeline["steps"]):
agent_type = step["agent_type"]
task_desc_raw = step["task_desc"]
step_kwargs_defs = step.get("kwargs", {})
# Resolve task description dynamically
current_task_desc = ""
if callable(task_desc_raw):
try:
# Pass relevant context to the lambda function for task description resolution.
# 'arch_res', 'resonance_res', etc., are results from specific preceding agents.
# 'kw' provides initial pipeline arguments.
current_task_desc = task_desc_raw(
arch_res=step_results_context.get("ARCHITECT"),
resonance_res=step_results_context.get("RESONANCE"), # Example for Coding Pipeline
pulse_debug_res=step_results_context.get("PULSE-DEBUG"), # Example for Debug Pipeline
kw=initial_kwargs # Pass initial kwargs if needed
)
except Exception as e:
print(f"WorkflowEngine Error: Failed to resolve task description for {agent_type} (Step {i+1}): {e}")
current_task_desc = f"Error resolving task description for {agent_type}"
else:
current_task_desc = task_desc_raw
# Resolve kwargs dynamically
current_step_kwargs = {}
for key, value_def in step_kwargs_defs.items():
if callable(value_def):
try:
# Pass relevant context to the lambda function for kwargs resolution.
# This mapping needs to be careful about what each agent needs from previous steps.
current_step_kwargs[key] = value_def(
arch_res=step_results_context.get("ARCHITECT"),
resonance_res=step_results_context.get("RESONANCE"),
pulse_debug_res=step_results_context.get("PULSE-DEBUG"),
current_result=current_pipeline_result, # Pass result of the *previous* step
kw=initial_kwargs
)
except Exception as e:
print(f"WorkflowEngine Error: Failed to resolve kwarg '{key}' for {agent_type} (Step {i+1}): {e}")
current_step_kwargs[key] = None # Handle error gracefully
else:
current_step_kwargs[key] = value_def
# Route the task
print(f"WorkflowEngine: Routing step {i+1}/{len(pipeline['steps'])} to {agent_type} with task: '{current_task_desc}' and kwargs: {current_step_kwargs}")
routing_result = self.task_router.route_task(
task_description=current_task_desc,
agent_type=agent_type,
**current_step_kwargs
)
# Store result and check for failures
current_pipeline_result = routing_result
step_results_context[agent_type] = current_pipeline_result # Store result for next steps if needed
if current_pipeline_result.get("status") == "failed":
error_message = f"Pipeline '{pipeline_name}' failed at step {i+1} ({agent_type}): {current_pipeline_result.get('message', 'Unknown error')}"
print(error_message)
self.memory_bridge.record_conclusion(error_message)
return {"status": "failed", "message": error_message, "failed_step": i+1, "agent": agent_type, "error_details": current_pipeline_result}
print(f"Step {i+1} ({agent_type}) completed with status: {current_pipeline_result.get('status')}")
# Pipeline completed successfully
final_message = pipeline["success_message"]
print(final_message)
self.memory_bridge.record_observation(final_message)
return {"status": "pipeline_completed", "final_result": current_pipeline_result, "pipeline": pipeline_name}
def run_single_agent_task(self, agent_type: str, task_description: str, task_input_args: dict):
"""
Executes a single task using a specific agent.
"""
print(f"
--- Executing Single Agent Task: {agent_type} ---")
result = self.task_router.route_task(
task_description=task_description,
agent_type=agent_type,
task_input_args=task_input_args
)
if result.get("status") == "failed":
error_message = f"Single agent task failed ({agent_type}): {result.get('message')}"
print(error_message)
self.memory_bridge.record_conclusion(error_message)
return {"status": "failed", "message": error_message, "agent": agent_type, "error_details": result}
else:
print(f"Single agent task ({agent_type}) completed.")
self.memory_bridge.record_observation(f"Single agent task ({agent_type}) completed: {result.get('message', 'Success')}")
return result
|