Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Web Research Agent for GAIA Agent System | |
| Handles Wikipedia and web search questions with intelligent search strategies | |
| """ | |
| import re | |
| import logging | |
| from typing import Dict, List, Optional, Any | |
| from urllib.parse import urlparse | |
| from agents.state import GAIAAgentState, AgentRole, AgentResult, ToolResult | |
| from models.qwen_client import QwenClient, ModelTier | |
| from tools.wikipedia_tool import WikipediaTool | |
| from tools.web_search_tool import WebSearchTool | |
| logger = logging.getLogger(__name__) | |
| class WebResearchAgent: | |
| """ | |
| Specialized agent for web research tasks | |
| Uses Wikipedia and web search tools with intelligent routing | |
| """ | |
| def __init__(self, llm_client: QwenClient): | |
| self.llm_client = llm_client | |
| self.wikipedia_tool = WikipediaTool() | |
| self.web_search_tool = WebSearchTool() | |
| def process(self, state: GAIAAgentState) -> GAIAAgentState: | |
| """ | |
| Process web research questions using Wikipedia and web search | |
| """ | |
| logger.info(f"Web researcher processing: {state.question[:100]}...") | |
| state.add_processing_step("Web Researcher: Starting research") | |
| try: | |
| # Determine research strategy | |
| strategy = self._determine_research_strategy(state.question, state.file_name) | |
| state.add_processing_step(f"Web Researcher: Strategy = {strategy}") | |
| # Execute research based on strategy | |
| if strategy == "wikipedia_direct": | |
| result = self._research_wikipedia_direct(state) | |
| elif strategy == "wikipedia_search": | |
| result = self._research_wikipedia_search(state) | |
| elif strategy == "youtube_analysis": | |
| result = self._research_youtube(state) | |
| elif strategy == "web_search": | |
| result = self._research_web_general(state) | |
| elif strategy == "url_extraction": | |
| result = self._research_url_content(state) | |
| else: | |
| result = self._research_multi_source(state) | |
| # Add result to state | |
| state.add_agent_result(result) | |
| state.add_processing_step(f"Web Researcher: Completed with confidence {result.confidence:.2f}") | |
| return state | |
| except Exception as e: | |
| error_msg = f"Web research failed: {str(e)}" | |
| state.add_error(error_msg) | |
| logger.error(error_msg) | |
| # Create failure result | |
| failure_result = AgentResult( | |
| agent_role=AgentRole.WEB_RESEARCHER, | |
| success=False, | |
| result=f"Research failed: {str(e)}", | |
| confidence=0.0, | |
| reasoning=f"Exception during web research: {str(e)}", | |
| model_used="error", | |
| processing_time=0.0, | |
| cost_estimate=0.0 | |
| ) | |
| state.add_agent_result(failure_result) | |
| return state | |
| def _determine_research_strategy(self, question: str, file_name: Optional[str] = None) -> str: | |
| """Determine the best research strategy for the question""" | |
| question_lower = question.lower() | |
| # Direct Wikipedia references | |
| if any(term in question_lower for term in ['wikipedia', 'featured article', 'promoted']): | |
| if 'search' in question_lower or 'find' in question_lower: | |
| return "wikipedia_search" | |
| else: | |
| return "wikipedia_direct" | |
| # YouTube video analysis | |
| if any(term in question_lower for term in ['youtube', 'video', 'watch?v=', 'youtu.be']): | |
| return "youtube_analysis" | |
| # URL content extraction | |
| urls = re.findall(r'https?://[^\s]+', question) | |
| if urls: | |
| return "url_extraction" | |
| # General web search for current events, news, recent information | |
| if any(term in question_lower for term in ['news', 'recent', 'latest', 'current', 'today', '2024', '2025']): | |
| return "web_search" | |
| # Multi-source research for complex questions | |
| if len(question.split()) > 20 or '?' in question and question.count('?') > 1: | |
| return "multi_source" | |
| # Default to Wikipedia search for informational questions | |
| return "wikipedia_search" | |
| def _research_wikipedia_direct(self, state: GAIAAgentState) -> AgentResult: | |
| """Research using direct Wikipedia lookup""" | |
| # Extract topic from question | |
| topic = self._extract_wikipedia_topic(state.question) | |
| logger.info(f"Wikipedia direct research for: {topic}") | |
| # Search Wikipedia | |
| wiki_result = self.wikipedia_tool.execute(topic) | |
| if wiki_result.success and wiki_result.result.get('found'): | |
| wiki_data = wiki_result.result['result'] | |
| # Use LLM to analyze and answer the question | |
| analysis_prompt = f""" | |
| Based on this Wikipedia information about {topic}, please answer the following question: | |
| Question: {state.question} | |
| Wikipedia Summary: {wiki_data.get('summary', '')} | |
| Wikipedia URL: {wiki_data.get('url', '')} | |
| Please provide a direct, accurate answer based on the Wikipedia information. | |
| """ | |
| # Use appropriate model tier | |
| model_tier = ModelTier.MAIN if state.complexity_assessment == "complex" else ModelTier.ROUTER | |
| llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=400) | |
| if llm_result.success: | |
| confidence = 0.85 if wiki_data.get('title') == topic else 0.75 | |
| return AgentResult( | |
| agent_role=AgentRole.WEB_RESEARCHER, | |
| success=True, | |
| result=llm_result.response, | |
| confidence=confidence, | |
| reasoning=f"Found Wikipedia article for '{topic}' and analyzed content", | |
| tools_used=[ToolResult( | |
| tool_name="wikipedia", | |
| success=True, | |
| result=wiki_data, | |
| execution_time=wiki_result.execution_time | |
| )], | |
| model_used=llm_result.model_used, | |
| processing_time=wiki_result.execution_time + llm_result.response_time, | |
| cost_estimate=llm_result.cost_estimate | |
| ) | |
| else: | |
| # Return Wikipedia summary as fallback | |
| return AgentResult( | |
| agent_role=AgentRole.WEB_RESEARCHER, | |
| success=True, | |
| result=wiki_data.get('summary', 'Wikipedia information found but analysis failed'), | |
| confidence=0.60, | |
| reasoning="Wikipedia found but LLM analysis failed", | |
| tools_used=[ToolResult( | |
| tool_name="wikipedia", | |
| success=True, | |
| result=wiki_data, | |
| execution_time=wiki_result.execution_time | |
| )], | |
| model_used="fallback", | |
| processing_time=wiki_result.execution_time, | |
| cost_estimate=0.0 | |
| ) | |
| else: | |
| # Wikipedia not found, try web search as fallback | |
| return self._research_web_fallback(state, f"Wikipedia not found for '{topic}'") | |
| def _research_wikipedia_search(self, state: GAIAAgentState) -> AgentResult: | |
| """Research using Wikipedia search functionality""" | |
| # Extract search terms | |
| search_terms = self._extract_search_terms(state.question) | |
| logger.info(f"Wikipedia search for: {search_terms}") | |
| # Search Wikipedia | |
| search_query = {"query": search_terms, "action": "summary"} | |
| wiki_result = self.wikipedia_tool.execute(search_query) | |
| if wiki_result.success and wiki_result.result.get('found'): | |
| return self._analyze_wikipedia_result(state, wiki_result) | |
| else: | |
| # Try web search as fallback | |
| return self._research_web_fallback(state, f"Wikipedia search failed for '{search_terms}'") | |
| def _research_youtube(self, state: GAIAAgentState) -> AgentResult: | |
| """Research YouTube video information""" | |
| # Extract YouTube URL or search terms | |
| youtube_query = self._extract_youtube_info(state.question) | |
| logger.info(f"YouTube research for: {youtube_query}") | |
| # Use web search tool's YouTube functionality | |
| if youtube_query.startswith('http'): | |
| # Direct YouTube URL | |
| web_result = self.web_search_tool.execute({ | |
| "query": youtube_query, | |
| "action": "extract" | |
| }) | |
| else: | |
| # Search for YouTube videos | |
| web_result = self.web_search_tool.execute(f"site:youtube.com {youtube_query}") | |
| if web_result.success and web_result.result.get('found'): | |
| return self._analyze_youtube_result(state, web_result) | |
| else: | |
| return self._create_failure_result("YouTube research failed") | |
| def _research_web_general(self, state: GAIAAgentState) -> AgentResult: | |
| """General web search research""" | |
| search_terms = self._extract_search_terms(state.question) | |
| logger.info(f"Web search for: {search_terms}") | |
| # Perform web search | |
| web_result = self.web_search_tool.execute({ | |
| "query": search_terms, | |
| "action": "search", | |
| "limit": 5 | |
| }) | |
| if web_result.success and web_result.result.get('found'): | |
| return self._analyze_web_search_result(state, web_result) | |
| else: | |
| return self._create_failure_result("Web search failed") | |
| def _research_url_content(self, state: GAIAAgentState) -> AgentResult: | |
| """Extract and analyze content from specific URLs""" | |
| urls = re.findall(r'https?://[^\s]+', state.question) | |
| if not urls: | |
| return self._create_failure_result("No URLs found in question") | |
| url = urls[0] # Use first URL | |
| logger.info(f"Extracting content from: {url}") | |
| # Extract content from URL | |
| web_result = self.web_search_tool.execute({ | |
| "query": url, | |
| "action": "extract" | |
| }) | |
| if web_result.success and web_result.result.get('found'): | |
| return self._analyze_url_content_result(state, web_result) | |
| else: | |
| return self._create_failure_result(f"Failed to extract content from {url}") | |
| def _research_multi_source(self, state: GAIAAgentState) -> AgentResult: | |
| """Multi-source research combining Wikipedia and web search""" | |
| search_terms = self._extract_search_terms(state.question) | |
| logger.info(f"Multi-source research for: {search_terms}") | |
| sources = [] | |
| # Try Wikipedia first | |
| wiki_result = self.wikipedia_tool.execute(search_terms) | |
| if wiki_result.success and wiki_result.result.get('found'): | |
| sources.append(("Wikipedia", wiki_result.result['result'])) | |
| # Add web search results | |
| web_result = self.web_search_tool.execute({ | |
| "query": search_terms, | |
| "action": "search", | |
| "limit": 3 | |
| }) | |
| if web_result.success and web_result.result.get('found'): | |
| for result in web_result.result['results'][:2]: # Use top 2 web results | |
| sources.append(("Web", result)) | |
| if sources: | |
| return self._analyze_multi_source_result(state, sources) | |
| else: | |
| return self._create_failure_result("All research sources failed") | |
| def _research_web_fallback(self, state: GAIAAgentState, reason: str) -> AgentResult: | |
| """Fallback to web search when other methods fail""" | |
| logger.info(f"Web search fallback: {reason}") | |
| search_terms = self._extract_search_terms(state.question) | |
| web_result = self.web_search_tool.execute(search_terms) | |
| if web_result.success and web_result.result.get('found'): | |
| result = self._analyze_web_search_result(state, web_result) | |
| result.reasoning = f"{reason}. Used web search fallback." | |
| result.confidence = max(0.3, result.confidence - 0.2) # Lower confidence for fallback | |
| return result | |
| else: | |
| return self._create_failure_result(f"Fallback failed: {reason}") | |
| def _extract_wikipedia_topic(self, question: str) -> str: | |
| """Extract Wikipedia topic from question""" | |
| # Look for quoted terms | |
| quoted = re.findall(r'"([^"]+)"', question) | |
| if quoted: | |
| return quoted[0] | |
| # Look for specific patterns | |
| patterns = [ | |
| r'wikipedia article[s]?\s+(?:about|on|for)\s+([^?.,]+)', | |
| r'featured article[s]?\s+(?:about|on|for)\s+([^?.,]+)', | |
| r'(?:about|on)\s+([A-Z][^?.,]+)', | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, question, re.IGNORECASE) | |
| if match: | |
| return match.group(1).strip() | |
| # Extract main nouns/entities | |
| words = question.split() | |
| topic_words = [] | |
| for word in words: | |
| if word[0].isupper() or len(word) > 6: # Likely important words | |
| topic_words.append(word) | |
| return ' '.join(topic_words[:3]) if topic_words else "topic" | |
| def _extract_search_terms(self, question: str) -> str: | |
| """Extract search terms from question""" | |
| # Remove question words and common phrases | |
| stop_phrases = [ | |
| 'what is', 'what are', 'who is', 'who are', 'when is', 'when was', | |
| 'where is', 'where are', 'how is', 'how are', 'why is', 'why are', | |
| 'tell me about', 'find information about', 'search for' | |
| ] | |
| clean_question = question.lower() | |
| for phrase in stop_phrases: | |
| clean_question = clean_question.replace(phrase, '') | |
| # Remove punctuation and extra spaces | |
| clean_question = re.sub(r'[?.,!]', '', clean_question) | |
| clean_question = re.sub(r'\s+', ' ', clean_question).strip() | |
| return clean_question | |
| def _extract_youtube_info(self, question: str) -> str: | |
| """Extract YouTube URL or search terms""" | |
| # Look for YouTube URLs | |
| youtube_urls = re.findall(r'https?://(?:www\.)?youtube\.com/[^\s]+', question) | |
| if youtube_urls: | |
| return youtube_urls[0] | |
| youtube_urls = re.findall(r'https?://youtu\.be/[^\s]+', question) | |
| if youtube_urls: | |
| return youtube_urls[0] | |
| # Extract search terms for YouTube | |
| return self._extract_search_terms(question) | |
| def _analyze_wikipedia_result(self, state: GAIAAgentState, wiki_result: ToolResult) -> AgentResult: | |
| """Analyze Wikipedia result and generate answer""" | |
| wiki_data = wiki_result.result['result'] | |
| analysis_prompt = f""" | |
| Based on this Wikipedia information, please answer the following question: | |
| Question: {state.question} | |
| Wikipedia Information: | |
| Title: {wiki_data.get('title', '')} | |
| Summary: {wiki_data.get('summary', '')} | |
| URL: {wiki_data.get('url', '')} | |
| Please provide a direct, accurate answer. | |
| """ | |
| model_tier = ModelTier.MAIN if len(state.question) > 100 else ModelTier.ROUTER | |
| llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=300) | |
| if llm_result.success: | |
| return AgentResult( | |
| agent_role=AgentRole.WEB_RESEARCHER, | |
| success=True, | |
| result=llm_result.response, | |
| confidence=0.80, | |
| reasoning="Analyzed Wikipedia information to answer question", | |
| tools_used=[wiki_result], | |
| model_used=llm_result.model_used, | |
| processing_time=wiki_result.execution_time + llm_result.response_time, | |
| cost_estimate=llm_result.cost_estimate | |
| ) | |
| else: | |
| return AgentResult( | |
| agent_role=AgentRole.WEB_RESEARCHER, | |
| success=True, | |
| result=wiki_data.get('summary', 'Information found'), | |
| confidence=0.60, | |
| reasoning="Wikipedia found but analysis failed", | |
| tools_used=[wiki_result], | |
| model_used="fallback", | |
| processing_time=wiki_result.execution_time, | |
| cost_estimate=0.0 | |
| ) | |
| def _analyze_youtube_result(self, state: GAIAAgentState, web_result: ToolResult) -> AgentResult: | |
| """Analyze YouTube research result""" | |
| # Implementation for YouTube analysis | |
| return AgentResult( | |
| agent_role=AgentRole.WEB_RESEARCHER, | |
| success=True, | |
| result="YouTube analysis completed", | |
| confidence=0.70, | |
| reasoning="Analyzed YouTube content", | |
| tools_used=[web_result], | |
| model_used="basic", | |
| processing_time=web_result.execution_time, | |
| cost_estimate=0.0 | |
| ) | |
| def _analyze_web_search_result(self, state: GAIAAgentState, web_result: ToolResult) -> AgentResult: | |
| """Analyze web search results""" | |
| search_results = web_result.result['results'] | |
| # Combine top results for analysis | |
| combined_content = [] | |
| for i, result in enumerate(search_results[:3], 1): | |
| combined_content.append(f"Result {i}: {result['title']}") | |
| combined_content.append(f"URL: {result['url']}") | |
| combined_content.append(f"Description: {result['snippet']}") | |
| combined_content.append("") | |
| analysis_prompt = f""" | |
| Based on these web search results, please answer the following question: | |
| Question: {state.question} | |
| Search Results: | |
| {chr(10).join(combined_content)} | |
| Please provide a direct answer based on the most relevant information. | |
| """ | |
| model_tier = ModelTier.MAIN | |
| llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=400) | |
| if llm_result.success: | |
| return AgentResult( | |
| agent_role=AgentRole.WEB_RESEARCHER, | |
| success=True, | |
| result=llm_result.response, | |
| confidence=0.75, | |
| reasoning=f"Analyzed {len(search_results)} web search results", | |
| tools_used=[web_result], | |
| model_used=llm_result.model_used, | |
| processing_time=web_result.execution_time + llm_result.response_time, | |
| cost_estimate=llm_result.cost_estimate | |
| ) | |
| else: | |
| # Fallback to first result description | |
| first_result = search_results[0] if search_results else {} | |
| return AgentResult( | |
| agent_role=AgentRole.WEB_RESEARCHER, | |
| success=True, | |
| result=first_result.get('snippet', 'Web search completed'), | |
| confidence=0.50, | |
| reasoning="Web search completed but analysis failed", | |
| tools_used=[web_result], | |
| model_used="fallback", | |
| processing_time=web_result.execution_time, | |
| cost_estimate=0.0 | |
| ) | |
| def _analyze_url_content_result(self, state: GAIAAgentState, web_result: ToolResult) -> AgentResult: | |
| """Analyze extracted URL content""" | |
| content_data = web_result.result | |
| analysis_prompt = f""" | |
| Based on this web page content, please answer the following question: | |
| Question: {state.question} | |
| Page Title: {content_data.get('title', '')} | |
| Page URL: {content_data.get('url', '')} | |
| Content: {content_data.get('content', '')[:1000]}... | |
| Please provide a direct answer based on the page content. | |
| """ | |
| model_tier = ModelTier.MAIN | |
| llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=400) | |
| if llm_result.success: | |
| return AgentResult( | |
| agent_role=AgentRole.WEB_RESEARCHER, | |
| success=True, | |
| result=llm_result.response, | |
| confidence=0.85, | |
| reasoning="Analyzed content from specific URL", | |
| tools_used=[web_result], | |
| model_used=llm_result.model_used, | |
| processing_time=web_result.execution_time + llm_result.response_time, | |
| cost_estimate=llm_result.cost_estimate | |
| ) | |
| else: | |
| return AgentResult( | |
| agent_role=AgentRole.WEB_RESEARCHER, | |
| success=True, | |
| result=content_data.get('content', 'Content extracted')[:200], | |
| confidence=0.60, | |
| reasoning="URL content extracted but analysis failed", | |
| tools_used=[web_result], | |
| model_used="fallback", | |
| processing_time=web_result.execution_time, | |
| cost_estimate=0.0 | |
| ) | |
| def _analyze_multi_source_result(self, state: GAIAAgentState, sources: List) -> AgentResult: | |
| """Analyze results from multiple sources""" | |
| source_summaries = [] | |
| for source_type, source_data in sources: | |
| if source_type == "Wikipedia": | |
| source_summaries.append(f"Wikipedia: {source_data.get('summary', '')[:200]}") | |
| else: # Web result | |
| source_summaries.append(f"Web: {source_data.get('snippet', '')[:200]}") | |
| analysis_prompt = f""" | |
| Based on these multiple sources, please answer the following question: | |
| Question: {state.question} | |
| Sources: | |
| {chr(10).join(source_summaries)} | |
| Please synthesize the information and provide a comprehensive answer. | |
| """ | |
| model_tier = ModelTier.COMPLEX # Use best model for multi-source analysis | |
| llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=500) | |
| if llm_result.success: | |
| return AgentResult( | |
| agent_role=AgentRole.WEB_RESEARCHER, | |
| success=True, | |
| result=llm_result.response, | |
| confidence=0.85, | |
| reasoning=f"Synthesized information from {len(sources)} sources", | |
| tools_used=[], | |
| model_used=llm_result.model_used, | |
| processing_time=llm_result.response_time, | |
| cost_estimate=llm_result.cost_estimate | |
| ) | |
| else: | |
| # Fallback to first source | |
| first_source = sources[0][1] if sources else {} | |
| content = first_source.get('summary') or first_source.get('snippet', 'Multi-source research completed') | |
| return AgentResult( | |
| agent_role=AgentRole.WEB_RESEARCHER, | |
| success=True, | |
| result=content, | |
| confidence=0.60, | |
| reasoning="Multi-source research completed but synthesis failed", | |
| tools_used=[], | |
| model_used="fallback", | |
| processing_time=0.0, | |
| cost_estimate=0.0 | |
| ) | |
| def _create_failure_result(self, error_message: str) -> AgentResult: | |
| """Create a failure result""" | |
| return AgentResult( | |
| agent_role=AgentRole.WEB_RESEARCHER, | |
| success=False, | |
| result=error_message, | |
| confidence=0.0, | |
| reasoning=error_message, | |
| model_used="error", | |
| processing_time=0.0, | |
| cost_estimate=0.0 | |
| ) |