Heavy / src /multi_orchestrator.py
justinhew
Deploy to HF Spaces
ea81a05
"""Multi-model orchestrator for managing multi-agent analysis workflow."""
import asyncio
from typing import List, Dict, Any, Optional
from .multi_client import MultiModelClient
from .multi_agent import MultiAgent
from .tavily_search import TavilySearcher
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn
console = Console()
class MultiOrchestrator:
"""Manages the multi-agent analysis workflow with model selection per role."""
def __init__(
self,
client: MultiModelClient,
config: Dict[str, Any],
orchestrator_model: str = "claude-4.5-sonnet",
agent_model: str = "claude-4.5-sonnet",
synthesizer_model: str = "claude-4.5-sonnet",
tavily_searcher: Optional[TavilySearcher] = None
):
"""Initialize the orchestrator.
Args:
client: MultiModelClient instance
config: Configuration dictionary
orchestrator_model: Model for question generation
agent_model: Model for agent analysis
synthesizer_model: Model for synthesis
tavily_searcher: Optional Tavily searcher for web research
"""
self.client = client
self.config = config
self.orchestrator_model = orchestrator_model
self.agent_model = agent_model
self.synthesizer_model = synthesizer_model
self.tavily_searcher = tavily_searcher
self.num_agents = config.get('orchestrator', {}).get('num_agents', 4)
self.verbose = config.get('output', {}).get('verbose', True)
self.show_agent_thoughts = config.get('output', {}).get('show_agent_thoughts', True)
async def process_query(self, query: str) -> str:
"""Process a query through the multi-agent system.
Args:
query: User's query to analyze
Returns:
Synthesized comprehensive response
"""
if self.verbose:
console.print(f"\n[bold cyan]Processing query:[/bold cyan] {query}\n")
console.print(f"[dim]Using: Orchestrator={self.orchestrator_model}, Agents={self.agent_model}, Synthesizer={self.synthesizer_model}[/dim]\n")
# Step 1: Generate specialized research questions
if self.verbose:
console.print("[bold yellow]Step 1:[/bold yellow] Generating specialized research questions...")
questions = await self._generate_questions(query)
if self.verbose:
console.print(f"\n[bold green]Generated {len(questions)} specialized questions:[/bold green]")
for i, q in enumerate(questions, 1):
console.print(f" {i}. {q}")
console.print()
# Step 2: Execute agents in parallel
if self.verbose:
console.print("[bold yellow]Step 2:[/bold yellow] Deploying agents for parallel analysis...\n")
agent_results = await self._execute_agents_parallel(query, questions)
# Display agent results
if self.verbose and self.show_agent_thoughts:
console.print("\n[bold green]Agent Analysis Results:[/bold green]\n")
for result in agent_results:
if result['success']:
console.print(f"[bold cyan]Agent {result['agent_id']} ({result['model']}) - {result['question']}[/bold cyan]")
console.print(f"{result['analysis']}\n")
else:
console.print(f"[bold red]Agent {result['agent_id']} ({result['model']}) failed: {result['error']}[/bold red]\n")
# Step 3: Synthesize results
if self.verbose:
console.print("[bold yellow]Step 3:[/bold yellow] Synthesizing comprehensive response...\n")
final_response = await self._synthesize_results(query, agent_results)
return final_response
async def _generate_questions(self, query: str) -> List[str]:
"""Generate specialized research questions from the original query.
Args:
query: Original user query
Returns:
List of specialized questions
"""
prompt = f"""<OrchestratorRequest num_agents="{self.num_agents}">
<OriginalQuery><![CDATA[{query}]]></OriginalQuery>
<DesignPrinciples>
<Principle>Each question must focus on a distinct perspective or sub-problem.</Principle>
<Principle>Questions should be concrete, scoped, and answerable via detailed analysis.</Principle>
<Principle>Avoid redundancy and ensure collective coverage of the original query.</Principle>
<Principle>Prefer clarity and testable language over vague brainstorming.</Principle>
</DesignPrinciples>
<OutputFormat>
<![CDATA[
<OrchestratorPlan>
<QuestionRationale>Briefly note how the set covers the query holistically.</QuestionRationale>
<NumberedQuestions>
1. Question text
2. Question text
</NumberedQuestions>
</OrchestratorPlan>
]]>
</OutputFormat>
<Instructions>
Populate <NumberedQuestions> with exactly {self.num_agents} entries using the numbering pattern shown so downstream parsers can extract them.
</Instructions>
</OrchestratorRequest>"""
messages = [
{"role": "user", "content": prompt}
]
response = await self.client.async_chat(
messages,
model=self.orchestrator_model,
temperature=0.8
)
# Parse questions from response
questions = self._parse_questions(response)
# Ensure we have the right number
if len(questions) < self.num_agents:
while len(questions) < self.num_agents:
questions.append(f"What are additional considerations for: {query}")
return questions[:self.num_agents]
def _parse_questions(self, response: str) -> List[str]:
"""Parse questions from model response."""
questions = []
lines = response.strip().split('\n')
for line in lines:
line = line.strip()
if line and (line[0].isdigit() or line.startswith('-') or line.startswith('•')):
question = line
for prefix in ['1.', '2.', '3.', '4.', '5.', '6.', '7.', '8.', '9.', '-', '•', '*']:
if question.startswith(prefix):
question = question[len(prefix):].strip()
break
if question:
questions.append(question)
return questions
async def _execute_agents_parallel(
self,
original_query: str,
questions: List[str]
) -> List[Dict[str, Any]]:
"""Execute multiple agents in parallel.
Args:
original_query: Original user query
questions: List of specialized questions
Returns:
List of agent results
"""
# Create agents (all using the same agent_model)
agents = [
MultiAgent(i, self.client, self.agent_model, self.config, self.tavily_searcher)
for i in range(len(questions))
]
# Create tasks for parallel execution
tasks = [
agent.analyze(question, original_query)
for agent, question in zip(agents, questions)
]
# Execute with progress bar
if self.verbose:
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
console=console
) as progress:
task = progress.add_task(
"[cyan]Agents analyzing...",
total=len(tasks)
)
results = []
for coro in asyncio.as_completed(tasks):
result = await coro
results.append(result)
progress.update(task, advance=1)
return results
else:
return await asyncio.gather(*tasks)
async def _synthesize_results(
self,
original_query: str,
agent_results: List[Dict[str, Any]]
) -> str:
"""Synthesize agent results into a comprehensive response.
Args:
original_query: Original user query
agent_results: Results from all agents
Returns:
Synthesized comprehensive response
"""
# Build synthesis prompt
agent_analysis_blocks = []
for result in agent_results:
if result['success']:
agent_analysis_blocks.append(
f""" <AgentAnalysis id="{result['agent_id']}" model="{result['model']}">
<Question><![CDATA[{result['question']}]]></Question>
<Analysis><![CDATA[{result['analysis']}]]></Analysis>
</AgentAnalysis>"""
)
agent_analyses = "\n".join(agent_analysis_blocks) if agent_analysis_blocks else " <AgentAnalysisList />"
synthesis_prompt = f"""<SynthesisRequest>
<OriginalQuery><![CDATA[{original_query}]]></OriginalQuery>
<Guidelines>
<Item>Integrate agent insights into a unified, well-structured response.</Item>
<Item>Identify complementary themes and resolve contradictions.</Item>
<Item>Address risks, limitations, and actionable recommendations.</Item>
<Item>Keep the response self-contained so the user can act without reading raw agent logs.</Item>
</Guidelines>
<AgentAnalyses>
{agent_analyses}
</AgentAnalyses>
<ResponseFormat>
<![CDATA[
<SynthesisResponse>
<ExecutiveSummary>...</ExecutiveSummary>
<KeyInsights>
<Insight>...</Insight>
</KeyInsights>
<ContradictionsOrTensions>
<Item>...</Item>
</ContradictionsOrTensions>
<Recommendations>...</Recommendations>
<RisksAndMitigations>...</RisksAndMitigations>
<NextIterationNotes>...</NextIterationNotes>
</SynthesisResponse>
]]>
</ResponseFormat>
<Instructions>
Populate every section even if it requires stating "None" explicitly.
</Instructions>
</SynthesisRequest>"""
messages = [
{"role": "user", "content": synthesis_prompt}
]
response = await self.client.async_chat(
messages,
model=self.synthesizer_model,
temperature=0.5,
max_tokens=6000
)
return response