Spaces:
Sleeping
Sleeping
| """ | |
| Orchestrator Module | |
| ==================== | |
| Coordinates multi-agent pipelines and manages task execution. | |
| """ | |
| from dataclasses import dataclass, field | |
| from typing import Any | |
| from datetime import datetime, timezone | |
| import logging | |
| from agents.research_agent import ResearchAgent | |
| from agents.summarizer_agent import SummarizerAgent | |
| from ledger.merkle import hash_leaf | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("Orchestrator") | |
| class ActionLog: | |
| """Represents a single action/step in the pipeline.""" | |
| step: int | |
| agent: str | |
| input: dict[str, Any] | |
| output: dict[str, Any] | |
| timestamp: str | |
| hash: str = "" | |
| def to_dict(self) -> dict[str, Any]: | |
| return { | |
| "step": self.step, | |
| "agent": self.agent, | |
| "input": self.input, | |
| "output": self.output, | |
| "timestamp": self.timestamp, | |
| "hash": self.hash | |
| } | |
| class Orchestrator: | |
| """ | |
| Orchestrates multi-agent pipelines. | |
| Coordinates the execution of multiple agents in sequence, | |
| logging each step and computing merkle hashes for audit trail. | |
| """ | |
| name: str = "MainOrchestrator" | |
| action_logs: list[ActionLog] = field(default_factory=list) | |
| def __post_init__(self): | |
| self.research_agent = ResearchAgent() | |
| self.summarizer_agent = SummarizerAgent() | |
| self.logger = logging.getLogger(f"Orchestrator-{self.name}") | |
| async def run_task(self, task: dict[str, Any]) -> dict[str, Any]: | |
| """ | |
| Execute a complete task through the agent pipeline. | |
| Args: | |
| task: Dictionary containing task parameters | |
| Expected keys: 'query' for research tasks | |
| Returns: | |
| Dictionary with execution results and step logs | |
| """ | |
| self.action_logs = [] # Reset logs for new task | |
| steps = [] | |
| step_count = 0 | |
| query = task.get("query", "default query") | |
| self.logger.info(f"Starting task: {query}") | |
| # Step 1: Research Agent | |
| step_count += 1 | |
| research_input = {"query": query} | |
| research_output = await self.research_agent.run(research_input) | |
| research_log = self._create_action_log( | |
| step=step_count, | |
| agent="research", | |
| input=research_input, | |
| output=research_output | |
| ) | |
| self.action_logs.append(research_log) | |
| steps.append(research_log.to_dict()) | |
| # Step 2: Summarizer Agent | |
| step_count += 1 | |
| summarizer_input = {"documents": research_output.get("results", [])} | |
| summarizer_output = await self.summarizer_agent.run(summarizer_input) | |
| summarizer_log = self._create_action_log( | |
| step=step_count, | |
| agent="summarizer", | |
| input=summarizer_input, | |
| output=summarizer_output | |
| ) | |
| self.action_logs.append(summarizer_log) | |
| steps.append(summarizer_log.to_dict()) | |
| self.logger.info(f"Task completed with {step_count} steps") | |
| return { | |
| "task": task, | |
| "steps": steps, | |
| "final_output": summarizer_output, | |
| "total_steps": step_count | |
| } | |
| def _create_action_log( | |
| self, | |
| step: int, | |
| agent: str, | |
| input: dict[str, Any], | |
| output: dict[str, Any] | |
| ) -> ActionLog: | |
| """Create an action log entry with timestamp and hash.""" | |
| timestamp = datetime.now(timezone.utc).isoformat() | |
| # Create hash of the action for audit trail | |
| hash_content = f"{step}:{agent}:{timestamp}:{str(output)}" | |
| action_hash = hash_leaf(hash_content) | |
| return ActionLog( | |
| step=step, | |
| agent=agent, | |
| input=input, | |
| output=output, | |
| timestamp=timestamp, | |
| hash=action_hash | |
| ) | |
| def get_action_hashes(self) -> list[str]: | |
| """Get all action hashes for merkle tree computation.""" | |
| return [log.hash for log in self.action_logs] | |