| |
| """ |
| Synthesizer Agent for GAIA Agent System |
| Combines results from multiple agents and produces final answers |
| """ |
|
|
| import logging |
| from typing import Dict, List, Optional, Any |
| from statistics import mean |
|
|
| from agents.state import GAIAAgentState, AgentRole, AgentResult |
| from models.qwen_client import QwenClient, ModelTier |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class SynthesizerAgent: |
| """ |
| Synthesizer agent that combines multiple agent results into a final answer |
| """ |
| |
| def __init__(self, llm_client: QwenClient): |
| self.llm_client = llm_client |
| |
| def process(self, state: GAIAAgentState) -> GAIAAgentState: |
| """ |
| Synthesize final answer from multiple agent results |
| """ |
| logger.info("Synthesizer: Starting result synthesis") |
| state.add_processing_step("Synthesizer: Analyzing agent results") |
| |
| try: |
| |
| if not state.agent_results: |
| error_msg = "No agent results available for synthesis" |
| state.add_error(error_msg) |
| state.final_answer = "Unable to process question - no agent results available" |
| state.final_confidence = 0.0 |
| state.final_reasoning = error_msg |
| state.is_complete = True |
| return state |
| |
| |
| synthesis_strategy = self._determine_synthesis_strategy(state) |
| state.add_processing_step(f"Synthesizer: Using {synthesis_strategy} strategy") |
| |
| |
| if synthesis_strategy == "single_agent": |
| final_result = self._synthesize_single_agent(state) |
| elif synthesis_strategy == "multi_agent_consensus": |
| final_result = self._synthesize_multi_agent_consensus(state) |
| elif synthesis_strategy == "confidence_weighted": |
| final_result = self._synthesize_confidence_weighted(state) |
| elif synthesis_strategy == "llm_synthesis": |
| final_result = self._synthesize_with_llm(state) |
| else: |
| final_result = self._synthesize_fallback(state) |
| |
| |
| state.final_answer = final_result["answer"] |
| state.final_confidence = final_result["confidence"] |
| state.final_reasoning = final_result["reasoning"] |
| state.answer_source = final_result["source"] |
| state.is_complete = True |
| |
| |
| state.confidence_threshold_met = state.final_confidence >= 0.7 |
| |
| |
| state.requires_human_review = ( |
| state.final_confidence < 0.5 or |
| len(state.error_messages) > 0 or |
| state.difficulty_level >= 3 |
| ) |
| |
| logger.info(f"✅ Synthesis complete: confidence={state.final_confidence:.2f}") |
| state.add_processing_step(f"Synthesizer: Final answer generated (confidence: {state.final_confidence:.2f})") |
| |
| return state |
| |
| except Exception as e: |
| error_msg = f"Synthesis failed: {str(e)}" |
| state.add_error(error_msg) |
| logger.error(error_msg) |
| |
| |
| state.final_answer = "Processing failed due to synthesis error" |
| state.final_confidence = 0.0 |
| state.final_reasoning = error_msg |
| state.answer_source = "error_fallback" |
| state.is_complete = True |
| state.requires_human_review = True |
| |
| return state |
| |
| def _determine_synthesis_strategy(self, state: GAIAAgentState) -> str: |
| """Determine the best synthesis strategy based on available results""" |
| |
| successful_results = [r for r in state.agent_results.values() if r.success] |
| |
| if len(successful_results) == 0: |
| return "fallback" |
| elif len(successful_results) == 1: |
| return "single_agent" |
| elif len(successful_results) == 2: |
| return "confidence_weighted" |
| elif all(r.confidence > 0.6 for r in successful_results): |
| return "multi_agent_consensus" |
| else: |
| return "llm_synthesis" |
| |
| def _synthesize_single_agent(self, state: GAIAAgentState) -> Dict[str, Any]: |
| """Synthesize result from a single agent""" |
| |
| successful_results = [r for r in state.agent_results.values() if r.success] |
| if not successful_results: |
| return self._create_fallback_result("No successful agent results") |
| |
| best_result = max(successful_results, key=lambda r: r.confidence) |
| |
| return { |
| "answer": best_result.result, |
| "confidence": best_result.confidence, |
| "reasoning": f"Single agent result from {best_result.agent_role.value}: {best_result.reasoning}", |
| "source": best_result.agent_role.value |
| } |
| |
| def _synthesize_multi_agent_consensus(self, state: GAIAAgentState) -> Dict[str, Any]: |
| """Synthesize results when multiple agents agree (high confidence)""" |
| |
| successful_results = [r for r in state.agent_results.values() if r.success] |
| high_confidence_results = [r for r in successful_results if r.confidence > 0.6] |
| |
| if not high_confidence_results: |
| return self._synthesize_confidence_weighted(state) |
| |
| |
| primary_result = max(high_confidence_results, key=lambda r: r.confidence) |
| |
| |
| avg_confidence = mean([r.confidence for r in high_confidence_results]) |
| consensus_confidence = min(0.95, avg_confidence * 1.1) |
| |
| |
| agent_summaries = [] |
| for result in high_confidence_results: |
| agent_summaries.append(f"{result.agent_role.value} (conf: {result.confidence:.2f})") |
| |
| reasoning = f"Consensus from {len(high_confidence_results)} agents: {', '.join(agent_summaries)}. Primary result: {primary_result.reasoning}" |
| |
| return { |
| "answer": primary_result.result, |
| "confidence": consensus_confidence, |
| "reasoning": reasoning, |
| "source": f"consensus_{len(high_confidence_results)}_agents" |
| } |
| |
| def _synthesize_confidence_weighted(self, state: GAIAAgentState) -> Dict[str, Any]: |
| """Synthesize results using confidence weighting""" |
| |
| successful_results = [r for r in state.agent_results.values() if r.success] |
| |
| if not successful_results: |
| return self._create_fallback_result("No successful results for confidence weighting") |
| |
| |
| total_weight = sum(r.confidence for r in successful_results) |
| if total_weight == 0: |
| return self._synthesize_single_agent(state) |
| |
| |
| primary_result = max(successful_results, key=lambda r: r.confidence) |
| |
| |
| weighted_confidence = sum(r.confidence ** 2 for r in successful_results) / total_weight |
| |
| |
| result_summaries = [] |
| for result in successful_results: |
| weight = result.confidence / total_weight |
| result_summaries.append(f"{result.agent_role.value} (weight: {weight:.2f})") |
| |
| reasoning = f"Confidence-weighted synthesis: {', '.join(result_summaries)}. Primary: {primary_result.reasoning}" |
| |
| return { |
| "answer": primary_result.result, |
| "confidence": min(0.9, weighted_confidence), |
| "reasoning": reasoning, |
| "source": f"weighted_{len(successful_results)}_agents" |
| } |
| |
| def _synthesize_with_llm(self, state: GAIAAgentState) -> Dict[str, Any]: |
| """Use LLM to synthesize conflicting or complex results""" |
| |
| successful_results = [r for r in state.agent_results.values() if r.success] |
| |
| |
| agent_results_text = [] |
| for i, result in enumerate(successful_results, 1): |
| agent_results_text.append(f""" |
| Agent {i} ({result.agent_role.value}): |
| - Answer: {result.result} |
| - Confidence: {result.confidence:.2f} |
| - Reasoning: {result.reasoning} |
| """) |
| |
| synthesis_prompt = f""" |
| Question: {state.question} |
| |
| Multiple agents have provided different answers/insights. Please synthesize these into a single, coherent final answer: |
| |
| {chr(10).join(agent_results_text)} |
| |
| Please provide: |
| 1. A clear, direct final answer |
| 2. Your confidence level (0.0 to 1.0) |
| 3. Brief reasoning explaining how you synthesized the results |
| |
| Focus on accuracy and be direct in your response. |
| """ |
| |
| |
| model_tier = ModelTier.COMPLEX if state.should_use_complex_model() else ModelTier.MAIN |
| llm_result = self.llm_client.generate(synthesis_prompt, tier=model_tier, max_tokens=400) |
| |
| if llm_result.success: |
| |
| llm_answer = llm_result.response |
| |
| |
| confidence_match = re.search(r'confidence[:\s]*([0-9.]+)', llm_answer.lower()) |
| llm_confidence = float(confidence_match.group(1)) if confidence_match else 0.7 |
| |
| |
| avg_input_confidence = mean([r.confidence for r in successful_results]) |
| final_confidence = min(0.85, (llm_confidence + avg_input_confidence) / 2) |
| |
| return { |
| "answer": llm_answer, |
| "confidence": final_confidence, |
| "reasoning": f"LLM synthesis of {len(successful_results)} agent results using {llm_result.model_used}", |
| "source": "llm_synthesis" |
| } |
| else: |
| |
| return self._synthesize_confidence_weighted(state) |
| |
| def _synthesize_fallback(self, state: GAIAAgentState) -> Dict[str, Any]: |
| """Fallback synthesis when other strategies fail""" |
| |
| |
| all_results = list(state.agent_results.values()) |
| |
| if all_results: |
| |
| best_attempt = max(all_results, key=lambda r: r.confidence if r.success else 0.0) |
| |
| if best_attempt.success: |
| return { |
| "answer": best_attempt.result, |
| "confidence": max(0.3, best_attempt.confidence * 0.8), |
| "reasoning": f"Fallback result from {best_attempt.agent_role.value}: {best_attempt.reasoning}", |
| "source": f"fallback_{best_attempt.agent_role.value}" |
| } |
| else: |
| return { |
| "answer": f"Processing encountered difficulties: {best_attempt.result}", |
| "confidence": 0.2, |
| "reasoning": f"Fallback from failed attempt by {best_attempt.agent_role.value}", |
| "source": "failed_fallback" |
| } |
| else: |
| return self._create_fallback_result("No agent results available") |
| |
| def _create_fallback_result(self, reason: str) -> Dict[str, Any]: |
| """Create a fallback result when synthesis is impossible""" |
| return { |
| "answer": f"Unable to process question: {reason}", |
| "confidence": 0.0, |
| "reasoning": f"Synthesis failed: {reason}", |
| "source": "synthesis_failure" |
| } |
|
|
| |
| import re |