| | |
| | """ |
| | Web Search Tool for GAIA Agent System |
| | Handles web searches using DuckDuckGo and content extraction from URLs |
| | """ |
| |
|
| | import re |
| | import logging |
| | import time |
| | from typing import Dict, List, Optional, Any |
| | from urllib.parse import urlparse, urljoin |
| | import requests |
| | from bs4 import BeautifulSoup |
| | from duckduckgo_search import DDGS |
| |
|
| | from tools import BaseTool |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | class WebSearchResult: |
| | """Container for web search results""" |
| | |
| | def __init__(self, title: str, url: str, snippet: str, content: str = ""): |
| | self.title = title |
| | self.url = url |
| | self.snippet = snippet |
| | self.content = content |
| | |
| | def to_dict(self) -> Dict[str, str]: |
| | return { |
| | "title": self.title, |
| | "url": self.url, |
| | "snippet": self.snippet, |
| | "content": self.content[:1500] + "..." if len(self.content) > 1500 else self.content |
| | } |
| |
|
| | class WebSearchTool(BaseTool): |
| | """ |
| | Web search tool using DuckDuckGo |
| | Handles searches, URL content extraction, and result filtering |
| | """ |
| | |
| | def __init__(self): |
| | super().__init__("web_search") |
| | |
| | |
| | self.session = requests.Session() |
| | self.session.headers.update({ |
| | 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' |
| | }) |
| | self.session.timeout = 10 |
| | |
| | def _execute_impl(self, input_data: Any, **kwargs) -> Dict[str, Any]: |
| | """ |
| | Execute web search operations based on input type |
| | |
| | Args: |
| | input_data: Can be: |
| | - str: Search query or URL to extract content from |
| | - dict: {"query": str, "action": str, "limit": int, "extract_content": bool} |
| | """ |
| | |
| | if isinstance(input_data, str): |
| | |
| | if self._is_url(input_data): |
| | return self._extract_content_from_url(input_data) |
| | else: |
| | return self._search_web(input_data) |
| | |
| | elif isinstance(input_data, dict): |
| | query = input_data.get("query", "") |
| | action = input_data.get("action", "search") |
| | limit = input_data.get("limit", 5) |
| | extract_content = input_data.get("extract_content", False) |
| | |
| | if action == "search": |
| | return self._search_web(query, limit, extract_content) |
| | elif action == "extract": |
| | return self._extract_content_from_url(query) |
| | else: |
| | raise ValueError(f"Unknown action: {action}") |
| | else: |
| | raise ValueError(f"Unsupported input type: {type(input_data)}") |
| | |
| | def _is_url(self, text: str) -> bool: |
| | """Check if text is a URL""" |
| | return bool(re.match(r'https?://', text)) |
| | |
| | def _search_web(self, query: str, limit: int = 5, extract_content: bool = False) -> Dict[str, Any]: |
| | """ |
| | Search the web using DuckDuckGo |
| | """ |
| | try: |
| | logger.info(f"Searching web for: {query}") |
| | |
| | |
| | with DDGS() as ddgs: |
| | search_results = list(ddgs.text( |
| | keywords=query, |
| | max_results=limit, |
| | region='us-en', |
| | safesearch='moderate' |
| | )) |
| | |
| | if not search_results: |
| | return { |
| | "query": query, |
| | "found": False, |
| | "message": "No web search results found", |
| | "results": [] |
| | } |
| | |
| | results = [] |
| | for result in search_results: |
| | web_result = WebSearchResult( |
| | title=result.get('title', 'No title'), |
| | url=result.get('href', ''), |
| | snippet=result.get('body', 'No description') |
| | ) |
| | |
| | |
| | if extract_content and web_result.url: |
| | try: |
| | content_result = self._extract_content_from_url(web_result.url) |
| | if content_result.get('found'): |
| | web_result.content = content_result['content'][:1000] |
| | except Exception as e: |
| | logger.warning(f"Failed to extract content from {web_result.url}: {e}") |
| | |
| | results.append(web_result.to_dict()) |
| | |
| | return { |
| | "query": query, |
| | "found": True, |
| | "results": results, |
| | "total_results": len(results), |
| | "message": f"Found {len(results)} web search results" |
| | } |
| | |
| | except Exception as e: |
| | raise Exception(f"Web search failed: {str(e)}") |
| | |
| | def _extract_content_from_url(self, url: str) -> Dict[str, Any]: |
| | """ |
| | Extract readable content from a web page |
| | """ |
| | try: |
| | logger.info(f"Extracting content from: {url}") |
| | |
| | |
| | response = self.session.get(url) |
| | response.raise_for_status() |
| | |
| | |
| | soup = BeautifulSoup(response.content, 'html.parser') |
| | |
| | |
| | for script in soup(["script", "style", "nav", "header", "footer", "aside"]): |
| | script.decompose() |
| | |
| | |
| | title = soup.find('title') |
| | title_text = title.get_text().strip() if title else "No title" |
| | |
| | |
| | content = self._extract_main_content(soup) |
| | |
| | |
| | meta_description = "" |
| | meta_desc = soup.find('meta', attrs={'name': 'description'}) |
| | if meta_desc: |
| | meta_description = meta_desc.get('content', '') |
| | |
| | |
| | links = [] |
| | for link in soup.find_all('a', href=True)[:10]: |
| | link_url = urljoin(url, link['href']) |
| | link_text = link.get_text().strip() |
| | if link_text and len(link_text) > 5: |
| | links.append({"text": link_text, "url": link_url}) |
| | |
| | return { |
| | "url": url, |
| | "found": True, |
| | "title": title_text, |
| | "content": content, |
| | "meta_description": meta_description, |
| | "links": links, |
| | "content_length": len(content), |
| | "message": "Successfully extracted content from URL" |
| | } |
| | |
| | except requests.exceptions.RequestException as e: |
| | return { |
| | "url": url, |
| | "found": False, |
| | "message": f"Failed to fetch URL: {str(e)}", |
| | "error_type": "network_error" |
| | } |
| | except Exception as e: |
| | return { |
| | "url": url, |
| | "found": False, |
| | "message": f"Failed to extract content: {str(e)}", |
| | "error_type": "parsing_error" |
| | } |
| | |
| | def _extract_main_content(self, soup: BeautifulSoup) -> str: |
| | """ |
| | Extract main content from HTML using various strategies |
| | """ |
| | content_parts = [] |
| | |
| | |
| | main_content = soup.find(['article', 'main']) |
| | if main_content: |
| | content_parts.append(main_content.get_text()) |
| | |
| | |
| | content_selectors = [ |
| | 'div.content', |
| | 'div.article-content', |
| | 'div.post-content', |
| | 'div.entry-content', |
| | 'div.main-content', |
| | 'div#content', |
| | 'div.text' |
| | ] |
| | |
| | for selector in content_selectors: |
| | elements = soup.select(selector) |
| | for element in elements: |
| | content_parts.append(element.get_text()) |
| | |
| | |
| | if not content_parts: |
| | paragraphs = soup.find_all('p') |
| | for p in paragraphs[:20]: |
| | text = p.get_text().strip() |
| | if len(text) > 50: |
| | content_parts.append(text) |
| | |
| | |
| | combined_content = '\n\n'.join(content_parts) |
| | |
| | |
| | combined_content = re.sub(r'\n\s*\n', '\n\n', combined_content) |
| | combined_content = re.sub(r' +', ' ', combined_content) |
| | |
| | return combined_content.strip()[:5000] |
| | |
| | def search_youtube_metadata(self, query: str) -> Dict[str, Any]: |
| | """ |
| | Specialized search for YouTube video information |
| | """ |
| | try: |
| | |
| | youtube_query = f"site:youtube.com {query}" |
| | |
| | with DDGS() as ddgs: |
| | search_results = list(ddgs.text( |
| | keywords=youtube_query, |
| | max_results=3, |
| | region='us-en', |
| | safesearch='moderate' |
| | )) |
| | |
| | youtube_results = [] |
| | for result in search_results: |
| | if 'youtube.com/watch' in result.get('href', ''): |
| | video_id = self._extract_youtube_id(result['href']) |
| | |
| | youtube_result = { |
| | "title": result.get('title', 'No title'), |
| | "url": result.get('href', ''), |
| | "description": result.get('body', 'No description'), |
| | "video_id": video_id |
| | } |
| | youtube_results.append(youtube_result) |
| | |
| | return { |
| | "query": query, |
| | "found": len(youtube_results) > 0, |
| | "results": youtube_results, |
| | "message": f"Found {len(youtube_results)} YouTube videos" |
| | } |
| | |
| | except Exception as e: |
| | raise Exception(f"YouTube search failed: {str(e)}") |
| | |
| | def _extract_youtube_id(self, url: str) -> str: |
| | """Extract YouTube video ID from URL""" |
| | patterns = [ |
| | r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', |
| | r'(?:embed\/)([0-9A-Za-z_-]{11})', |
| | r'(?:youtu\.be\/)([0-9A-Za-z_-]{11})' |
| | ] |
| | |
| | for pattern in patterns: |
| | match = re.search(pattern, url) |
| | if match: |
| | return match.group(1) |
| | return "" |
| |
|
| | def test_web_search_tool(): |
| | """Test the web search tool with various queries""" |
| | tool = WebSearchTool() |
| | |
| | |
| | test_cases = [ |
| | "Python programming tutorial", |
| | "https://en.wikipedia.org/wiki/Machine_learning", |
| | {"query": "artificial intelligence news", "action": "search", "limit": 3}, |
| | {"query": "https://www.python.org", "action": "extract"}, |
| | {"query": "OpenAI ChatGPT", "action": "search", "limit": 2, "extract_content": True} |
| | ] |
| | |
| | print("🧪 Testing Web Search Tool...") |
| | |
| | for i, test_case in enumerate(test_cases, 1): |
| | print(f"\n--- Test {i}: {test_case} ---") |
| | try: |
| | result = tool.execute(test_case) |
| | |
| | if result.success: |
| | print(f"✅ Success: {result.result.get('message', 'No message')}") |
| | if result.result.get('found'): |
| | if 'results' in result.result: |
| | print(f" Found {len(result.result['results'])} results") |
| | |
| | if result.result['results']: |
| | first_result = result.result['results'][0] |
| | print(f" First result: {first_result.get('title', 'No title')}") |
| | print(f" URL: {first_result.get('url', 'No URL')}") |
| | elif 'content' in result.result: |
| | print(f" Extracted {len(result.result['content'])} characters") |
| | print(f" Title: {result.result.get('title', 'No title')}") |
| | else: |
| | print(f" Not found: {result.result.get('message', 'Unknown error')}") |
| | else: |
| | print(f"❌ Error: {result.error}") |
| | |
| | print(f" Execution time: {result.execution_time:.2f}s") |
| | |
| | except Exception as e: |
| | print(f"❌ Exception: {str(e)}") |
| |
|
| | if __name__ == "__main__": |
| | |
| | test_web_search_tool() |