""" Orchestrator Module ==================== Coordinates multi-agent pipelines and manages task execution. """ from dataclasses import dataclass, field from typing import Any from datetime import datetime, timezone import logging from agents.research_agent import ResearchAgent from agents.summarizer_agent import SummarizerAgent from ledger.merkle import hash_leaf logging.basicConfig(level=logging.INFO) logger = logging.getLogger("Orchestrator") @dataclass class ActionLog: """Represents a single action/step in the pipeline.""" step: int agent: str input: dict[str, Any] output: dict[str, Any] timestamp: str hash: str = "" def to_dict(self) -> dict[str, Any]: return { "step": self.step, "agent": self.agent, "input": self.input, "output": self.output, "timestamp": self.timestamp, "hash": self.hash } @dataclass class Orchestrator: """ Orchestrates multi-agent pipelines. Coordinates the execution of multiple agents in sequence, logging each step and computing merkle hashes for audit trail. """ name: str = "MainOrchestrator" action_logs: list[ActionLog] = field(default_factory=list) def __post_init__(self): self.research_agent = ResearchAgent() self.summarizer_agent = SummarizerAgent() self.logger = logging.getLogger(f"Orchestrator-{self.name}") async def run_task(self, task: dict[str, Any]) -> dict[str, Any]: """ Execute a complete task through the agent pipeline. Args: task: Dictionary containing task parameters Expected keys: 'query' for research tasks Returns: Dictionary with execution results and step logs """ self.action_logs = [] # Reset logs for new task steps = [] step_count = 0 query = task.get("query", "default query") self.logger.info(f"Starting task: {query}") # Step 1: Research Agent step_count += 1 research_input = {"query": query} research_output = await self.research_agent.run(research_input) research_log = self._create_action_log( step=step_count, agent="research", input=research_input, output=research_output ) self.action_logs.append(research_log) steps.append(research_log.to_dict()) # Step 2: Summarizer Agent step_count += 1 summarizer_input = {"documents": research_output.get("results", [])} summarizer_output = await self.summarizer_agent.run(summarizer_input) summarizer_log = self._create_action_log( step=step_count, agent="summarizer", input=summarizer_input, output=summarizer_output ) self.action_logs.append(summarizer_log) steps.append(summarizer_log.to_dict()) self.logger.info(f"Task completed with {step_count} steps") return { "task": task, "steps": steps, "final_output": summarizer_output, "total_steps": step_count } def _create_action_log( self, step: int, agent: str, input: dict[str, Any], output: dict[str, Any] ) -> ActionLog: """Create an action log entry with timestamp and hash.""" timestamp = datetime.now(timezone.utc).isoformat() # Create hash of the action for audit trail hash_content = f"{step}:{agent}:{timestamp}:{str(output)}" action_hash = hash_leaf(hash_content) return ActionLog( step=step, agent=agent, input=input, output=output, timestamp=timestamp, hash=action_hash ) def get_action_hashes(self) -> list[str]: """Get all action hashes for merkle tree computation.""" return [log.hash for log in self.action_logs]