File size: 5,787 Bytes
e82d9c9 fd1472e e82d9c9 9639483 e82d9c9 11888fc 9639483 e82d9c9 11888fc e82d9c9 fd1472e e82d9c9 9639483 e82d9c9 11888fc e82d9c9 11888fc fd1472e 11888fc fd1472e 11888fc fd1472e e82d9c9 11888fc e82d9c9 9639483 e0c585c e82d9c9 11888fc e82d9c9 11888fc e82d9c9 11888fc e82d9c9 11888fc e82d9c9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
"""Hierarchical Orchestrator using middleware and sub-teams.
This orchestrator implements a hierarchical team pattern where sub-teams
can be composed and coordinated through middleware. It provides more
granular control over the research workflow compared to the simple
orchestrator.
Design Patterns:
- Composite: Teams can contain sub-teams
- Chain of Responsibility: Middleware processes requests in sequence
- Template Method: SubIterationMiddleware defines the iteration skeleton
"""
import asyncio
from collections.abc import AsyncGenerator
import structlog
from src.agents.judge_agent_llm import LLMSubIterationJudge
from src.agents.magentic_agents import create_search_agent
from src.config.domain import ResearchDomain
from src.middleware.sub_iteration import SubIterationMiddleware, SubIterationTeam
from src.orchestrators.base import OrchestratorProtocol
from src.state import init_magentic_state
from src.utils.models import AgentEvent, OrchestratorConfig
from src.utils.service_loader import get_embedding_service_if_available
logger = structlog.get_logger()
# Default timeout for hierarchical orchestrator (5 minutes)
DEFAULT_TIMEOUT_SECONDS = 300.0
class ResearchTeam(SubIterationTeam):
"""Adapts ChatAgent to SubIterationTeam protocol.
This adapter allows the search agent to be used within the
sub-iteration middleware framework.
"""
def __init__(self, domain: ResearchDomain | str | None = None) -> None:
self.agent = create_search_agent(domain=domain)
async def execute(self, task: str) -> str:
"""Execute a research task.
Args:
task: The research task description
Returns:
Text response from the agent
"""
response = await self.agent.run(task)
if response.messages:
for msg in reversed(response.messages):
if msg.role == "assistant" and msg.text:
return str(msg.text)
return "No response from agent."
class HierarchicalOrchestrator(OrchestratorProtocol):
"""Orchestrator that uses hierarchical teams and sub-iterations.
This orchestrator provides:
- Sub-iteration middleware for fine-grained control
- LLM-based judge for sub-iteration decisions
- Event-driven architecture for UI updates
- Configurable iterations and timeout
"""
def __init__(
self,
config: OrchestratorConfig | None = None,
timeout_seconds: float = DEFAULT_TIMEOUT_SECONDS,
domain: ResearchDomain | str | None = None,
) -> None:
"""Initialize the hierarchical orchestrator.
Args:
config: Optional configuration (uses defaults if not provided)
timeout_seconds: Maximum workflow duration (default: 5 minutes)
domain: Research domain for customization
"""
self.config = config or OrchestratorConfig()
self._timeout_seconds = timeout_seconds
self.domain = domain
self.team = ResearchTeam(domain=domain)
self.judge = LLMSubIterationJudge()
self.middleware = SubIterationMiddleware(
self.team, self.judge, max_iterations=self.config.max_iterations
)
async def run(self, query: str) -> AsyncGenerator[AgentEvent, None]:
"""Run the hierarchical workflow.
Args:
query: User's research question
Yields:
AgentEvent objects for real-time UI updates
"""
logger.info("Starting hierarchical orchestrator", query=query)
service = get_embedding_service_if_available()
init_magentic_state(query, service)
yield AgentEvent(type="started", message=f"Starting research: {query}")
queue: asyncio.Queue[AgentEvent | None] = asyncio.Queue()
async def event_callback(event: AgentEvent) -> None:
await queue.put(event)
try:
async with asyncio.timeout(self._timeout_seconds):
task_future = asyncio.create_task(self.middleware.run(query, event_callback))
while not task_future.done():
get_event = asyncio.create_task(queue.get())
done, _ = await asyncio.wait(
{task_future, get_event}, return_when=asyncio.FIRST_COMPLETED
)
if get_event in done:
event = get_event.result()
if event:
yield event
else:
get_event.cancel()
# Process remaining events
while not queue.empty():
ev = queue.get_nowait()
if ev:
yield ev
result, assessment = await task_future
assessment_text = assessment.reasoning if assessment else "None"
yield AgentEvent(
type="complete",
message=(
f"Research complete.\n\nResult:\n{result}\n\nAssessment:\n{assessment_text}"
),
data={"assessment": assessment.model_dump() if assessment else None},
)
except TimeoutError:
logger.warning("Hierarchical workflow timed out", query=query)
yield AgentEvent(
type="complete",
message="Research timed out. Results may be incomplete.",
data={"reason": "timeout"},
)
except Exception as e:
logger.error(
"Orchestrator failed",
error=str(e),
error_type=type(e).__name__,
)
yield AgentEvent(type="error", message=f"Orchestrator failed: {e}")
|