Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Web Research Agent for GAIA Agent System | |
| Handles Wikipedia and web search questions with intelligent search strategies | |
| """ | |
| import re | |
| import logging | |
| from typing import Dict, List, Optional, Any | |
| from urllib.parse import urlparse | |
| from agents.state import GAIAAgentState, AgentRole, AgentResult, ToolResult | |
| from models.qwen_client import QwenClient, ModelTier | |
| from tools.wikipedia_tool import WikipediaTool | |
| from tools.web_search_tool import WebSearchTool | |
| logger = logging.getLogger(__name__) | |
| class WebResearchAgent: | |
| """ | |
| Specialized agent for web research tasks | |
| Uses Wikipedia and web search tools with intelligent routing | |
| """ | |
| def __init__(self, llm_client: QwenClient): | |
| self.llm_client = llm_client | |
| self.wikipedia_tool = WikipediaTool() | |
| self.web_search_tool = WebSearchTool() | |
| def process(self, state: GAIAAgentState) -> GAIAAgentState: | |
| """ | |
| Enhanced multi-step research processing with systematic problem decomposition | |
| """ | |
| logger.info(f"Web researcher processing: {state.question[:100]}...") | |
| state.add_processing_step("Web Researcher: Starting enhanced multi-step research") | |
| try: | |
| # Step 1: Analyze router's decomposition if available | |
| router_analysis = getattr(state, 'router_analysis', None) | |
| if router_analysis: | |
| state.add_processing_step("Web Researcher: Using router analysis") | |
| research_plan = self._build_research_plan_from_router(state.question, router_analysis) | |
| else: | |
| state.add_processing_step("Web Researcher: Creating independent research plan") | |
| research_plan = self._create_independent_research_plan(state.question) | |
| # Step 2: Execute research plan with iterative refinement | |
| results = self._execute_research_plan(state, research_plan) | |
| # Step 3: Evaluate results and refine if needed | |
| if not results or results.confidence < 0.4: | |
| logger.info("Initial research insufficient, attempting refinement") | |
| state.add_processing_step("Web Researcher: Refining research approach") | |
| refined_plan = self._refine_research_plan(state.question, research_plan, results) | |
| results = self._execute_research_plan(state, refined_plan) | |
| # Step 4: Finalize results | |
| if not results or not isinstance(results, AgentResult): | |
| results = self._create_basic_response(state, "Multi-step research completed with limited results") | |
| # Add result to state | |
| state.add_agent_result(results) | |
| state.add_processing_step(f"Web Researcher: Completed with confidence {results.confidence:.2f}") | |
| return state | |
| except Exception as e: | |
| error_msg = f"Enhanced web research failed: {str(e)}" | |
| state.add_error(error_msg) | |
| logger.error(error_msg) | |
| # Create failure result but ensure system continues | |
| failure_result = AgentResult( | |
| agent_role=AgentRole.WEB_RESEARCHER, | |
| success=False, | |
| result=f"Research encountered difficulties: {str(e)}", | |
| confidence=0.1, | |
| reasoning=f"Exception during enhanced web research: {str(e)}", | |
| tools_used=[], | |
| model_used="error", | |
| processing_time=0.0, | |
| cost_estimate=0.0 | |
| ) | |
| state.add_agent_result(failure_result) | |
| return state | |
| def _build_research_plan_from_router(self, question: str, router_analysis: Dict[str, Any]) -> Dict[str, Any]: | |
| """Build research plan using router's structural analysis""" | |
| structural = router_analysis.get('structural', {}) | |
| requirements = router_analysis.get('requirements', {}) | |
| strategy = router_analysis.get('strategy', {}) | |
| plan = { | |
| 'question_type': structural.get('type', 'unknown'), | |
| 'primary_need': requirements.get('primary_need', 'factual_lookup'), | |
| 'data_sources': structural.get('data_sources', []), | |
| 'approach': strategy.get('approach', 'sequential'), | |
| 'steps': [], | |
| 'fallback_strategies': [] | |
| } | |
| # Build step-by-step research plan | |
| if plan['question_type'] == 'quantitative': | |
| plan['steps'] = [ | |
| {'action': 'identify_entity', 'details': 'Extract the main subject/entity'}, | |
| {'action': 'gather_data', 'details': 'Find relevant numerical data'}, | |
| {'action': 'verify_timeframe', 'details': 'Ensure data matches time constraints'}, | |
| {'action': 'extract_count', 'details': 'Extract specific count/quantity'} | |
| ] | |
| elif plan['question_type'] == 'identification': | |
| plan['steps'] = [ | |
| {'action': 'parse_subject', 'details': 'Identify what/who to find'}, | |
| {'action': 'context_search', 'details': 'Search for relevant context'}, | |
| {'action': 'verify_identity', 'details': 'Confirm identity from sources'} | |
| ] | |
| else: | |
| plan['steps'] = [ | |
| {'action': 'decompose_query', 'details': 'Break down complex question'}, | |
| {'action': 'research_components', 'details': 'Research each component'}, | |
| {'action': 'synthesize_findings', 'details': 'Combine results'} | |
| ] | |
| # Add fallback strategies | |
| plan['fallback_strategies'] = [ | |
| 'broaden_search_terms', | |
| 'try_alternative_sources', | |
| 'use_partial_information' | |
| ] | |
| return plan | |
| def _create_independent_research_plan(self, question: str) -> Dict[str, Any]: | |
| """Create research plan when router analysis isn't available""" | |
| # Analyze question independently | |
| plan = { | |
| 'question_type': 'general_research', | |
| 'primary_need': 'factual_lookup', | |
| 'data_sources': [], | |
| 'approach': 'sequential', | |
| 'steps': [], | |
| 'fallback_strategies': [] | |
| } | |
| question_lower = question.lower() | |
| # Determine research approach based on question patterns | |
| if any(term in question_lower for term in ['how many', 'count', 'number']): | |
| plan['question_type'] = 'quantitative' | |
| plan['steps'] = [ | |
| {'action': 'extract_entity', 'details': 'Find the main subject'}, | |
| {'action': 'search_entity_data', 'details': 'Search for subject information'}, | |
| {'action': 'extract_quantities', 'details': 'Find numerical data'}, | |
| {'action': 'apply_constraints', 'details': 'Apply time/condition filters'} | |
| ] | |
| elif any(term in question_lower for term in ['who', 'name', 'identity']): | |
| plan['question_type'] = 'identification' | |
| plan['steps'] = [ | |
| {'action': 'parse_context', 'details': 'Understand context clues'}, | |
| {'action': 'search_individuals', 'details': 'Search for people/entities'}, | |
| {'action': 'verify_match', 'details': 'Confirm identity match'} | |
| ] | |
| elif any(term in question_lower for term in ['wikipedia', 'article']): | |
| plan['question_type'] = 'wikipedia_specific' | |
| plan['data_sources'] = ['wikipedia'] | |
| plan['steps'] = [ | |
| {'action': 'extract_topic', 'details': 'Identify Wikipedia topic'}, | |
| {'action': 'search_wikipedia', 'details': 'Search Wikipedia directly'}, | |
| {'action': 'extract_metadata', 'details': 'Get article details'} | |
| ] | |
| else: | |
| plan['steps'] = [ | |
| {'action': 'analyze_question', 'details': 'Break down question components'}, | |
| {'action': 'multi_source_search', 'details': 'Search multiple sources'}, | |
| {'action': 'consolidate_results', 'details': 'Combine findings'} | |
| ] | |
| # Standard fallback strategies | |
| plan['fallback_strategies'] = [ | |
| 'simplify_search_terms', | |
| 'try_broader_keywords', | |
| 'search_related_topics' | |
| ] | |
| return plan | |
| def _execute_research_plan(self, state: GAIAAgentState, plan: Dict[str, Any]) -> AgentResult: | |
| """Execute the research plan step by step""" | |
| logger.info(f"Executing research plan: {plan['question_type']} with {len(plan['steps'])} steps") | |
| accumulated_results = [] | |
| total_processing_time = 0.0 | |
| total_cost = 0.0 | |
| for i, step in enumerate(plan['steps'], 1): | |
| logger.info(f"Step {i}/{len(plan['steps'])}: {step['action']} - {step['details']}") | |
| state.add_processing_step(f"Web Research Step {i}: {step['action']}") | |
| try: | |
| step_result = self._execute_research_step(state, step, plan, accumulated_results) | |
| if step_result: | |
| accumulated_results.append(step_result) | |
| total_processing_time += getattr(step_result, 'execution_time', 0.0) | |
| total_cost += getattr(step_result, 'cost_estimate', 0.0) | |
| except Exception as e: | |
| logger.warning(f"Step {i} failed: {e}, continuing with next step") | |
| state.add_processing_step(f"Web Research Step {i}: Failed - {str(e)}") | |
| continue | |
| # Synthesize accumulated results | |
| if accumulated_results: | |
| return self._synthesize_research_results(state, accumulated_results, plan, total_processing_time, total_cost) | |
| else: | |
| return self._create_failure_result("All research steps failed") | |
| def _execute_research_step(self, state: GAIAAgentState, step: Dict[str, Any], | |
| plan: Dict[str, Any], previous_results: List) -> Any: | |
| """Execute a single research step""" | |
| action = step['action'] | |
| if action == 'extract_entity' or action == 'identify_entity': | |
| return self._extract_main_entity(state.question) | |
| elif action == 'search_entity_data' or action == 'gather_data': | |
| entity = self._get_entity_from_results(previous_results) | |
| return self._search_entity_information(entity, state.question) | |
| elif action == 'extract_quantities' or action == 'extract_count': | |
| return self._extract_numerical_data(previous_results, state.question) | |
| elif action == 'search_wikipedia': | |
| topic = self._extract_wikipedia_topic(state.question) | |
| return self.wikipedia_tool.execute(topic) | |
| elif action == 'multi_source_search': | |
| search_terms = self._extract_search_terms(state.question) | |
| return self._research_multi_source_enhanced(state, search_terms) | |
| else: | |
| # Default: general web search | |
| search_terms = self._extract_search_terms(state.question) | |
| return self.web_search_tool.execute(search_terms) | |
| def _extract_main_entity(self, question: str) -> Dict[str, Any]: | |
| """Extract the main entity/subject from the question""" | |
| # Use simple heuristics and patterns to extract main entity | |
| import re | |
| # Look for quoted entities | |
| quoted = re.findall(r'"([^"]+)"', question) | |
| if quoted: | |
| return {'type': 'quoted_entity', 'entity': quoted[0], 'confidence': 0.9} | |
| # Look for proper nouns (capitalized words) | |
| words = question.split() | |
| proper_nouns = [] | |
| for word in words: | |
| clean_word = re.sub(r'[^\w]', '', word) | |
| if clean_word and clean_word[0].isupper() and len(clean_word) > 1: | |
| proper_nouns.append(clean_word) | |
| if proper_nouns: | |
| entity = ' '.join(proper_nouns[:3]) # Take first few proper nouns | |
| return {'type': 'proper_noun', 'entity': entity, 'confidence': 0.7} | |
| # Fallback: use question keywords | |
| keywords = self._extract_search_terms(question, max_length=50) | |
| return {'type': 'keywords', 'entity': keywords, 'confidence': 0.5} | |
| def _search_entity_information(self, entity_data: Dict[str, Any], question: str) -> Any: | |
| """Search for information about the extracted entity""" | |
| if not entity_data or 'entity' not in entity_data: | |
| return None | |
| entity = entity_data['entity'] | |
| # Try Wikipedia first for entities | |
| wiki_result = self.wikipedia_tool.execute(entity) | |
| if wiki_result.success and wiki_result.result.get('found'): | |
| return wiki_result | |
| # Fallback to web search | |
| search_query = f"{entity} {self._extract_search_terms(question, max_length=30)}" | |
| return self.web_search_tool.execute(search_query) | |
| def _extract_numerical_data(self, previous_results: List, question: str) -> Dict[str, Any]: | |
| """Extract numerical data from previous search results""" | |
| numerical_data = { | |
| 'numbers_found': [], | |
| 'context': [], | |
| 'confidence': 0.0 | |
| } | |
| for result in previous_results: | |
| if hasattr(result, 'result') and result.result: | |
| text = str(result.result) | |
| # Extract numbers with context | |
| import re | |
| number_patterns = [ | |
| r'\b(\d+)\s*(albums?|songs?|tracks?|releases?)\b', | |
| r'\b(\d+)\s*(studio|live|compilation)\s*(albums?)\b', | |
| r'\bbetween\s*(\d{4})\s*and\s*(\d{4})\b', | |
| r'\b(\d+)\b' # Any number as fallback | |
| ] | |
| for pattern in number_patterns: | |
| matches = re.findall(pattern, text, re.IGNORECASE) | |
| for match in matches: | |
| if isinstance(match, tuple): | |
| numerical_data['numbers_found'].extend(match) | |
| else: | |
| numerical_data['numbers_found'].append(match) | |
| if numerical_data['numbers_found']: | |
| numerical_data['confidence'] = 0.8 | |
| return numerical_data | |
| def _get_entity_from_results(self, results: List) -> str: | |
| """Extract entity name from previous results""" | |
| for result in results: | |
| if isinstance(result, dict) and 'entity' in result: | |
| return result['entity'] | |
| return "" | |
| def _research_multi_source_enhanced(self, state: GAIAAgentState, search_terms: str) -> Any: | |
| """Enhanced multi-source research with systematic approach""" | |
| sources_tried = [] | |
| # Try Wikipedia first for factual information | |
| wiki_result = self.wikipedia_tool.execute(search_terms) | |
| if wiki_result.success and wiki_result.result.get('found'): | |
| sources_tried.append(('Wikipedia', wiki_result)) | |
| # Try web search for additional information | |
| web_result = self.web_search_tool.execute({ | |
| "query": search_terms, | |
| "action": "search", | |
| "limit": 3 | |
| }) | |
| if web_result.success and web_result.result.get('found'): | |
| sources_tried.append(('Web', web_result)) | |
| return {'sources': sources_tried, 'primary_terms': search_terms} | |
| def _synthesize_research_results(self, state: GAIAAgentState, results: List, plan: Dict[str, Any], | |
| total_time: float, total_cost: float) -> AgentResult: | |
| """Synthesize results from multi-step research""" | |
| # Combine information from all steps | |
| combined_info = [] | |
| confidence_scores = [] | |
| for result in results: | |
| if hasattr(result, 'result'): | |
| combined_info.append(str(result.result)) | |
| if hasattr(result, 'confidence'): | |
| confidence_scores.append(result.confidence) | |
| elif isinstance(result, dict): | |
| combined_info.append(str(result)) | |
| confidence_scores.append(0.5) # Default confidence | |
| # Create synthesis prompt | |
| synthesis_prompt = f""" | |
| Based on multi-step research for this question, provide a direct answer: | |
| Question: {state.question} | |
| Research Plan Type: {plan['question_type']} | |
| Research Findings: | |
| {chr(10).join(f"Step {i+1}: {info}" for i, info in enumerate(combined_info))} | |
| Please provide a direct, precise answer based on the research findings. | |
| """ | |
| # Use appropriate model for synthesis | |
| model_tier = ModelTier.COMPLEX if len(results) > 2 else ModelTier.MAIN | |
| llm_result = self.llm_client.generate(synthesis_prompt, tier=model_tier, max_tokens=300) | |
| avg_confidence = sum(confidence_scores) / len(confidence_scores) if confidence_scores else 0.5 | |
| if llm_result.success: | |
| return AgentResult( | |
| agent_role=AgentRole.WEB_RESEARCHER, | |
| success=True, | |
| result=llm_result.response, | |
| confidence=min(0.85, avg_confidence + 0.1), # Boost for multi-step research | |
| reasoning=f"Multi-step research completed with {len(results)} steps: {plan['question_type']}", | |
| tools_used=[], | |
| model_used=llm_result.model_used, | |
| processing_time=total_time + llm_result.response_time, | |
| cost_estimate=total_cost + llm_result.cost_estimate | |
| ) | |
| else: | |
| # Fallback to best single result | |
| best_info = combined_info[0] if combined_info else "Multi-step research completed" | |
| return AgentResult( | |
| agent_role=AgentRole.WEB_RESEARCHER, | |
| success=True, | |
| result=best_info, | |
| confidence=avg_confidence, | |
| reasoning=f"Multi-step research completed, synthesis failed", | |
| tools_used=[], | |
| model_used="fallback", | |
| processing_time=total_time, | |
| cost_estimate=total_cost | |
| ) | |
| def _refine_research_plan(self, question: str, original_plan: Dict[str, Any], | |
| previous_result: AgentResult) -> Dict[str, Any]: | |
| """Refine research plan when initial attempt yields poor results""" | |
| refined_plan = original_plan.copy() | |
| # Add refinement strategies based on why previous attempt failed | |
| if previous_result and previous_result.confidence < 0.3: | |
| # Very low confidence - try different approach | |
| refined_plan['steps'] = [ | |
| {'action': 'broaden_search', 'details': 'Use broader search terms'}, | |
| {'action': 'alternative_sources', 'details': 'Try different information sources'}, | |
| {'action': 'relaxed_matching', 'details': 'Accept partial matches'} | |
| ] | |
| elif not previous_result or not previous_result.success: | |
| # Complete failure - simplify approach | |
| refined_plan['steps'] = [ | |
| {'action': 'simple_search', 'details': 'Basic web search with key terms'}, | |
| {'action': 'extract_any_info', 'details': 'Extract any relevant information'} | |
| ] | |
| refined_plan['refinement_attempt'] = True | |
| return refined_plan | |
| def _determine_research_strategy(self, question: str, file_name: Optional[str] = None) -> str: | |
| """Determine the best research strategy for the question""" | |
| question_lower = question.lower() | |
| # Direct Wikipedia references | |
| if any(term in question_lower for term in ['wikipedia', 'featured article', 'promoted']): | |
| if 'search' in question_lower or 'find' in question_lower: | |
| return "wikipedia_search" | |
| else: | |
| return "wikipedia_direct" | |
| # YouTube video analysis | |
| if any(term in question_lower for term in ['youtube', 'video', 'watch?v=', 'youtu.be']): | |
| return "youtube_analysis" | |
| # URL content extraction | |
| urls = re.findall(r'https?://[^\s]+', question) | |
| if urls: | |
| return "url_extraction" | |
| # General web search for current events, news, recent information | |
| if any(term in question_lower for term in ['news', 'recent', 'latest', 'current', 'today', '2024', '2025']): | |
| return "web_search" | |
| # Multi-source research for complex questions | |
| if len(question.split()) > 20 or '?' in question and question.count('?') > 1: | |
| return "multi_source" | |
| # Default to Wikipedia search for informational questions | |
| return "wikipedia_search" | |
| def _research_wikipedia_direct(self, state: GAIAAgentState) -> AgentResult: | |
| """Research using direct Wikipedia lookup""" | |
| # Extract topic from question | |
| topic = self._extract_wikipedia_topic(state.question) | |
| logger.info(f"Wikipedia direct research for: {topic}") | |
| # Search Wikipedia | |
| wiki_result = self.wikipedia_tool.execute(topic) | |
| if wiki_result.success and wiki_result.result.get('found'): | |
| wiki_data = wiki_result.result['result'] | |
| # Use LLM to analyze and answer the question | |
| analysis_prompt = f""" | |
| Based on this Wikipedia information about {topic}, please answer the following question: | |
| Question: {state.question} | |
| Wikipedia Summary: {wiki_data.get('summary', '')} | |
| Wikipedia URL: {wiki_data.get('url', '')} | |
| Please provide a direct, accurate answer based on the Wikipedia information. | |
| """ | |
| # Use appropriate model tier | |
| model_tier = ModelTier.MAIN if state.complexity_assessment == "complex" else ModelTier.ROUTER | |
| llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=400) | |
| if llm_result.success: | |
| confidence = 0.85 if wiki_data.get('title') == topic else 0.75 | |
| return AgentResult( | |
| agent_role=AgentRole.WEB_RESEARCHER, | |
| success=True, | |
| result=llm_result.response, | |
| confidence=confidence, | |
| reasoning=f"Found Wikipedia article for '{topic}' and analyzed content", | |
| tools_used=[ToolResult( | |
| tool_name="wikipedia", | |
| success=True, | |
| result=wiki_data, | |
| execution_time=wiki_result.execution_time | |
| )], | |
| model_used=llm_result.model_used, | |
| processing_time=wiki_result.execution_time + llm_result.response_time, | |
| cost_estimate=llm_result.cost_estimate | |
| ) | |
| else: | |
| # Return Wikipedia summary as fallback | |
| return AgentResult( | |
| agent_role=AgentRole.WEB_RESEARCHER, | |
| success=True, | |
| result=wiki_data.get('summary', 'Wikipedia information found but analysis failed'), | |
| confidence=0.60, | |
| reasoning="Wikipedia found but LLM analysis failed", | |
| tools_used=[ToolResult( | |
| tool_name="wikipedia", | |
| success=True, | |
| result=wiki_data, | |
| execution_time=wiki_result.execution_time | |
| )], | |
| model_used="fallback", | |
| processing_time=wiki_result.execution_time, | |
| cost_estimate=0.0 | |
| ) | |
| else: | |
| # Wikipedia not found, try web search as fallback | |
| return self._research_web_fallback(state, f"Wikipedia not found for '{topic}'") | |
| def _research_wikipedia_search(self, state: GAIAAgentState) -> AgentResult: | |
| """Research using Wikipedia search functionality""" | |
| # Extract search terms | |
| search_terms = self._extract_search_terms(state.question) | |
| logger.info(f"Wikipedia search for: {search_terms}") | |
| # Search Wikipedia | |
| search_query = {"query": search_terms, "action": "summary"} | |
| wiki_result = self.wikipedia_tool.execute(search_query) | |
| if wiki_result.success and wiki_result.result.get('found'): | |
| return self._analyze_wikipedia_result(state, wiki_result) | |
| else: | |
| # Try web search as fallback | |
| return self._research_web_fallback(state, f"Wikipedia search failed for '{search_terms}'") | |
| def _research_youtube(self, state: GAIAAgentState) -> AgentResult: | |
| """Research YouTube video information""" | |
| # Extract YouTube URL or search terms | |
| youtube_query = self._extract_youtube_info(state.question) | |
| logger.info(f"YouTube research for: {youtube_query}") | |
| # Use web search tool's YouTube functionality | |
| if youtube_query.startswith('http'): | |
| # Direct YouTube URL | |
| web_result = self.web_search_tool.execute({ | |
| "query": youtube_query, | |
| "action": "extract" | |
| }) | |
| else: | |
| # Search for YouTube videos | |
| web_result = self.web_search_tool.execute(f"site:youtube.com {youtube_query}") | |
| if web_result.success and web_result.result.get('found'): | |
| return self._analyze_youtube_result(state, web_result) | |
| else: | |
| return self._create_failure_result("YouTube research failed") | |
| def _research_web_general(self, state: GAIAAgentState) -> AgentResult: | |
| """General web search research""" | |
| search_terms = self._extract_search_terms(state.question) | |
| logger.info(f"Web search for: {search_terms}") | |
| # Perform web search | |
| web_result = self.web_search_tool.execute({ | |
| "query": search_terms, | |
| "action": "search", | |
| "limit": 5 | |
| }) | |
| if web_result.success and web_result.result.get('found'): | |
| return self._analyze_web_search_result(state, web_result) | |
| else: | |
| return self._create_failure_result("Web search failed") | |
| def _research_url_content(self, state: GAIAAgentState) -> AgentResult: | |
| """Extract and analyze content from specific URLs""" | |
| urls = re.findall(r'https?://[^\s]+', state.question) | |
| if not urls: | |
| return self._create_failure_result("No URLs found in question") | |
| url = urls[0] # Use first URL | |
| logger.info(f"Extracting content from: {url}") | |
| # Extract content from URL | |
| web_result = self.web_search_tool.execute({ | |
| "query": url, | |
| "action": "extract" | |
| }) | |
| if web_result.success and web_result.result.get('found'): | |
| return self._analyze_url_content_result(state, web_result) | |
| else: | |
| return self._create_failure_result(f"Failed to extract content from {url}") | |
| def _research_multi_source(self, state: GAIAAgentState) -> AgentResult: | |
| """Multi-source research combining Wikipedia and web search""" | |
| search_terms = self._extract_search_terms(state.question) | |
| logger.info(f"Multi-source research for: {search_terms}") | |
| sources = [] | |
| # Try Wikipedia first | |
| wiki_result = self.wikipedia_tool.execute(search_terms) | |
| if wiki_result.success and wiki_result.result.get('found'): | |
| sources.append(("Wikipedia", wiki_result.result['result'])) | |
| # Add web search results | |
| web_result = self.web_search_tool.execute({ | |
| "query": search_terms, | |
| "action": "search", | |
| "limit": 3 | |
| }) | |
| if web_result.success and web_result.result.get('found'): | |
| for result in web_result.result['results'][:2]: # Use top 2 web results | |
| sources.append(("Web", result)) | |
| if sources: | |
| return self._analyze_multi_source_result(state, sources) | |
| else: | |
| return self._create_failure_result("All research sources failed") | |
| def _research_web_fallback(self, state: GAIAAgentState, reason: str) -> AgentResult: | |
| """Fallback to web search when other methods fail""" | |
| logger.info(f"Web search fallback: {reason}") | |
| search_terms = self._extract_search_terms(state.question) | |
| web_result = self.web_search_tool.execute(search_terms) | |
| if web_result.success and web_result.result.get('found'): | |
| result = self._analyze_web_search_result(state, web_result) | |
| result.reasoning = f"{reason}. Used web search fallback." | |
| result.confidence = max(0.3, result.confidence - 0.2) # Lower confidence for fallback | |
| return result | |
| else: | |
| return self._create_failure_result(f"Fallback failed: {reason}") | |
| def _research_fallback_strategy(self, state: GAIAAgentState, original_error: str) -> AgentResult: | |
| """Enhanced fallback strategy when primary research fails""" | |
| logger.info("Executing fallback research strategy") | |
| # Try simple web search as universal fallback | |
| try: | |
| search_terms = self._extract_search_terms(state.question) | |
| web_result = self.web_search_tool.execute(search_terms) | |
| if web_result.success and web_result.result.get('found'): | |
| # Analyze results with basic processing | |
| search_results = web_result.result.get('results', []) | |
| if search_results: | |
| first_result = search_results[0] | |
| fallback_answer = f"Based on web search: {first_result.get('snippet', 'Limited information available')}" | |
| return AgentResult( | |
| agent_role=AgentRole.WEB_RESEARCHER, | |
| success=True, | |
| result=fallback_answer, | |
| confidence=0.4, # Lower confidence for fallback | |
| reasoning=f"Fallback web search after: {original_error}", | |
| tools_used=[ToolResult( | |
| tool_name="web_search_fallback", | |
| success=True, | |
| result={"summary": "Fallback search completed"}, | |
| execution_time=web_result.execution_time | |
| )], | |
| model_used="fallback", | |
| processing_time=web_result.execution_time, | |
| cost_estimate=0.0 | |
| ) | |
| except Exception as fallback_error: | |
| logger.warning(f"Web search fallback failed: {fallback_error}") | |
| # If all else fails, try basic text processing | |
| return self._create_basic_response(state, f"Fallback failed: {original_error}") | |
| def _create_basic_response(self, state: GAIAAgentState, error_context: str) -> AgentResult: | |
| """Create a basic response when all research methods fail""" | |
| # Try to extract any useful information from the question itself | |
| basic_analysis = f"Unable to conduct external research. Question analysis: {state.question[:100]}" | |
| return AgentResult( | |
| agent_role=AgentRole.WEB_RESEARCHER, | |
| success=False, | |
| result=f"Processing encountered difficulties: {error_context}", | |
| confidence=0.1, | |
| reasoning=f"All research sources failed: {error_context}", | |
| tools_used=[], | |
| model_used="none", | |
| processing_time=0.0, | |
| cost_estimate=0.0 | |
| ) | |
| def _extract_wikipedia_topic(self, question: str) -> str: | |
| """Extract Wikipedia topic from question""" | |
| # Look for quoted terms | |
| quoted = re.findall(r'"([^"]+)"', question) | |
| if quoted: | |
| return quoted[0] | |
| # Look for specific patterns | |
| patterns = [ | |
| r'wikipedia article[s]?\s+(?:about|on|for)\s+([^?.,]+)', | |
| r'featured article[s]?\s+(?:about|on|for)\s+([^?.,]+)', | |
| r'(?:about|on)\s+([A-Z][^?.,]+)', | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, question, re.IGNORECASE) | |
| if match: | |
| return match.group(1).strip() | |
| # Extract main nouns/entities | |
| words = question.split() | |
| topic_words = [] | |
| for word in words: | |
| if word[0].isupper() or len(word) > 6: # Likely important words | |
| topic_words.append(word) | |
| return ' '.join(topic_words[:3]) if topic_words else "topic" | |
| def _extract_search_terms(self, question: str, max_length: int = 100) -> str: | |
| """ | |
| Extract optimized search terms from question | |
| Prioritizes important terms while staying under length limits | |
| """ | |
| # Clean the question first | |
| clean_question = re.sub(r'[^\w\s\-]', ' ', question.lower()) | |
| words = clean_question.split() | |
| # Remove common stop words but keep question words | |
| stop_words = { | |
| 'the', 'a', 'an', 'is', 'are', 'was', 'were', 'be', 'been', 'being', | |
| 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', | |
| 'should', 'may', 'might', 'must', 'shall', 'can', 'to', 'of', 'in', | |
| 'on', 'at', 'by', 'for', 'with', 'from', 'as', 'but', 'or', 'and', | |
| 'if', 'then', 'than', 'this', 'that', 'these', 'those', 'i', 'you', | |
| 'he', 'she', 'it', 'we', 'they', 'me', 'him', 'her', 'us', 'them' | |
| } | |
| # Keep important question words | |
| question_words = {'who', 'what', 'when', 'where', 'why', 'how', 'which'} | |
| # Priority terms (always include if present) | |
| priority_terms = [] | |
| # Extract quoted phrases first | |
| quoted_phrases = re.findall(r'"([^"]*)"', question) | |
| for phrase in quoted_phrases: | |
| if len(phrase.strip()) > 0: | |
| priority_terms.append(phrase.strip()) | |
| # Extract years (4-digit numbers) - capture full years, not just prefixes | |
| years = re.findall(r'\b(?:19|20)\d{2}\b', question) # Changed from capturing group to full match | |
| # Extract proper nouns (capitalized words) - exclude numbers | |
| proper_nouns = [] | |
| for word in question.split(): | |
| clean_word = re.sub(r'[^\w]', '', word) | |
| if (clean_word and | |
| clean_word[0].isupper() and | |
| len(clean_word) > 1 and | |
| not clean_word.isdigit()): # Exclude pure numbers | |
| proper_nouns.append(clean_word) | |
| # Extract other meaningful numbers (but be very selective) | |
| # Only include numbers that are likely meaningful (dates, counts, etc.) | |
| meaningful_numbers = [] | |
| number_matches = re.findall(r'\b\d{1,4}\b', question) | |
| for num in number_matches: | |
| # Skip very common/meaningless numbers and years already captured | |
| if (num not in ['1', '2', '3', '4', '5', '10', '20', '19', '21', '22', '23', '24', '25'] and | |
| num not in years and | |
| len(num) > 1): # Require at least 2 digits for meaningful numbers | |
| # Only include if it appears in a meaningful context | |
| if any(context in question.lower() for context in [ | |
| f'{num} albums', f'{num} songs', f'{num} years', f'{num} people', | |
| f'{num} times', f'{num} days', f'{num} months', f'episode {num}', | |
| f'season {num}', f'volume {num}', f'part {num}' | |
| ]): | |
| meaningful_numbers.append(num) | |
| # Build search terms with priority | |
| search_terms = [] | |
| # Add quoted phrases (highest priority) | |
| search_terms.extend(priority_terms) | |
| # Add proper nouns (high priority) | |
| search_terms.extend(proper_nouns[:5]) # Limit to avoid duplication | |
| # Add question words if present | |
| for word in words: | |
| if word in question_words and word not in search_terms: | |
| search_terms.append(word) | |
| # Add years | |
| search_terms.extend(years[:2]) # Limit to 2 years max | |
| # Add other important terms | |
| for word in words: | |
| if (word not in stop_words and | |
| word not in search_terms and | |
| len(word) > 2 and | |
| not word.isdigit()): # Avoid random numbers | |
| search_terms.append(word) | |
| # Stop if we have enough terms | |
| if len(' '.join(search_terms)) > max_length - 20: | |
| break | |
| # Add a few important numbers if space allows | |
| if len(' '.join(search_terms)) < max_length - 10: | |
| search_terms.extend(meaningful_numbers[:2]) | |
| # Join and clean up | |
| search_query = ' '.join(search_terms) | |
| # Remove duplicates while preserving order | |
| seen = set() | |
| unique_terms = [] | |
| for term in search_terms: | |
| if term.lower() not in seen: | |
| seen.add(term.lower()) | |
| unique_terms.append(term) | |
| # Final cleanup and length check | |
| final_query = ' '.join(unique_terms) | |
| if len(final_query) > max_length: | |
| # Truncate to fit | |
| truncated_terms = [] | |
| current_length = 0 | |
| for term in unique_terms: | |
| if current_length + len(term) + 1 <= max_length: | |
| truncated_terms.append(term) | |
| current_length += len(term) + 1 | |
| else: | |
| break | |
| final_query = ' '.join(truncated_terms) | |
| logger.info(f"📝 Optimized search terms: '{final_query}' from question: '{question[:50]}...'") | |
| return final_query | |
| def _extract_youtube_info(self, question: str) -> str: | |
| """Extract YouTube URL or search terms""" | |
| # Look for YouTube URLs | |
| youtube_urls = re.findall(r'https?://(?:www\.)?youtube\.com/[^\s]+', question) | |
| if youtube_urls: | |
| return youtube_urls[0] | |
| youtube_urls = re.findall(r'https?://youtu\.be/[^\s]+', question) | |
| if youtube_urls: | |
| return youtube_urls[0] | |
| # Extract search terms for YouTube | |
| return self._extract_search_terms(question) | |
| def _analyze_wikipedia_result(self, state: GAIAAgentState, wiki_result: ToolResult) -> AgentResult: | |
| """Analyze Wikipedia result and generate answer""" | |
| wiki_data = wiki_result.result['result'] | |
| analysis_prompt = f""" | |
| Based on this Wikipedia information, please answer the following question: | |
| Question: {state.question} | |
| Wikipedia Information: | |
| Title: {wiki_data.get('title', '')} | |
| Summary: {wiki_data.get('summary', '')} | |
| URL: {wiki_data.get('url', '')} | |
| Please provide a direct, accurate answer. | |
| """ | |
| model_tier = ModelTier.MAIN if len(state.question) > 100 else ModelTier.ROUTER | |
| llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=300) | |
| if llm_result.success: | |
| return AgentResult( | |
| agent_role=AgentRole.WEB_RESEARCHER, | |
| success=True, | |
| result=llm_result.response, | |
| confidence=0.80, | |
| reasoning="Analyzed Wikipedia information to answer question", | |
| tools_used=[wiki_result], | |
| model_used=llm_result.model_used, | |
| processing_time=wiki_result.execution_time + llm_result.response_time, | |
| cost_estimate=llm_result.cost_estimate | |
| ) | |
| else: | |
| return AgentResult( | |
| agent_role=AgentRole.WEB_RESEARCHER, | |
| success=True, | |
| result=wiki_data.get('summary', 'Information found'), | |
| confidence=0.60, | |
| reasoning="Wikipedia found but analysis failed", | |
| tools_used=[wiki_result], | |
| model_used="fallback", | |
| processing_time=wiki_result.execution_time, | |
| cost_estimate=0.0 | |
| ) | |
| def _analyze_youtube_result(self, state: GAIAAgentState, web_result: ToolResult) -> AgentResult: | |
| """Analyze YouTube research result""" | |
| # Implementation for YouTube analysis | |
| return AgentResult( | |
| agent_role=AgentRole.WEB_RESEARCHER, | |
| success=True, | |
| result="YouTube analysis completed", | |
| confidence=0.70, | |
| reasoning="Analyzed YouTube content", | |
| tools_used=[web_result], | |
| model_used="basic", | |
| processing_time=web_result.execution_time, | |
| cost_estimate=0.0 | |
| ) | |
| def _analyze_web_search_result(self, state: GAIAAgentState, web_result: ToolResult) -> AgentResult: | |
| """Analyze web search results""" | |
| search_data = web_result.result | |
| # Handle new search result format | |
| if search_data.get('success') and search_data.get('results'): | |
| search_results = search_data['results'] | |
| # Convert WebSearchResult objects to dictionaries if needed | |
| if search_results and hasattr(search_results[0], 'to_dict'): | |
| search_results = [r.to_dict() for r in search_results] | |
| # Combine top results for analysis | |
| combined_content = [] | |
| for i, result in enumerate(search_results[:3], 1): | |
| combined_content.append(f"Result {i}: {result.get('title', 'No title')}") | |
| combined_content.append(f"URL: {result.get('url', 'No URL')}") | |
| combined_content.append(f"Description: {result.get('snippet', result.get('content', 'No description'))[:200]}") | |
| combined_content.append(f"Source: {result.get('source', 'Unknown')}") | |
| combined_content.append("") | |
| analysis_prompt = f""" | |
| Based on these web search results, please answer the following question: | |
| Question: {state.question} | |
| Search Query: {search_data.get('query', 'N/A')} | |
| Search Engine: {search_data.get('source', 'Unknown')} | |
| Results Found: {search_data.get('count', len(search_results))} | |
| Search Results: | |
| {chr(10).join(combined_content)} | |
| Please provide a direct answer based on the most relevant information. | |
| """ | |
| model_tier = ModelTier.COMPLEX # Use 72B model for better analysis | |
| llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=400) | |
| if llm_result.success: | |
| return AgentResult( | |
| agent_role=AgentRole.WEB_RESEARCHER, | |
| success=True, | |
| result=llm_result.response, | |
| confidence=0.80, # Higher confidence with better model | |
| reasoning=f"Analyzed {len(search_results)} web search results using {search_data.get('source', 'search engine')}", | |
| tools_used=[web_result], | |
| model_used=llm_result.model_used, | |
| processing_time=web_result.execution_time + llm_result.response_time, | |
| cost_estimate=llm_result.cost_estimate | |
| ) | |
| else: | |
| # Fallback to first result description | |
| first_result = search_results[0] if search_results else {} | |
| return AgentResult( | |
| agent_role=AgentRole.WEB_RESEARCHER, | |
| success=True, | |
| result=first_result.get('snippet', first_result.get('content', 'Web search completed')), | |
| confidence=0.50, | |
| reasoning="Web search completed but analysis failed", | |
| tools_used=[web_result], | |
| model_used="fallback", | |
| processing_time=web_result.execution_time, | |
| cost_estimate=0.0 | |
| ) | |
| else: | |
| # Handle search failure or empty results | |
| return AgentResult( | |
| agent_role=AgentRole.WEB_RESEARCHER, | |
| success=False, | |
| result="Web search returned no useful results", | |
| confidence=0.20, | |
| reasoning=f"Search failed or empty: {search_data.get('note', 'Unknown reason')}", | |
| tools_used=[web_result], | |
| model_used="none", | |
| processing_time=web_result.execution_time, | |
| cost_estimate=0.0 | |
| ) | |
| def _analyze_url_content_result(self, state: GAIAAgentState, web_result: ToolResult) -> AgentResult: | |
| """Analyze extracted URL content""" | |
| content_data = web_result.result | |
| analysis_prompt = f""" | |
| Based on this web page content, please answer the following question: | |
| Question: {state.question} | |
| Page Title: {content_data.get('title', '')} | |
| Page URL: {content_data.get('url', '')} | |
| Content: {content_data.get('content', '')[:1000]}... | |
| Please provide a direct answer based on the page content. | |
| """ | |
| model_tier = ModelTier.MAIN | |
| llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=400) | |
| if llm_result.success: | |
| return AgentResult( | |
| agent_role=AgentRole.WEB_RESEARCHER, | |
| success=True, | |
| result=llm_result.response, | |
| confidence=0.85, | |
| reasoning="Analyzed content from specific URL", | |
| tools_used=[web_result], | |
| model_used=llm_result.model_used, | |
| processing_time=web_result.execution_time + llm_result.response_time, | |
| cost_estimate=llm_result.cost_estimate | |
| ) | |
| else: | |
| return AgentResult( | |
| agent_role=AgentRole.WEB_RESEARCHER, | |
| success=True, | |
| result=content_data.get('content', 'Content extracted')[:200], | |
| confidence=0.60, | |
| reasoning="URL content extracted but analysis failed", | |
| tools_used=[web_result], | |
| model_used="fallback", | |
| processing_time=web_result.execution_time, | |
| cost_estimate=0.0 | |
| ) | |
| def _analyze_multi_source_result(self, state: GAIAAgentState, sources: List) -> AgentResult: | |
| """Analyze results from multiple sources""" | |
| source_summaries = [] | |
| for source_type, source_data in sources: | |
| if source_type == "Wikipedia": | |
| source_summaries.append(f"Wikipedia: {source_data.get('summary', '')[:200]}") | |
| else: # Web result | |
| source_summaries.append(f"Web: {source_data.get('snippet', '')[:200]}") | |
| analysis_prompt = f""" | |
| Based on these multiple sources, please answer the following question: | |
| Question: {state.question} | |
| Sources: | |
| {chr(10).join(source_summaries)} | |
| Please synthesize the information and provide a comprehensive answer. | |
| """ | |
| model_tier = ModelTier.COMPLEX # Use best model for multi-source analysis | |
| llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=500) | |
| if llm_result.success: | |
| return AgentResult( | |
| agent_role=AgentRole.WEB_RESEARCHER, | |
| success=True, | |
| result=llm_result.response, | |
| confidence=0.85, | |
| reasoning=f"Synthesized information from {len(sources)} sources", | |
| tools_used=[], | |
| model_used=llm_result.model_used, | |
| processing_time=llm_result.response_time, | |
| cost_estimate=llm_result.cost_estimate | |
| ) | |
| else: | |
| # Fallback to first source | |
| first_source = sources[0][1] if sources else {} | |
| content = first_source.get('summary') or first_source.get('snippet', 'Multi-source research completed') | |
| return AgentResult( | |
| agent_role=AgentRole.WEB_RESEARCHER, | |
| success=True, | |
| result=content, | |
| confidence=0.60, | |
| reasoning="Multi-source research completed but synthesis failed", | |
| tools_used=[], | |
| model_used="fallback", | |
| processing_time=0.0, | |
| cost_estimate=0.0 | |
| ) | |
| def _create_failure_result(self, error_message: str) -> AgentResult: | |
| """Create a failure result""" | |
| return AgentResult( | |
| agent_role=AgentRole.WEB_RESEARCHER, | |
| success=False, | |
| result=error_message, | |
| confidence=0.0, | |
| reasoning=error_message, | |
| model_used="error", | |
| processing_time=0.0, | |
| cost_estimate=0.0 | |
| ) |