Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Synthesizer Agent for GAIA Agent System | |
| Combines results from multiple agents and produces final answers | |
| """ | |
| import logging | |
| from typing import Dict, List, Optional, Any | |
| from statistics import mean | |
| from agents.state import GAIAAgentState, AgentRole, AgentResult | |
| from models.qwen_client import QwenClient, ModelTier | |
| logger = logging.getLogger(__name__) | |
| class SynthesizerAgent: | |
| """ | |
| Synthesizer agent that combines multiple agent results into a final answer | |
| """ | |
| def __init__(self, llm_client: QwenClient): | |
| self.llm_client = llm_client | |
| def process(self, state: GAIAAgentState) -> GAIAAgentState: | |
| """ | |
| Synthesize final answer from multiple agent results | |
| """ | |
| logger.info("Synthesizer: Starting result synthesis") | |
| state.add_processing_step("Synthesizer: Analyzing agent results") | |
| try: | |
| # Check if we have any agent results to synthesize | |
| if not state.agent_results: | |
| error_msg = "No agent results available for synthesis" | |
| state.add_error(error_msg) | |
| state.final_answer = "Unable to process question - no agent results available" | |
| state.final_confidence = 0.0 | |
| state.final_reasoning = error_msg | |
| state.is_complete = True | |
| return state | |
| # Determine synthesis strategy based on available results | |
| synthesis_strategy = self._determine_synthesis_strategy(state) | |
| state.add_processing_step(f"Synthesizer: Using {synthesis_strategy} strategy") | |
| # Execute synthesis based on strategy | |
| if synthesis_strategy == "single_agent": | |
| final_result = self._synthesize_single_agent(state) | |
| elif synthesis_strategy == "multi_agent_consensus": | |
| final_result = self._synthesize_multi_agent_consensus(state) | |
| elif synthesis_strategy == "confidence_weighted": | |
| final_result = self._synthesize_confidence_weighted(state) | |
| elif synthesis_strategy == "llm_synthesis": | |
| final_result = self._synthesize_with_llm(state) | |
| else: | |
| final_result = self._synthesize_fallback(state) | |
| # Update state with final results | |
| state.final_answer = final_result["answer"] | |
| state.final_confidence = final_result["confidence"] | |
| state.final_reasoning = final_result["reasoning"] | |
| state.answer_source = final_result["source"] | |
| state.is_complete = True | |
| # Check if confidence threshold is met | |
| state.confidence_threshold_met = state.final_confidence >= 0.7 | |
| # Determine if human review is needed | |
| state.requires_human_review = ( | |
| state.final_confidence < 0.5 or | |
| len(state.error_messages) > 0 or | |
| state.difficulty_level >= 3 | |
| ) | |
| logger.info(f"✅ Synthesis complete: confidence={state.final_confidence:.2f}") | |
| state.add_processing_step(f"Synthesizer: Final answer generated (confidence: {state.final_confidence:.2f})") | |
| return state | |
| except Exception as e: | |
| error_msg = f"Synthesis failed: {str(e)}" | |
| state.add_error(error_msg) | |
| logger.error(error_msg) | |
| # Provide fallback answer | |
| state.final_answer = "Processing failed due to synthesis error" | |
| state.final_confidence = 0.0 | |
| state.final_reasoning = error_msg | |
| state.answer_source = "error_fallback" | |
| state.is_complete = True | |
| state.requires_human_review = True | |
| return state | |
| def _determine_synthesis_strategy(self, state: GAIAAgentState) -> str: | |
| """Determine the best synthesis strategy based on available results""" | |
| successful_results = [r for r in state.agent_results.values() if r.success] | |
| if len(successful_results) == 0: | |
| return "fallback" | |
| elif len(successful_results) == 1: | |
| return "single_agent" | |
| elif len(successful_results) == 2: | |
| return "confidence_weighted" | |
| elif all(r.confidence > 0.6 for r in successful_results): | |
| return "multi_agent_consensus" | |
| else: | |
| return "llm_synthesis" | |
| def _synthesize_single_agent(self, state: GAIAAgentState) -> Dict[str, Any]: | |
| """Synthesize result from a single agent""" | |
| successful_results = [r for r in state.agent_results.values() if r.success] | |
| if not successful_results: | |
| return self._create_fallback_result("No successful agent results") | |
| best_result = max(successful_results, key=lambda r: r.confidence) | |
| return { | |
| "answer": best_result.result, | |
| "confidence": best_result.confidence, | |
| "reasoning": f"Single agent result from {best_result.agent_role.value}: {best_result.reasoning}", | |
| "source": best_result.agent_role.value | |
| } | |
| def _synthesize_multi_agent_consensus(self, state: GAIAAgentState) -> Dict[str, Any]: | |
| """Synthesize results when multiple agents agree (high confidence)""" | |
| successful_results = [r for r in state.agent_results.values() if r.success] | |
| high_confidence_results = [r for r in successful_results if r.confidence > 0.6] | |
| if not high_confidence_results: | |
| return self._synthesize_confidence_weighted(state) | |
| # Use the highest confidence result as primary | |
| primary_result = max(high_confidence_results, key=lambda r: r.confidence) | |
| # Calculate consensus confidence | |
| avg_confidence = mean([r.confidence for r in high_confidence_results]) | |
| consensus_confidence = min(0.95, avg_confidence * 1.1) # Boost for consensus | |
| # Create reasoning summary | |
| agent_summaries = [] | |
| for result in high_confidence_results: | |
| agent_summaries.append(f"{result.agent_role.value} (conf: {result.confidence:.2f})") | |
| reasoning = f"Consensus from {len(high_confidence_results)} agents: {', '.join(agent_summaries)}. Primary result: {primary_result.reasoning}" | |
| return { | |
| "answer": primary_result.result, | |
| "confidence": consensus_confidence, | |
| "reasoning": reasoning, | |
| "source": f"consensus_{len(high_confidence_results)}_agents" | |
| } | |
| def _synthesize_confidence_weighted(self, state: GAIAAgentState) -> Dict[str, Any]: | |
| """Synthesize results using confidence weighting""" | |
| successful_results = [r for r in state.agent_results.values() if r.success] | |
| if not successful_results: | |
| return self._create_fallback_result("No successful results for confidence weighting") | |
| # Weight by confidence | |
| total_weight = sum(r.confidence for r in successful_results) | |
| if total_weight == 0: | |
| return self._synthesize_single_agent(state) | |
| # Select primary result (highest confidence) | |
| primary_result = max(successful_results, key=lambda r: r.confidence) | |
| # Calculate weighted confidence | |
| weighted_confidence = sum(r.confidence ** 2 for r in successful_results) / total_weight | |
| # Create reasoning | |
| result_summaries = [] | |
| for result in successful_results: | |
| weight = result.confidence / total_weight | |
| result_summaries.append(f"{result.agent_role.value} (weight: {weight:.2f})") | |
| reasoning = f"Confidence-weighted synthesis: {', '.join(result_summaries)}. Primary: {primary_result.reasoning}" | |
| return { | |
| "answer": primary_result.result, | |
| "confidence": min(0.9, weighted_confidence), | |
| "reasoning": reasoning, | |
| "source": f"weighted_{len(successful_results)}_agents" | |
| } | |
| def _synthesize_with_llm(self, state: GAIAAgentState) -> Dict[str, Any]: | |
| """Use LLM to synthesize conflicting or complex results""" | |
| successful_results = [r for r in state.agent_results.values() if r.success] | |
| # Prepare synthesis prompt | |
| agent_results_text = [] | |
| for i, result in enumerate(successful_results, 1): | |
| agent_results_text.append(f""" | |
| Agent {i} ({result.agent_role.value}): | |
| - Answer: {result.result} | |
| - Confidence: {result.confidence:.2f} | |
| - Reasoning: {result.reasoning} | |
| """) | |
| synthesis_prompt = f""" | |
| Question: {state.question} | |
| Multiple agents have provided different answers/insights. Please synthesize these into a single, coherent final answer: | |
| {chr(10).join(agent_results_text)} | |
| Please provide: | |
| 1. A clear, direct final answer | |
| 2. Your confidence level (0.0 to 1.0) | |
| 3. Brief reasoning explaining how you synthesized the results | |
| Focus on accuracy and be direct in your response. | |
| """ | |
| # Use complex model for synthesis | |
| model_tier = ModelTier.COMPLEX if state.should_use_complex_model() else ModelTier.MAIN | |
| llm_result = self.llm_client.generate(synthesis_prompt, tier=model_tier, max_tokens=400) | |
| if llm_result.success: | |
| # Parse LLM response for structured output | |
| llm_answer = llm_result.response | |
| # Extract confidence if mentioned in response | |
| confidence_match = re.search(r'confidence[:\s]*([0-9.]+)', llm_answer.lower()) | |
| llm_confidence = float(confidence_match.group(1)) if confidence_match else 0.7 | |
| # Adjust confidence based on input quality | |
| avg_input_confidence = mean([r.confidence for r in successful_results]) | |
| final_confidence = min(0.85, (llm_confidence + avg_input_confidence) / 2) | |
| return { | |
| "answer": llm_answer, | |
| "confidence": final_confidence, | |
| "reasoning": f"LLM synthesis of {len(successful_results)} agent results using {llm_result.model_used}", | |
| "source": "llm_synthesis" | |
| } | |
| else: | |
| # Fallback to confidence weighted if LLM fails | |
| return self._synthesize_confidence_weighted(state) | |
| def _synthesize_fallback(self, state: GAIAAgentState) -> Dict[str, Any]: | |
| """Fallback synthesis when other strategies fail""" | |
| # Try to get any result, even if not successful | |
| all_results = list(state.agent_results.values()) | |
| if all_results: | |
| # Use the result with highest confidence, even if failed | |
| best_attempt = max(all_results, key=lambda r: r.confidence if r.success else 0.0) | |
| if best_attempt.success: | |
| return { | |
| "answer": best_attempt.result, | |
| "confidence": max(0.3, best_attempt.confidence * 0.8), # Reduce confidence for fallback | |
| "reasoning": f"Fallback result from {best_attempt.agent_role.value}: {best_attempt.reasoning}", | |
| "source": f"fallback_{best_attempt.agent_role.value}" | |
| } | |
| else: | |
| return { | |
| "answer": f"Processing encountered difficulties: {best_attempt.result}", | |
| "confidence": 0.2, | |
| "reasoning": f"Fallback from failed attempt by {best_attempt.agent_role.value}", | |
| "source": "failed_fallback" | |
| } | |
| else: | |
| return self._create_fallback_result("No agent results available") | |
| def _create_fallback_result(self, reason: str) -> Dict[str, Any]: | |
| """Create a fallback result when synthesis is impossible""" | |
| return { | |
| "answer": f"Unable to process question: {reason}", | |
| "confidence": 0.0, | |
| "reasoning": f"Synthesis failed: {reason}", | |
| "source": "synthesis_failure" | |
| } | |
| # Import regex for LLM response parsing | |
| import re |