| |
| """ |
| Web Research Agent for GAIA Agent System |
| Handles Wikipedia and web search questions with intelligent search strategies |
| """ |
|
|
| import re |
| import logging |
| from typing import Dict, List, Optional, Any |
| from urllib.parse import urlparse |
|
|
| from agents.state import GAIAAgentState, AgentRole, AgentResult, ToolResult |
| from models.qwen_client import QwenClient, ModelTier |
| from tools.wikipedia_tool import WikipediaTool |
| from tools.web_search_tool import WebSearchTool |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class WebResearchAgent: |
| """ |
| Specialized agent for web research tasks |
| Uses Wikipedia and web search tools with intelligent routing |
| """ |
| |
| def __init__(self, llm_client: QwenClient): |
| self.llm_client = llm_client |
| self.wikipedia_tool = WikipediaTool() |
| self.web_search_tool = WebSearchTool() |
| |
| def process(self, state: GAIAAgentState) -> GAIAAgentState: |
| """ |
| Process web research questions using Wikipedia and web search |
| """ |
| logger.info(f"Web researcher processing: {state.question[:100]}...") |
| state.add_processing_step("Web Researcher: Starting research") |
| |
| try: |
| |
| strategy = self._determine_research_strategy(state.question, state.file_name) |
| state.add_processing_step(f"Web Researcher: Strategy = {strategy}") |
| |
| |
| result = None |
| try: |
| |
| if strategy == "wikipedia_direct": |
| result = self._research_wikipedia_direct(state) |
| elif strategy == "wikipedia_search": |
| result = self._research_wikipedia_search(state) |
| elif strategy == "youtube_analysis": |
| result = self._research_youtube(state) |
| elif strategy == "web_search": |
| result = self._research_web_general(state) |
| elif strategy == "url_extraction": |
| result = self._research_url_content(state) |
| else: |
| result = self._research_multi_source(state) |
| |
| except Exception as strategy_error: |
| logger.warning(f"Strategy {strategy} failed: {strategy_error}, trying fallback") |
| |
| try: |
| result = self._research_fallback_strategy(state, str(strategy_error)) |
| except Exception as fallback_error: |
| logger.error(f"Fallback strategy also failed: {fallback_error}") |
| result = self._create_basic_response(state, f"Research failed: {fallback_error}") |
| |
| |
| if not result or not isinstance(result, AgentResult): |
| result = self._create_basic_response(state, "No research results available") |
| |
| |
| state.add_agent_result(result) |
| state.add_processing_step(f"Web Researcher: Completed with confidence {result.confidence:.2f}") |
| |
| return state |
| |
| except Exception as e: |
| error_msg = f"Web research failed: {str(e)}" |
| state.add_error(error_msg) |
| logger.error(error_msg) |
| |
| |
| failure_result = AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=False, |
| result=f"Research encountered difficulties: {str(e)}", |
| confidence=0.1, |
| reasoning=f"Exception during web research: {str(e)}", |
| tools_used=[], |
| model_used="error", |
| processing_time=0.0, |
| cost_estimate=0.0 |
| ) |
| state.add_agent_result(failure_result) |
| return state |
| |
| def _determine_research_strategy(self, question: str, file_name: Optional[str] = None) -> str: |
| """Determine the best research strategy for the question""" |
| |
| question_lower = question.lower() |
| |
| |
| if any(term in question_lower for term in ['wikipedia', 'featured article', 'promoted']): |
| if 'search' in question_lower or 'find' in question_lower: |
| return "wikipedia_search" |
| else: |
| return "wikipedia_direct" |
| |
| |
| if any(term in question_lower for term in ['youtube', 'video', 'watch?v=', 'youtu.be']): |
| return "youtube_analysis" |
| |
| |
| urls = re.findall(r'https?://[^\s]+', question) |
| if urls: |
| return "url_extraction" |
| |
| |
| if any(term in question_lower for term in ['news', 'recent', 'latest', 'current', 'today', '2024', '2025']): |
| return "web_search" |
| |
| |
| if len(question.split()) > 20 or '?' in question and question.count('?') > 1: |
| return "multi_source" |
| |
| |
| return "wikipedia_search" |
| |
| def _research_wikipedia_direct(self, state: GAIAAgentState) -> AgentResult: |
| """Research using direct Wikipedia lookup""" |
| |
| |
| topic = self._extract_wikipedia_topic(state.question) |
| |
| logger.info(f"Wikipedia direct research for: {topic}") |
| |
| |
| wiki_result = self.wikipedia_tool.execute(topic) |
| |
| if wiki_result.success and wiki_result.result.get('found'): |
| wiki_data = wiki_result.result['result'] |
| |
| |
| analysis_prompt = f""" |
| Based on this Wikipedia information about {topic}, please answer the following question: |
| |
| Question: {state.question} |
| |
| Wikipedia Summary: {wiki_data.get('summary', '')} |
| |
| Wikipedia URL: {wiki_data.get('url', '')} |
| |
| Please provide a direct, accurate answer based on the Wikipedia information. |
| """ |
| |
| |
| model_tier = ModelTier.MAIN if state.complexity_assessment == "complex" else ModelTier.ROUTER |
| llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=400) |
| |
| if llm_result.success: |
| confidence = 0.85 if wiki_data.get('title') == topic else 0.75 |
| |
| return AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=True, |
| result=llm_result.response, |
| confidence=confidence, |
| reasoning=f"Found Wikipedia article for '{topic}' and analyzed content", |
| tools_used=[ToolResult( |
| tool_name="wikipedia", |
| success=True, |
| result=wiki_data, |
| execution_time=wiki_result.execution_time |
| )], |
| model_used=llm_result.model_used, |
| processing_time=wiki_result.execution_time + llm_result.response_time, |
| cost_estimate=llm_result.cost_estimate |
| ) |
| else: |
| |
| return AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=True, |
| result=wiki_data.get('summary', 'Wikipedia information found but analysis failed'), |
| confidence=0.60, |
| reasoning="Wikipedia found but LLM analysis failed", |
| tools_used=[ToolResult( |
| tool_name="wikipedia", |
| success=True, |
| result=wiki_data, |
| execution_time=wiki_result.execution_time |
| )], |
| model_used="fallback", |
| processing_time=wiki_result.execution_time, |
| cost_estimate=0.0 |
| ) |
| else: |
| |
| return self._research_web_fallback(state, f"Wikipedia not found for '{topic}'") |
| |
| def _research_wikipedia_search(self, state: GAIAAgentState) -> AgentResult: |
| """Research using Wikipedia search functionality""" |
| |
| |
| search_terms = self._extract_search_terms(state.question) |
| |
| logger.info(f"Wikipedia search for: {search_terms}") |
| |
| |
| search_query = {"query": search_terms, "action": "summary"} |
| wiki_result = self.wikipedia_tool.execute(search_query) |
| |
| if wiki_result.success and wiki_result.result.get('found'): |
| return self._analyze_wikipedia_result(state, wiki_result) |
| else: |
| |
| return self._research_web_fallback(state, f"Wikipedia search failed for '{search_terms}'") |
| |
| def _research_youtube(self, state: GAIAAgentState) -> AgentResult: |
| """Research YouTube video information""" |
| |
| |
| youtube_query = self._extract_youtube_info(state.question) |
| |
| logger.info(f"YouTube research for: {youtube_query}") |
| |
| |
| if youtube_query.startswith('http'): |
| |
| web_result = self.web_search_tool.execute({ |
| "query": youtube_query, |
| "action": "extract" |
| }) |
| else: |
| |
| web_result = self.web_search_tool.execute(f"site:youtube.com {youtube_query}") |
| |
| if web_result.success and web_result.result.get('found'): |
| return self._analyze_youtube_result(state, web_result) |
| else: |
| return self._create_failure_result("YouTube research failed") |
| |
| def _research_web_general(self, state: GAIAAgentState) -> AgentResult: |
| """General web search research""" |
| |
| search_terms = self._extract_search_terms(state.question) |
| |
| logger.info(f"Web search for: {search_terms}") |
| |
| |
| web_result = self.web_search_tool.execute({ |
| "query": search_terms, |
| "action": "search", |
| "limit": 5 |
| }) |
| |
| if web_result.success and web_result.result.get('found'): |
| return self._analyze_web_search_result(state, web_result) |
| else: |
| return self._create_failure_result("Web search failed") |
| |
| def _research_url_content(self, state: GAIAAgentState) -> AgentResult: |
| """Extract and analyze content from specific URLs""" |
| |
| urls = re.findall(r'https?://[^\s]+', state.question) |
| if not urls: |
| return self._create_failure_result("No URLs found in question") |
| |
| url = urls[0] |
| logger.info(f"Extracting content from: {url}") |
| |
| |
| web_result = self.web_search_tool.execute({ |
| "query": url, |
| "action": "extract" |
| }) |
| |
| if web_result.success and web_result.result.get('found'): |
| return self._analyze_url_content_result(state, web_result) |
| else: |
| return self._create_failure_result(f"Failed to extract content from {url}") |
| |
| def _research_multi_source(self, state: GAIAAgentState) -> AgentResult: |
| """Multi-source research combining Wikipedia and web search""" |
| |
| search_terms = self._extract_search_terms(state.question) |
| |
| logger.info(f"Multi-source research for: {search_terms}") |
| |
| sources = [] |
| |
| |
| wiki_result = self.wikipedia_tool.execute(search_terms) |
| if wiki_result.success and wiki_result.result.get('found'): |
| sources.append(("Wikipedia", wiki_result.result['result'])) |
| |
| |
| web_result = self.web_search_tool.execute({ |
| "query": search_terms, |
| "action": "search", |
| "limit": 3 |
| }) |
| if web_result.success and web_result.result.get('found'): |
| for result in web_result.result['results'][:2]: |
| sources.append(("Web", result)) |
| |
| if sources: |
| return self._analyze_multi_source_result(state, sources) |
| else: |
| return self._create_failure_result("All research sources failed") |
| |
| def _research_web_fallback(self, state: GAIAAgentState, reason: str) -> AgentResult: |
| """Fallback to web search when other methods fail""" |
| |
| logger.info(f"Web search fallback: {reason}") |
| |
| search_terms = self._extract_search_terms(state.question) |
| web_result = self.web_search_tool.execute(search_terms) |
| |
| if web_result.success and web_result.result.get('found'): |
| result = self._analyze_web_search_result(state, web_result) |
| result.reasoning = f"{reason}. Used web search fallback." |
| result.confidence = max(0.3, result.confidence - 0.2) |
| return result |
| else: |
| return self._create_failure_result(f"Fallback failed: {reason}") |
| |
| def _research_fallback_strategy(self, state: GAIAAgentState, original_error: str) -> AgentResult: |
| """Enhanced fallback strategy when primary research fails""" |
| |
| logger.info("Executing fallback research strategy") |
| |
| |
| try: |
| search_terms = self._extract_search_terms(state.question) |
| web_result = self.web_search_tool.execute(search_terms) |
| |
| if web_result.success and web_result.result.get('found'): |
| |
| search_results = web_result.result.get('results', []) |
| if search_results: |
| first_result = search_results[0] |
| fallback_answer = f"Based on web search: {first_result.get('snippet', 'Limited information available')}" |
| |
| return AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=True, |
| result=fallback_answer, |
| confidence=0.4, |
| reasoning=f"Fallback web search after: {original_error}", |
| tools_used=[ToolResult( |
| tool_name="web_search_fallback", |
| success=True, |
| result={"summary": "Fallback search completed"}, |
| execution_time=web_result.execution_time |
| )], |
| model_used="fallback", |
| processing_time=web_result.execution_time, |
| cost_estimate=0.0 |
| ) |
| |
| except Exception as fallback_error: |
| logger.warning(f"Web search fallback failed: {fallback_error}") |
| |
| |
| return self._create_basic_response(state, f"Fallback failed: {original_error}") |
| |
| def _create_basic_response(self, state: GAIAAgentState, error_context: str) -> AgentResult: |
| """Create a basic response when all research methods fail""" |
| |
| |
| basic_analysis = f"Unable to conduct external research. Question analysis: {state.question[:100]}" |
| |
| return AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=False, |
| result=f"Processing encountered difficulties: {error_context}", |
| confidence=0.1, |
| reasoning=f"All research sources failed: {error_context}", |
| tools_used=[], |
| model_used="none", |
| processing_time=0.0, |
| cost_estimate=0.0 |
| ) |
| |
| def _extract_wikipedia_topic(self, question: str) -> str: |
| """Extract Wikipedia topic from question""" |
| |
| |
| quoted = re.findall(r'"([^"]+)"', question) |
| if quoted: |
| return quoted[0] |
| |
| |
| patterns = [ |
| r'wikipedia article[s]?\s+(?:about|on|for)\s+([^?.,]+)', |
| r'featured article[s]?\s+(?:about|on|for)\s+([^?.,]+)', |
| r'(?:about|on)\s+([A-Z][^?.,]+)', |
| ] |
| |
| for pattern in patterns: |
| match = re.search(pattern, question, re.IGNORECASE) |
| if match: |
| return match.group(1).strip() |
| |
| |
| words = question.split() |
| topic_words = [] |
| for word in words: |
| if word[0].isupper() or len(word) > 6: |
| topic_words.append(word) |
| |
| return ' '.join(topic_words[:3]) if topic_words else "topic" |
| |
| def _extract_search_terms(self, question: str) -> str: |
| """Extract focused search terms from question to avoid length limits""" |
| |
| |
| question_lower = question.lower() |
| |
| |
| |
| quoted_terms = re.findall(r'"([^"]+)"', question) |
| if quoted_terms: |
| |
| main_term = quoted_terms[0] |
| |
| years = re.findall(r'\b(19|20)\d{2}\b', question) |
| if years: |
| return f"{main_term} {years[0]}" |
| return main_term |
| |
| |
| |
| proper_nouns = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', question) |
| |
| |
| years = re.findall(r'\b(19|20)\d{2}\b', question) |
| numbers = re.findall(r'\b\d+\b', question) |
| |
| |
| stop_patterns = [ |
| r'\b(?:what|who|when|where|why|how|is|are|was|were|do|does|did|can|could|would|should|will)\b', |
| r'\b(?:the|a|an|and|or|but|in|on|at|to|for|of|with|by|from|about)\b', |
| r'\b(?:please|could|you|tell|me|find|search|for|give|provide|list|show)\b', |
| r'\b(?:information|details|data|facts|answer)\b', |
| r'[?.,!]+', |
| ] |
| |
| |
| clean_question = question |
| for pattern in stop_patterns: |
| clean_question = re.sub(pattern, ' ', clean_question, flags=re.IGNORECASE) |
| |
| |
| words = clean_question.split() |
| meaningful_words = [] |
| |
| for word in words: |
| word = word.strip() |
| if len(word) > 2 and word.isalpha(): |
| meaningful_words.append(word) |
| |
| |
| search_terms = [] |
| |
| |
| for noun in proper_nouns[:2]: |
| if len(' '.join(search_terms + [noun])) <= 100: |
| search_terms.append(noun) |
| |
| |
| for year in years[:1]: |
| if len(' '.join(search_terms + [year])) <= 100: |
| search_terms.append(year) |
| |
| |
| for word in meaningful_words[:5]: |
| potential_query = ' '.join(search_terms + [word]) |
| if len(potential_query) <= 100: |
| search_terms.append(word) |
| else: |
| break |
| |
| |
| if not search_terms: |
| |
| first_words = question.split()[:5] |
| search_terms = [w for w in first_words if w.isalpha() and len(w) > 2] |
| |
| result = ' '.join(search_terms) |
| |
| |
| if len(result) > 100: |
| result = result[:100].rsplit(' ', 1)[0] |
| |
| logger.info(f"📝 Extracted search terms: '{result}' from question: '{question[:50]}...'") |
| return result |
| |
| def _extract_youtube_info(self, question: str) -> str: |
| """Extract YouTube URL or search terms""" |
| |
| |
| youtube_urls = re.findall(r'https?://(?:www\.)?youtube\.com/[^\s]+', question) |
| if youtube_urls: |
| return youtube_urls[0] |
| |
| youtube_urls = re.findall(r'https?://youtu\.be/[^\s]+', question) |
| if youtube_urls: |
| return youtube_urls[0] |
| |
| |
| return self._extract_search_terms(question) |
| |
| def _analyze_wikipedia_result(self, state: GAIAAgentState, wiki_result: ToolResult) -> AgentResult: |
| """Analyze Wikipedia result and generate answer""" |
| |
| wiki_data = wiki_result.result['result'] |
| |
| analysis_prompt = f""" |
| Based on this Wikipedia information, please answer the following question: |
| |
| Question: {state.question} |
| |
| Wikipedia Information: |
| Title: {wiki_data.get('title', '')} |
| Summary: {wiki_data.get('summary', '')} |
| URL: {wiki_data.get('url', '')} |
| |
| Please provide a direct, accurate answer. |
| """ |
| |
| model_tier = ModelTier.MAIN if len(state.question) > 100 else ModelTier.ROUTER |
| llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=300) |
| |
| if llm_result.success: |
| return AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=True, |
| result=llm_result.response, |
| confidence=0.80, |
| reasoning="Analyzed Wikipedia information to answer question", |
| tools_used=[wiki_result], |
| model_used=llm_result.model_used, |
| processing_time=wiki_result.execution_time + llm_result.response_time, |
| cost_estimate=llm_result.cost_estimate |
| ) |
| else: |
| return AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=True, |
| result=wiki_data.get('summary', 'Information found'), |
| confidence=0.60, |
| reasoning="Wikipedia found but analysis failed", |
| tools_used=[wiki_result], |
| model_used="fallback", |
| processing_time=wiki_result.execution_time, |
| cost_estimate=0.0 |
| ) |
| |
| def _analyze_youtube_result(self, state: GAIAAgentState, web_result: ToolResult) -> AgentResult: |
| """Analyze YouTube research result""" |
| |
| |
| return AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=True, |
| result="YouTube analysis completed", |
| confidence=0.70, |
| reasoning="Analyzed YouTube content", |
| tools_used=[web_result], |
| model_used="basic", |
| processing_time=web_result.execution_time, |
| cost_estimate=0.0 |
| ) |
| |
| def _analyze_web_search_result(self, state: GAIAAgentState, web_result: ToolResult) -> AgentResult: |
| """Analyze web search results""" |
| |
| search_results = web_result.result['results'] |
| |
| |
| combined_content = [] |
| for i, result in enumerate(search_results[:3], 1): |
| combined_content.append(f"Result {i}: {result['title']}") |
| combined_content.append(f"URL: {result['url']}") |
| combined_content.append(f"Description: {result['snippet']}") |
| combined_content.append("") |
| |
| analysis_prompt = f""" |
| Based on these web search results, please answer the following question: |
| |
| Question: {state.question} |
| |
| Search Results: |
| {chr(10).join(combined_content)} |
| |
| Please provide a direct answer based on the most relevant information. |
| """ |
| |
| model_tier = ModelTier.MAIN |
| llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=400) |
| |
| if llm_result.success: |
| return AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=True, |
| result=llm_result.response, |
| confidence=0.75, |
| reasoning=f"Analyzed {len(search_results)} web search results", |
| tools_used=[web_result], |
| model_used=llm_result.model_used, |
| processing_time=web_result.execution_time + llm_result.response_time, |
| cost_estimate=llm_result.cost_estimate |
| ) |
| else: |
| |
| first_result = search_results[0] if search_results else {} |
| return AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=True, |
| result=first_result.get('snippet', 'Web search completed'), |
| confidence=0.50, |
| reasoning="Web search completed but analysis failed", |
| tools_used=[web_result], |
| model_used="fallback", |
| processing_time=web_result.execution_time, |
| cost_estimate=0.0 |
| ) |
| |
| def _analyze_url_content_result(self, state: GAIAAgentState, web_result: ToolResult) -> AgentResult: |
| """Analyze extracted URL content""" |
| |
| content_data = web_result.result |
| |
| analysis_prompt = f""" |
| Based on this web page content, please answer the following question: |
| |
| Question: {state.question} |
| |
| Page Title: {content_data.get('title', '')} |
| Page URL: {content_data.get('url', '')} |
| Content: {content_data.get('content', '')[:1000]}... |
| |
| Please provide a direct answer based on the page content. |
| """ |
| |
| model_tier = ModelTier.MAIN |
| llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=400) |
| |
| if llm_result.success: |
| return AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=True, |
| result=llm_result.response, |
| confidence=0.85, |
| reasoning="Analyzed content from specific URL", |
| tools_used=[web_result], |
| model_used=llm_result.model_used, |
| processing_time=web_result.execution_time + llm_result.response_time, |
| cost_estimate=llm_result.cost_estimate |
| ) |
| else: |
| return AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=True, |
| result=content_data.get('content', 'Content extracted')[:200], |
| confidence=0.60, |
| reasoning="URL content extracted but analysis failed", |
| tools_used=[web_result], |
| model_used="fallback", |
| processing_time=web_result.execution_time, |
| cost_estimate=0.0 |
| ) |
| |
| def _analyze_multi_source_result(self, state: GAIAAgentState, sources: List) -> AgentResult: |
| """Analyze results from multiple sources""" |
| |
| source_summaries = [] |
| for source_type, source_data in sources: |
| if source_type == "Wikipedia": |
| source_summaries.append(f"Wikipedia: {source_data.get('summary', '')[:200]}") |
| else: |
| source_summaries.append(f"Web: {source_data.get('snippet', '')[:200]}") |
| |
| analysis_prompt = f""" |
| Based on these multiple sources, please answer the following question: |
| |
| Question: {state.question} |
| |
| Sources: |
| {chr(10).join(source_summaries)} |
| |
| Please synthesize the information and provide a comprehensive answer. |
| """ |
| |
| model_tier = ModelTier.COMPLEX |
| llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=500) |
| |
| if llm_result.success: |
| return AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=True, |
| result=llm_result.response, |
| confidence=0.85, |
| reasoning=f"Synthesized information from {len(sources)} sources", |
| tools_used=[], |
| model_used=llm_result.model_used, |
| processing_time=llm_result.response_time, |
| cost_estimate=llm_result.cost_estimate |
| ) |
| else: |
| |
| first_source = sources[0][1] if sources else {} |
| content = first_source.get('summary') or first_source.get('snippet', 'Multi-source research completed') |
| return AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=True, |
| result=content, |
| confidence=0.60, |
| reasoning="Multi-source research completed but synthesis failed", |
| tools_used=[], |
| model_used="fallback", |
| processing_time=0.0, |
| cost_estimate=0.0 |
| ) |
| |
| def _create_failure_result(self, error_message: str) -> AgentResult: |
| """Create a failure result""" |
| return AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=False, |
| result=error_message, |
| confidence=0.0, |
| reasoning=error_message, |
| model_used="error", |
| processing_time=0.0, |
| cost_estimate=0.0 |
| ) |