| | |
| | """ |
| | Web Search Tool for GAIA Agent System |
| | Handles web searches using DuckDuckGo (primary), Tavily API (secondary), and Wikipedia (fallback) |
| | """ |
| |
|
| | import re |
| | import logging |
| | import time |
| | import os |
| | from typing import Dict, List, Optional, Any |
| | from urllib.parse import urlparse, urljoin |
| | import requests |
| | from bs4 import BeautifulSoup |
| |
|
| | from tools import BaseTool |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | class WebSearchResult: |
| | """Container for web search results""" |
| | |
| | def __init__(self, title: str, url: str, snippet: str, content: str = "", source: str = ""): |
| | self.title = title |
| | self.url = url |
| | self.snippet = snippet |
| | self.content = content |
| | self.source = source |
| | |
| | def to_dict(self) -> Dict[str, str]: |
| | return { |
| | "title": self.title, |
| | "url": self.url, |
| | "snippet": self.snippet, |
| | "content": self.content[:1500] + "..." if len(self.content) > 1500 else self.content, |
| | "source": self.source |
| | } |
| |
|
| | class WebSearchTool(BaseTool): |
| | """ |
| | Web search tool using DuckDuckGo (primary), Tavily API (secondary), and Wikipedia (fallback) |
| | Provides multiple search engine options for reliability |
| | """ |
| | |
| | def __init__(self): |
| | super().__init__("web_search") |
| | |
| | |
| | self.session = requests.Session() |
| | self.session.headers.update({ |
| | 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' |
| | }) |
| | self.session.timeout = 10 |
| | |
| | |
| | self.tavily_api_key = os.getenv("TAVILY_API_KEY") |
| | self.use_tavily = self.tavily_api_key is not None |
| | |
| | |
| | try: |
| | from duckduckgo_search import DDGS |
| | self.ddgs = DDGS() |
| | self.use_duckduckgo = True |
| | logger.info("✅ DuckDuckGo search initialized") |
| | except ImportError: |
| | logger.warning("⚠️ DuckDuckGo search not available - install duckduckgo-search package") |
| | self.use_duckduckgo = False |
| | |
| | |
| | try: |
| | import wikipedia |
| | self.wikipedia = wikipedia |
| | self.use_wikipedia = True |
| | logger.info("✅ Wikipedia search initialized") |
| | except ImportError: |
| | logger.warning("⚠️ Wikipedia search not available - install wikipedia package") |
| | self.use_wikipedia = False |
| | |
| | if self.use_tavily: |
| | logger.info("✅ Tavily API key found - using as secondary search") |
| | |
| | |
| | search_engines = [] |
| | if self.use_duckduckgo: |
| | search_engines.append("DuckDuckGo") |
| | if self.use_tavily: |
| | search_engines.append("Tavily") |
| | if self.use_wikipedia: |
| | search_engines.append("Wikipedia") |
| | |
| | logger.info(f"🔍 Available search engines: {', '.join(search_engines)}") |
| | |
| | def _execute_impl(self, input_data: Any, **kwargs) -> Dict[str, Any]: |
| | """ |
| | Execute web search operations based on input type |
| | |
| | Args: |
| | input_data: Can be: |
| | - str: Search query or URL to extract content from |
| | - dict: {"query": str, "action": str, "limit": int, "extract_content": bool} |
| | """ |
| | |
| | if isinstance(input_data, str): |
| | |
| | if self._is_url(input_data): |
| | return self._extract_content_from_url(input_data) |
| | else: |
| | return self._search_web(input_data) |
| | |
| | elif isinstance(input_data, dict): |
| | query = input_data.get("query", "") |
| | action = input_data.get("action", "search") |
| | limit = input_data.get("limit", 5) |
| | extract_content = input_data.get("extract_content", False) |
| | |
| | if action == "search": |
| | return self._search_web(query, limit, extract_content) |
| | elif action == "extract": |
| | return self._extract_content_from_url(query) |
| | else: |
| | raise ValueError(f"Unknown action: {action}") |
| | else: |
| | raise ValueError(f"Unsupported input type: {type(input_data)}") |
| | |
| | def _is_url(self, text: str) -> bool: |
| | """Check if text is a URL""" |
| | return bool(re.match(r'https?://', text)) |
| | |
| | def _extract_search_terms(self, question: str, max_length: int = 200) -> str: |
| | """ |
| | Extract focused search terms from a question |
| | Prioritizes key entities, dates, and specific information |
| | """ |
| | |
| | question_clean = re.sub(r'\b(what|who|when|where|why|how|is|are|was|were|did|do|does|can|could|should|would)\b', '', question.lower()) |
| | |
| | |
| | entities = [] |
| | |
| | |
| | quoted_phrases = re.findall(r'"([^"]+)"', question) |
| | entities.extend(quoted_phrases) |
| | |
| | |
| | proper_nouns = re.findall(r'\b[A-Z][a-zA-Z]*(?:\s+[A-Z][a-zA-Z]*)*\b', question) |
| | entities.extend(proper_nouns[:3]) |
| | |
| | |
| | years = re.findall(r'\b(19|20)\d{2}\b', question) |
| | entities.extend(years) |
| | |
| | |
| | numbers = re.findall(r'\b\d+\b', question) |
| | entities.extend(numbers[:2]) |
| | |
| | |
| | if entities: |
| | search_terms = ' '.join(entities[:6]) |
| | else: |
| | |
| | words = question_clean.split() |
| | |
| | stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by', 'from', 'up', 'about', 'into', 'through', 'during', 'before', 'after', 'above', 'below', 'between', 'among', 'this', 'that', 'these', 'those', 'i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', 'your', 'yours', 'yourself', 'yourselves', 'he', 'him', 'his', 'himself', 'she', 'her', 'hers', 'herself', 'it', 'its', 'itself', 'they', 'them', 'their', 'theirs', 'themselves'} |
| | filtered_words = [w for w in words if w.lower() not in stop_words and len(w) > 2] |
| | search_terms = ' '.join(filtered_words[:8]) |
| | |
| | |
| | if len(search_terms) > max_length: |
| | search_terms = search_terms[:max_length].rsplit(' ', 1)[0] |
| | |
| | |
| | logger.info(f"📝 Extracted search terms: '{search_terms}' from question: '{question[:100]}...'") |
| | |
| | return search_terms.strip() |
| | |
| | def _search_web(self, query: str, limit: int = 5, extract_content: bool = False) -> Dict[str, Any]: |
| | """ |
| | Search the web using available search engines in priority order with improved search terms |
| | """ |
| | |
| | |
| | search_query = self._extract_search_terms(query, max_length=200) |
| | |
| | |
| | if self.use_duckduckgo: |
| | try: |
| | ddg_result = self._search_with_duckduckgo(search_query, limit, extract_content) |
| | if ddg_result.get('success') and ddg_result.get('count', 0) > 0: |
| | return { |
| | 'success': True, |
| | 'found': True, |
| | 'results': [r.to_dict() if hasattr(r, 'to_dict') else r for r in ddg_result['results']], |
| | 'query': query, |
| | 'source': 'DuckDuckGo', |
| | 'total_found': ddg_result['count'] |
| | } |
| | except Exception as e: |
| | logger.warning(f"DuckDuckGo search failed, trying Tavily: {e}") |
| | |
| | |
| | if self.use_tavily: |
| | try: |
| | tavily_result = self._search_with_tavily(search_query, limit, extract_content) |
| | if tavily_result.get('success') and tavily_result.get('count', 0) > 0: |
| | return { |
| | 'success': True, |
| | 'found': True, |
| | 'results': [r.to_dict() if hasattr(r, 'to_dict') else r for r in tavily_result['results']], |
| | 'query': query, |
| | 'source': 'Tavily', |
| | 'total_found': tavily_result['count'] |
| | } |
| | except Exception as e: |
| | logger.warning(f"Tavily search failed, trying Wikipedia: {e}") |
| | |
| | |
| | if self.use_wikipedia: |
| | try: |
| | wiki_result = self._search_with_wikipedia(search_query, limit) |
| | if wiki_result.get('success') and wiki_result.get('count', 0) > 0: |
| | return { |
| | 'success': True, |
| | 'found': True, |
| | 'results': [r.to_dict() if hasattr(r, 'to_dict') else r for r in wiki_result['results']], |
| | 'query': query, |
| | 'source': 'Wikipedia', |
| | 'total_found': wiki_result['count'] |
| | } |
| | except Exception as e: |
| | logger.warning(f"Wikipedia search failed: {e}") |
| | |
| | |
| | logger.warning("All search engines failed, returning empty results") |
| | return { |
| | "query": query, |
| | "found": False, |
| | "success": False, |
| | "message": "❌ All search engines failed or returned no results.", |
| | "results": [], |
| | "source": "none", |
| | "total_found": 0 |
| | } |
| | |
| | def _search_with_duckduckgo(self, query: str, limit: int = 5, extract_content: bool = False) -> Dict[str, Any]: |
| | """ |
| | Search using DuckDuckGo - primary search engine with improved error handling and rate limiting |
| | """ |
| | try: |
| | logger.info(f"🦆 DuckDuckGo search for: {query}") |
| | |
| | |
| | time.sleep(0.5) |
| | |
| | |
| | max_retries = 2 |
| | for attempt in range(max_retries): |
| | try: |
| | ddg_results = list(self.ddgs.text(query, max_results=min(limit, 10))) |
| | break |
| | except Exception as retry_error: |
| | if attempt < max_retries - 1: |
| | logger.warning(f"DuckDuckGo attempt {attempt + 1} failed, retrying in {2 ** attempt}s: {retry_error}") |
| | time.sleep(2 ** attempt) |
| | continue |
| | else: |
| | raise retry_error |
| | |
| | if not ddg_results: |
| | logger.warning("DuckDuckGo returned no results") |
| | return self._search_with_fallback(query, limit) |
| | |
| | |
| | results = [] |
| | for result in ddg_results: |
| | web_result = WebSearchResult( |
| | title=result.get('title', 'No title'), |
| | url=result.get('href', ''), |
| | snippet=result.get('body', 'No description'), |
| | source='DuckDuckGo' |
| | ) |
| | results.append(web_result) |
| | |
| | logger.info(f"✅ DuckDuckGo found {len(results)} results") |
| | |
| | return { |
| | 'success': True, |
| | 'results': results, |
| | 'source': 'DuckDuckGo', |
| | 'query': query, |
| | 'count': len(results) |
| | } |
| | |
| | except Exception as e: |
| | logger.warning(f"DuckDuckGo search failed: {str(e)}") |
| | |
| | if "ratelimit" in str(e).lower() or "429" in str(e) or "202" in str(e): |
| | logger.warning("Rate limiting detected, adding delay before fallback") |
| | time.sleep(2.0) |
| | return self._search_with_fallback(query, limit) |
| | |
| | def _search_with_fallback(self, query: str, limit: int = 5) -> Dict[str, Any]: |
| | """Enhanced fallback search when DuckDuckGo fails""" |
| | |
| | logger.info(f"🔄 Using fallback search engines for: {query}") |
| | |
| | |
| | if hasattr(self, 'tavily') and self.tavily: |
| | try: |
| | logger.info("📡 Trying Tavily API search") |
| | tavily_result = self.tavily.search(query, max_results=limit) |
| | |
| | if tavily_result and 'results' in tavily_result: |
| | results = [] |
| | for result in tavily_result['results'][:limit]: |
| | web_result = WebSearchResult( |
| | title=result.get('title', 'No title'), |
| | url=result.get('url', ''), |
| | snippet=result.get('content', 'No description'), |
| | source='Tavily' |
| | ) |
| | results.append(web_result) |
| | |
| | if results: |
| | logger.info(f"✅ Tavily found {len(results)} results") |
| | return { |
| | 'success': True, |
| | 'results': results, |
| | 'source': 'Tavily', |
| | 'query': query, |
| | 'count': len(results) |
| | } |
| | except Exception as e: |
| | logger.warning(f"Tavily search failed: {str(e)}") |
| | |
| | |
| | logger.info("📚 Wikipedia search for: " + query) |
| | try: |
| | wiki_results = self._search_with_wikipedia(query, limit) |
| | if wiki_results and wiki_results.get('success'): |
| | logger.info(f"✅ Wikipedia found {wiki_results.get('count', 0)} results") |
| | return wiki_results |
| | except Exception as e: |
| | logger.warning(f"Wikipedia fallback failed: {str(e)}") |
| | |
| | |
| | logger.warning("All search engines failed, returning empty results") |
| | return { |
| | 'success': True, |
| | 'results': [], |
| | 'source': 'none', |
| | 'query': query, |
| | 'count': 0, |
| | 'note': 'All search engines failed' |
| | } |
| | |
| | def _search_with_tavily(self, query: str, limit: int = 5, extract_content: bool = False) -> Dict[str, Any]: |
| | """ |
| | Search using Tavily Search API - secondary search engine |
| | """ |
| | try: |
| | logger.info(f"🔍 Tavily search for: {query}") |
| | |
| | |
| | headers = { |
| | "Content-Type": "application/json" |
| | } |
| | |
| | payload = { |
| | "api_key": self.tavily_api_key, |
| | "query": query, |
| | "search_depth": "basic", |
| | "include_answer": False, |
| | "include_images": False, |
| | "include_raw_content": extract_content, |
| | "max_results": min(limit, 10) |
| | } |
| | |
| | |
| | response = self.session.post( |
| | "https://api.tavily.com/search", |
| | json=payload, |
| | headers=headers, |
| | timeout=15 |
| | ) |
| | response.raise_for_status() |
| | |
| | tavily_data = response.json() |
| | |
| | |
| | results = [] |
| | tavily_results = tavily_data.get('results', []) |
| | |
| | for result in tavily_results: |
| | web_result = WebSearchResult( |
| | title=result.get('title', 'No title'), |
| | url=result.get('url', ''), |
| | snippet=result.get('content', 'No description'), |
| | content=result.get('raw_content', '') if extract_content else '' |
| | ) |
| | results.append(web_result) |
| | |
| | if results: |
| | logger.info(f"✅ Tavily found {len(results)} results") |
| | return { |
| | 'success': True, |
| | 'results': results, |
| | 'source': 'Tavily', |
| | 'query': query, |
| | 'count': len(results) |
| | } |
| | else: |
| | logger.warning("Tavily returned no results") |
| | |
| | if self.use_wikipedia: |
| | return self._search_with_wikipedia(query, limit) |
| | |
| | except requests.exceptions.RequestException as e: |
| | logger.error(f"Tavily API request failed: {e}") |
| | except Exception as e: |
| | logger.error(f"Tavily search error: {e}") |
| | |
| | |
| | if self.use_wikipedia: |
| | return self._search_with_wikipedia(query, limit) |
| | |
| | return { |
| | 'success': False, |
| | 'results': [], |
| | 'source': 'Tavily', |
| | 'query': query, |
| | 'count': 0, |
| | 'note': 'Tavily search failed and no fallback available' |
| | } |
| | |
| | def _search_with_wikipedia(self, query: str, limit: int = 5) -> Dict[str, Any]: |
| | """ |
| | Search using Wikipedia - fallback search engine for factual information |
| | """ |
| | try: |
| | logger.info(f"📚 Wikipedia search for: {query}") |
| | |
| | self.wikipedia.set_lang("en") |
| | |
| | |
| | search_terms = self._extract_search_terms(query, max_length=100) |
| | |
| | |
| | wiki_results = self.wikipedia.search(search_terms, results=min(limit * 2, 10)) |
| | |
| | if not wiki_results: |
| | return { |
| | 'success': False, |
| | 'results': [], |
| | 'source': 'Wikipedia', |
| | 'query': query, |
| | 'count': 0, |
| | 'note': 'No Wikipedia articles found for this query' |
| | } |
| | |
| | results = [] |
| | processed = 0 |
| | |
| | for page_title in wiki_results: |
| | if processed >= limit: |
| | break |
| | |
| | try: |
| | page = self.wikipedia.page(page_title) |
| | summary = page.summary[:300] + "..." if len(page.summary) > 300 else page.summary |
| | |
| | web_result = WebSearchResult( |
| | title=f"{page_title} (Wikipedia)", |
| | url=page.url, |
| | snippet=summary, |
| | content=page.summary[:1000] + "..." if len(page.summary) > 1000 else page.summary |
| | ) |
| | results.append(web_result) |
| | processed += 1 |
| | |
| | except self.wikipedia.exceptions.DisambiguationError as e: |
| | |
| | try: |
| | if e.options: |
| | page = self.wikipedia.page(e.options[0]) |
| | summary = page.summary[:300] + "..." if len(page.summary) > 300 else page.summary |
| | |
| | web_result = WebSearchResult( |
| | title=f"{e.options[0]} (Wikipedia)", |
| | url=page.url, |
| | snippet=summary, |
| | content=page.summary[:1000] + "..." if len(page.summary) > 1000 else page.summary |
| | ) |
| | results.append(web_result) |
| | processed += 1 |
| | except: |
| | continue |
| | |
| | except self.wikipedia.exceptions.PageError: |
| | |
| | continue |
| | except Exception as e: |
| | |
| | logger.warning(f"Wikipedia page error for '{page_title}': {e}") |
| | continue |
| | |
| | if results: |
| | logger.info(f"✅ Wikipedia found {len(results)} results") |
| | return { |
| | 'success': True, |
| | 'results': results, |
| | 'source': 'Wikipedia', |
| | 'query': query, |
| | 'count': len(results) |
| | } |
| | else: |
| | return { |
| | 'success': False, |
| | 'results': [], |
| | 'source': 'Wikipedia', |
| | 'query': query, |
| | 'count': 0, |
| | 'note': 'No accessible Wikipedia articles found for this query' |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Wikipedia search failed: {e}") |
| | return { |
| | 'success': False, |
| | 'results': [], |
| | 'source': 'Wikipedia', |
| | 'query': query, |
| | 'count': 0, |
| | 'note': f"Wikipedia search failed: {str(e)}" |
| | } |
| | |
| | def _extract_content_from_url(self, url: str) -> Dict[str, Any]: |
| | """ |
| | Extract readable content from a web page |
| | """ |
| | try: |
| | logger.info(f"Extracting content from: {url}") |
| | |
| | |
| | response = self.session.get(url) |
| | response.raise_for_status() |
| | |
| | |
| | soup = BeautifulSoup(response.content, 'html.parser') |
| | |
| | |
| | for script in soup(["script", "style", "nav", "header", "footer", "aside"]): |
| | script.decompose() |
| | |
| | |
| | title = soup.find('title') |
| | title_text = title.get_text().strip() if title else "No title" |
| | |
| | |
| | content = self._extract_main_content(soup) |
| | |
| | |
| | meta_description = "" |
| | meta_desc = soup.find('meta', attrs={'name': 'description'}) |
| | if meta_desc: |
| | meta_description = meta_desc.get('content', '') |
| | |
| | |
| | links = [] |
| | for link in soup.find_all('a', href=True)[:10]: |
| | link_url = urljoin(url, link['href']) |
| | link_text = link.get_text().strip() |
| | if link_text and len(link_text) > 5: |
| | links.append({"text": link_text, "url": link_url}) |
| | |
| | return { |
| | "url": url, |
| | "found": True, |
| | "title": title_text, |
| | "content": content, |
| | "meta_description": meta_description, |
| | "links": links, |
| | "content_length": len(content), |
| | "message": "Successfully extracted content from URL" |
| | } |
| | |
| | except requests.exceptions.RequestException as e: |
| | return { |
| | "url": url, |
| | "found": False, |
| | "message": f"Failed to fetch URL: {str(e)}", |
| | "error_type": "network_error" |
| | } |
| | except Exception as e: |
| | return { |
| | "url": url, |
| | "found": False, |
| | "message": f"Failed to extract content: {str(e)}", |
| | "error_type": "parsing_error" |
| | } |
| | |
| | def _extract_main_content(self, soup: BeautifulSoup) -> str: |
| | """ |
| | Extract main content from HTML using various strategies |
| | """ |
| | content_parts = [] |
| | |
| | |
| | main_content = soup.find(['article', 'main']) |
| | if main_content: |
| | content_parts.append(main_content.get_text()) |
| | |
| | |
| | content_selectors = [ |
| | 'div.content', |
| | 'div.article-content', |
| | 'div.post-content', |
| | 'div.entry-content', |
| | 'div.main-content', |
| | 'div#content', |
| | 'div.text' |
| | ] |
| | |
| | for selector in content_selectors: |
| | elements = soup.select(selector) |
| | for element in elements: |
| | content_parts.append(element.get_text()) |
| | |
| | |
| | if not content_parts: |
| | paragraphs = soup.find_all('p') |
| | for p in paragraphs[:20]: |
| | text = p.get_text().strip() |
| | if len(text) > 50: |
| | content_parts.append(text) |
| | |
| | |
| | combined_content = '\n\n'.join(content_parts) |
| | |
| | |
| | combined_content = re.sub(r'\n\s*\n', '\n\n', combined_content) |
| | combined_content = re.sub(r' +', ' ', combined_content) |
| | |
| | return combined_content.strip()[:5000] |
| |
|
| | def test_web_search_tool(): |
| | """Test the web search tool with various queries""" |
| | tool = WebSearchTool() |
| | |
| | |
| | test_cases = [ |
| | "Python programming tutorial", |
| | "Mercedes Sosa studio albums 2000 2009", |
| | "artificial intelligence recent developments", |
| | "climate change latest research", |
| | "https://en.wikipedia.org/wiki/Machine_learning" |
| | ] |
| | |
| | print("🧪 Testing Web Search Tool...") |
| | |
| | for i, test_case in enumerate(test_cases, 1): |
| | print(f"\n--- Test {i}: {test_case} ---") |
| | try: |
| | result = tool.execute(test_case) |
| | |
| | if result.success: |
| | print(f"✅ Success: {result.result.get('message', 'No message')}") |
| | search_engine = result.result.get('source', 'unknown') |
| | print(f" Search engine: {search_engine}") |
| | |
| | if result.result.get('found'): |
| | if 'results' in result.result: |
| | print(f" Found {len(result.result['results'])} results") |
| | |
| | if result.result['results']: |
| | first_result = result.result['results'][0] |
| | print(f" First result: {first_result.get('title', 'No title')}") |
| | print(f" URL: {first_result.get('url', 'No URL')}") |
| | elif 'content' in result.result: |
| | print(f" Extracted {len(result.result['content'])} characters") |
| | print(f" Title: {result.result.get('title', 'No title')}") |
| | else: |
| | print(f" Not found: {result.result.get('message', 'Unknown error')}") |
| | else: |
| | print(f"❌ Error: {result.error}") |
| | |
| | print(f" Execution time: {result.execution_time:.2f}s") |
| | |
| | except Exception as e: |
| | print(f"❌ Exception: {str(e)}") |
| |
|
| | if __name__ == "__main__": |
| | |
| | test_web_search_tool() |