Spaces:
Sleeping
Sleeping
| """Multi-model orchestrator for managing multi-agent analysis workflow.""" | |
| import asyncio | |
| from typing import List, Dict, Any, Optional | |
| from .multi_client import MultiModelClient | |
| from .multi_agent import MultiAgent | |
| from .tavily_search import TavilySearcher | |
| from rich.console import Console | |
| from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn | |
| console = Console() | |
| class MultiOrchestrator: | |
| """Manages the multi-agent analysis workflow with model selection per role.""" | |
| def __init__( | |
| self, | |
| client: MultiModelClient, | |
| config: Dict[str, Any], | |
| orchestrator_model: str = "claude-4.5-sonnet", | |
| agent_model: str = "claude-4.5-sonnet", | |
| synthesizer_model: str = "claude-4.5-sonnet", | |
| tavily_searcher: Optional[TavilySearcher] = None | |
| ): | |
| """Initialize the orchestrator. | |
| Args: | |
| client: MultiModelClient instance | |
| config: Configuration dictionary | |
| orchestrator_model: Model for question generation | |
| agent_model: Model for agent analysis | |
| synthesizer_model: Model for synthesis | |
| tavily_searcher: Optional Tavily searcher for web research | |
| """ | |
| self.client = client | |
| self.config = config | |
| self.orchestrator_model = orchestrator_model | |
| self.agent_model = agent_model | |
| self.synthesizer_model = synthesizer_model | |
| self.tavily_searcher = tavily_searcher | |
| self.num_agents = config.get('orchestrator', {}).get('num_agents', 4) | |
| self.verbose = config.get('output', {}).get('verbose', True) | |
| self.show_agent_thoughts = config.get('output', {}).get('show_agent_thoughts', True) | |
| async def process_query(self, query: str) -> str: | |
| """Process a query through the multi-agent system. | |
| Args: | |
| query: User's query to analyze | |
| Returns: | |
| Synthesized comprehensive response | |
| """ | |
| if self.verbose: | |
| console.print(f"\n[bold cyan]Processing query:[/bold cyan] {query}\n") | |
| console.print(f"[dim]Using: Orchestrator={self.orchestrator_model}, Agents={self.agent_model}, Synthesizer={self.synthesizer_model}[/dim]\n") | |
| # Step 1: Generate specialized research questions | |
| if self.verbose: | |
| console.print("[bold yellow]Step 1:[/bold yellow] Generating specialized research questions...") | |
| questions = await self._generate_questions(query) | |
| if self.verbose: | |
| console.print(f"\n[bold green]Generated {len(questions)} specialized questions:[/bold green]") | |
| for i, q in enumerate(questions, 1): | |
| console.print(f" {i}. {q}") | |
| console.print() | |
| # Step 2: Execute agents in parallel | |
| if self.verbose: | |
| console.print("[bold yellow]Step 2:[/bold yellow] Deploying agents for parallel analysis...\n") | |
| agent_results = await self._execute_agents_parallel(query, questions) | |
| # Display agent results | |
| if self.verbose and self.show_agent_thoughts: | |
| console.print("\n[bold green]Agent Analysis Results:[/bold green]\n") | |
| for result in agent_results: | |
| if result['success']: | |
| console.print(f"[bold cyan]Agent {result['agent_id']} ({result['model']}) - {result['question']}[/bold cyan]") | |
| console.print(f"{result['analysis']}\n") | |
| else: | |
| console.print(f"[bold red]Agent {result['agent_id']} ({result['model']}) failed: {result['error']}[/bold red]\n") | |
| # Step 3: Synthesize results | |
| if self.verbose: | |
| console.print("[bold yellow]Step 3:[/bold yellow] Synthesizing comprehensive response...\n") | |
| final_response = await self._synthesize_results(query, agent_results) | |
| return final_response | |
| async def _generate_questions(self, query: str) -> List[str]: | |
| """Generate specialized research questions from the original query. | |
| Args: | |
| query: Original user query | |
| Returns: | |
| List of specialized questions | |
| """ | |
| prompt = f"""<OrchestratorRequest num_agents="{self.num_agents}"> | |
| <OriginalQuery><![CDATA[{query}]]></OriginalQuery> | |
| <DesignPrinciples> | |
| <Principle>Each question must focus on a distinct perspective or sub-problem.</Principle> | |
| <Principle>Questions should be concrete, scoped, and answerable via detailed analysis.</Principle> | |
| <Principle>Avoid redundancy and ensure collective coverage of the original query.</Principle> | |
| <Principle>Prefer clarity and testable language over vague brainstorming.</Principle> | |
| </DesignPrinciples> | |
| <OutputFormat> | |
| <![CDATA[ | |
| <OrchestratorPlan> | |
| <QuestionRationale>Briefly note how the set covers the query holistically.</QuestionRationale> | |
| <NumberedQuestions> | |
| 1. Question text | |
| 2. Question text | |
| </NumberedQuestions> | |
| </OrchestratorPlan> | |
| ]]> | |
| </OutputFormat> | |
| <Instructions> | |
| Populate <NumberedQuestions> with exactly {self.num_agents} entries using the numbering pattern shown so downstream parsers can extract them. | |
| </Instructions> | |
| </OrchestratorRequest>""" | |
| messages = [ | |
| {"role": "user", "content": prompt} | |
| ] | |
| response = await self.client.async_chat( | |
| messages, | |
| model=self.orchestrator_model, | |
| temperature=0.8 | |
| ) | |
| # Parse questions from response | |
| questions = self._parse_questions(response) | |
| # Ensure we have the right number | |
| if len(questions) < self.num_agents: | |
| while len(questions) < self.num_agents: | |
| questions.append(f"What are additional considerations for: {query}") | |
| return questions[:self.num_agents] | |
| def _parse_questions(self, response: str) -> List[str]: | |
| """Parse questions from model response.""" | |
| questions = [] | |
| lines = response.strip().split('\n') | |
| for line in lines: | |
| line = line.strip() | |
| if line and (line[0].isdigit() or line.startswith('-') or line.startswith('•')): | |
| question = line | |
| for prefix in ['1.', '2.', '3.', '4.', '5.', '6.', '7.', '8.', '9.', '-', '•', '*']: | |
| if question.startswith(prefix): | |
| question = question[len(prefix):].strip() | |
| break | |
| if question: | |
| questions.append(question) | |
| return questions | |
| async def _execute_agents_parallel( | |
| self, | |
| original_query: str, | |
| questions: List[str] | |
| ) -> List[Dict[str, Any]]: | |
| """Execute multiple agents in parallel. | |
| Args: | |
| original_query: Original user query | |
| questions: List of specialized questions | |
| Returns: | |
| List of agent results | |
| """ | |
| # Create agents (all using the same agent_model) | |
| agents = [ | |
| MultiAgent(i, self.client, self.agent_model, self.config, self.tavily_searcher) | |
| for i in range(len(questions)) | |
| ] | |
| # Create tasks for parallel execution | |
| tasks = [ | |
| agent.analyze(question, original_query) | |
| for agent, question in zip(agents, questions) | |
| ] | |
| # Execute with progress bar | |
| if self.verbose: | |
| with Progress( | |
| SpinnerColumn(), | |
| TextColumn("[progress.description]{task.description}"), | |
| BarColumn(), | |
| TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), | |
| console=console | |
| ) as progress: | |
| task = progress.add_task( | |
| "[cyan]Agents analyzing...", | |
| total=len(tasks) | |
| ) | |
| results = [] | |
| for coro in asyncio.as_completed(tasks): | |
| result = await coro | |
| results.append(result) | |
| progress.update(task, advance=1) | |
| return results | |
| else: | |
| return await asyncio.gather(*tasks) | |
| async def _synthesize_results( | |
| self, | |
| original_query: str, | |
| agent_results: List[Dict[str, Any]] | |
| ) -> str: | |
| """Synthesize agent results into a comprehensive response. | |
| Args: | |
| original_query: Original user query | |
| agent_results: Results from all agents | |
| Returns: | |
| Synthesized comprehensive response | |
| """ | |
| # Build synthesis prompt | |
| agent_analysis_blocks = [] | |
| for result in agent_results: | |
| if result['success']: | |
| agent_analysis_blocks.append( | |
| f""" <AgentAnalysis id="{result['agent_id']}" model="{result['model']}"> | |
| <Question><![CDATA[{result['question']}]]></Question> | |
| <Analysis><![CDATA[{result['analysis']}]]></Analysis> | |
| </AgentAnalysis>""" | |
| ) | |
| agent_analyses = "\n".join(agent_analysis_blocks) if agent_analysis_blocks else " <AgentAnalysisList />" | |
| synthesis_prompt = f"""<SynthesisRequest> | |
| <OriginalQuery><![CDATA[{original_query}]]></OriginalQuery> | |
| <Guidelines> | |
| <Item>Integrate agent insights into a unified, well-structured response.</Item> | |
| <Item>Identify complementary themes and resolve contradictions.</Item> | |
| <Item>Address risks, limitations, and actionable recommendations.</Item> | |
| <Item>Keep the response self-contained so the user can act without reading raw agent logs.</Item> | |
| </Guidelines> | |
| <AgentAnalyses> | |
| {agent_analyses} | |
| </AgentAnalyses> | |
| <ResponseFormat> | |
| <![CDATA[ | |
| <SynthesisResponse> | |
| <ExecutiveSummary>...</ExecutiveSummary> | |
| <KeyInsights> | |
| <Insight>...</Insight> | |
| </KeyInsights> | |
| <ContradictionsOrTensions> | |
| <Item>...</Item> | |
| </ContradictionsOrTensions> | |
| <Recommendations>...</Recommendations> | |
| <RisksAndMitigations>...</RisksAndMitigations> | |
| <NextIterationNotes>...</NextIterationNotes> | |
| </SynthesisResponse> | |
| ]]> | |
| </ResponseFormat> | |
| <Instructions> | |
| Populate every section even if it requires stating "None" explicitly. | |
| </Instructions> | |
| </SynthesisRequest>""" | |
| messages = [ | |
| {"role": "user", "content": synthesis_prompt} | |
| ] | |
| response = await self.client.async_chat( | |
| messages, | |
| model=self.synthesizer_model, | |
| temperature=0.5, | |
| max_tokens=6000 | |
| ) | |
| return response | |