"""Safe web tools that don't require dangerous requests.""" import logging from typing import Dict, Any, Optional import time import asyncio # Use new tavily-python SDK try: from tavily import TavilyClient TAVILY_SDK_AVAILABLE = True except ImportError: TAVILY_SDK_AVAILABLE = False logging.getLogger(__name__).warning("Tavily SDK not available. Please install tavily-python package.") from langchain_community.tools import DuckDuckGoSearchResults from langchain_community.utilities import WikipediaAPIWrapper from langchain_community.document_loaders import YoutubeLoader from langchain_community.document_loaders.youtube import TranscriptFormat from langchain_community.document_loaders import ArxivLoader from langchain_community.document_loaders import WikipediaLoader from langchain_community.tools.tavily_search import TavilySearchResults from src.utils.config import config import re import requests import json logger = logging.getLogger(__name__) # Rate limiting last_search_time = 0 min_search_interval = 3.0 def _rate_limit(): """Apply rate limiting to prevent API abuse.""" global last_search_time current_time = time.time() time_since_last = current_time - last_search_time if time_since_last < min_search_interval: wait_time = min_search_interval - time_since_last time.sleep(wait_time) last_search_time = time.time() class SafeWebSearchTool: """A tool for performing safe, rate-limited web searches. This tool is ideal for general-purpose web searches to answer questions, find information, or gather research. It is designed to be safe and efficient, with built-in rate limiting to prevent API abuse. Currently uses Google Search, but can be easily switched to other providers. """ def __init__(self, search_provider="google"): self.name = "safe_web_search" self._initialized = False self.search_provider = search_provider self.searcher = None def invoke(self, query: str) -> str: """Executes a web search for the given query. Args: query: The search query string. Returns: A string containing the search results. """ if not self._initialized: if self.search_provider == "google": try: from googlesearch import search self.searcher = search self._initialized = True logger.debug("Google search initialized successfully.") except ImportError: logger.error("Google search not available. Please install googlesearch-python package.") return "Google search not available. Please install googlesearch-python package." except Exception as e: logger.error(f"Failed to initialize Google search: {e}") return f"Failed to initialize Google search: {e}" else: # Fallback to DuckDuckGo try: from langchain_community.tools import DuckDuckGoSearchRun self.ddg = DuckDuckGoSearchRun() self._initialized = True logger.debug("DuckDuckGoSearchTool initialized successfully.") except ImportError: logger.error("DuckDuckGo search not available. Please install duckduckgo-search package.") return "DuckDuckGo search not available. Please install duckduckgo-search package." except Exception as e: logger.error(f"Failed to initialize DuckDuckGo search: {e}") return f"Failed to initialize DuckDuckGo search: {e}" try: if self.search_provider == "google": logger.info(f"Performing Google search for query: '{query}'") # Apply rate limiting _rate_limit() # Get search results from Google # Import BeautifulSoup for fetching page info from bs4 import BeautifulSoup import requests formatted_results = [] search_results = [] # Perform the search try: for idx, url in enumerate(self.searcher(query, num_results=5, lang='en')): search_results.append(url) if idx >= 4: # Limit to 5 results break except Exception as e: logger.error(f"Error during Google search: {e}") search_results = [] logger.debug(f"Raw Google results: {search_results}") if search_results: for idx, url in enumerate(search_results): try: # Try to fetch page title and snippet headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } # Quick fetch with timeout response = requests.get(url, headers=headers, timeout=2) if response.status_code == 200: soup = BeautifulSoup(response.text, 'html.parser') # Parse only first 5KB # Get title title = soup.find('title') title_text = title.text.strip() if title else url # Try to get description or first paragraph description = "" meta_desc = soup.find('meta', attrs={'name': 'description'}) if meta_desc and meta_desc.get('content'): description = meta_desc['content'] else: # Get first paragraph or text paragraphs = soup.find_all('p', limit=4) if paragraphs: description = ' '.join([p.text.strip() for p in paragraphs]) formatted_results.append( f"Description: {description}...\n" if description else "" ) else: # Fallback if we can't fetch the page formatted_results.append(f"Web Search Result {idx+1}: {url}") logger.debug(f"Result {idx+1}: URL='{url}'") except Exception as e: logger.debug(f"Error processing result {idx+1}: {e}") # Fallback to just URL formatted_results.append(f"Web Search Result {idx+1}: {url}") logger.info(f"Returning {len(formatted_results)} Google search results for query: '{query}'") return "\n\n---\n".join(formatted_results) else: logger.info(f"No Google search results found for query: '{query}'") return "No search results found." else: # DuckDuckGo fallback logger.info(f"Performing DuckDuckGo search for query: '{query}'") return self.ddg.invoke(query) # logger.debug(f"Raw DuckDuckGo results: {results}") # # Format results as a clean string instead of list representation # if results: # formatted_results = [] # for idx, result in enumerate(results): # title = result.get('title', 'No title') # body = result.get('body', 'No description') # href = result.get('href', 'No URL') # logger.debug(f"Result {idx+1}: Title='{title}', URL='{href}'") # formatted_results.append(f"Web Search Result {idx+1}: {body} \n") # logger.info(f"Returning {len(formatted_results)} DuckDuckGo search results for query: '{query}'") # return "\n---\n".join(formatted_results) # else: # logger.info(f"No DuckDuckGo search results found for query: '{query}'") # return "No search results found." except Exception as e: logger.error(f"{self.search_provider} search error for query '{query}': {e}") return f"{self.search_provider} search error: {e}" def cleanup(self): """Clean up any resources.""" # Clean up DuckDuckGo if needed if hasattr(self, 'ddg') and self.ddg: try: if hasattr(self.ddg, 'close'): self.ddg.close() except Exception as e: logger.debug(f"Error cleaning up DuckDuckGo: {e}") # Google search doesn't require cleanup self.searcher = None class BaseWikipediaTool: """A tool for searching Wikipedia and loading article content. This tool allows you to search for a specific query on Wikipedia and retrieve the content of the most relevant articles. You can control the number of articles to load, making it useful for both quick lookups and in-depth research. """ def __init__(self): self.name = "base_wikipedia" self.query = "" self.load_max_docs = 5 def invoke(self, query: str, load_max_docs: int = 5) -> str: """Searches Wikipedia and loads the content of the top matching articles. Args: query: The search query. load_max_docs: The maximum number of documents to load. Returns: A formatted string containing the content of the loaded Wikipedia articles. """ self.query = query self.load_max_docs = load_max_docs # Use WikipediaLoader with increased content length to get full articles including discography search_docs = WikipediaLoader( query=self.query, load_max_docs=self.load_max_docs, doc_content_chars_max=15000 # Increased from default 4000 to get full content including discography ).load() formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content}\n' for doc in search_docs ] ) return formatted_search_docs def cleanup(self): """Clean up any resources.""" pass class ArxivLoaderTool: """A tool for searching and loading papers from Arxiv. Use this tool to find and retrieve academic papers from the Arxiv repository. It is ideal for research, especially in scientific and technical fields. You can specify the number of papers to load. """ def __init__(self): self.name = "arxiv_search" self.query = "" self.load_max_docs = 3 def load(self, query: str, load_max_docs: int = 3) -> str: """Searches Arxiv and loads the content of the most relevant papers. Args: query: The search query (e.g., paper title, author, keywords). load_max_docs: The maximum number of papers to load. Returns: A formatted string containing the content of the loaded Arxiv papers. """ self.query = query self.load_max_docs = load_max_docs search_docs = ArxivLoader(query=self.query, load_max_docs=self.load_max_docs).load() formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content[:1000]}\n' for doc in search_docs ] ) return formatted_search_docs def cleanup(self): """Clean up any resources.""" pass class TavilyWebSearchTool: """A powerful web search tool using the Tavily API. This tool provides a high-quality, AI-optimized search experience. It is best used for complex queries that require a deeper understanding of the topic. Requires a Tavily API key to be configured. """ def __init__(self): self.name = "web_search" if TAVILY_SDK_AVAILABLE and config.TAVILY_API_KEY: self.tavily_client = TavilyClient(api_key=config.TAVILY_API_KEY) else: self.tavily_client = None def invoke(self, query: str) -> str: """Executes a web search using the Tavily API. Args: query: The search query. Returns: A formatted string containing the search results. """ search_docs = TavilySearchResults(max_results=3).invoke(query=query) formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content}\n' for doc in search_docs ] ) return formatted_search_docs def cleanup(self): """Clean up any resources.""" self.tavily_client = None pass class SafeWikipediaSearchTool: """Enhanced Wikipedia search tool that can fetch specific sections when needed. This tool first tries the regular Wikipedia search, and if it finds empty sections, it can fetch specific section content using the Wikipedia API. """ def __init__(self): self.name = "safe_wikipedia_search" self.base_tool = BaseWikipediaTool() def invoke(self, query: str, load_max_docs: int = 3, section_name: Optional[str] = None) -> str: """Search Wikipedia with optional section-specific fetching. Args: query: The search query (page name) load_max_docs: Maximum number of documents to load section_name: Optional section name to fetch specifically (e.g., "Studio albums") Returns: Wikipedia content, with section-specific content if requested """ if section_name: # Try to get specific section content section_content = self._get_wikipedia_section(query, section_name) if section_content: return f"Wikipedia Section '{section_name}' for '{query}':\n\n{section_content}" # Fall back to regular Wikipedia search regular_result = self.base_tool.invoke(query, load_max_docs) # Check if we found empty sections that might need API fetching if section_name and self._has_empty_section(regular_result, section_name): section_content = self._get_wikipedia_section(query, section_name) if section_content: return f"{regular_result}\n\n--- Enhanced Section Content ---\n\nSection '{section_name}':\n{section_content}" return regular_result def _has_empty_section(self, content: str, section_name: str) -> bool: """Check if a section exists but appears to be empty.""" section_marker = f"=== {section_name} ===" if section_marker in content: # Find the section and check if it's followed by another section quickly idx = content.find(section_marker) next_section_idx = content.find("===", idx + len(section_marker)) if next_section_idx != -1: section_content = content[idx:next_section_idx].strip() # If the section is very short (just the header), it's likely empty return len(section_content) < 50 return False def _get_wikipedia_section(self, page_name: str, section_name: str) -> Optional[str]: """Fetch specific section content using Wikipedia API. Args: page_name: The Wikipedia page name section_name: The section name to fetch Returns: Section content as formatted text, or None if not found """ try: # First, get all sections to find the section ID resp = requests.get( 'https://en.wikipedia.org/w/api.php', params={ 'action': 'parse', 'page': page_name, 'prop': 'sections', 'format': 'json' }, timeout=10 ) if resp.status_code != 200: return None data = resp.json() if 'parse' not in data or 'sections' not in data['parse']: return None sections = data['parse']['sections'] studio_section = None # Find the section by name for section in sections: if section.get('line') == section_name: studio_section = section break if not studio_section: return None section_id = studio_section['index'] # Now fetch the section content resp2 = requests.get( 'https://en.wikipedia.org/w/api.php', params={ 'action': 'parse', 'page': page_name, 'format': 'json', 'prop': 'wikitext', 'section': section_id }, timeout=10 ) if resp2.status_code != 200: return None data2 = resp2.json() if 'parse' not in data2 or 'wikitext' not in data2['parse']: return None wikitext = data2['parse']['wikitext']['*'] # Convert wikitext to readable format return self._format_wikitext(wikitext) except Exception as e: print(f"Error fetching Wikipedia section: {e}") return None def _format_wikitext(self, wikitext: str) -> str: """Convert wikitext to a more readable format.""" lines = wikitext.split('\n') formatted_lines = [] for line in lines: line = line.strip() if not line: continue # Handle table rows if line.startswith('|-'): continue elif line.startswith('|') and not line.startswith('|+'): # Table cell content cell_content = line[1:].strip() if cell_content and not cell_content.startswith('{'): # Clean up wiki markup cell_content = cell_content.replace("''", "").replace("[[", "").replace("]]", "") # Remove small tags and other markup if '' in cell_content: cell_content = cell_content.replace('', '(').replace('', ')') formatted_lines.append(cell_content) elif line.startswith('!'): # Table header header = line[1:].strip() if header: formatted_lines.append(f"=== {header} ===") return '\n'.join(formatted_lines) class SafeYouTubeTranscriptTool: """A tool for extracting transcripts from YouTube videos. Provide a YouTube video URL, and this tool will return the full transcript. It is useful for analyzing video content, extracting quotes, or creating summaries. """ def __init__(self): self.name = "safe_youtube_transcript" self._initialized = True # No async resources to initialize def invoke(self, query: str) -> str: """Extracts the transcript from a YouTube video URL. Args: query: The URL of the YouTube video. Returns: A string containing the video's transcript. """ loader = YoutubeLoader.from_youtube_url( query, add_video_info=False ) documents = loader.load() result = "\n\n".join([doc.page_content for doc in documents]) return result def cleanup(self): """Clean up any resources.""" # No cleanup needed for transcript tool pass # Update the toolbelt to include the new tool class WebScraperTool: """A general web scraper tool that can extract content from web pages. This tool fetches web pages and extracts text content, tables, or specific elements using BeautifulSoup for HTML parsing. """ def __init__(self): self.name = "web_scraper" def invoke(self, url: str, element_type: str = "text", selector: Optional[str] = None) -> str: """Scrape content from a web page. Args: url: The URL to scrape element_type: Type of content to extract ('text', 'table', 'links', 'images') selector: Optional CSS selector or element ID to target specific content Returns: Extracted content as formatted text """ try: import requests from bs4 import BeautifulSoup # Set headers to mimic a real browser headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.content, 'html.parser') if element_type == "text": if selector: elements = soup.select(selector) return '\n'.join([elem.get_text(strip=True) for elem in elements]) else: # Remove script and style elements for script in soup(["script", "style"]): script.decompose() return soup.get_text(strip=True) elif element_type == "table": if selector: table = soup.select_one(selector) else: table = soup.find("table") if table: rows = [] for row in table.find_all("tr"): cells = [cell.get_text(strip=True) for cell in row.find_all(["td", "th"])] if cells: rows.append(" | ".join(cells)) return "\n".join(rows) else: return "No table found" elif element_type == "links": links = soup.find_all("a", href=True) return "\n".join([f"{link.get_text(strip=True)}: {link['href']}" for link in links if link.get_text(strip=True)]) elif element_type == "images": images = soup.find_all("img", src=True) return "\n".join([f"{img.get('alt', 'No alt text')}: {img['src']}" for img in images]) else: return "Unsupported element type. Use 'text', 'table', 'links', or 'images'" except Exception as e: return f"Error scraping {url}: {str(e)}" class BaseballReferenceScraperTool: """A specialized tool for scraping tables from Baseball Reference websites. This tool handles the specific formatting and HTML comment structure used by Baseball Reference sites to extract tabular data. """ def __init__(self): self.name = "baseball_reference_scraper" def invoke(self, url: str, table_id: Optional[str] = None) -> str: """Scrape a table from Baseball Reference. Args: url: The Baseball Reference URL to scrape table_id: Optional table ID to target a specific table Returns: Table data formatted as text """ try: import requests import pandas as pd from bs4 import BeautifulSoup # Set headers to mimic a real browser headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() # Baseball-Reference often wraps tables in HTML comments text = response.text.replace("", "") soup = BeautifulSoup(text, "html.parser") if table_id: table = soup.find("table", {"id": table_id}) else: table = soup.find("table") if not table: return f"No table found with ID: {table_id}" if table_id else "No table found on the page" # Try to use pandas to parse the table try: df = pd.read_html(str(table))[0] # Format the dataframe as a readable string result = f"Table from {url}\n" if table_id: result += f"Table ID: {table_id}\n" result += f"Shape: {df.shape[0]} rows x {df.shape[1]} columns\n\n" # Show first few rows result += "First 10 rows:\n" result += df.head(10).to_string(index=False) if len(df) > 10: result += f"\n\n... and {len(df) - 10} more rows" return result except Exception as pd_error: # Fallback to manual parsing if pandas fails rows = [] for row in table.find_all("tr"): cells = [cell.get_text(strip=True) for cell in row.find_all(["td", "th"])] if cells: rows.append(" | ".join(cells)) result = f"Table from {url}\n" if table_id: result += f"Table ID: {table_id}\n" result += f"Rows found: {len(rows)}\n\n" result += "\n".join(rows[:20]) # Show first 20 rows if len(rows) > 20: result += f"\n\n... and {len(rows) - 20} more rows" return result except Exception as e: return f"Error scraping Baseball Reference table from {url}: {str(e)}" # Safe tools that don't require dangerous requests SAFE_WEB_TOOLS = [SafeWebSearchTool(), SafeWikipediaSearchTool(), SafeYouTubeTranscriptTool()] def cleanup_web_tools(): """Clean up all web tools to prevent event loop errors.""" for tool in SAFE_WEB_TOOLS: try: if hasattr(tool, 'cleanup'): tool.cleanup() except Exception as e: logger.debug(f"Error cleaning up tool {tool.name}: {e}") # python -c " # import requests # # First fetch the page to get section IDs # resp = requests.get( # 'https://en.wikipedia.org/w/api.php', # params={ # 'action': 'parse', # 'page': 'Mercedes Sosa', # 'prop': 'sections', # 'format': 'json' # } # ) # sections = resp.json()['parse']['sections'] # studio_section = next(s for s in sections if s['line'] == 'Studio albums') # secid = studio_section['index'] # # Then fetch just that section's wikitext # resp2 = requests.get( # 'https://en.wikipedia.org/w/api.php', # params={ # 'action': 'parse', # 'page': 'Mercedes Sosa', # 'format': 'json', # 'prop': 'wikitext', # 'section': secid # } # ) # print(resp2.json()['parse']['wikitext']) # "