|
|
"""Hierarchical Orchestrator using middleware and sub-teams. |
|
|
|
|
|
This orchestrator implements a hierarchical team pattern where sub-teams |
|
|
can be composed and coordinated through middleware. It provides more |
|
|
granular control over the research workflow compared to the simple |
|
|
orchestrator. |
|
|
|
|
|
Design Patterns: |
|
|
- Composite: Teams can contain sub-teams |
|
|
- Chain of Responsibility: Middleware processes requests in sequence |
|
|
- Template Method: SubIterationMiddleware defines the iteration skeleton |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
from collections.abc import AsyncGenerator |
|
|
|
|
|
import structlog |
|
|
|
|
|
from src.agents.judge_agent_llm import LLMSubIterationJudge |
|
|
from src.agents.magentic_agents import create_search_agent |
|
|
from src.config.domain import ResearchDomain |
|
|
from src.middleware.sub_iteration import SubIterationMiddleware, SubIterationTeam |
|
|
from src.orchestrators.base import OrchestratorProtocol |
|
|
from src.state import init_magentic_state |
|
|
from src.utils.models import AgentEvent, OrchestratorConfig |
|
|
from src.utils.service_loader import get_embedding_service_if_available |
|
|
|
|
|
logger = structlog.get_logger() |
|
|
|
|
|
|
|
|
DEFAULT_TIMEOUT_SECONDS = 300.0 |
|
|
|
|
|
|
|
|
class ResearchTeam(SubIterationTeam): |
|
|
"""Adapts ChatAgent to SubIterationTeam protocol. |
|
|
|
|
|
This adapter allows the search agent to be used within the |
|
|
sub-iteration middleware framework. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
domain: ResearchDomain | str | None = None, |
|
|
api_key: str | None = None, |
|
|
) -> None: |
|
|
self.agent = create_search_agent(domain=domain, api_key=api_key) |
|
|
|
|
|
async def execute(self, task: str) -> str: |
|
|
"""Execute a research task. |
|
|
|
|
|
Args: |
|
|
task: The research task description |
|
|
|
|
|
Returns: |
|
|
Text response from the agent |
|
|
""" |
|
|
response = await self.agent.run(task) |
|
|
if response.messages: |
|
|
for msg in reversed(response.messages): |
|
|
if msg.role == "assistant" and msg.text: |
|
|
return str(msg.text) |
|
|
return "No response from agent." |
|
|
|
|
|
|
|
|
class HierarchicalOrchestrator(OrchestratorProtocol): |
|
|
"""Orchestrator that uses hierarchical teams and sub-iterations. |
|
|
|
|
|
This orchestrator provides: |
|
|
- Sub-iteration middleware for fine-grained control |
|
|
- LLM-based judge for sub-iteration decisions |
|
|
- Event-driven architecture for UI updates |
|
|
- Configurable iterations and timeout |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
config: OrchestratorConfig | None = None, |
|
|
timeout_seconds: float = DEFAULT_TIMEOUT_SECONDS, |
|
|
domain: ResearchDomain | str | None = None, |
|
|
api_key: str | None = None, |
|
|
) -> None: |
|
|
"""Initialize the hierarchical orchestrator. |
|
|
|
|
|
Args: |
|
|
config: Optional configuration (uses defaults if not provided) |
|
|
timeout_seconds: Maximum workflow duration (default: 5 minutes) |
|
|
domain: Research domain for customization |
|
|
api_key: Optional BYOK key (auto-detects provider from prefix) |
|
|
""" |
|
|
self.config = config or OrchestratorConfig() |
|
|
self._timeout_seconds = timeout_seconds |
|
|
self.domain = domain |
|
|
self._api_key = api_key |
|
|
self.team = ResearchTeam(domain=domain, api_key=api_key) |
|
|
self.judge = LLMSubIterationJudge(api_key=api_key) |
|
|
self.middleware = SubIterationMiddleware( |
|
|
self.team, self.judge, max_iterations=self.config.max_iterations |
|
|
) |
|
|
|
|
|
async def run(self, query: str) -> AsyncGenerator[AgentEvent, None]: |
|
|
"""Run the hierarchical workflow. |
|
|
|
|
|
Args: |
|
|
query: User's research question |
|
|
|
|
|
Yields: |
|
|
AgentEvent objects for real-time UI updates |
|
|
""" |
|
|
logger.info("Starting hierarchical orchestrator", query=query) |
|
|
|
|
|
service = get_embedding_service_if_available(api_key=self._api_key) |
|
|
init_magentic_state(query, service) |
|
|
|
|
|
yield AgentEvent(type="started", message=f"Starting research: {query}") |
|
|
|
|
|
queue: asyncio.Queue[AgentEvent | None] = asyncio.Queue() |
|
|
|
|
|
async def event_callback(event: AgentEvent) -> None: |
|
|
await queue.put(event) |
|
|
|
|
|
try: |
|
|
async with asyncio.timeout(self._timeout_seconds): |
|
|
task_future = asyncio.create_task(self.middleware.run(query, event_callback)) |
|
|
|
|
|
while not task_future.done(): |
|
|
get_event = asyncio.create_task(queue.get()) |
|
|
done, _ = await asyncio.wait( |
|
|
{task_future, get_event}, return_when=asyncio.FIRST_COMPLETED |
|
|
) |
|
|
|
|
|
if get_event in done: |
|
|
event = get_event.result() |
|
|
if event: |
|
|
yield event |
|
|
else: |
|
|
get_event.cancel() |
|
|
|
|
|
|
|
|
while not queue.empty(): |
|
|
ev = queue.get_nowait() |
|
|
if ev: |
|
|
yield ev |
|
|
|
|
|
result, assessment = await task_future |
|
|
|
|
|
assessment_text = assessment.reasoning if assessment else "None" |
|
|
yield AgentEvent( |
|
|
type="complete", |
|
|
message=( |
|
|
f"Research complete.\n\nResult:\n{result}\n\nAssessment:\n{assessment_text}" |
|
|
), |
|
|
data={"assessment": assessment.model_dump() if assessment else None}, |
|
|
) |
|
|
|
|
|
except TimeoutError: |
|
|
logger.warning("Hierarchical workflow timed out", query=query) |
|
|
yield AgentEvent( |
|
|
type="complete", |
|
|
message="Research timed out. Results may be incomplete.", |
|
|
data={"reason": "timeout"}, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error( |
|
|
"Orchestrator failed", |
|
|
error=str(e), |
|
|
error_type=type(e).__name__, |
|
|
) |
|
|
yield AgentEvent(type="error", message=f"Orchestrator failed: {e}") |
|
|
|