| |
| """ |
| Web Research Agent for GAIA Agent System |
| Handles Wikipedia and web search questions with intelligent search strategies |
| """ |
|
|
| import re |
| import logging |
| from typing import Dict, List, Optional, Any |
| from urllib.parse import urlparse |
|
|
| from agents.state import GAIAAgentState, AgentRole, AgentResult, ToolResult |
| from models.qwen_client import QwenClient, ModelTier |
| from tools.wikipedia_tool import WikipediaTool |
| from tools.web_search_tool import WebSearchTool |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class WebResearchAgent: |
| """ |
| Specialized agent for web research tasks |
| Uses Wikipedia and web search tools with intelligent routing |
| """ |
| |
| def __init__(self, llm_client: QwenClient): |
| self.llm_client = llm_client |
| self.wikipedia_tool = WikipediaTool() |
| self.web_search_tool = WebSearchTool() |
| |
| def process(self, state: GAIAAgentState) -> GAIAAgentState: |
| """ |
| Enhanced multi-step research processing with systematic problem decomposition |
| """ |
| logger.info(f"Web researcher processing: {state.question[:100]}...") |
| state.add_processing_step("Web Researcher: Starting enhanced multi-step research") |
| |
| try: |
| |
| router_analysis = getattr(state, 'router_analysis', None) |
| if router_analysis: |
| state.add_processing_step("Web Researcher: Using router analysis") |
| research_plan = self._build_research_plan_from_router(state.question, router_analysis) |
| else: |
| state.add_processing_step("Web Researcher: Creating independent research plan") |
| research_plan = self._create_independent_research_plan(state.question) |
| |
| |
| results = self._execute_research_plan(state, research_plan) |
| |
| |
| if not results or results.confidence < 0.4: |
| logger.info("Initial research insufficient, attempting refinement") |
| state.add_processing_step("Web Researcher: Refining research approach") |
| refined_plan = self._refine_research_plan(state.question, research_plan, results) |
| results = self._execute_research_plan(state, refined_plan) |
| |
| |
| if not results or not isinstance(results, AgentResult): |
| results = self._create_basic_response(state, "Multi-step research completed with limited results") |
| |
| |
| state.add_agent_result(results) |
| state.add_processing_step(f"Web Researcher: Completed with confidence {results.confidence:.2f}") |
| |
| return state |
| |
| except Exception as e: |
| error_msg = f"Enhanced web research failed: {str(e)}" |
| state.add_error(error_msg) |
| logger.error(error_msg) |
| |
| |
| failure_result = AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=False, |
| result=f"Research encountered difficulties: {str(e)}", |
| confidence=0.1, |
| reasoning=f"Exception during enhanced web research: {str(e)}", |
| tools_used=[], |
| model_used="error", |
| processing_time=0.0, |
| cost_estimate=0.0 |
| ) |
| state.add_agent_result(failure_result) |
| return state |
| |
| def _build_research_plan_from_router(self, question: str, router_analysis: Dict[str, Any]) -> Dict[str, Any]: |
| """Build research plan using router's structural analysis""" |
| |
| structural = router_analysis.get('structural', {}) |
| requirements = router_analysis.get('requirements', {}) |
| strategy = router_analysis.get('strategy', {}) |
| |
| plan = { |
| 'question_type': structural.get('type', 'unknown'), |
| 'primary_need': requirements.get('primary_need', 'factual_lookup'), |
| 'data_sources': structural.get('data_sources', []), |
| 'approach': strategy.get('approach', 'sequential'), |
| 'steps': [], |
| 'fallback_strategies': [] |
| } |
| |
| |
| if plan['question_type'] == 'quantitative': |
| plan['steps'] = [ |
| {'action': 'identify_entity', 'details': 'Extract the main subject/entity'}, |
| {'action': 'gather_data', 'details': 'Find relevant numerical data'}, |
| {'action': 'verify_timeframe', 'details': 'Ensure data matches time constraints'}, |
| {'action': 'extract_count', 'details': 'Extract specific count/quantity'} |
| ] |
| elif plan['question_type'] == 'identification': |
| plan['steps'] = [ |
| {'action': 'parse_subject', 'details': 'Identify what/who to find'}, |
| {'action': 'context_search', 'details': 'Search for relevant context'}, |
| {'action': 'verify_identity', 'details': 'Confirm identity from sources'} |
| ] |
| else: |
| plan['steps'] = [ |
| {'action': 'decompose_query', 'details': 'Break down complex question'}, |
| {'action': 'research_components', 'details': 'Research each component'}, |
| {'action': 'synthesize_findings', 'details': 'Combine results'} |
| ] |
| |
| |
| plan['fallback_strategies'] = [ |
| 'broaden_search_terms', |
| 'try_alternative_sources', |
| 'use_partial_information' |
| ] |
| |
| return plan |
| |
| def _create_independent_research_plan(self, question: str) -> Dict[str, Any]: |
| """Create research plan when router analysis isn't available""" |
| |
| |
| plan = { |
| 'question_type': 'general_research', |
| 'primary_need': 'factual_lookup', |
| 'data_sources': [], |
| 'approach': 'sequential', |
| 'steps': [], |
| 'fallback_strategies': [] |
| } |
| |
| question_lower = question.lower() |
| |
| |
| if any(term in question_lower for term in ['how many', 'count', 'number']): |
| plan['question_type'] = 'quantitative' |
| plan['steps'] = [ |
| {'action': 'extract_entity', 'details': 'Find the main subject'}, |
| {'action': 'search_entity_data', 'details': 'Search for subject information'}, |
| {'action': 'extract_quantities', 'details': 'Find numerical data'}, |
| {'action': 'apply_constraints', 'details': 'Apply time/condition filters'} |
| ] |
| elif any(term in question_lower for term in ['who', 'name', 'identity']): |
| plan['question_type'] = 'identification' |
| plan['steps'] = [ |
| {'action': 'parse_context', 'details': 'Understand context clues'}, |
| {'action': 'search_individuals', 'details': 'Search for people/entities'}, |
| {'action': 'verify_match', 'details': 'Confirm identity match'} |
| ] |
| elif any(term in question_lower for term in ['wikipedia', 'article']): |
| plan['question_type'] = 'wikipedia_specific' |
| plan['data_sources'] = ['wikipedia'] |
| plan['steps'] = [ |
| {'action': 'extract_topic', 'details': 'Identify Wikipedia topic'}, |
| {'action': 'search_wikipedia', 'details': 'Search Wikipedia directly'}, |
| {'action': 'extract_metadata', 'details': 'Get article details'} |
| ] |
| else: |
| plan['steps'] = [ |
| {'action': 'analyze_question', 'details': 'Break down question components'}, |
| {'action': 'multi_source_search', 'details': 'Search multiple sources'}, |
| {'action': 'consolidate_results', 'details': 'Combine findings'} |
| ] |
| |
| |
| plan['fallback_strategies'] = [ |
| 'simplify_search_terms', |
| 'try_broader_keywords', |
| 'search_related_topics' |
| ] |
| |
| return plan |
| |
| def _execute_research_plan(self, state: GAIAAgentState, plan: Dict[str, Any]) -> AgentResult: |
| """Execute the research plan step by step""" |
| |
| logger.info(f"Executing research plan: {plan['question_type']} with {len(plan['steps'])} steps") |
| |
| accumulated_results = [] |
| total_processing_time = 0.0 |
| total_cost = 0.0 |
| |
| for i, step in enumerate(plan['steps'], 1): |
| logger.info(f"Step {i}/{len(plan['steps'])}: {step['action']} - {step['details']}") |
| state.add_processing_step(f"Web Research Step {i}: {step['action']}") |
| |
| try: |
| step_result = self._execute_research_step(state, step, plan, accumulated_results) |
| if step_result: |
| accumulated_results.append(step_result) |
| total_processing_time += getattr(step_result, 'execution_time', 0.0) |
| total_cost += getattr(step_result, 'cost_estimate', 0.0) |
| |
| except Exception as e: |
| logger.warning(f"Step {i} failed: {e}, continuing with next step") |
| state.add_processing_step(f"Web Research Step {i}: Failed - {str(e)}") |
| continue |
| |
| |
| if accumulated_results: |
| return self._synthesize_research_results(state, accumulated_results, plan, total_processing_time, total_cost) |
| else: |
| return self._create_failure_result("All research steps failed") |
| |
| def _execute_research_step(self, state: GAIAAgentState, step: Dict[str, Any], |
| plan: Dict[str, Any], previous_results: List) -> Any: |
| """Execute a single research step""" |
| |
| action = step['action'] |
| |
| if action == 'extract_entity' or action == 'identify_entity': |
| return self._extract_main_entity(state.question) |
| |
| elif action == 'search_entity_data' or action == 'gather_data': |
| entity = self._get_entity_from_results(previous_results) |
| return self._search_entity_information(entity, state.question) |
| |
| elif action == 'extract_quantities' or action == 'extract_count': |
| return self._extract_numerical_data(previous_results, state.question) |
| |
| elif action == 'search_wikipedia': |
| topic = self._extract_wikipedia_topic(state.question) |
| return self.wikipedia_tool.execute(topic) |
| |
| elif action == 'multi_source_search': |
| search_terms = self._extract_search_terms(state.question) |
| return self._research_multi_source_enhanced(state, search_terms) |
| |
| else: |
| |
| search_terms = self._extract_search_terms(state.question) |
| return self.web_search_tool.execute(search_terms) |
| |
| def _extract_main_entity(self, question: str) -> Dict[str, Any]: |
| """Extract the main entity/subject from the question""" |
| |
| |
| import re |
| |
| |
| quoted = re.findall(r'"([^"]+)"', question) |
| if quoted: |
| return {'type': 'quoted_entity', 'entity': quoted[0], 'confidence': 0.9} |
| |
| |
| words = question.split() |
| proper_nouns = [] |
| for word in words: |
| clean_word = re.sub(r'[^\w]', '', word) |
| if clean_word and clean_word[0].isupper() and len(clean_word) > 1: |
| proper_nouns.append(clean_word) |
| |
| if proper_nouns: |
| entity = ' '.join(proper_nouns[:3]) |
| return {'type': 'proper_noun', 'entity': entity, 'confidence': 0.7} |
| |
| |
| keywords = self._extract_search_terms(question, max_length=50) |
| return {'type': 'keywords', 'entity': keywords, 'confidence': 0.5} |
| |
| def _search_entity_information(self, entity_data: Dict[str, Any], question: str) -> Any: |
| """Search for information about the extracted entity""" |
| |
| if not entity_data or 'entity' not in entity_data: |
| return None |
| |
| entity = entity_data['entity'] |
| |
| |
| wiki_result = self.wikipedia_tool.execute(entity) |
| if wiki_result.success and wiki_result.result.get('found'): |
| return wiki_result |
| |
| |
| search_query = f"{entity} {self._extract_search_terms(question, max_length=30)}" |
| return self.web_search_tool.execute(search_query) |
| |
| def _extract_numerical_data(self, previous_results: List, question: str) -> Dict[str, Any]: |
| """Extract numerical data from previous search results""" |
| |
| numerical_data = { |
| 'numbers_found': [], |
| 'context': [], |
| 'confidence': 0.0 |
| } |
| |
| for result in previous_results: |
| if hasattr(result, 'result') and result.result: |
| text = str(result.result) |
| |
| |
| import re |
| number_patterns = [ |
| r'\b(\d+)\s*(albums?|songs?|tracks?|releases?)\b', |
| r'\b(\d+)\s*(studio|live|compilation)\s*(albums?)\b', |
| r'\bbetween\s*(\d{4})\s*and\s*(\d{4})\b', |
| r'\b(\d+)\b' |
| ] |
| |
| for pattern in number_patterns: |
| matches = re.findall(pattern, text, re.IGNORECASE) |
| for match in matches: |
| if isinstance(match, tuple): |
| numerical_data['numbers_found'].extend(match) |
| else: |
| numerical_data['numbers_found'].append(match) |
| |
| if numerical_data['numbers_found']: |
| numerical_data['confidence'] = 0.8 |
| |
| return numerical_data |
| |
| def _get_entity_from_results(self, results: List) -> str: |
| """Extract entity name from previous results""" |
| |
| for result in results: |
| if isinstance(result, dict) and 'entity' in result: |
| return result['entity'] |
| |
| return "" |
| |
| def _research_multi_source_enhanced(self, state: GAIAAgentState, search_terms: str) -> Any: |
| """Enhanced multi-source research with systematic approach""" |
| |
| sources_tried = [] |
| |
| |
| wiki_result = self.wikipedia_tool.execute(search_terms) |
| if wiki_result.success and wiki_result.result.get('found'): |
| sources_tried.append(('Wikipedia', wiki_result)) |
| |
| |
| web_result = self.web_search_tool.execute({ |
| "query": search_terms, |
| "action": "search", |
| "limit": 3 |
| }) |
| if web_result.success and web_result.result.get('found'): |
| sources_tried.append(('Web', web_result)) |
| |
| return {'sources': sources_tried, 'primary_terms': search_terms} |
| |
| def _synthesize_research_results(self, state: GAIAAgentState, results: List, plan: Dict[str, Any], |
| total_time: float, total_cost: float) -> AgentResult: |
| """Synthesize results from multi-step research""" |
| |
| |
| combined_info = [] |
| confidence_scores = [] |
| |
| for result in results: |
| if hasattr(result, 'result'): |
| combined_info.append(str(result.result)) |
| if hasattr(result, 'confidence'): |
| confidence_scores.append(result.confidence) |
| elif isinstance(result, dict): |
| combined_info.append(str(result)) |
| confidence_scores.append(0.5) |
| |
| |
| synthesis_prompt = f""" |
| Based on multi-step research for this question, provide a direct answer: |
| |
| Question: {state.question} |
| |
| Research Plan Type: {plan['question_type']} |
| |
| Research Findings: |
| {chr(10).join(f"Step {i+1}: {info}" for i, info in enumerate(combined_info))} |
| |
| Please provide a direct, precise answer based on the research findings. |
| """ |
| |
| |
| model_tier = ModelTier.COMPLEX if len(results) > 2 else ModelTier.MAIN |
| llm_result = self.llm_client.generate(synthesis_prompt, tier=model_tier, max_tokens=300) |
| |
| avg_confidence = sum(confidence_scores) / len(confidence_scores) if confidence_scores else 0.5 |
| |
| if llm_result.success: |
| return AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=True, |
| result=llm_result.response, |
| confidence=min(0.85, avg_confidence + 0.1), |
| reasoning=f"Multi-step research completed with {len(results)} steps: {plan['question_type']}", |
| tools_used=[], |
| model_used=llm_result.model_used, |
| processing_time=total_time + llm_result.response_time, |
| cost_estimate=total_cost + llm_result.cost_estimate |
| ) |
| else: |
| |
| best_info = combined_info[0] if combined_info else "Multi-step research completed" |
| return AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=True, |
| result=best_info, |
| confidence=avg_confidence, |
| reasoning=f"Multi-step research completed, synthesis failed", |
| tools_used=[], |
| model_used="fallback", |
| processing_time=total_time, |
| cost_estimate=total_cost |
| ) |
| |
| def _refine_research_plan(self, question: str, original_plan: Dict[str, Any], |
| previous_result: AgentResult) -> Dict[str, Any]: |
| """Refine research plan when initial attempt yields poor results""" |
| |
| refined_plan = original_plan.copy() |
| |
| |
| if previous_result and previous_result.confidence < 0.3: |
| |
| refined_plan['steps'] = [ |
| {'action': 'broaden_search', 'details': 'Use broader search terms'}, |
| {'action': 'alternative_sources', 'details': 'Try different information sources'}, |
| {'action': 'relaxed_matching', 'details': 'Accept partial matches'} |
| ] |
| elif not previous_result or not previous_result.success: |
| |
| refined_plan['steps'] = [ |
| {'action': 'simple_search', 'details': 'Basic web search with key terms'}, |
| {'action': 'extract_any_info', 'details': 'Extract any relevant information'} |
| ] |
| |
| refined_plan['refinement_attempt'] = True |
| return refined_plan |
| |
| def _determine_research_strategy(self, question: str, file_name: Optional[str] = None) -> str: |
| """Determine the best research strategy for the question""" |
| |
| question_lower = question.lower() |
| |
| |
| if any(term in question_lower for term in ['wikipedia', 'featured article', 'promoted']): |
| if 'search' in question_lower or 'find' in question_lower: |
| return "wikipedia_search" |
| else: |
| return "wikipedia_direct" |
| |
| |
| if any(term in question_lower for term in ['youtube', 'video', 'watch?v=', 'youtu.be']): |
| return "youtube_analysis" |
| |
| |
| urls = re.findall(r'https?://[^\s]+', question) |
| if urls: |
| return "url_extraction" |
| |
| |
| if any(term in question_lower for term in ['news', 'recent', 'latest', 'current', 'today', '2024', '2025']): |
| return "web_search" |
| |
| |
| if len(question.split()) > 20 or '?' in question and question.count('?') > 1: |
| return "multi_source" |
| |
| |
| return "wikipedia_search" |
| |
| def _research_wikipedia_direct(self, state: GAIAAgentState) -> AgentResult: |
| """Research using direct Wikipedia lookup""" |
| |
| |
| topic = self._extract_wikipedia_topic(state.question) |
| |
| logger.info(f"Wikipedia direct research for: {topic}") |
| |
| |
| wiki_result = self.wikipedia_tool.execute(topic) |
| |
| if wiki_result.success and wiki_result.result.get('found'): |
| wiki_data = wiki_result.result['result'] |
| |
| |
| analysis_prompt = f""" |
| Based on this Wikipedia information about {topic}, please answer the following question: |
| |
| Question: {state.question} |
| |
| Wikipedia Summary: {wiki_data.get('summary', '')} |
| |
| Wikipedia URL: {wiki_data.get('url', '')} |
| |
| Please provide a direct, accurate answer based on the Wikipedia information. |
| """ |
| |
| |
| model_tier = ModelTier.MAIN if state.complexity_assessment == "complex" else ModelTier.ROUTER |
| llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=400) |
| |
| if llm_result.success: |
| confidence = 0.85 if wiki_data.get('title') == topic else 0.75 |
| |
| return AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=True, |
| result=llm_result.response, |
| confidence=confidence, |
| reasoning=f"Found Wikipedia article for '{topic}' and analyzed content", |
| tools_used=[ToolResult( |
| tool_name="wikipedia", |
| success=True, |
| result=wiki_data, |
| execution_time=wiki_result.execution_time |
| )], |
| model_used=llm_result.model_used, |
| processing_time=wiki_result.execution_time + llm_result.response_time, |
| cost_estimate=llm_result.cost_estimate |
| ) |
| else: |
| |
| return AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=True, |
| result=wiki_data.get('summary', 'Wikipedia information found but analysis failed'), |
| confidence=0.60, |
| reasoning="Wikipedia found but LLM analysis failed", |
| tools_used=[ToolResult( |
| tool_name="wikipedia", |
| success=True, |
| result=wiki_data, |
| execution_time=wiki_result.execution_time |
| )], |
| model_used="fallback", |
| processing_time=wiki_result.execution_time, |
| cost_estimate=0.0 |
| ) |
| else: |
| |
| return self._research_web_fallback(state, f"Wikipedia not found for '{topic}'") |
| |
| def _research_wikipedia_search(self, state: GAIAAgentState) -> AgentResult: |
| """Research using Wikipedia search functionality""" |
| |
| |
| search_terms = self._extract_search_terms(state.question) |
| |
| logger.info(f"Wikipedia search for: {search_terms}") |
| |
| |
| search_query = {"query": search_terms, "action": "summary"} |
| wiki_result = self.wikipedia_tool.execute(search_query) |
| |
| if wiki_result.success and wiki_result.result.get('found'): |
| return self._analyze_wikipedia_result(state, wiki_result) |
| else: |
| |
| return self._research_web_fallback(state, f"Wikipedia search failed for '{search_terms}'") |
| |
| def _research_youtube(self, state: GAIAAgentState) -> AgentResult: |
| """Research YouTube video information""" |
| |
| |
| youtube_query = self._extract_youtube_info(state.question) |
| |
| logger.info(f"YouTube research for: {youtube_query}") |
| |
| |
| if youtube_query.startswith('http'): |
| |
| web_result = self.web_search_tool.execute({ |
| "query": youtube_query, |
| "action": "extract" |
| }) |
| else: |
| |
| web_result = self.web_search_tool.execute(f"site:youtube.com {youtube_query}") |
| |
| if web_result.success and web_result.result.get('found'): |
| return self._analyze_youtube_result(state, web_result) |
| else: |
| return self._create_failure_result("YouTube research failed") |
| |
| def _research_web_general(self, state: GAIAAgentState) -> AgentResult: |
| """General web search research""" |
| |
| search_terms = self._extract_search_terms(state.question) |
| |
| logger.info(f"Web search for: {search_terms}") |
| |
| |
| web_result = self.web_search_tool.execute({ |
| "query": search_terms, |
| "action": "search", |
| "limit": 5 |
| }) |
| |
| if web_result.success and web_result.result.get('found'): |
| return self._analyze_web_search_result(state, web_result) |
| else: |
| return self._create_failure_result("Web search failed") |
| |
| def _research_url_content(self, state: GAIAAgentState) -> AgentResult: |
| """Extract and analyze content from specific URLs""" |
| |
| urls = re.findall(r'https?://[^\s]+', state.question) |
| if not urls: |
| return self._create_failure_result("No URLs found in question") |
| |
| url = urls[0] |
| logger.info(f"Extracting content from: {url}") |
| |
| |
| web_result = self.web_search_tool.execute({ |
| "query": url, |
| "action": "extract" |
| }) |
| |
| if web_result.success and web_result.result.get('found'): |
| return self._analyze_url_content_result(state, web_result) |
| else: |
| return self._create_failure_result(f"Failed to extract content from {url}") |
| |
| def _research_multi_source(self, state: GAIAAgentState) -> AgentResult: |
| """Multi-source research combining Wikipedia and web search""" |
| |
| search_terms = self._extract_search_terms(state.question) |
| |
| logger.info(f"Multi-source research for: {search_terms}") |
| |
| sources = [] |
| |
| |
| wiki_result = self.wikipedia_tool.execute(search_terms) |
| if wiki_result.success and wiki_result.result.get('found'): |
| sources.append(("Wikipedia", wiki_result.result['result'])) |
| |
| |
| web_result = self.web_search_tool.execute({ |
| "query": search_terms, |
| "action": "search", |
| "limit": 3 |
| }) |
| if web_result.success and web_result.result.get('found'): |
| for result in web_result.result['results'][:2]: |
| sources.append(("Web", result)) |
| |
| if sources: |
| return self._analyze_multi_source_result(state, sources) |
| else: |
| return self._create_failure_result("All research sources failed") |
| |
| def _research_web_fallback(self, state: GAIAAgentState, reason: str) -> AgentResult: |
| """Fallback to web search when other methods fail""" |
| |
| logger.info(f"Web search fallback: {reason}") |
| |
| search_terms = self._extract_search_terms(state.question) |
| web_result = self.web_search_tool.execute(search_terms) |
| |
| if web_result.success and web_result.result.get('found'): |
| result = self._analyze_web_search_result(state, web_result) |
| result.reasoning = f"{reason}. Used web search fallback." |
| result.confidence = max(0.3, result.confidence - 0.2) |
| return result |
| else: |
| return self._create_failure_result(f"Fallback failed: {reason}") |
| |
| def _research_fallback_strategy(self, state: GAIAAgentState, original_error: str) -> AgentResult: |
| """Enhanced fallback strategy when primary research fails""" |
| |
| logger.info("Executing fallback research strategy") |
| |
| |
| try: |
| search_terms = self._extract_search_terms(state.question) |
| web_result = self.web_search_tool.execute(search_terms) |
| |
| if web_result.success and web_result.result.get('found'): |
| |
| search_results = web_result.result.get('results', []) |
| if search_results: |
| first_result = search_results[0] |
| fallback_answer = f"Based on web search: {first_result.get('snippet', 'Limited information available')}" |
| |
| return AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=True, |
| result=fallback_answer, |
| confidence=0.4, |
| reasoning=f"Fallback web search after: {original_error}", |
| tools_used=[ToolResult( |
| tool_name="web_search_fallback", |
| success=True, |
| result={"summary": "Fallback search completed"}, |
| execution_time=web_result.execution_time |
| )], |
| model_used="fallback", |
| processing_time=web_result.execution_time, |
| cost_estimate=0.0 |
| ) |
| |
| except Exception as fallback_error: |
| logger.warning(f"Web search fallback failed: {fallback_error}") |
| |
| |
| return self._create_basic_response(state, f"Fallback failed: {original_error}") |
| |
| def _create_basic_response(self, state: GAIAAgentState, error_context: str) -> AgentResult: |
| """Create a basic response when all research methods fail""" |
| |
| |
| basic_analysis = f"Unable to conduct external research. Question analysis: {state.question[:100]}" |
| |
| return AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=False, |
| result=f"Processing encountered difficulties: {error_context}", |
| confidence=0.1, |
| reasoning=f"All research sources failed: {error_context}", |
| tools_used=[], |
| model_used="none", |
| processing_time=0.0, |
| cost_estimate=0.0 |
| ) |
| |
| def _extract_wikipedia_topic(self, question: str) -> str: |
| """Extract Wikipedia topic from question""" |
| |
| |
| quoted = re.findall(r'"([^"]+)"', question) |
| if quoted: |
| return quoted[0] |
| |
| |
| patterns = [ |
| r'wikipedia article[s]?\s+(?:about|on|for)\s+([^?.,]+)', |
| r'featured article[s]?\s+(?:about|on|for)\s+([^?.,]+)', |
| r'(?:about|on)\s+([A-Z][^?.,]+)', |
| ] |
| |
| for pattern in patterns: |
| match = re.search(pattern, question, re.IGNORECASE) |
| if match: |
| return match.group(1).strip() |
| |
| |
| words = question.split() |
| topic_words = [] |
| for word in words: |
| if word[0].isupper() or len(word) > 6: |
| topic_words.append(word) |
| |
| return ' '.join(topic_words[:3]) if topic_words else "topic" |
| |
| def _extract_search_terms(self, question: str, max_length: int = 100) -> str: |
| """ |
| Extract optimized search terms from question |
| Prioritizes important terms while staying under length limits |
| """ |
| |
| |
| clean_question = re.sub(r'[^\w\s\-]', ' ', question.lower()) |
| words = clean_question.split() |
| |
| |
| stop_words = { |
| 'the', 'a', 'an', 'is', 'are', 'was', 'were', 'be', 'been', 'being', |
| 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', |
| 'should', 'may', 'might', 'must', 'shall', 'can', 'to', 'of', 'in', |
| 'on', 'at', 'by', 'for', 'with', 'from', 'as', 'but', 'or', 'and', |
| 'if', 'then', 'than', 'this', 'that', 'these', 'those', 'i', 'you', |
| 'he', 'she', 'it', 'we', 'they', 'me', 'him', 'her', 'us', 'them' |
| } |
| |
| |
| question_words = {'who', 'what', 'when', 'where', 'why', 'how', 'which'} |
| |
| |
| priority_terms = [] |
| |
| |
| quoted_phrases = re.findall(r'"([^"]*)"', question) |
| for phrase in quoted_phrases: |
| if len(phrase.strip()) > 0: |
| priority_terms.append(phrase.strip()) |
| |
| |
| years = re.findall(r'\b(?:19|20)\d{2}\b', question) |
| |
| |
| proper_nouns = [] |
| for word in question.split(): |
| clean_word = re.sub(r'[^\w]', '', word) |
| if (clean_word and |
| clean_word[0].isupper() and |
| len(clean_word) > 1 and |
| not clean_word.isdigit()): |
| proper_nouns.append(clean_word) |
| |
| |
| |
| meaningful_numbers = [] |
| number_matches = re.findall(r'\b\d{1,4}\b', question) |
| for num in number_matches: |
| |
| if (num not in ['1', '2', '3', '4', '5', '10', '20', '19', '21', '22', '23', '24', '25'] and |
| num not in years and |
| len(num) > 1): |
| |
| if any(context in question.lower() for context in [ |
| f'{num} albums', f'{num} songs', f'{num} years', f'{num} people', |
| f'{num} times', f'{num} days', f'{num} months', f'episode {num}', |
| f'season {num}', f'volume {num}', f'part {num}' |
| ]): |
| meaningful_numbers.append(num) |
| |
| |
| search_terms = [] |
| |
| |
| search_terms.extend(priority_terms) |
| |
| |
| search_terms.extend(proper_nouns[:5]) |
| |
| |
| for word in words: |
| if word in question_words and word not in search_terms: |
| search_terms.append(word) |
| |
| |
| search_terms.extend(years[:2]) |
| |
| |
| for word in words: |
| if (word not in stop_words and |
| word not in search_terms and |
| len(word) > 2 and |
| not word.isdigit()): |
| search_terms.append(word) |
| |
| |
| if len(' '.join(search_terms)) > max_length - 20: |
| break |
| |
| |
| if len(' '.join(search_terms)) < max_length - 10: |
| search_terms.extend(meaningful_numbers[:2]) |
| |
| |
| search_query = ' '.join(search_terms) |
| |
| |
| seen = set() |
| unique_terms = [] |
| for term in search_terms: |
| if term.lower() not in seen: |
| seen.add(term.lower()) |
| unique_terms.append(term) |
| |
| |
| final_query = ' '.join(unique_terms) |
| if len(final_query) > max_length: |
| |
| truncated_terms = [] |
| current_length = 0 |
| for term in unique_terms: |
| if current_length + len(term) + 1 <= max_length: |
| truncated_terms.append(term) |
| current_length += len(term) + 1 |
| else: |
| break |
| final_query = ' '.join(truncated_terms) |
| |
| logger.info(f"📝 Optimized search terms: '{final_query}' from question: '{question[:50]}...'") |
| return final_query |
| |
| def _extract_youtube_info(self, question: str) -> str: |
| """Extract YouTube URL or search terms""" |
| |
| |
| youtube_urls = re.findall(r'https?://(?:www\.)?youtube\.com/[^\s]+', question) |
| if youtube_urls: |
| return youtube_urls[0] |
| |
| youtube_urls = re.findall(r'https?://youtu\.be/[^\s]+', question) |
| if youtube_urls: |
| return youtube_urls[0] |
| |
| |
| return self._extract_search_terms(question) |
| |
| def _analyze_wikipedia_result(self, state: GAIAAgentState, wiki_result: ToolResult) -> AgentResult: |
| """Analyze Wikipedia result and generate answer""" |
| |
| wiki_data = wiki_result.result['result'] |
| |
| analysis_prompt = f""" |
| Based on this Wikipedia information, please answer the following question: |
| |
| Question: {state.question} |
| |
| Wikipedia Information: |
| Title: {wiki_data.get('title', '')} |
| Summary: {wiki_data.get('summary', '')} |
| URL: {wiki_data.get('url', '')} |
| |
| Please provide a direct, accurate answer. |
| """ |
| |
| model_tier = ModelTier.MAIN if len(state.question) > 100 else ModelTier.ROUTER |
| llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=300) |
| |
| if llm_result.success: |
| return AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=True, |
| result=llm_result.response, |
| confidence=0.80, |
| reasoning="Analyzed Wikipedia information to answer question", |
| tools_used=[wiki_result], |
| model_used=llm_result.model_used, |
| processing_time=wiki_result.execution_time + llm_result.response_time, |
| cost_estimate=llm_result.cost_estimate |
| ) |
| else: |
| return AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=True, |
| result=wiki_data.get('summary', 'Information found'), |
| confidence=0.60, |
| reasoning="Wikipedia found but analysis failed", |
| tools_used=[wiki_result], |
| model_used="fallback", |
| processing_time=wiki_result.execution_time, |
| cost_estimate=0.0 |
| ) |
| |
| def _analyze_youtube_result(self, state: GAIAAgentState, web_result: ToolResult) -> AgentResult: |
| """Analyze YouTube research result""" |
| |
| |
| return AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=True, |
| result="YouTube analysis completed", |
| confidence=0.70, |
| reasoning="Analyzed YouTube content", |
| tools_used=[web_result], |
| model_used="basic", |
| processing_time=web_result.execution_time, |
| cost_estimate=0.0 |
| ) |
| |
| def _analyze_web_search_result(self, state: GAIAAgentState, web_result: ToolResult) -> AgentResult: |
| """Analyze web search results""" |
| |
| search_data = web_result.result |
| |
| |
| if search_data.get('success') and search_data.get('results'): |
| search_results = search_data['results'] |
| |
| |
| if search_results and hasattr(search_results[0], 'to_dict'): |
| search_results = [r.to_dict() for r in search_results] |
| |
| |
| combined_content = [] |
| for i, result in enumerate(search_results[:3], 1): |
| combined_content.append(f"Result {i}: {result.get('title', 'No title')}") |
| combined_content.append(f"URL: {result.get('url', 'No URL')}") |
| combined_content.append(f"Description: {result.get('snippet', result.get('content', 'No description'))[:200]}") |
| combined_content.append(f"Source: {result.get('source', 'Unknown')}") |
| combined_content.append("") |
| |
| analysis_prompt = f""" |
| Based on these web search results, please answer the following question: |
| |
| Question: {state.question} |
| |
| Search Query: {search_data.get('query', 'N/A')} |
| Search Engine: {search_data.get('source', 'Unknown')} |
| Results Found: {search_data.get('count', len(search_results))} |
| |
| Search Results: |
| {chr(10).join(combined_content)} |
| |
| Please provide a direct answer based on the most relevant information. |
| """ |
| |
| model_tier = ModelTier.COMPLEX |
| llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=400) |
| |
| if llm_result.success: |
| return AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=True, |
| result=llm_result.response, |
| confidence=0.80, |
| reasoning=f"Analyzed {len(search_results)} web search results using {search_data.get('source', 'search engine')}", |
| tools_used=[web_result], |
| model_used=llm_result.model_used, |
| processing_time=web_result.execution_time + llm_result.response_time, |
| cost_estimate=llm_result.cost_estimate |
| ) |
| else: |
| |
| first_result = search_results[0] if search_results else {} |
| return AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=True, |
| result=first_result.get('snippet', first_result.get('content', 'Web search completed')), |
| confidence=0.50, |
| reasoning="Web search completed but analysis failed", |
| tools_used=[web_result], |
| model_used="fallback", |
| processing_time=web_result.execution_time, |
| cost_estimate=0.0 |
| ) |
| else: |
| |
| return AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=False, |
| result="Web search returned no useful results", |
| confidence=0.20, |
| reasoning=f"Search failed or empty: {search_data.get('note', 'Unknown reason')}", |
| tools_used=[web_result], |
| model_used="none", |
| processing_time=web_result.execution_time, |
| cost_estimate=0.0 |
| ) |
| |
| def _analyze_url_content_result(self, state: GAIAAgentState, web_result: ToolResult) -> AgentResult: |
| """Analyze extracted URL content""" |
| |
| content_data = web_result.result |
| |
| analysis_prompt = f""" |
| Based on this web page content, please answer the following question: |
| |
| Question: {state.question} |
| |
| Page Title: {content_data.get('title', '')} |
| Page URL: {content_data.get('url', '')} |
| Content: {content_data.get('content', '')[:1000]}... |
| |
| Please provide a direct answer based on the page content. |
| """ |
| |
| model_tier = ModelTier.MAIN |
| llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=400) |
| |
| if llm_result.success: |
| return AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=True, |
| result=llm_result.response, |
| confidence=0.85, |
| reasoning="Analyzed content from specific URL", |
| tools_used=[web_result], |
| model_used=llm_result.model_used, |
| processing_time=web_result.execution_time + llm_result.response_time, |
| cost_estimate=llm_result.cost_estimate |
| ) |
| else: |
| return AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=True, |
| result=content_data.get('content', 'Content extracted')[:200], |
| confidence=0.60, |
| reasoning="URL content extracted but analysis failed", |
| tools_used=[web_result], |
| model_used="fallback", |
| processing_time=web_result.execution_time, |
| cost_estimate=0.0 |
| ) |
| |
| def _analyze_multi_source_result(self, state: GAIAAgentState, sources: List) -> AgentResult: |
| """Analyze results from multiple sources""" |
| |
| source_summaries = [] |
| for source_type, source_data in sources: |
| if source_type == "Wikipedia": |
| source_summaries.append(f"Wikipedia: {source_data.get('summary', '')[:200]}") |
| else: |
| source_summaries.append(f"Web: {source_data.get('snippet', '')[:200]}") |
| |
| analysis_prompt = f""" |
| Based on these multiple sources, please answer the following question: |
| |
| Question: {state.question} |
| |
| Sources: |
| {chr(10).join(source_summaries)} |
| |
| Please synthesize the information and provide a comprehensive answer. |
| """ |
| |
| model_tier = ModelTier.COMPLEX |
| llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=500) |
| |
| if llm_result.success: |
| return AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=True, |
| result=llm_result.response, |
| confidence=0.85, |
| reasoning=f"Synthesized information from {len(sources)} sources", |
| tools_used=[], |
| model_used=llm_result.model_used, |
| processing_time=llm_result.response_time, |
| cost_estimate=llm_result.cost_estimate |
| ) |
| else: |
| |
| first_source = sources[0][1] if sources else {} |
| content = first_source.get('summary') or first_source.get('snippet', 'Multi-source research completed') |
| return AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=True, |
| result=content, |
| confidence=0.60, |
| reasoning="Multi-source research completed but synthesis failed", |
| tools_used=[], |
| model_used="fallback", |
| processing_time=0.0, |
| cost_estimate=0.0 |
| ) |
| |
| def _create_failure_result(self, error_message: str) -> AgentResult: |
| """Create a failure result""" |
| return AgentResult( |
| agent_role=AgentRole.WEB_RESEARCHER, |
| success=False, |
| result=error_message, |
| confidence=0.0, |
| reasoning=error_message, |
| model_used="error", |
| processing_time=0.0, |
| cost_estimate=0.0 |
| ) |