Qagents-workflows / orchestrators /orchestrator.py
Deminiko
1
b86397a
# Path: QAgents-workflos/orchestrators/orchestrator.py
# Relations: Uses agents, workflows, database modules
# Description: Orchestrators for Blackboard, Guided, and Naked execution modes
"""
Orchestrators Module: Workflow orchestration and execution.
Contains both Blackboard (free) and Guided (strict) orchestrators.
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
from datetime import datetime
import logging
import time
from agents import (
BaseAgent, AgentContext, AgentResult,
AgentState, create_all_agents
)
from workflows import (
WorkflowDefinition, WorkflowExecution,
WorkflowStatus, get_workflow
)
from database import get_database, LogEntry
logger = logging.getLogger(__name__)
@dataclass
class OrchestratorResult:
"""Result from orchestrator execution."""
success: bool
final_output: Any
execution_time_ms: float
steps_completed: int
total_steps: int
agent_results: Dict[str, AgentResult] = field(default_factory=dict)
errors: List[str] = field(default_factory=list)
class BaseOrchestrator(ABC):
"""Abstract base class for orchestrators."""
def __init__(self, name: str):
self.name = name
self.agents: Dict[str, BaseAgent] = {}
self.db = get_database()
def register_agent(self, agent: BaseAgent):
"""Register an agent with the orchestrator."""
self.agents[agent.agent_id] = agent
def log(self, level: str, message: str, workflow_id: str = None, data: Dict = None):
"""Log orchestrator activity."""
entry = LogEntry(
level=level,
message=message,
agent_id=self.name,
workflow_id=workflow_id,
data=data or {}
)
self.db.log(entry)
@abstractmethod
def run(self, goal: str, initial_context: Dict = None) -> OrchestratorResult:
"""Run the orchestrator to achieve the goal."""
pass
class BlackboardOrchestrator(BaseOrchestrator):
"""
Blackboard (Free) Orchestrator.
Uses a shared blackboard for agent communication.
Agents opportunistically activate when they can contribute.
Emergent workflow based on data availability.
"""
def __init__(self):
super().__init__("blackboard")
self.blackboard: Dict[str, Any] = {}
self.max_iterations = 20
def _reset_blackboard(self, goal: str, initial_context: Dict = None):
"""Initialize the blackboard with goal and context."""
# Ensure goal is a string
if isinstance(goal, list):
goal = goal[0] if goal else ""
goal = str(goal) if goal else ""
self.blackboard = {
"goal": goal,
"current_circuit": None,
"validation_passed": False,
"scores": None,
"completed": False,
**(initial_context or {})
}
def _build_context(self) -> AgentContext:
"""Build agent context from blackboard."""
return AgentContext(
goal=self.blackboard.get("goal", ""),
current_circuit=self.blackboard.get("current_circuit"),
history=self.blackboard.get("history", []),
constraints=self.blackboard.get("constraints", {}),
shared_data=self.blackboard
)
def _find_active_agent(self, context: AgentContext) -> Optional[BaseAgent]:
"""Find an agent that can handle the current state."""
# Priority order for agent selection - simplified for reliability
# First: generate circuit, then validate
priority_order = ["builder", "architect", "validator"]
for agent_id in priority_order:
agent = self.agents.get(agent_id)
if agent and agent.can_handle(context):
if agent.state == AgentState.IDLE:
return agent
return None
def _update_blackboard(self, agent_id: str, result: AgentResult):
"""Update blackboard with agent results."""
if not result.success:
return
data = result.data
if isinstance(data, dict):
# Extract QASM if present
if "qasm" in data:
qasm = data["qasm"]
# Handle list responses
if isinstance(qasm, list):
qasm = qasm[0] if qasm else None
self.blackboard["current_circuit"] = qasm
# Update validation status
if "valid" in data:
self.blackboard["validation_passed"] = data["valid"]
# Update scores
if "score" in data:
self.blackboard["scores"] = data["score"]
# Track history
if "history" not in self.blackboard:
self.blackboard["history"] = []
self.blackboard["history"].append({
"agent": agent_id,
"action": result.actions_taken,
"success": result.success,
"timestamp": datetime.now().isoformat()
})
def _check_completion(self) -> bool:
"""Check if the goal has been achieved."""
# Simple completion: we have a validated circuit
has_circuit = self.blackboard.get("current_circuit") is not None
is_validated = self.blackboard.get("validation_passed", False)
return has_circuit and is_validated
def run(self, goal: str, initial_context: Dict = None) -> OrchestratorResult:
"""Run blackboard orchestration."""
start_time = time.perf_counter()
self.log("INFO", f"Starting blackboard orchestration for: {goal}")
self._reset_blackboard(goal, initial_context)
# Ensure we have agents
if not self.agents:
self.agents = create_all_agents()
agent_results = {}
steps_completed = 0
errors = []
for iteration in range(self.max_iterations):
context = self._build_context()
# Find an agent that can work
agent = self._find_active_agent(context)
if agent is None:
self.log("INFO", "No active agent found, checking completion")
if self._check_completion():
break
# No agent and not complete - might be stuck
if iteration > 5: # Give it a few tries
errors.append("No agent could make progress")
break
continue
self.log("INFO", f"Activating agent: {agent.agent_id}")
# Agent decides and executes - with null safety
try:
action = agent.decide(context)
if action is None:
self.log("WARN", f"Agent {agent.agent_id} returned no action, continuing")
agent.reset()
continue
result = agent.execute(action, context)
if result is None:
self.log("WARN", f"Agent {agent.agent_id} returned no result, continuing")
agent.reset()
continue
agent_results[agent.agent_id] = result
steps_completed += 1
# Update blackboard
self._update_blackboard(agent.agent_id, result)
except Exception as e:
self.log("ERROR", f"Agent {agent.agent_id} failed: {e}")
errors.append(f"Agent {agent.agent_id} error: {str(e)}")
agent.reset()
continue
# Reset agent for next potential activation
agent.reset()
# Check completion
if self._check_completion():
self.log("INFO", "Goal achieved!")
break
elapsed = (time.perf_counter() - start_time) * 1000
return OrchestratorResult(
success=self._check_completion(),
final_output=self.blackboard.get("current_circuit"),
execution_time_ms=elapsed,
steps_completed=steps_completed,
total_steps=self.max_iterations,
agent_results=agent_results,
errors=errors
)
class GuidedOrchestrator(BaseOrchestrator):
"""
Guided (Strict) Orchestrator.
Follows a predefined workflow with explicit steps.
Central control over agent execution order.
Predictable, auditable execution path.
"""
def __init__(self, workflow_name: str = "build"):
super().__init__("guided")
self.workflow = get_workflow(workflow_name)
if self.workflow is None:
raise ValueError(f"Unknown workflow: {workflow_name}")
self.execution: Optional[WorkflowExecution] = None
def set_workflow(self, workflow_name: str):
"""Change the workflow."""
self.workflow = get_workflow(workflow_name)
if self.workflow is None:
raise ValueError(f"Unknown workflow: {workflow_name}")
def run(self, goal: str, initial_context: Dict = None) -> OrchestratorResult:
"""Run guided workflow orchestration."""
start_time = time.perf_counter()
# Ensure goal is a string
if isinstance(goal, list):
goal = goal[0] if goal else ""
goal = str(goal) if goal else ""
self.log("INFO", f"Starting guided workflow '{self.workflow.name}' for: {goal}")
# Initialize execution state
self.execution = WorkflowExecution(
workflow=self.workflow,
context={"goal": goal, **(initial_context or {})}
)
self.execution.status = WorkflowStatus.IN_PROGRESS
# Ensure we have agents
if not self.agents:
self.agents = create_all_agents()
agent_results = {}
# Execute each step in order
while self.execution.current_step is not None:
step = self.execution.current_step
self.log("INFO", f"Executing step: {step.name} ({step.agent_type})")
# Get the agent for this step
agent = self.agents.get(step.agent_type)
if agent is None:
if step.required:
self.execution.fail(f"Missing agent: {step.agent_type}")
break
else:
self.log("WARN", f"Skipping optional step: {step.name}")
self.execution.advance()
continue
# Build context for agent
context = AgentContext(
goal=self.execution.context.get("goal", ""),
current_circuit=self.execution.context.get("circuit_qasm"),
history=[],
constraints={},
shared_data=self.execution.context
)
# Agent decides and executes
action = agent.decide(context)
if action is None:
# Agent has nothing to do - might be okay for some steps
self.log("WARN", f"Agent {step.agent_type} returned no action")
self.execution.advance()
continue
result = agent.execute(action, context)
agent_results[step.name] = result
# Store outputs in execution context
if result.success and result.data:
for output_key in step.outputs:
if isinstance(result.data, dict):
if output_key in result.data:
self.execution.context[output_key] = result.data[output_key]
elif "qasm" in result.data:
qasm = result.data["qasm"]
# Handle list responses
if isinstance(qasm, list):
qasm = qasm[0] if qasm else None
self.execution.context["circuit_qasm"] = qasm
# Handle failure
if not result.success and step.required:
self.execution.fail(f"Step {step.name} failed: {result.message}")
break
# Reset agent and advance
agent.reset()
self.execution.advance()
elapsed = (time.perf_counter() - start_time) * 1000
return OrchestratorResult(
success=self.execution.status == WorkflowStatus.COMPLETED,
final_output=self.execution.context.get(self.workflow.final_output),
execution_time_ms=elapsed,
steps_completed=self.execution.current_step_index,
total_steps=len(self.workflow.steps),
agent_results=agent_results,
errors=self.execution.errors
)
class NakedOrchestrator(BaseOrchestrator):
"""
Naked (Baseline) Orchestrator.
Direct LLM-to-QASM generation with single call.
No multi-agent coordination, no structured workflow.
Uses ONE LLM call per problem for baseline comparison.
Purpose: Measure raw LLM capability at quantum circuit generation
without agentic overhead.
"""
def __init__(self):
super().__init__("naked")
self._llm = None
def _get_llm(self):
"""Lazy load LLM adapter."""
if self._llm is None:
from agents.llm_adapter import get_llm_adapter
from config import config
# Get API key dynamically (supports HF Spaces Secrets)
api_key = config.llm.get_api_key()
if not api_key:
raise ValueError(
"Missing API key! To use the Google AI API, provide api_key via:\n"
" 1. GOOGLE_API_KEY environment variable (HF Spaces Secrets)\n"
" 2. GENAI_API_KEY environment variable (fallback)\n"
" 3. Set in .env file (local development)"
)
self._llm = get_llm_adapter(
provider="gemini",
api_key=api_key,
enable_fallback=True
)
return self._llm
def run(self, goal: str, initial_context: Dict = None) -> OrchestratorResult:
"""
Run naked LLM execution - ONE LLM call per problem.
This is the baseline test: can a single LLM call generate
valid QASM for a quantum computing problem?
"""
start_time = time.perf_counter()
# Ensure goal is a string
if isinstance(goal, list):
goal = goal[0] if goal else ""
goal = str(goal) if goal else ""
self.log("INFO", f"Starting naked LLM execution for: {goal}")
from tools import invoke_tool
errors = []
circuit_qasm = None
llm_requests = 0
tokens_used = 0
# System prompt for direct QASM generation
system_prompt = """You are an expert quantum computing engineer.
Your task is to generate valid OpenQASM 2.0 code for the given quantum circuit problem.
RULES:
1. Output ONLY valid OpenQASM 2.0 code
2. Start with: OPENQASM 2.0; include "qelib1.inc";
3. Declare qubits with: qreg q[N];
4. Declare classical bits with: creg c[N];
5. Use standard gates: h, x, y, z, cx, cz, ccx, swap, t, s, rx, ry, rz
6. Add measurements with: measure q[i] -> c[i];
7. NO explanations, NO markdown, ONLY QASM code
EXAMPLE OUTPUT:
OPENQASM 2.0;
include "qelib1.inc";
qreg q[2];
creg c[2];
h q[0];
cx q[0], q[1];
measure q[0] -> c[0];
measure q[1] -> c[1];
"""
user_prompt = f"""Generate the OpenQASM 2.0 code for this quantum circuit problem:
{goal}
Output ONLY the QASM code, nothing else."""
try:
# Single LLM call - the naked baseline test
llm = self._get_llm()
response = llm.generate(
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
temperature=0.1, # Low temperature for deterministic output
max_tokens=1000
)
llm_requests = 1
tokens_used = response.tokens_used
# Extract QASM from response
raw_output = response.text.strip()
# Clean up common LLM artifacts
if "```" in raw_output:
# Extract from code block
lines = raw_output.split("\n")
in_block = False
qasm_lines = []
for line in lines:
if line.strip().startswith("```"):
if in_block:
break
in_block = True
continue
if in_block:
qasm_lines.append(line)
raw_output = "\n".join(qasm_lines)
# Ensure it starts with OPENQASM declaration
if "OPENQASM" in raw_output:
# Find the start of QASM
idx = raw_output.find("OPENQASM")
circuit_qasm = raw_output[idx:]
else:
# Try to use as-is if it looks like QASM
if "qreg" in raw_output or "include" in raw_output:
circuit_qasm = "OPENQASM 2.0;\ninclude \"qelib1.inc\";\n" + raw_output
else:
errors.append(f"LLM did not produce valid QASM: {raw_output[:100]}")
# Validate the generated QASM
if circuit_qasm:
validation = invoke_tool("validate_syntax", qasm=circuit_qasm)
if not validation.get("success") or not validation.get("valid", False):
error_msg = validation.get("error", "Unknown validation error")
errors.append(f"QASM validation failed: {error_msg}")
# Still keep the circuit for analysis
self.log("WARN", f"Generated QASM failed validation: {error_msg}")
except Exception as e:
errors.append(str(e))
self.log("ERROR", f"Naked LLM execution failed: {e}")
elapsed = (time.perf_counter() - start_time) * 1000
# Create a simple AgentResult-like dict for compatibility
from agents import AgentResult
naked_result = AgentResult(
success=circuit_qasm is not None and len(errors) == 0,
data={
"qasm": circuit_qasm,
"llm_requests": llm_requests,
"tokens_used": tokens_used
},
message=f"Generated QASM via naked LLM ({llm_requests} request, {tokens_used} tokens)"
)
return OrchestratorResult(
success=circuit_qasm is not None and len(errors) == 0,
final_output=circuit_qasm,
execution_time_ms=elapsed,
steps_completed=1 if llm_requests > 0 else 0,
total_steps=1,
agent_results={"naked_llm": naked_result},
errors=errors
)
# Factory function
def create_orchestrator(mode: str) -> BaseOrchestrator:
"""Create an orchestrator based on mode."""
if mode == "blackboard":
return BlackboardOrchestrator()
elif mode == "guided":
return GuidedOrchestrator()
elif mode == "naked":
return NakedOrchestrator()
elif mode == "quasar":
from .quasar_orchestrator import QuasarOrchestrator
return QuasarOrchestrator()
elif mode == "hybrid":
from .quasar_orchestrator import HybridOrchestrator
return HybridOrchestrator()
else:
raise ValueError(f"Unknown mode: {mode}. Use 'blackboard', 'guided', 'naked', 'quasar', or 'hybrid'")