File size: 5,787 Bytes
e82d9c9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fd1472e
e82d9c9
9639483
e82d9c9
11888fc
9639483
e82d9c9
 
 
11888fc
 
 
e82d9c9
 
 
 
 
 
 
 
fd1472e
 
e82d9c9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9639483
e82d9c9
 
 
 
 
 
11888fc
e82d9c9
 
11888fc
 
 
 
fd1472e
11888fc
 
 
 
 
 
fd1472e
11888fc
 
 
fd1472e
 
e82d9c9
11888fc
 
 
e82d9c9
 
 
 
 
 
 
 
 
 
 
 
9639483
e0c585c
e82d9c9
 
 
 
 
 
 
 
 
11888fc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e82d9c9
 
11888fc
 
e82d9c9
11888fc
e82d9c9
11888fc
 
 
 
 
e82d9c9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
"""Hierarchical Orchestrator using middleware and sub-teams.

This orchestrator implements a hierarchical team pattern where sub-teams
can be composed and coordinated through middleware. It provides more
granular control over the research workflow compared to the simple
orchestrator.

Design Patterns:
- Composite: Teams can contain sub-teams
- Chain of Responsibility: Middleware processes requests in sequence
- Template Method: SubIterationMiddleware defines the iteration skeleton
"""

import asyncio
from collections.abc import AsyncGenerator

import structlog

from src.agents.judge_agent_llm import LLMSubIterationJudge
from src.agents.magentic_agents import create_search_agent
from src.config.domain import ResearchDomain
from src.middleware.sub_iteration import SubIterationMiddleware, SubIterationTeam
from src.orchestrators.base import OrchestratorProtocol
from src.state import init_magentic_state
from src.utils.models import AgentEvent, OrchestratorConfig
from src.utils.service_loader import get_embedding_service_if_available

logger = structlog.get_logger()

# Default timeout for hierarchical orchestrator (5 minutes)
DEFAULT_TIMEOUT_SECONDS = 300.0


class ResearchTeam(SubIterationTeam):
    """Adapts ChatAgent to SubIterationTeam protocol.

    This adapter allows the search agent to be used within the
    sub-iteration middleware framework.
    """

    def __init__(self, domain: ResearchDomain | str | None = None) -> None:
        self.agent = create_search_agent(domain=domain)

    async def execute(self, task: str) -> str:
        """Execute a research task.

        Args:
            task: The research task description

        Returns:
            Text response from the agent
        """
        response = await self.agent.run(task)
        if response.messages:
            for msg in reversed(response.messages):
                if msg.role == "assistant" and msg.text:
                    return str(msg.text)
        return "No response from agent."


class HierarchicalOrchestrator(OrchestratorProtocol):
    """Orchestrator that uses hierarchical teams and sub-iterations.

    This orchestrator provides:
    - Sub-iteration middleware for fine-grained control
    - LLM-based judge for sub-iteration decisions
    - Event-driven architecture for UI updates
    - Configurable iterations and timeout
    """

    def __init__(
        self,
        config: OrchestratorConfig | None = None,
        timeout_seconds: float = DEFAULT_TIMEOUT_SECONDS,
        domain: ResearchDomain | str | None = None,
    ) -> None:
        """Initialize the hierarchical orchestrator.

        Args:
            config: Optional configuration (uses defaults if not provided)
            timeout_seconds: Maximum workflow duration (default: 5 minutes)
            domain: Research domain for customization
        """
        self.config = config or OrchestratorConfig()
        self._timeout_seconds = timeout_seconds
        self.domain = domain
        self.team = ResearchTeam(domain=domain)
        self.judge = LLMSubIterationJudge()
        self.middleware = SubIterationMiddleware(
            self.team, self.judge, max_iterations=self.config.max_iterations
        )

    async def run(self, query: str) -> AsyncGenerator[AgentEvent, None]:
        """Run the hierarchical workflow.

        Args:
            query: User's research question

        Yields:
            AgentEvent objects for real-time UI updates
        """
        logger.info("Starting hierarchical orchestrator", query=query)

        service = get_embedding_service_if_available()
        init_magentic_state(query, service)

        yield AgentEvent(type="started", message=f"Starting research: {query}")

        queue: asyncio.Queue[AgentEvent | None] = asyncio.Queue()

        async def event_callback(event: AgentEvent) -> None:
            await queue.put(event)

        try:
            async with asyncio.timeout(self._timeout_seconds):
                task_future = asyncio.create_task(self.middleware.run(query, event_callback))

                while not task_future.done():
                    get_event = asyncio.create_task(queue.get())
                    done, _ = await asyncio.wait(
                        {task_future, get_event}, return_when=asyncio.FIRST_COMPLETED
                    )

                    if get_event in done:
                        event = get_event.result()
                        if event:
                            yield event
                    else:
                        get_event.cancel()

                # Process remaining events
                while not queue.empty():
                    ev = queue.get_nowait()
                    if ev:
                        yield ev

                result, assessment = await task_future

                assessment_text = assessment.reasoning if assessment else "None"
                yield AgentEvent(
                    type="complete",
                    message=(
                        f"Research complete.\n\nResult:\n{result}\n\nAssessment:\n{assessment_text}"
                    ),
                    data={"assessment": assessment.model_dump() if assessment else None},
                )

        except TimeoutError:
            logger.warning("Hierarchical workflow timed out", query=query)
            yield AgentEvent(
                type="complete",
                message="Research timed out. Results may be incomplete.",
                data={"reason": "timeout"},
            )

        except Exception as e:
            logger.error(
                "Orchestrator failed",
                error=str(e),
                error_type=type(e).__name__,
            )
            yield AgentEvent(type="error", message=f"Orchestrator failed: {e}")