#!/usr/bin/env python3 """ Web Search Tool for GAIA Agent System Handles web searches using Tavily API (primary) and Wikipedia (fallback) """ import re import logging import time import os from typing import Dict, List, Optional, Any from urllib.parse import urlparse, urljoin import requests from bs4 import BeautifulSoup from tools import BaseTool logger = logging.getLogger(__name__) class WebSearchResult: """Container for web search results""" def __init__(self, title: str, url: str, snippet: str, content: str = ""): self.title = title self.url = url self.snippet = snippet self.content = content def to_dict(self) -> Dict[str, str]: return { "title": self.title, "url": self.url, "snippet": self.snippet, "content": self.content[:1500] + "..." if len(self.content) > 1500 else self.content } class WebSearchTool(BaseTool): """ Web search tool using Tavily API (primary) and Wikipedia (fallback) Much more reliable than DuckDuckGo with no rate limiting issues """ def __init__(self): super().__init__("web_search") # Configure requests session for web scraping self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' }) self.session.timeout = 10 # Initialize Tavily client if API key is available self.tavily_api_key = os.getenv("TAVILY_API_KEY") self.use_tavily = self.tavily_api_key is not None if self.use_tavily: logger.info("โœ… Tavily API key found - using Tavily for web search") else: logger.info("โ„น๏ธ No Tavily API key found - will use Wikipedia fallback only") def _execute_impl(self, input_data: Any, **kwargs) -> Dict[str, Any]: """ Execute web search operations based on input type Args: input_data: Can be: - str: Search query or URL to extract content from - dict: {"query": str, "action": str, "limit": int, "extract_content": bool} """ if isinstance(input_data, str): # Handle both search queries and URLs if self._is_url(input_data): return self._extract_content_from_url(input_data) else: return self._search_web(input_data) elif isinstance(input_data, dict): query = input_data.get("query", "") action = input_data.get("action", "search") limit = input_data.get("limit", 5) extract_content = input_data.get("extract_content", False) if action == "search": return self._search_web(query, limit, extract_content) elif action == "extract": return self._extract_content_from_url(query) else: raise ValueError(f"Unknown action: {action}") else: raise ValueError(f"Unsupported input type: {type(input_data)}") def _is_url(self, text: str) -> bool: """Check if text is a URL""" return bool(re.match(r'https?://', text)) def _search_web(self, query: str, limit: int = 5, extract_content: bool = False) -> Dict[str, Any]: """ Search the web using Tavily API (primary) or Wikipedia (fallback) """ # Try Tavily first if API key is available if self.use_tavily: try: return self._search_with_tavily(query, limit, extract_content) except Exception as e: logger.warning(f"Tavily search failed, falling back to Wikipedia: {e}") # Fallback to Wikipedia search return self._search_with_wikipedia(query, limit) def _search_with_tavily(self, query: str, limit: int = 5, extract_content: bool = False) -> Dict[str, Any]: """ Search using Tavily Search API - much more reliable than DuckDuckGo """ try: logger.info(f"๐Ÿ” Tavily search for: {query}") # Prepare Tavily API request headers = { "Content-Type": "application/json" } payload = { "api_key": self.tavily_api_key, "query": query, "search_depth": "basic", "include_answer": False, "include_images": False, "include_raw_content": extract_content, "max_results": min(limit, 10) # Tavily supports up to 10 results } # Make API request response = self.session.post( "https://api.tavily.com/search", json=payload, headers=headers, timeout=15 ) response.raise_for_status() tavily_data = response.json() # Process Tavily results results = [] tavily_results = tavily_data.get('results', []) for result in tavily_results: web_result = WebSearchResult( title=result.get('title', 'No title'), url=result.get('url', ''), snippet=result.get('content', 'No description'), content=result.get('raw_content', '') if extract_content else '' ) results.append(web_result.to_dict()) if results: logger.info(f"โœ… Tavily found {len(results)} results") return { "query": query, "found": True, "results": results, "total_results": len(results), "message": f"Found {len(results)} results via Tavily Search API", "search_engine": "tavily" } else: logger.warning("Tavily returned no results, trying Wikipedia fallback") return self._search_with_wikipedia(query, limit) except requests.exceptions.RequestException as e: logger.error(f"Tavily API request failed: {e}") # Fall back to Wikipedia return self._search_with_wikipedia(query, limit) except Exception as e: logger.error(f"Tavily search error: {e}") # Fall back to Wikipedia return self._search_with_wikipedia(query, limit) def _search_with_wikipedia(self, query: str, limit: int = 5) -> Dict[str, Any]: """ Search using Wikipedia as fallback - very reliable and no rate limits """ try: logger.info(f"๐Ÿ“š Wikipedia search for: {query}") # Try to import wikipedia library try: import wikipedia except ImportError: return { "query": query, "found": False, "message": "โŒ No search engines available. Install 'wikipedia' package or configure Tavily API key.", "results": [] } wikipedia.set_lang("en") # Clean up query for Wikipedia search search_terms = query.replace("site:", "").strip() # Search Wikipedia pages wiki_results = wikipedia.search(search_terms, results=min(limit * 2, 10)) if not wiki_results: return { "query": query, "found": False, "message": "No Wikipedia articles found for this query", "results": [], "search_engine": "wikipedia" } results = [] processed = 0 for page_title in wiki_results: if processed >= limit: break try: page = wikipedia.page(page_title) summary = page.summary[:300] + "..." if len(page.summary) > 300 else page.summary web_result = WebSearchResult( title=f"{page_title} (Wikipedia)", url=page.url, snippet=summary, content=page.summary[:1000] + "..." if len(page.summary) > 1000 else page.summary ) results.append(web_result.to_dict()) processed += 1 except wikipedia.exceptions.DisambiguationError as e: # Try the first suggestion from disambiguation try: if e.options: page = wikipedia.page(e.options[0]) summary = page.summary[:300] + "..." if len(page.summary) > 300 else page.summary web_result = WebSearchResult( title=f"{e.options[0]} (Wikipedia)", url=page.url, snippet=summary, content=page.summary[:1000] + "..." if len(page.summary) > 1000 else page.summary ) results.append(web_result.to_dict()) processed += 1 except: continue except wikipedia.exceptions.PageError: # Page doesn't exist, skip continue except Exception as e: # Other Wikipedia errors, skip this page logger.warning(f"Wikipedia page error for '{page_title}': {e}") continue if results: logger.info(f"โœ… Wikipedia found {len(results)} results") return { "query": query, "found": True, "results": results, "total_results": len(results), "message": f"Found {len(results)} Wikipedia articles", "search_engine": "wikipedia" } else: return { "query": query, "found": False, "message": "No accessible Wikipedia articles found for this query", "results": [], "search_engine": "wikipedia" } except Exception as e: logger.error(f"Wikipedia search failed: {e}") return { "query": query, "found": False, "message": f"Search failed: {str(e)}", "results": [], "error_type": "search_failure" } def _extract_content_from_url(self, url: str) -> Dict[str, Any]: """ Extract readable content from a web page """ try: logger.info(f"Extracting content from: {url}") # Get page content response = self.session.get(url) response.raise_for_status() # Parse with BeautifulSoup soup = BeautifulSoup(response.content, 'html.parser') # Remove script and style elements for script in soup(["script", "style", "nav", "header", "footer", "aside"]): script.decompose() # Extract title title = soup.find('title') title_text = title.get_text().strip() if title else "No title" # Extract main content content = self._extract_main_content(soup) # Extract metadata meta_description = "" meta_desc = soup.find('meta', attrs={'name': 'description'}) if meta_desc: meta_description = meta_desc.get('content', '') # Extract links links = [] for link in soup.find_all('a', href=True)[:10]: # First 10 links link_url = urljoin(url, link['href']) link_text = link.get_text().strip() if link_text and len(link_text) > 5: # Filter out short/empty links links.append({"text": link_text, "url": link_url}) return { "url": url, "found": True, "title": title_text, "content": content, "meta_description": meta_description, "links": links, "content_length": len(content), "message": "Successfully extracted content from URL" } except requests.exceptions.RequestException as e: return { "url": url, "found": False, "message": f"Failed to fetch URL: {str(e)}", "error_type": "network_error" } except Exception as e: return { "url": url, "found": False, "message": f"Failed to extract content: {str(e)}", "error_type": "parsing_error" } def _extract_main_content(self, soup: BeautifulSoup) -> str: """ Extract main content from HTML using various strategies """ content_parts = [] # Strategy 1: Look for article/main tags main_content = soup.find(['article', 'main']) if main_content: content_parts.append(main_content.get_text()) # Strategy 2: Look for content in common div classes content_selectors = [ 'div.content', 'div.article-content', 'div.post-content', 'div.entry-content', 'div.main-content', 'div#content', 'div.text' ] for selector in content_selectors: elements = soup.select(selector) for element in elements: content_parts.append(element.get_text()) # Strategy 3: Look for paragraphs in body if not content_parts: paragraphs = soup.find_all('p') for p in paragraphs[:20]: # First 20 paragraphs text = p.get_text().strip() if len(text) > 50: # Filter out short paragraphs content_parts.append(text) # Clean and combine content combined_content = '\n\n'.join(content_parts) # Clean up whitespace and formatting combined_content = re.sub(r'\n\s*\n', '\n\n', combined_content) # Multiple newlines combined_content = re.sub(r' +', ' ', combined_content) # Multiple spaces return combined_content.strip()[:5000] # Limit to 5000 characters def search_youtube_metadata(self, query: str) -> Dict[str, Any]: """ Specialized search for YouTube video information """ try: # Search specifically for YouTube videos youtube_query = f"site:youtube.com {query}" # Use the same search logic but filter for YouTube results search_result = self._search_web(youtube_query, limit=3) if not search_result.get('found'): return search_result youtube_results = [] for result in search_result.get('results', []): if 'youtube.com/watch' in result.get('url', ''): video_id = self._extract_youtube_id(result['url']) youtube_result = { "title": result.get('title', 'No title'), "url": result.get('url', ''), "description": result.get('snippet', 'No description'), "video_id": video_id } youtube_results.append(youtube_result) return { "query": query, "found": len(youtube_results) > 0, "results": youtube_results, "message": f"Found {len(youtube_results)} YouTube videos" } except Exception as e: raise Exception(f"YouTube search failed: {str(e)}") def _extract_youtube_id(self, url: str) -> str: """Extract YouTube video ID from URL""" patterns = [ r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', r'(?:embed\/)([0-9A-Za-z_-]{11})', r'(?:youtu\.be\/)([0-9A-Za-z_-]{11})' ] for pattern in patterns: match = re.search(pattern, url) if match: return match.group(1) return "" def test_web_search_tool(): """Test the web search tool with various queries""" tool = WebSearchTool() # Test cases test_cases = [ "Python programming tutorial", "https://en.wikipedia.org/wiki/Machine_learning", {"query": "artificial intelligence news", "action": "search", "limit": 3}, {"query": "https://www.python.org", "action": "extract"}, {"query": "OpenAI ChatGPT", "action": "search", "limit": 2, "extract_content": True} ] print("๐Ÿงช Testing Web Search Tool...") for i, test_case in enumerate(test_cases, 1): print(f"\n--- Test {i}: {test_case} ---") try: result = tool.execute(test_case) if result.success: print(f"โœ… Success: {result.result.get('message', 'No message')}") search_engine = result.result.get('search_engine', 'unknown') print(f" Search engine: {search_engine}") if result.result.get('found'): if 'results' in result.result: print(f" Found {len(result.result['results'])} results") # Show first result details if result.result['results']: first_result = result.result['results'][0] print(f" First result: {first_result.get('title', 'No title')}") print(f" URL: {first_result.get('url', 'No URL')}") elif 'content' in result.result: print(f" Extracted {len(result.result['content'])} characters") print(f" Title: {result.result.get('title', 'No title')}") else: print(f" Not found: {result.result.get('message', 'Unknown error')}") else: print(f"โŒ Error: {result.error}") print(f" Execution time: {result.execution_time:.2f}s") except Exception as e: print(f"โŒ Exception: {str(e)}") if __name__ == "__main__": # Test when run directly test_web_search_tool()