|
|
|
|
|
""" |
|
|
Web Search Tool for GAIA Agent System |
|
|
Handles web searches using DuckDuckGo and content extraction from URLs |
|
|
""" |
|
|
|
|
|
import re |
|
|
import logging |
|
|
import time |
|
|
from typing import Dict, List, Optional, Any |
|
|
from urllib.parse import urlparse, urljoin |
|
|
import requests |
|
|
from bs4 import BeautifulSoup |
|
|
from duckduckgo_search import DDGS |
|
|
|
|
|
from tools import BaseTool |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class WebSearchResult: |
|
|
"""Container for web search results""" |
|
|
|
|
|
def __init__(self, title: str, url: str, snippet: str, content: str = ""): |
|
|
self.title = title |
|
|
self.url = url |
|
|
self.snippet = snippet |
|
|
self.content = content |
|
|
|
|
|
def to_dict(self) -> Dict[str, str]: |
|
|
return { |
|
|
"title": self.title, |
|
|
"url": self.url, |
|
|
"snippet": self.snippet, |
|
|
"content": self.content[:1500] + "..." if len(self.content) > 1500 else self.content |
|
|
} |
|
|
|
|
|
class WebSearchTool(BaseTool): |
|
|
""" |
|
|
Web search tool using DuckDuckGo |
|
|
Handles searches, URL content extraction, and result filtering |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
super().__init__("web_search") |
|
|
|
|
|
|
|
|
self.session = requests.Session() |
|
|
self.session.headers.update({ |
|
|
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' |
|
|
}) |
|
|
self.session.timeout = 10 |
|
|
|
|
|
def _execute_impl(self, input_data: Any, **kwargs) -> Dict[str, Any]: |
|
|
""" |
|
|
Execute web search operations based on input type |
|
|
|
|
|
Args: |
|
|
input_data: Can be: |
|
|
- str: Search query or URL to extract content from |
|
|
- dict: {"query": str, "action": str, "limit": int, "extract_content": bool} |
|
|
""" |
|
|
|
|
|
if isinstance(input_data, str): |
|
|
|
|
|
if self._is_url(input_data): |
|
|
return self._extract_content_from_url(input_data) |
|
|
else: |
|
|
return self._search_web(input_data) |
|
|
|
|
|
elif isinstance(input_data, dict): |
|
|
query = input_data.get("query", "") |
|
|
action = input_data.get("action", "search") |
|
|
limit = input_data.get("limit", 5) |
|
|
extract_content = input_data.get("extract_content", False) |
|
|
|
|
|
if action == "search": |
|
|
return self._search_web(query, limit, extract_content) |
|
|
elif action == "extract": |
|
|
return self._extract_content_from_url(query) |
|
|
else: |
|
|
raise ValueError(f"Unknown action: {action}") |
|
|
else: |
|
|
raise ValueError(f"Unsupported input type: {type(input_data)}") |
|
|
|
|
|
def _is_url(self, text: str) -> bool: |
|
|
"""Check if text is a URL""" |
|
|
return bool(re.match(r'https?://', text)) |
|
|
|
|
|
def _search_web(self, query: str, limit: int = 5, extract_content: bool = False) -> Dict[str, Any]: |
|
|
""" |
|
|
Search the web using DuckDuckGo with enhanced rate limiting handling |
|
|
""" |
|
|
|
|
|
for attempt in range(3): |
|
|
try: |
|
|
logger.info(f"Searching web for: {query} (attempt {attempt + 1}/3)") |
|
|
|
|
|
|
|
|
if attempt > 0: |
|
|
delay = 5 * (2 ** (attempt - 1)) |
|
|
logger.info(f"Waiting {delay}s before retry due to rate limiting...") |
|
|
time.sleep(delay) |
|
|
|
|
|
with DDGS() as ddgs: |
|
|
|
|
|
search_results = list(ddgs.text( |
|
|
keywords=query, |
|
|
max_results=limit, |
|
|
region='us-en', |
|
|
safesearch='moderate' |
|
|
)) |
|
|
|
|
|
if not search_results: |
|
|
if attempt < 2: |
|
|
logger.warning(f"No results on attempt {attempt + 1}, retrying...") |
|
|
continue |
|
|
else: |
|
|
return { |
|
|
"query": query, |
|
|
"found": False, |
|
|
"message": "No web search results found after retries", |
|
|
"results": [] |
|
|
} |
|
|
|
|
|
results = [] |
|
|
for result in search_results: |
|
|
try: |
|
|
web_result = WebSearchResult( |
|
|
title=result.get('title', 'No title'), |
|
|
url=result.get('href', ''), |
|
|
snippet=result.get('body', 'No description') |
|
|
) |
|
|
|
|
|
|
|
|
if extract_content and web_result.url: |
|
|
try: |
|
|
content_result = self._extract_content_from_url(web_result.url) |
|
|
if content_result.get('found'): |
|
|
web_result.content = content_result['content'][:1000] |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to extract content from {web_result.url}: {e}") |
|
|
|
|
|
|
|
|
results.append(web_result.to_dict()) |
|
|
|
|
|
except Exception as result_error: |
|
|
logger.warning(f"Error processing search result: {result_error}") |
|
|
|
|
|
continue |
|
|
|
|
|
|
|
|
return { |
|
|
"query": query, |
|
|
"found": len(results) > 0, |
|
|
"results": results, |
|
|
"total_results": len(results), |
|
|
"message": f"Found {len(results)} web search results" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = str(e) |
|
|
if "ratelimit" in error_msg.lower() or "rate limit" in error_msg.lower() or "403" in error_msg or "202" in error_msg or "429" in error_msg: |
|
|
logger.warning(f"Web search attempt {attempt + 1} failed: {error_msg}") |
|
|
if attempt < 2: |
|
|
continue |
|
|
else: |
|
|
logger.error(f"Web search attempt {attempt + 1} failed with non-rate-limit error: {error_msg}") |
|
|
if attempt < 2: |
|
|
continue |
|
|
|
|
|
|
|
|
logger.warning("All DuckDuckGo attempts failed, trying fallback search strategy...") |
|
|
return self._fallback_search(query) |
|
|
|
|
|
def _fallback_search(self, query: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Fallback search strategy when DuckDuckGo is completely unavailable |
|
|
""" |
|
|
try: |
|
|
|
|
|
import wikipedia |
|
|
wikipedia.set_lang("en") |
|
|
|
|
|
|
|
|
search_terms = query.replace("site:", "").strip() |
|
|
|
|
|
try: |
|
|
|
|
|
wiki_results = wikipedia.search(search_terms, results=3) |
|
|
if wiki_results: |
|
|
fallback_results = [] |
|
|
for i, page_title in enumerate(wiki_results[:2], 1): |
|
|
try: |
|
|
page = wikipedia.page(page_title) |
|
|
summary = page.summary[:200] + "..." if len(page.summary) > 200 else page.summary |
|
|
|
|
|
web_result = WebSearchResult( |
|
|
title=f"{page_title} (Wikipedia)", |
|
|
url=page.url, |
|
|
snippet=summary |
|
|
) |
|
|
fallback_results.append(web_result.to_dict()) |
|
|
except: |
|
|
continue |
|
|
|
|
|
if fallback_results: |
|
|
return { |
|
|
"query": query, |
|
|
"found": True, |
|
|
"results": fallback_results, |
|
|
"total_results": len(fallback_results), |
|
|
"message": f"Using Wikipedia fallback search. Found {len(fallback_results)} results" |
|
|
} |
|
|
except: |
|
|
pass |
|
|
|
|
|
except ImportError: |
|
|
pass |
|
|
|
|
|
|
|
|
return { |
|
|
"query": query, |
|
|
"found": False, |
|
|
"message": "❌ Web search failed due to rate limiting. Please try again later or provide the information directly.", |
|
|
"results": [], |
|
|
"error_type": "search_failure" |
|
|
} |
|
|
|
|
|
def _extract_content_from_url(self, url: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Extract readable content from a web page |
|
|
""" |
|
|
try: |
|
|
logger.info(f"Extracting content from: {url}") |
|
|
|
|
|
|
|
|
response = self.session.get(url) |
|
|
response.raise_for_status() |
|
|
|
|
|
|
|
|
soup = BeautifulSoup(response.content, 'html.parser') |
|
|
|
|
|
|
|
|
for script in soup(["script", "style", "nav", "header", "footer", "aside"]): |
|
|
script.decompose() |
|
|
|
|
|
|
|
|
title = soup.find('title') |
|
|
title_text = title.get_text().strip() if title else "No title" |
|
|
|
|
|
|
|
|
content = self._extract_main_content(soup) |
|
|
|
|
|
|
|
|
meta_description = "" |
|
|
meta_desc = soup.find('meta', attrs={'name': 'description'}) |
|
|
if meta_desc: |
|
|
meta_description = meta_desc.get('content', '') |
|
|
|
|
|
|
|
|
links = [] |
|
|
for link in soup.find_all('a', href=True)[:10]: |
|
|
link_url = urljoin(url, link['href']) |
|
|
link_text = link.get_text().strip() |
|
|
if link_text and len(link_text) > 5: |
|
|
links.append({"text": link_text, "url": link_url}) |
|
|
|
|
|
return { |
|
|
"url": url, |
|
|
"found": True, |
|
|
"title": title_text, |
|
|
"content": content, |
|
|
"meta_description": meta_description, |
|
|
"links": links, |
|
|
"content_length": len(content), |
|
|
"message": "Successfully extracted content from URL" |
|
|
} |
|
|
|
|
|
except requests.exceptions.RequestException as e: |
|
|
return { |
|
|
"url": url, |
|
|
"found": False, |
|
|
"message": f"Failed to fetch URL: {str(e)}", |
|
|
"error_type": "network_error" |
|
|
} |
|
|
except Exception as e: |
|
|
return { |
|
|
"url": url, |
|
|
"found": False, |
|
|
"message": f"Failed to extract content: {str(e)}", |
|
|
"error_type": "parsing_error" |
|
|
} |
|
|
|
|
|
def _extract_main_content(self, soup: BeautifulSoup) -> str: |
|
|
""" |
|
|
Extract main content from HTML using various strategies |
|
|
""" |
|
|
content_parts = [] |
|
|
|
|
|
|
|
|
main_content = soup.find(['article', 'main']) |
|
|
if main_content: |
|
|
content_parts.append(main_content.get_text()) |
|
|
|
|
|
|
|
|
content_selectors = [ |
|
|
'div.content', |
|
|
'div.article-content', |
|
|
'div.post-content', |
|
|
'div.entry-content', |
|
|
'div.main-content', |
|
|
'div#content', |
|
|
'div.text' |
|
|
] |
|
|
|
|
|
for selector in content_selectors: |
|
|
elements = soup.select(selector) |
|
|
for element in elements: |
|
|
content_parts.append(element.get_text()) |
|
|
|
|
|
|
|
|
if not content_parts: |
|
|
paragraphs = soup.find_all('p') |
|
|
for p in paragraphs[:20]: |
|
|
text = p.get_text().strip() |
|
|
if len(text) > 50: |
|
|
content_parts.append(text) |
|
|
|
|
|
|
|
|
combined_content = '\n\n'.join(content_parts) |
|
|
|
|
|
|
|
|
combined_content = re.sub(r'\n\s*\n', '\n\n', combined_content) |
|
|
combined_content = re.sub(r' +', ' ', combined_content) |
|
|
|
|
|
return combined_content.strip()[:5000] |
|
|
|
|
|
def search_youtube_metadata(self, query: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Specialized search for YouTube video information |
|
|
""" |
|
|
try: |
|
|
|
|
|
youtube_query = f"site:youtube.com {query}" |
|
|
|
|
|
with DDGS() as ddgs: |
|
|
search_results = list(ddgs.text( |
|
|
keywords=youtube_query, |
|
|
max_results=3, |
|
|
region='us-en', |
|
|
safesearch='moderate' |
|
|
)) |
|
|
|
|
|
youtube_results = [] |
|
|
for result in search_results: |
|
|
if 'youtube.com/watch' in result.get('href', ''): |
|
|
video_id = self._extract_youtube_id(result['href']) |
|
|
|
|
|
youtube_result = { |
|
|
"title": result.get('title', 'No title'), |
|
|
"url": result.get('href', ''), |
|
|
"description": result.get('body', 'No description'), |
|
|
"video_id": video_id |
|
|
} |
|
|
youtube_results.append(youtube_result) |
|
|
|
|
|
return { |
|
|
"query": query, |
|
|
"found": len(youtube_results) > 0, |
|
|
"results": youtube_results, |
|
|
"message": f"Found {len(youtube_results)} YouTube videos" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
raise Exception(f"YouTube search failed: {str(e)}") |
|
|
|
|
|
def _extract_youtube_id(self, url: str) -> str: |
|
|
"""Extract YouTube video ID from URL""" |
|
|
patterns = [ |
|
|
r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', |
|
|
r'(?:embed\/)([0-9A-Za-z_-]{11})', |
|
|
r'(?:youtu\.be\/)([0-9A-Za-z_-]{11})' |
|
|
] |
|
|
|
|
|
for pattern in patterns: |
|
|
match = re.search(pattern, url) |
|
|
if match: |
|
|
return match.group(1) |
|
|
return "" |
|
|
|
|
|
def test_web_search_tool(): |
|
|
"""Test the web search tool with various queries""" |
|
|
tool = WebSearchTool() |
|
|
|
|
|
|
|
|
test_cases = [ |
|
|
"Python programming tutorial", |
|
|
"https://en.wikipedia.org/wiki/Machine_learning", |
|
|
{"query": "artificial intelligence news", "action": "search", "limit": 3}, |
|
|
{"query": "https://www.python.org", "action": "extract"}, |
|
|
{"query": "OpenAI ChatGPT", "action": "search", "limit": 2, "extract_content": True} |
|
|
] |
|
|
|
|
|
print("🧪 Testing Web Search Tool...") |
|
|
|
|
|
for i, test_case in enumerate(test_cases, 1): |
|
|
print(f"\n--- Test {i}: {test_case} ---") |
|
|
try: |
|
|
result = tool.execute(test_case) |
|
|
|
|
|
if result.success: |
|
|
print(f"✅ Success: {result.result.get('message', 'No message')}") |
|
|
if result.result.get('found'): |
|
|
if 'results' in result.result: |
|
|
print(f" Found {len(result.result['results'])} results") |
|
|
|
|
|
if result.result['results']: |
|
|
first_result = result.result['results'][0] |
|
|
print(f" First result: {first_result.get('title', 'No title')}") |
|
|
print(f" URL: {first_result.get('url', 'No URL')}") |
|
|
elif 'content' in result.result: |
|
|
print(f" Extracted {len(result.result['content'])} characters") |
|
|
print(f" Title: {result.result.get('title', 'No title')}") |
|
|
else: |
|
|
print(f" Not found: {result.result.get('message', 'Unknown error')}") |
|
|
else: |
|
|
print(f"❌ Error: {result.error}") |
|
|
|
|
|
print(f" Execution time: {result.execution_time:.2f}s") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ Exception: {str(e)}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
test_web_search_tool() |