Spaces:
Paused
Paused
| """Safe web tools that don't require dangerous requests.""" | |
| import logging | |
| from typing import Dict, Any, Optional | |
| import time | |
| import asyncio | |
| # Use new tavily-python SDK | |
| try: | |
| from tavily import TavilyClient | |
| TAVILY_SDK_AVAILABLE = True | |
| except ImportError: | |
| TAVILY_SDK_AVAILABLE = False | |
| logging.getLogger(__name__).warning("Tavily SDK not available. Please install tavily-python package.") | |
| from langchain_community.tools import DuckDuckGoSearchResults | |
| from langchain_community.utilities import WikipediaAPIWrapper | |
| from langchain_community.document_loaders import YoutubeLoader | |
| from langchain_community.document_loaders.youtube import TranscriptFormat | |
| from langchain_community.document_loaders import ArxivLoader | |
| from langchain_community.document_loaders import WikipediaLoader | |
| from langchain_community.tools.tavily_search import TavilySearchResults | |
| from src.utils.config import config | |
| import re | |
| import requests | |
| import json | |
| logger = logging.getLogger(__name__) | |
| # Rate limiting | |
| last_search_time = 0 | |
| min_search_interval = 3.0 | |
| def _rate_limit(): | |
| """Apply rate limiting to prevent API abuse.""" | |
| global last_search_time | |
| current_time = time.time() | |
| time_since_last = current_time - last_search_time | |
| if time_since_last < min_search_interval: | |
| wait_time = min_search_interval - time_since_last | |
| time.sleep(wait_time) | |
| last_search_time = time.time() | |
| class SafeWebSearchTool: | |
| """A tool for performing safe, rate-limited web searches. | |
| This tool is ideal for general-purpose web searches to answer questions, find information, or gather research. | |
| It is designed to be safe and efficient, with built-in rate limiting to prevent API abuse. | |
| Currently uses Google Search, but can be easily switched to other providers. | |
| """ | |
| def __init__(self, search_provider="google"): | |
| self.name = "safe_web_search" | |
| self._initialized = False | |
| self.search_provider = search_provider | |
| self.searcher = None | |
| def invoke(self, query: str) -> str: | |
| """Executes a web search for the given query. | |
| Args: | |
| query: The search query string. | |
| Returns: | |
| A string containing the search results. | |
| """ | |
| if not self._initialized: | |
| if self.search_provider == "google": | |
| try: | |
| from googlesearch import search | |
| self.searcher = search | |
| self._initialized = True | |
| logger.debug("Google search initialized successfully.") | |
| except ImportError: | |
| logger.error("Google search not available. Please install googlesearch-python package.") | |
| return "Google search not available. Please install googlesearch-python package." | |
| except Exception as e: | |
| logger.error(f"Failed to initialize Google search: {e}") | |
| return f"Failed to initialize Google search: {e}" | |
| else: # Fallback to DuckDuckGo | |
| try: | |
| from langchain_community.tools import DuckDuckGoSearchRun | |
| self.ddg = DuckDuckGoSearchRun() | |
| self._initialized = True | |
| logger.debug("DuckDuckGoSearchTool initialized successfully.") | |
| except ImportError: | |
| logger.error("DuckDuckGo search not available. Please install duckduckgo-search package.") | |
| return "DuckDuckGo search not available. Please install duckduckgo-search package." | |
| except Exception as e: | |
| logger.error(f"Failed to initialize DuckDuckGo search: {e}") | |
| return f"Failed to initialize DuckDuckGo search: {e}" | |
| try: | |
| if self.search_provider == "google": | |
| logger.info(f"Performing Google search for query: '{query}'") | |
| # Apply rate limiting | |
| _rate_limit() | |
| # Get search results from Google | |
| # Import BeautifulSoup for fetching page info | |
| from bs4 import BeautifulSoup | |
| import requests | |
| formatted_results = [] | |
| search_results = [] | |
| # Perform the search | |
| try: | |
| for idx, url in enumerate(self.searcher(query, num_results=5, lang='en')): | |
| search_results.append(url) | |
| if idx >= 4: # Limit to 5 results | |
| break | |
| except Exception as e: | |
| logger.error(f"Error during Google search: {e}") | |
| search_results = [] | |
| logger.debug(f"Raw Google results: {search_results}") | |
| if search_results: | |
| for idx, url in enumerate(search_results): | |
| try: | |
| # Try to fetch page title and snippet | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| } | |
| # Quick fetch with timeout | |
| response = requests.get(url, headers=headers, timeout=2) | |
| if response.status_code == 200: | |
| soup = BeautifulSoup(response.text, 'html.parser') # Parse only first 5KB | |
| # Get title | |
| title = soup.find('title') | |
| title_text = title.text.strip() if title else url | |
| # Try to get description or first paragraph | |
| description = "" | |
| meta_desc = soup.find('meta', attrs={'name': 'description'}) | |
| if meta_desc and meta_desc.get('content'): | |
| description = meta_desc['content'] | |
| else: | |
| # Get first paragraph or text | |
| paragraphs = soup.find_all('p', limit=4) | |
| if paragraphs: | |
| description = ' '.join([p.text.strip() for p in paragraphs]) | |
| formatted_results.append( | |
| f"Description: {description}...\n" if description else "" | |
| ) | |
| else: | |
| # Fallback if we can't fetch the page | |
| formatted_results.append(f"Web Search Result {idx+1}: {url}") | |
| logger.debug(f"Result {idx+1}: URL='{url}'") | |
| except Exception as e: | |
| logger.debug(f"Error processing result {idx+1}: {e}") | |
| # Fallback to just URL | |
| formatted_results.append(f"Web Search Result {idx+1}: {url}") | |
| logger.info(f"Returning {len(formatted_results)} Google search results for query: '{query}'") | |
| return "\n\n---\n".join(formatted_results) | |
| else: | |
| logger.info(f"No Google search results found for query: '{query}'") | |
| return "No search results found." | |
| else: # DuckDuckGo fallback | |
| logger.info(f"Performing DuckDuckGo search for query: '{query}'") | |
| return self.ddg.invoke(query) | |
| # logger.debug(f"Raw DuckDuckGo results: {results}") | |
| # # Format results as a clean string instead of list representation | |
| # if results: | |
| # formatted_results = [] | |
| # for idx, result in enumerate(results): | |
| # title = result.get('title', 'No title') | |
| # body = result.get('body', 'No description') | |
| # href = result.get('href', 'No URL') | |
| # logger.debug(f"Result {idx+1}: Title='{title}', URL='{href}'") | |
| # formatted_results.append(f"Web Search Result {idx+1}: {body} \n") | |
| # logger.info(f"Returning {len(formatted_results)} DuckDuckGo search results for query: '{query}'") | |
| # return "\n---\n".join(formatted_results) | |
| # else: | |
| # logger.info(f"No DuckDuckGo search results found for query: '{query}'") | |
| # return "No search results found." | |
| except Exception as e: | |
| logger.error(f"{self.search_provider} search error for query '{query}': {e}") | |
| return f"{self.search_provider} search error: {e}" | |
| def cleanup(self): | |
| """Clean up any resources.""" | |
| # Clean up DuckDuckGo if needed | |
| if hasattr(self, 'ddg') and self.ddg: | |
| try: | |
| if hasattr(self.ddg, 'close'): | |
| self.ddg.close() | |
| except Exception as e: | |
| logger.debug(f"Error cleaning up DuckDuckGo: {e}") | |
| # Google search doesn't require cleanup | |
| self.searcher = None | |
| class BaseWikipediaTool: | |
| """A tool for searching Wikipedia and loading article content. | |
| This tool allows you to search for a specific query on Wikipedia and retrieve the content of the most relevant articles. | |
| You can control the number of articles to load, making it useful for both quick lookups and in-depth research. | |
| """ | |
| def __init__(self): | |
| self.name = "base_wikipedia" | |
| self.query = "" | |
| self.load_max_docs = 5 | |
| def invoke(self, query: str, load_max_docs: int = 5) -> str: | |
| """Searches Wikipedia and loads the content of the top matching articles. | |
| Args: | |
| query: The search query. | |
| load_max_docs: The maximum number of documents to load. | |
| Returns: | |
| A formatted string containing the content of the loaded Wikipedia articles. | |
| """ | |
| self.query = query | |
| self.load_max_docs = load_max_docs | |
| # Use WikipediaLoader with increased content length to get full articles including discography | |
| search_docs = WikipediaLoader( | |
| query=self.query, | |
| load_max_docs=self.load_max_docs, | |
| doc_content_chars_max=15000 # Increased from default 4000 to get full content including discography | |
| ).load() | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
| for doc in search_docs | |
| ] | |
| ) | |
| return formatted_search_docs | |
| def cleanup(self): | |
| """Clean up any resources.""" | |
| pass | |
| class ArxivLoaderTool: | |
| """A tool for searching and loading papers from Arxiv. | |
| Use this tool to find and retrieve academic papers from the Arxiv repository. | |
| It is ideal for research, especially in scientific and technical fields. | |
| You can specify the number of papers to load. | |
| """ | |
| def __init__(self): | |
| self.name = "arxiv_search" | |
| self.query = "" | |
| self.load_max_docs = 3 | |
| def load(self, query: str, load_max_docs: int = 3) -> str: | |
| """Searches Arxiv and loads the content of the most relevant papers. | |
| Args: | |
| query: The search query (e.g., paper title, author, keywords). | |
| load_max_docs: The maximum number of papers to load. | |
| Returns: | |
| A formatted string containing the content of the loaded Arxiv papers. | |
| """ | |
| self.query = query | |
| self.load_max_docs = load_max_docs | |
| search_docs = ArxivLoader(query=self.query, load_max_docs=self.load_max_docs).load() | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>' | |
| for doc in search_docs | |
| ] | |
| ) | |
| return formatted_search_docs | |
| def cleanup(self): | |
| """Clean up any resources.""" | |
| pass | |
| class TavilyWebSearchTool: | |
| """A powerful web search tool using the Tavily API. | |
| This tool provides a high-quality, AI-optimized search experience. | |
| It is best used for complex queries that require a deeper understanding of the topic. | |
| Requires a Tavily API key to be configured. | |
| """ | |
| def __init__(self): | |
| self.name = "web_search" | |
| if TAVILY_SDK_AVAILABLE and config.TAVILY_API_KEY: | |
| self.tavily_client = TavilyClient(api_key=config.TAVILY_API_KEY) | |
| else: | |
| self.tavily_client = None | |
| def invoke(self, query: str) -> str: | |
| """Executes a web search using the Tavily API. | |
| Args: | |
| query: The search query. | |
| Returns: | |
| A formatted string containing the search results. | |
| """ | |
| search_docs = TavilySearchResults(max_results=3).invoke(query=query) | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
| for doc in search_docs | |
| ] | |
| ) | |
| return formatted_search_docs | |
| def cleanup(self): | |
| """Clean up any resources.""" | |
| self.tavily_client = None | |
| pass | |
| class SafeWikipediaSearchTool: | |
| """Enhanced Wikipedia search tool that can fetch specific sections when needed. | |
| This tool first tries the regular Wikipedia search, and if it finds empty sections, | |
| it can fetch specific section content using the Wikipedia API. | |
| """ | |
| def __init__(self): | |
| self.name = "safe_wikipedia_search" | |
| self.base_tool = BaseWikipediaTool() | |
| def invoke(self, query: str, load_max_docs: int = 3, section_name: Optional[str] = None) -> str: | |
| """Search Wikipedia with optional section-specific fetching. | |
| Args: | |
| query: The search query (page name) | |
| load_max_docs: Maximum number of documents to load | |
| section_name: Optional section name to fetch specifically (e.g., "Studio albums") | |
| Returns: | |
| Wikipedia content, with section-specific content if requested | |
| """ | |
| if section_name: | |
| # Try to get specific section content | |
| section_content = self._get_wikipedia_section(query, section_name) | |
| if section_content: | |
| return f"Wikipedia Section '{section_name}' for '{query}':\n\n{section_content}" | |
| # Fall back to regular Wikipedia search | |
| regular_result = self.base_tool.invoke(query, load_max_docs) | |
| # Check if we found empty sections that might need API fetching | |
| if section_name and self._has_empty_section(regular_result, section_name): | |
| section_content = self._get_wikipedia_section(query, section_name) | |
| if section_content: | |
| return f"{regular_result}\n\n--- Enhanced Section Content ---\n\nSection '{section_name}':\n{section_content}" | |
| return regular_result | |
| def _has_empty_section(self, content: str, section_name: str) -> bool: | |
| """Check if a section exists but appears to be empty.""" | |
| section_marker = f"=== {section_name} ===" | |
| if section_marker in content: | |
| # Find the section and check if it's followed by another section quickly | |
| idx = content.find(section_marker) | |
| next_section_idx = content.find("===", idx + len(section_marker)) | |
| if next_section_idx != -1: | |
| section_content = content[idx:next_section_idx].strip() | |
| # If the section is very short (just the header), it's likely empty | |
| return len(section_content) < 50 | |
| return False | |
| def _get_wikipedia_section(self, page_name: str, section_name: str) -> Optional[str]: | |
| """Fetch specific section content using Wikipedia API. | |
| Args: | |
| page_name: The Wikipedia page name | |
| section_name: The section name to fetch | |
| Returns: | |
| Section content as formatted text, or None if not found | |
| """ | |
| try: | |
| # First, get all sections to find the section ID | |
| resp = requests.get( | |
| 'https://en.wikipedia.org/w/api.php', | |
| params={ | |
| 'action': 'parse', | |
| 'page': page_name, | |
| 'prop': 'sections', | |
| 'format': 'json' | |
| }, | |
| timeout=10 | |
| ) | |
| if resp.status_code != 200: | |
| return None | |
| data = resp.json() | |
| if 'parse' not in data or 'sections' not in data['parse']: | |
| return None | |
| sections = data['parse']['sections'] | |
| studio_section = None | |
| # Find the section by name | |
| for section in sections: | |
| if section.get('line') == section_name: | |
| studio_section = section | |
| break | |
| if not studio_section: | |
| return None | |
| section_id = studio_section['index'] | |
| # Now fetch the section content | |
| resp2 = requests.get( | |
| 'https://en.wikipedia.org/w/api.php', | |
| params={ | |
| 'action': 'parse', | |
| 'page': page_name, | |
| 'format': 'json', | |
| 'prop': 'wikitext', | |
| 'section': section_id | |
| }, | |
| timeout=10 | |
| ) | |
| if resp2.status_code != 200: | |
| return None | |
| data2 = resp2.json() | |
| if 'parse' not in data2 or 'wikitext' not in data2['parse']: | |
| return None | |
| wikitext = data2['parse']['wikitext']['*'] | |
| # Convert wikitext to readable format | |
| return self._format_wikitext(wikitext) | |
| except Exception as e: | |
| print(f"Error fetching Wikipedia section: {e}") | |
| return None | |
| def _format_wikitext(self, wikitext: str) -> str: | |
| """Convert wikitext to a more readable format.""" | |
| lines = wikitext.split('\n') | |
| formatted_lines = [] | |
| for line in lines: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| # Handle table rows | |
| if line.startswith('|-'): | |
| continue | |
| elif line.startswith('|') and not line.startswith('|+'): | |
| # Table cell content | |
| cell_content = line[1:].strip() | |
| if cell_content and not cell_content.startswith('{'): | |
| # Clean up wiki markup | |
| cell_content = cell_content.replace("''", "").replace("[[", "").replace("]]", "") | |
| # Remove small tags and other markup | |
| if '<small>' in cell_content: | |
| cell_content = cell_content.replace('<small>', '(').replace('</small>', ')') | |
| formatted_lines.append(cell_content) | |
| elif line.startswith('!'): | |
| # Table header | |
| header = line[1:].strip() | |
| if header: | |
| formatted_lines.append(f"=== {header} ===") | |
| return '\n'.join(formatted_lines) | |
| class SafeYouTubeTranscriptTool: | |
| """A tool for extracting transcripts from YouTube videos. | |
| Provide a YouTube video URL, and this tool will return the full transcript. | |
| It is useful for analyzing video content, extracting quotes, or creating summaries. | |
| """ | |
| def __init__(self): | |
| self.name = "safe_youtube_transcript" | |
| self._initialized = True # No async resources to initialize | |
| def invoke(self, query: str) -> str: | |
| """Extracts the transcript from a YouTube video URL. | |
| Args: | |
| query: The URL of the YouTube video. | |
| Returns: | |
| A string containing the video's transcript. | |
| """ | |
| loader = YoutubeLoader.from_youtube_url( | |
| query, | |
| add_video_info=False | |
| ) | |
| documents = loader.load() | |
| result = "\n\n".join([doc.page_content for doc in documents]) | |
| return result | |
| def cleanup(self): | |
| """Clean up any resources.""" | |
| # No cleanup needed for transcript tool | |
| pass | |
| # Update the toolbelt to include the new tool | |
| class WebScraperTool: | |
| """A general web scraper tool that can extract content from web pages. | |
| This tool fetches web pages and extracts text content, tables, or specific elements | |
| using BeautifulSoup for HTML parsing. | |
| """ | |
| def __init__(self): | |
| self.name = "web_scraper" | |
| def invoke(self, url: str, element_type: str = "text", selector: Optional[str] = None) -> str: | |
| """Scrape content from a web page. | |
| Args: | |
| url: The URL to scrape | |
| element_type: Type of content to extract ('text', 'table', 'links', 'images') | |
| selector: Optional CSS selector or element ID to target specific content | |
| Returns: | |
| Extracted content as formatted text | |
| """ | |
| try: | |
| import requests | |
| from bs4 import BeautifulSoup | |
| # Set headers to mimic a real browser | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| } | |
| response = requests.get(url, headers=headers, timeout=10) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| if element_type == "text": | |
| if selector: | |
| elements = soup.select(selector) | |
| return '\n'.join([elem.get_text(strip=True) for elem in elements]) | |
| else: | |
| # Remove script and style elements | |
| for script in soup(["script", "style"]): | |
| script.decompose() | |
| return soup.get_text(strip=True) | |
| elif element_type == "table": | |
| if selector: | |
| table = soup.select_one(selector) | |
| else: | |
| table = soup.find("table") | |
| if table: | |
| rows = [] | |
| for row in table.find_all("tr"): | |
| cells = [cell.get_text(strip=True) for cell in row.find_all(["td", "th"])] | |
| if cells: | |
| rows.append(" | ".join(cells)) | |
| return "\n".join(rows) | |
| else: | |
| return "No table found" | |
| elif element_type == "links": | |
| links = soup.find_all("a", href=True) | |
| return "\n".join([f"{link.get_text(strip=True)}: {link['href']}" for link in links if link.get_text(strip=True)]) | |
| elif element_type == "images": | |
| images = soup.find_all("img", src=True) | |
| return "\n".join([f"{img.get('alt', 'No alt text')}: {img['src']}" for img in images]) | |
| else: | |
| return "Unsupported element type. Use 'text', 'table', 'links', or 'images'" | |
| except Exception as e: | |
| return f"Error scraping {url}: {str(e)}" | |
| class BaseballReferenceScraperTool: | |
| """A specialized tool for scraping tables from Baseball Reference websites. | |
| This tool handles the specific formatting and HTML comment structure used by | |
| Baseball Reference sites to extract tabular data. | |
| """ | |
| def __init__(self): | |
| self.name = "baseball_reference_scraper" | |
| def invoke(self, url: str, table_id: Optional[str] = None) -> str: | |
| """Scrape a table from Baseball Reference. | |
| Args: | |
| url: The Baseball Reference URL to scrape | |
| table_id: Optional table ID to target a specific table | |
| Returns: | |
| Table data formatted as text | |
| """ | |
| try: | |
| import requests | |
| import pandas as pd | |
| from bs4 import BeautifulSoup | |
| # Set headers to mimic a real browser | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| } | |
| response = requests.get(url, headers=headers, timeout=10) | |
| response.raise_for_status() | |
| # Baseball-Reference often wraps tables in HTML comments | |
| text = response.text.replace("<!--", "").replace("-->", "") | |
| soup = BeautifulSoup(text, "html.parser") | |
| if table_id: | |
| table = soup.find("table", {"id": table_id}) | |
| else: | |
| table = soup.find("table") | |
| if not table: | |
| return f"No table found with ID: {table_id}" if table_id else "No table found on the page" | |
| # Try to use pandas to parse the table | |
| try: | |
| df = pd.read_html(str(table))[0] | |
| # Format the dataframe as a readable string | |
| result = f"Table from {url}\n" | |
| if table_id: | |
| result += f"Table ID: {table_id}\n" | |
| result += f"Shape: {df.shape[0]} rows x {df.shape[1]} columns\n\n" | |
| # Show first few rows | |
| result += "First 10 rows:\n" | |
| result += df.head(10).to_string(index=False) | |
| if len(df) > 10: | |
| result += f"\n\n... and {len(df) - 10} more rows" | |
| return result | |
| except Exception as pd_error: | |
| # Fallback to manual parsing if pandas fails | |
| rows = [] | |
| for row in table.find_all("tr"): | |
| cells = [cell.get_text(strip=True) for cell in row.find_all(["td", "th"])] | |
| if cells: | |
| rows.append(" | ".join(cells)) | |
| result = f"Table from {url}\n" | |
| if table_id: | |
| result += f"Table ID: {table_id}\n" | |
| result += f"Rows found: {len(rows)}\n\n" | |
| result += "\n".join(rows[:20]) # Show first 20 rows | |
| if len(rows) > 20: | |
| result += f"\n\n... and {len(rows) - 20} more rows" | |
| return result | |
| except Exception as e: | |
| return f"Error scraping Baseball Reference table from {url}: {str(e)}" | |
| # Safe tools that don't require dangerous requests | |
| SAFE_WEB_TOOLS = [SafeWebSearchTool(), SafeWikipediaSearchTool(), SafeYouTubeTranscriptTool()] | |
| def cleanup_web_tools(): | |
| """Clean up all web tools to prevent event loop errors.""" | |
| for tool in SAFE_WEB_TOOLS: | |
| try: | |
| if hasattr(tool, 'cleanup'): | |
| tool.cleanup() | |
| except Exception as e: | |
| logger.debug(f"Error cleaning up tool {tool.name}: {e}") | |
| # python -c " | |
| # import requests | |
| # # First fetch the page to get section IDs | |
| # resp = requests.get( | |
| # 'https://en.wikipedia.org/w/api.php', | |
| # params={ | |
| # 'action': 'parse', | |
| # 'page': 'Mercedes Sosa', | |
| # 'prop': 'sections', | |
| # 'format': 'json' | |
| # } | |
| # ) | |
| # sections = resp.json()['parse']['sections'] | |
| # studio_section = next(s for s in sections if s['line'] == 'Studio albums') | |
| # secid = studio_section['index'] | |
| # # Then fetch just that section's wikitext | |
| # resp2 = requests.get( | |
| # 'https://en.wikipedia.org/w/api.php', | |
| # params={ | |
| # 'action': 'parse', | |
| # 'page': 'Mercedes Sosa', | |
| # 'format': 'json', | |
| # 'prop': 'wikitext', | |
| # 'section': secid | |
| # } | |
| # ) | |
| # print(resp2.json()['parse']['wikitext']) | |
| # " |