AgentMask / src /orchestrator.py
b2230765034
stage1: multi-agent basic implementation + tests
c34b33b
"""
Orchestrator Module
====================
Coordinates multi-agent pipelines and manages task execution.
"""
from dataclasses import dataclass, field
from typing import Any
from datetime import datetime, timezone
import logging
from agents.research_agent import ResearchAgent
from agents.summarizer_agent import SummarizerAgent
from ledger.merkle import hash_leaf
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("Orchestrator")
@dataclass
class ActionLog:
"""Represents a single action/step in the pipeline."""
step: int
agent: str
input: dict[str, Any]
output: dict[str, Any]
timestamp: str
hash: str = ""
def to_dict(self) -> dict[str, Any]:
return {
"step": self.step,
"agent": self.agent,
"input": self.input,
"output": self.output,
"timestamp": self.timestamp,
"hash": self.hash
}
@dataclass
class Orchestrator:
"""
Orchestrates multi-agent pipelines.
Coordinates the execution of multiple agents in sequence,
logging each step and computing merkle hashes for audit trail.
"""
name: str = "MainOrchestrator"
action_logs: list[ActionLog] = field(default_factory=list)
def __post_init__(self):
self.research_agent = ResearchAgent()
self.summarizer_agent = SummarizerAgent()
self.logger = logging.getLogger(f"Orchestrator-{self.name}")
async def run_task(self, task: dict[str, Any]) -> dict[str, Any]:
"""
Execute a complete task through the agent pipeline.
Args:
task: Dictionary containing task parameters
Expected keys: 'query' for research tasks
Returns:
Dictionary with execution results and step logs
"""
self.action_logs = [] # Reset logs for new task
steps = []
step_count = 0
query = task.get("query", "default query")
self.logger.info(f"Starting task: {query}")
# Step 1: Research Agent
step_count += 1
research_input = {"query": query}
research_output = await self.research_agent.run(research_input)
research_log = self._create_action_log(
step=step_count,
agent="research",
input=research_input,
output=research_output
)
self.action_logs.append(research_log)
steps.append(research_log.to_dict())
# Step 2: Summarizer Agent
step_count += 1
summarizer_input = {"documents": research_output.get("results", [])}
summarizer_output = await self.summarizer_agent.run(summarizer_input)
summarizer_log = self._create_action_log(
step=step_count,
agent="summarizer",
input=summarizer_input,
output=summarizer_output
)
self.action_logs.append(summarizer_log)
steps.append(summarizer_log.to_dict())
self.logger.info(f"Task completed with {step_count} steps")
return {
"task": task,
"steps": steps,
"final_output": summarizer_output,
"total_steps": step_count
}
def _create_action_log(
self,
step: int,
agent: str,
input: dict[str, Any],
output: dict[str, Any]
) -> ActionLog:
"""Create an action log entry with timestamp and hash."""
timestamp = datetime.now(timezone.utc).isoformat()
# Create hash of the action for audit trail
hash_content = f"{step}:{agent}:{timestamp}:{str(output)}"
action_hash = hash_leaf(hash_content)
return ActionLog(
step=step,
agent=agent,
input=input,
output=output,
timestamp=timestamp,
hash=action_hash
)
def get_action_hashes(self) -> list[str]:
"""Get all action hashes for merkle tree computation."""
return [log.hash for log in self.action_logs]