Grux3 / src /tools /safe_web_tools.py
BladeSzaSza's picture
feat: working local agent with test cases passing
d61265e
"""Safe web tools that don't require dangerous requests."""
import logging
from typing import Dict, Any, Optional
import time
import asyncio
# Use new tavily-python SDK
try:
from tavily import TavilyClient
TAVILY_SDK_AVAILABLE = True
except ImportError:
TAVILY_SDK_AVAILABLE = False
logging.getLogger(__name__).warning("Tavily SDK not available. Please install tavily-python package.")
from langchain_community.tools import DuckDuckGoSearchResults
from langchain_community.utilities import WikipediaAPIWrapper
from langchain_community.document_loaders import YoutubeLoader
from langchain_community.document_loaders.youtube import TranscriptFormat
from langchain_community.document_loaders import ArxivLoader
from langchain_community.document_loaders import WikipediaLoader
from langchain_community.tools.tavily_search import TavilySearchResults
from src.utils.config import config
import re
import requests
import json
logger = logging.getLogger(__name__)
# Rate limiting
last_search_time = 0
min_search_interval = 3.0
def _rate_limit():
"""Apply rate limiting to prevent API abuse."""
global last_search_time
current_time = time.time()
time_since_last = current_time - last_search_time
if time_since_last < min_search_interval:
wait_time = min_search_interval - time_since_last
time.sleep(wait_time)
last_search_time = time.time()
class SafeWebSearchTool:
"""A tool for performing safe, rate-limited web searches.
This tool is ideal for general-purpose web searches to answer questions, find information, or gather research.
It is designed to be safe and efficient, with built-in rate limiting to prevent API abuse.
Currently uses Google Search, but can be easily switched to other providers.
"""
def __init__(self, search_provider="google"):
self.name = "safe_web_search"
self._initialized = False
self.search_provider = search_provider
self.searcher = None
def invoke(self, query: str) -> str:
"""Executes a web search for the given query.
Args:
query: The search query string.
Returns:
A string containing the search results.
"""
if not self._initialized:
if self.search_provider == "google":
try:
from googlesearch import search
self.searcher = search
self._initialized = True
logger.debug("Google search initialized successfully.")
except ImportError:
logger.error("Google search not available. Please install googlesearch-python package.")
return "Google search not available. Please install googlesearch-python package."
except Exception as e:
logger.error(f"Failed to initialize Google search: {e}")
return f"Failed to initialize Google search: {e}"
else: # Fallback to DuckDuckGo
try:
from langchain_community.tools import DuckDuckGoSearchRun
self.ddg = DuckDuckGoSearchRun()
self._initialized = True
logger.debug("DuckDuckGoSearchTool initialized successfully.")
except ImportError:
logger.error("DuckDuckGo search not available. Please install duckduckgo-search package.")
return "DuckDuckGo search not available. Please install duckduckgo-search package."
except Exception as e:
logger.error(f"Failed to initialize DuckDuckGo search: {e}")
return f"Failed to initialize DuckDuckGo search: {e}"
try:
if self.search_provider == "google":
logger.info(f"Performing Google search for query: '{query}'")
# Apply rate limiting
_rate_limit()
# Get search results from Google
# Import BeautifulSoup for fetching page info
from bs4 import BeautifulSoup
import requests
formatted_results = []
search_results = []
# Perform the search
try:
for idx, url in enumerate(self.searcher(query, num_results=5, lang='en')):
search_results.append(url)
if idx >= 4: # Limit to 5 results
break
except Exception as e:
logger.error(f"Error during Google search: {e}")
search_results = []
logger.debug(f"Raw Google results: {search_results}")
if search_results:
for idx, url in enumerate(search_results):
try:
# Try to fetch page title and snippet
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
# Quick fetch with timeout
response = requests.get(url, headers=headers, timeout=2)
if response.status_code == 200:
soup = BeautifulSoup(response.text, 'html.parser') # Parse only first 5KB
# Get title
title = soup.find('title')
title_text = title.text.strip() if title else url
# Try to get description or first paragraph
description = ""
meta_desc = soup.find('meta', attrs={'name': 'description'})
if meta_desc and meta_desc.get('content'):
description = meta_desc['content']
else:
# Get first paragraph or text
paragraphs = soup.find_all('p', limit=4)
if paragraphs:
description = ' '.join([p.text.strip() for p in paragraphs])
formatted_results.append(
f"Description: {description}...\n" if description else ""
)
else:
# Fallback if we can't fetch the page
formatted_results.append(f"Web Search Result {idx+1}: {url}")
logger.debug(f"Result {idx+1}: URL='{url}'")
except Exception as e:
logger.debug(f"Error processing result {idx+1}: {e}")
# Fallback to just URL
formatted_results.append(f"Web Search Result {idx+1}: {url}")
logger.info(f"Returning {len(formatted_results)} Google search results for query: '{query}'")
return "\n\n---\n".join(formatted_results)
else:
logger.info(f"No Google search results found for query: '{query}'")
return "No search results found."
else: # DuckDuckGo fallback
logger.info(f"Performing DuckDuckGo search for query: '{query}'")
return self.ddg.invoke(query)
# logger.debug(f"Raw DuckDuckGo results: {results}")
# # Format results as a clean string instead of list representation
# if results:
# formatted_results = []
# for idx, result in enumerate(results):
# title = result.get('title', 'No title')
# body = result.get('body', 'No description')
# href = result.get('href', 'No URL')
# logger.debug(f"Result {idx+1}: Title='{title}', URL='{href}'")
# formatted_results.append(f"Web Search Result {idx+1}: {body} \n")
# logger.info(f"Returning {len(formatted_results)} DuckDuckGo search results for query: '{query}'")
# return "\n---\n".join(formatted_results)
# else:
# logger.info(f"No DuckDuckGo search results found for query: '{query}'")
# return "No search results found."
except Exception as e:
logger.error(f"{self.search_provider} search error for query '{query}': {e}")
return f"{self.search_provider} search error: {e}"
def cleanup(self):
"""Clean up any resources."""
# Clean up DuckDuckGo if needed
if hasattr(self, 'ddg') and self.ddg:
try:
if hasattr(self.ddg, 'close'):
self.ddg.close()
except Exception as e:
logger.debug(f"Error cleaning up DuckDuckGo: {e}")
# Google search doesn't require cleanup
self.searcher = None
class BaseWikipediaTool:
"""A tool for searching Wikipedia and loading article content.
This tool allows you to search for a specific query on Wikipedia and retrieve the content of the most relevant articles.
You can control the number of articles to load, making it useful for both quick lookups and in-depth research.
"""
def __init__(self):
self.name = "base_wikipedia"
self.query = ""
self.load_max_docs = 5
def invoke(self, query: str, load_max_docs: int = 5) -> str:
"""Searches Wikipedia and loads the content of the top matching articles.
Args:
query: The search query.
load_max_docs: The maximum number of documents to load.
Returns:
A formatted string containing the content of the loaded Wikipedia articles.
"""
self.query = query
self.load_max_docs = load_max_docs
# Use WikipediaLoader with increased content length to get full articles including discography
search_docs = WikipediaLoader(
query=self.query,
load_max_docs=self.load_max_docs,
doc_content_chars_max=15000 # Increased from default 4000 to get full content including discography
).load()
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>'
for doc in search_docs
]
)
return formatted_search_docs
def cleanup(self):
"""Clean up any resources."""
pass
class ArxivLoaderTool:
"""A tool for searching and loading papers from Arxiv.
Use this tool to find and retrieve academic papers from the Arxiv repository.
It is ideal for research, especially in scientific and technical fields.
You can specify the number of papers to load.
"""
def __init__(self):
self.name = "arxiv_search"
self.query = ""
self.load_max_docs = 3
def load(self, query: str, load_max_docs: int = 3) -> str:
"""Searches Arxiv and loads the content of the most relevant papers.
Args:
query: The search query (e.g., paper title, author, keywords).
load_max_docs: The maximum number of papers to load.
Returns:
A formatted string containing the content of the loaded Arxiv papers.
"""
self.query = query
self.load_max_docs = load_max_docs
search_docs = ArxivLoader(query=self.query, load_max_docs=self.load_max_docs).load()
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>'
for doc in search_docs
]
)
return formatted_search_docs
def cleanup(self):
"""Clean up any resources."""
pass
class TavilyWebSearchTool:
"""A powerful web search tool using the Tavily API.
This tool provides a high-quality, AI-optimized search experience.
It is best used for complex queries that require a deeper understanding of the topic.
Requires a Tavily API key to be configured.
"""
def __init__(self):
self.name = "web_search"
if TAVILY_SDK_AVAILABLE and config.TAVILY_API_KEY:
self.tavily_client = TavilyClient(api_key=config.TAVILY_API_KEY)
else:
self.tavily_client = None
def invoke(self, query: str) -> str:
"""Executes a web search using the Tavily API.
Args:
query: The search query.
Returns:
A formatted string containing the search results.
"""
search_docs = TavilySearchResults(max_results=3).invoke(query=query)
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>'
for doc in search_docs
]
)
return formatted_search_docs
def cleanup(self):
"""Clean up any resources."""
self.tavily_client = None
pass
class SafeWikipediaSearchTool:
"""Enhanced Wikipedia search tool that can fetch specific sections when needed.
This tool first tries the regular Wikipedia search, and if it finds empty sections,
it can fetch specific section content using the Wikipedia API.
"""
def __init__(self):
self.name = "safe_wikipedia_search"
self.base_tool = BaseWikipediaTool()
def invoke(self, query: str, load_max_docs: int = 3, section_name: Optional[str] = None) -> str:
"""Search Wikipedia with optional section-specific fetching.
Args:
query: The search query (page name)
load_max_docs: Maximum number of documents to load
section_name: Optional section name to fetch specifically (e.g., "Studio albums")
Returns:
Wikipedia content, with section-specific content if requested
"""
if section_name:
# Try to get specific section content
section_content = self._get_wikipedia_section(query, section_name)
if section_content:
return f"Wikipedia Section '{section_name}' for '{query}':\n\n{section_content}"
# Fall back to regular Wikipedia search
regular_result = self.base_tool.invoke(query, load_max_docs)
# Check if we found empty sections that might need API fetching
if section_name and self._has_empty_section(regular_result, section_name):
section_content = self._get_wikipedia_section(query, section_name)
if section_content:
return f"{regular_result}\n\n--- Enhanced Section Content ---\n\nSection '{section_name}':\n{section_content}"
return regular_result
def _has_empty_section(self, content: str, section_name: str) -> bool:
"""Check if a section exists but appears to be empty."""
section_marker = f"=== {section_name} ==="
if section_marker in content:
# Find the section and check if it's followed by another section quickly
idx = content.find(section_marker)
next_section_idx = content.find("===", idx + len(section_marker))
if next_section_idx != -1:
section_content = content[idx:next_section_idx].strip()
# If the section is very short (just the header), it's likely empty
return len(section_content) < 50
return False
def _get_wikipedia_section(self, page_name: str, section_name: str) -> Optional[str]:
"""Fetch specific section content using Wikipedia API.
Args:
page_name: The Wikipedia page name
section_name: The section name to fetch
Returns:
Section content as formatted text, or None if not found
"""
try:
# First, get all sections to find the section ID
resp = requests.get(
'https://en.wikipedia.org/w/api.php',
params={
'action': 'parse',
'page': page_name,
'prop': 'sections',
'format': 'json'
},
timeout=10
)
if resp.status_code != 200:
return None
data = resp.json()
if 'parse' not in data or 'sections' not in data['parse']:
return None
sections = data['parse']['sections']
studio_section = None
# Find the section by name
for section in sections:
if section.get('line') == section_name:
studio_section = section
break
if not studio_section:
return None
section_id = studio_section['index']
# Now fetch the section content
resp2 = requests.get(
'https://en.wikipedia.org/w/api.php',
params={
'action': 'parse',
'page': page_name,
'format': 'json',
'prop': 'wikitext',
'section': section_id
},
timeout=10
)
if resp2.status_code != 200:
return None
data2 = resp2.json()
if 'parse' not in data2 or 'wikitext' not in data2['parse']:
return None
wikitext = data2['parse']['wikitext']['*']
# Convert wikitext to readable format
return self._format_wikitext(wikitext)
except Exception as e:
print(f"Error fetching Wikipedia section: {e}")
return None
def _format_wikitext(self, wikitext: str) -> str:
"""Convert wikitext to a more readable format."""
lines = wikitext.split('\n')
formatted_lines = []
for line in lines:
line = line.strip()
if not line:
continue
# Handle table rows
if line.startswith('|-'):
continue
elif line.startswith('|') and not line.startswith('|+'):
# Table cell content
cell_content = line[1:].strip()
if cell_content and not cell_content.startswith('{'):
# Clean up wiki markup
cell_content = cell_content.replace("''", "").replace("[[", "").replace("]]", "")
# Remove small tags and other markup
if '<small>' in cell_content:
cell_content = cell_content.replace('<small>', '(').replace('</small>', ')')
formatted_lines.append(cell_content)
elif line.startswith('!'):
# Table header
header = line[1:].strip()
if header:
formatted_lines.append(f"=== {header} ===")
return '\n'.join(formatted_lines)
class SafeYouTubeTranscriptTool:
"""A tool for extracting transcripts from YouTube videos.
Provide a YouTube video URL, and this tool will return the full transcript.
It is useful for analyzing video content, extracting quotes, or creating summaries.
"""
def __init__(self):
self.name = "safe_youtube_transcript"
self._initialized = True # No async resources to initialize
def invoke(self, query: str) -> str:
"""Extracts the transcript from a YouTube video URL.
Args:
query: The URL of the YouTube video.
Returns:
A string containing the video's transcript.
"""
loader = YoutubeLoader.from_youtube_url(
query,
add_video_info=False
)
documents = loader.load()
result = "\n\n".join([doc.page_content for doc in documents])
return result
def cleanup(self):
"""Clean up any resources."""
# No cleanup needed for transcript tool
pass
# Update the toolbelt to include the new tool
class WebScraperTool:
"""A general web scraper tool that can extract content from web pages.
This tool fetches web pages and extracts text content, tables, or specific elements
using BeautifulSoup for HTML parsing.
"""
def __init__(self):
self.name = "web_scraper"
def invoke(self, url: str, element_type: str = "text", selector: Optional[str] = None) -> str:
"""Scrape content from a web page.
Args:
url: The URL to scrape
element_type: Type of content to extract ('text', 'table', 'links', 'images')
selector: Optional CSS selector or element ID to target specific content
Returns:
Extracted content as formatted text
"""
try:
import requests
from bs4 import BeautifulSoup
# Set headers to mimic a real browser
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
if element_type == "text":
if selector:
elements = soup.select(selector)
return '\n'.join([elem.get_text(strip=True) for elem in elements])
else:
# Remove script and style elements
for script in soup(["script", "style"]):
script.decompose()
return soup.get_text(strip=True)
elif element_type == "table":
if selector:
table = soup.select_one(selector)
else:
table = soup.find("table")
if table:
rows = []
for row in table.find_all("tr"):
cells = [cell.get_text(strip=True) for cell in row.find_all(["td", "th"])]
if cells:
rows.append(" | ".join(cells))
return "\n".join(rows)
else:
return "No table found"
elif element_type == "links":
links = soup.find_all("a", href=True)
return "\n".join([f"{link.get_text(strip=True)}: {link['href']}" for link in links if link.get_text(strip=True)])
elif element_type == "images":
images = soup.find_all("img", src=True)
return "\n".join([f"{img.get('alt', 'No alt text')}: {img['src']}" for img in images])
else:
return "Unsupported element type. Use 'text', 'table', 'links', or 'images'"
except Exception as e:
return f"Error scraping {url}: {str(e)}"
class BaseballReferenceScraperTool:
"""A specialized tool for scraping tables from Baseball Reference websites.
This tool handles the specific formatting and HTML comment structure used by
Baseball Reference sites to extract tabular data.
"""
def __init__(self):
self.name = "baseball_reference_scraper"
def invoke(self, url: str, table_id: Optional[str] = None) -> str:
"""Scrape a table from Baseball Reference.
Args:
url: The Baseball Reference URL to scrape
table_id: Optional table ID to target a specific table
Returns:
Table data formatted as text
"""
try:
import requests
import pandas as pd
from bs4 import BeautifulSoup
# Set headers to mimic a real browser
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
# Baseball-Reference often wraps tables in HTML comments
text = response.text.replace("<!--", "").replace("-->", "")
soup = BeautifulSoup(text, "html.parser")
if table_id:
table = soup.find("table", {"id": table_id})
else:
table = soup.find("table")
if not table:
return f"No table found with ID: {table_id}" if table_id else "No table found on the page"
# Try to use pandas to parse the table
try:
df = pd.read_html(str(table))[0]
# Format the dataframe as a readable string
result = f"Table from {url}\n"
if table_id:
result += f"Table ID: {table_id}\n"
result += f"Shape: {df.shape[0]} rows x {df.shape[1]} columns\n\n"
# Show first few rows
result += "First 10 rows:\n"
result += df.head(10).to_string(index=False)
if len(df) > 10:
result += f"\n\n... and {len(df) - 10} more rows"
return result
except Exception as pd_error:
# Fallback to manual parsing if pandas fails
rows = []
for row in table.find_all("tr"):
cells = [cell.get_text(strip=True) for cell in row.find_all(["td", "th"])]
if cells:
rows.append(" | ".join(cells))
result = f"Table from {url}\n"
if table_id:
result += f"Table ID: {table_id}\n"
result += f"Rows found: {len(rows)}\n\n"
result += "\n".join(rows[:20]) # Show first 20 rows
if len(rows) > 20:
result += f"\n\n... and {len(rows) - 20} more rows"
return result
except Exception as e:
return f"Error scraping Baseball Reference table from {url}: {str(e)}"
# Safe tools that don't require dangerous requests
SAFE_WEB_TOOLS = [SafeWebSearchTool(), SafeWikipediaSearchTool(), SafeYouTubeTranscriptTool()]
def cleanup_web_tools():
"""Clean up all web tools to prevent event loop errors."""
for tool in SAFE_WEB_TOOLS:
try:
if hasattr(tool, 'cleanup'):
tool.cleanup()
except Exception as e:
logger.debug(f"Error cleaning up tool {tool.name}: {e}")
# python -c "
# import requests
# # First fetch the page to get section IDs
# resp = requests.get(
# 'https://en.wikipedia.org/w/api.php',
# params={
# 'action': 'parse',
# 'page': 'Mercedes Sosa',
# 'prop': 'sections',
# 'format': 'json'
# }
# )
# sections = resp.json()['parse']['sections']
# studio_section = next(s for s in sections if s['line'] == 'Studio albums')
# secid = studio_section['index']
# # Then fetch just that section's wikitext
# resp2 = requests.get(
# 'https://en.wikipedia.org/w/api.php',
# params={
# 'action': 'parse',
# 'page': 'Mercedes Sosa',
# 'format': 'json',
# 'prop': 'wikitext',
# 'section': secid
# }
# )
# print(resp2.json()['parse']['wikitext'])
# "