Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Web Search Tool for GAIA Agent System | |
| Handles web searches using DuckDuckGo and content extraction from URLs | |
| """ | |
| import re | |
| import logging | |
| import time | |
| from typing import Dict, List, Optional, Any | |
| from urllib.parse import urlparse, urljoin | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from duckduckgo_search import DDGS | |
| from tools import BaseTool | |
| logger = logging.getLogger(__name__) | |
| class WebSearchResult: | |
| """Container for web search results""" | |
| def __init__(self, title: str, url: str, snippet: str, content: str = ""): | |
| self.title = title | |
| self.url = url | |
| self.snippet = snippet | |
| self.content = content | |
| def to_dict(self) -> Dict[str, str]: | |
| return { | |
| "title": self.title, | |
| "url": self.url, | |
| "snippet": self.snippet, | |
| "content": self.content[:1500] + "..." if len(self.content) > 1500 else self.content | |
| } | |
| class WebSearchTool(BaseTool): | |
| """ | |
| Web search tool using DuckDuckGo | |
| Handles searches, URL content extraction, and result filtering | |
| """ | |
| def __init__(self): | |
| super().__init__("web_search") | |
| # Configure requests session for web scraping | |
| self.session = requests.Session() | |
| self.session.headers.update({ | |
| 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| }) | |
| self.session.timeout = 10 | |
| def _execute_impl(self, input_data: Any, **kwargs) -> Dict[str, Any]: | |
| """ | |
| Execute web search operations based on input type | |
| Args: | |
| input_data: Can be: | |
| - str: Search query or URL to extract content from | |
| - dict: {"query": str, "action": str, "limit": int, "extract_content": bool} | |
| """ | |
| if isinstance(input_data, str): | |
| # Handle both search queries and URLs | |
| if self._is_url(input_data): | |
| return self._extract_content_from_url(input_data) | |
| else: | |
| return self._search_web(input_data) | |
| elif isinstance(input_data, dict): | |
| query = input_data.get("query", "") | |
| action = input_data.get("action", "search") | |
| limit = input_data.get("limit", 5) | |
| extract_content = input_data.get("extract_content", False) | |
| if action == "search": | |
| return self._search_web(query, limit, extract_content) | |
| elif action == "extract": | |
| return self._extract_content_from_url(query) | |
| else: | |
| raise ValueError(f"Unknown action: {action}") | |
| else: | |
| raise ValueError(f"Unsupported input type: {type(input_data)}") | |
| def _is_url(self, text: str) -> bool: | |
| """Check if text is a URL""" | |
| return bool(re.match(r'https?://', text)) | |
| def _search_web(self, query: str, limit: int = 5, extract_content: bool = False) -> Dict[str, Any]: | |
| """ | |
| Search the web using DuckDuckGo with enhanced rate limiting handling | |
| """ | |
| for attempt in range(3): | |
| try: | |
| logger.info(f"Searching web for: {query} (attempt {attempt + 1}/3)") | |
| # Progressive delays to handle rate limiting | |
| if attempt > 0: | |
| delay = 5 * (2 ** (attempt - 1)) # 5s, 10s delays | |
| logger.info(f"Waiting {delay}s before retry due to rate limiting...") | |
| time.sleep(delay) | |
| with DDGS() as ddgs: | |
| # Use DuckDuckGo search with proper parameters | |
| search_results = list(ddgs.text( | |
| keywords=query, | |
| max_results=limit, | |
| region='us-en', | |
| safesearch='moderate' | |
| )) | |
| if not search_results: | |
| if attempt < 2: | |
| logger.warning(f"No results on attempt {attempt + 1}, retrying...") | |
| continue | |
| else: | |
| return { | |
| "query": query, | |
| "found": False, | |
| "message": "No web search results found after retries", | |
| "results": [] | |
| } | |
| results = [] | |
| for result in search_results: | |
| try: | |
| web_result = WebSearchResult( | |
| title=result.get('title', 'No title'), | |
| url=result.get('href', ''), | |
| snippet=result.get('body', 'No description') | |
| ) | |
| # Optionally extract full content from each URL | |
| if extract_content and web_result.url: | |
| try: | |
| content_result = self._extract_content_from_url(web_result.url) | |
| if content_result.get('found'): | |
| web_result.content = content_result['content'][:1000] # Limit content size | |
| except Exception as e: | |
| logger.warning(f"Failed to extract content from {web_result.url}: {e}") | |
| # Continue without content extraction rather than failing | |
| results.append(web_result.to_dict()) | |
| except Exception as result_error: | |
| logger.warning(f"Error processing search result: {result_error}") | |
| # Continue with other results rather than failing entire search | |
| continue | |
| # Return successful results even if some individual results failed | |
| return { | |
| "query": query, | |
| "found": len(results) > 0, | |
| "results": results, | |
| "total_results": len(results), | |
| "message": f"Found {len(results)} web search results" | |
| } | |
| except Exception as e: | |
| error_msg = str(e) | |
| if "ratelimit" in error_msg.lower() or "rate limit" in error_msg.lower() or "403" in error_msg or "202" in error_msg or "429" in error_msg: | |
| logger.warning(f"Web search attempt {attempt + 1} failed: {error_msg}") | |
| if attempt < 2: | |
| continue | |
| else: | |
| logger.error(f"Web search attempt {attempt + 1} failed with non-rate-limit error: {error_msg}") | |
| if attempt < 2: | |
| continue | |
| # If all attempts failed, try fallback search strategy | |
| logger.warning("All DuckDuckGo attempts failed, trying fallback search strategy...") | |
| return self._fallback_search(query) | |
| def _fallback_search(self, query: str) -> Dict[str, Any]: | |
| """ | |
| Fallback search strategy when DuckDuckGo is completely unavailable | |
| """ | |
| try: | |
| # Try a simple Wikipedia search as fallback | |
| import wikipedia | |
| wikipedia.set_lang("en") | |
| # Extract key terms from query for Wikipedia search | |
| search_terms = query.replace("site:", "").strip() | |
| try: | |
| # Search Wikipedia pages | |
| wiki_results = wikipedia.search(search_terms, results=3) | |
| if wiki_results: | |
| fallback_results = [] | |
| for i, page_title in enumerate(wiki_results[:2], 1): | |
| try: | |
| page = wikipedia.page(page_title) | |
| summary = page.summary[:200] + "..." if len(page.summary) > 200 else page.summary | |
| web_result = WebSearchResult( | |
| title=f"{page_title} (Wikipedia)", | |
| url=page.url, | |
| snippet=summary | |
| ) | |
| fallback_results.append(web_result.to_dict()) | |
| except: | |
| continue | |
| if fallback_results: | |
| return { | |
| "query": query, | |
| "found": True, | |
| "results": fallback_results, | |
| "total_results": len(fallback_results), | |
| "message": f"Using Wikipedia fallback search. Found {len(fallback_results)} results" | |
| } | |
| except: | |
| pass | |
| except ImportError: | |
| pass | |
| # Last resort: return a helpful message | |
| return { | |
| "query": query, | |
| "found": False, | |
| "message": "❌ Web search failed due to rate limiting. Please try again later or provide the information directly.", | |
| "results": [], | |
| "error_type": "search_failure" | |
| } | |
| def _extract_content_from_url(self, url: str) -> Dict[str, Any]: | |
| """ | |
| Extract readable content from a web page | |
| """ | |
| try: | |
| logger.info(f"Extracting content from: {url}") | |
| # Get page content | |
| response = self.session.get(url) | |
| response.raise_for_status() | |
| # Parse with BeautifulSoup | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| # Remove script and style elements | |
| for script in soup(["script", "style", "nav", "header", "footer", "aside"]): | |
| script.decompose() | |
| # Extract title | |
| title = soup.find('title') | |
| title_text = title.get_text().strip() if title else "No title" | |
| # Extract main content | |
| content = self._extract_main_content(soup) | |
| # Extract metadata | |
| meta_description = "" | |
| meta_desc = soup.find('meta', attrs={'name': 'description'}) | |
| if meta_desc: | |
| meta_description = meta_desc.get('content', '') | |
| # Extract links | |
| links = [] | |
| for link in soup.find_all('a', href=True)[:10]: # First 10 links | |
| link_url = urljoin(url, link['href']) | |
| link_text = link.get_text().strip() | |
| if link_text and len(link_text) > 5: # Filter out short/empty links | |
| links.append({"text": link_text, "url": link_url}) | |
| return { | |
| "url": url, | |
| "found": True, | |
| "title": title_text, | |
| "content": content, | |
| "meta_description": meta_description, | |
| "links": links, | |
| "content_length": len(content), | |
| "message": "Successfully extracted content from URL" | |
| } | |
| except requests.exceptions.RequestException as e: | |
| return { | |
| "url": url, | |
| "found": False, | |
| "message": f"Failed to fetch URL: {str(e)}", | |
| "error_type": "network_error" | |
| } | |
| except Exception as e: | |
| return { | |
| "url": url, | |
| "found": False, | |
| "message": f"Failed to extract content: {str(e)}", | |
| "error_type": "parsing_error" | |
| } | |
| def _extract_main_content(self, soup: BeautifulSoup) -> str: | |
| """ | |
| Extract main content from HTML using various strategies | |
| """ | |
| content_parts = [] | |
| # Strategy 1: Look for article/main tags | |
| main_content = soup.find(['article', 'main']) | |
| if main_content: | |
| content_parts.append(main_content.get_text()) | |
| # Strategy 2: Look for content in common div classes | |
| content_selectors = [ | |
| 'div.content', | |
| 'div.article-content', | |
| 'div.post-content', | |
| 'div.entry-content', | |
| 'div.main-content', | |
| 'div#content', | |
| 'div.text' | |
| ] | |
| for selector in content_selectors: | |
| elements = soup.select(selector) | |
| for element in elements: | |
| content_parts.append(element.get_text()) | |
| # Strategy 3: Look for paragraphs in body | |
| if not content_parts: | |
| paragraphs = soup.find_all('p') | |
| for p in paragraphs[:20]: # First 20 paragraphs | |
| text = p.get_text().strip() | |
| if len(text) > 50: # Filter out short paragraphs | |
| content_parts.append(text) | |
| # Clean and combine content | |
| combined_content = '\n\n'.join(content_parts) | |
| # Clean up whitespace and formatting | |
| combined_content = re.sub(r'\n\s*\n', '\n\n', combined_content) # Multiple newlines | |
| combined_content = re.sub(r' +', ' ', combined_content) # Multiple spaces | |
| return combined_content.strip()[:5000] # Limit to 5000 characters | |
| def search_youtube_metadata(self, query: str) -> Dict[str, Any]: | |
| """ | |
| Specialized search for YouTube video information | |
| """ | |
| try: | |
| # Search specifically for YouTube videos | |
| youtube_query = f"site:youtube.com {query}" | |
| with DDGS() as ddgs: | |
| search_results = list(ddgs.text( | |
| keywords=youtube_query, | |
| max_results=3, | |
| region='us-en', | |
| safesearch='moderate' | |
| )) | |
| youtube_results = [] | |
| for result in search_results: | |
| if 'youtube.com/watch' in result.get('href', ''): | |
| video_id = self._extract_youtube_id(result['href']) | |
| youtube_result = { | |
| "title": result.get('title', 'No title'), | |
| "url": result.get('href', ''), | |
| "description": result.get('body', 'No description'), | |
| "video_id": video_id | |
| } | |
| youtube_results.append(youtube_result) | |
| return { | |
| "query": query, | |
| "found": len(youtube_results) > 0, | |
| "results": youtube_results, | |
| "message": f"Found {len(youtube_results)} YouTube videos" | |
| } | |
| except Exception as e: | |
| raise Exception(f"YouTube search failed: {str(e)}") | |
| def _extract_youtube_id(self, url: str) -> str: | |
| """Extract YouTube video ID from URL""" | |
| patterns = [ | |
| r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', | |
| r'(?:embed\/)([0-9A-Za-z_-]{11})', | |
| r'(?:youtu\.be\/)([0-9A-Za-z_-]{11})' | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, url) | |
| if match: | |
| return match.group(1) | |
| return "" | |
| def test_web_search_tool(): | |
| """Test the web search tool with various queries""" | |
| tool = WebSearchTool() | |
| # Test cases | |
| test_cases = [ | |
| "Python programming tutorial", | |
| "https://en.wikipedia.org/wiki/Machine_learning", | |
| {"query": "artificial intelligence news", "action": "search", "limit": 3}, | |
| {"query": "https://www.python.org", "action": "extract"}, | |
| {"query": "OpenAI ChatGPT", "action": "search", "limit": 2, "extract_content": True} | |
| ] | |
| print("🧪 Testing Web Search Tool...") | |
| for i, test_case in enumerate(test_cases, 1): | |
| print(f"\n--- Test {i}: {test_case} ---") | |
| try: | |
| result = tool.execute(test_case) | |
| if result.success: | |
| print(f"✅ Success: {result.result.get('message', 'No message')}") | |
| if result.result.get('found'): | |
| if 'results' in result.result: | |
| print(f" Found {len(result.result['results'])} results") | |
| # Show first result details | |
| if result.result['results']: | |
| first_result = result.result['results'][0] | |
| print(f" First result: {first_result.get('title', 'No title')}") | |
| print(f" URL: {first_result.get('url', 'No URL')}") | |
| elif 'content' in result.result: | |
| print(f" Extracted {len(result.result['content'])} characters") | |
| print(f" Title: {result.result.get('title', 'No title')}") | |
| else: | |
| print(f" Not found: {result.result.get('message', 'Unknown error')}") | |
| else: | |
| print(f"❌ Error: {result.error}") | |
| print(f" Execution time: {result.execution_time:.2f}s") | |
| except Exception as e: | |
| print(f"❌ Exception: {str(e)}") | |
| if __name__ == "__main__": | |
| # Test when run directly | |
| test_web_search_tool() |