Spaces:
Sleeping
Sleeping
| """ | |
| Specific tool implementations for GAIA benchmark agent. | |
| Each tool has a focused, single responsibility. | |
| """ | |
| import os | |
| import re | |
| import chess | |
| import chess.pgn | |
| import numpy as np | |
| import pandas as pd | |
| import requests | |
| import whisper | |
| import wikipedia | |
| from bs4 import BeautifulSoup | |
| from langchain_community.tools import DuckDuckGoSearchRun, WikipediaQueryRun | |
| from langchain_community.tools.wikidata.tool import WikidataAPIWrapper, WikidataQueryRun | |
| from langchain_community.utilities import ArxivAPIWrapper, WikipediaAPIWrapper | |
| from openpyxl import load_workbook | |
| from smolagents import tool | |
| # === SEARCH AND WEB TOOLS === | |
| additional_tools = [ | |
| WikidataQueryRun(api_wrapper=WikidataAPIWrapper()), | |
| WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper()), | |
| ] | |
| def wikipedia_search(query: str, language: str = "en", sentences: int = 3) -> str: | |
| """ | |
| Search Wikipedia for information on a specific topic. | |
| Args: | |
| query: The search term or topic | |
| language: Wikipedia language code (default: "en") | |
| sentences: Number of sentences to return from summary (default: 3) | |
| Returns: | |
| Wikipedia article information including title, summary, and URL | |
| """ | |
| try: | |
| wikipedia.set_lang(language) | |
| # Try direct page access first | |
| try: | |
| page = wikipedia.page(query) | |
| summary = wikipedia.summary(query, sentences=sentences) | |
| return f"Title: {page.title}\nSummary: {summary}\nURL: {page.url}" | |
| except wikipedia.DisambiguationError as e: | |
| # Handle disambiguation by trying first option | |
| page = wikipedia.page(e.options[0]) | |
| summary = wikipedia.summary(e.options[0], sentences=sentences) | |
| return f"Title: {page.title}\nSummary: {summary}\nURL: {page.url}\nNote: Disambiguation resolved to first option from: {', '.join(e.options[:3])}" | |
| except wikipedia.PageError: | |
| # Search for similar pages | |
| search_results = wikipedia.search(query, results=5) | |
| if search_results: | |
| page = wikipedia.page(search_results[0]) | |
| summary = wikipedia.summary(search_results[0], sentences=sentences) | |
| return f"Title: {page.title}\nSummary: {summary}\nURL: {page.url}\nNote: Found via search, other results: {', '.join(search_results[1:3])}" | |
| else: | |
| return f"No Wikipedia results found for: {query}" | |
| except Exception as e: | |
| return f"Error searching Wikipedia: {str(e)}" | |
| def web_search_duckduckgo(query: str, max_results: int = 5) -> str: | |
| """ | |
| Search the web using DuckDuckGo search engine. | |
| Args: | |
| query: Search query string | |
| max_results: Maximum number of results to return (default: 5) | |
| Returns: | |
| Formatted search results with titles, URLs, and snippets | |
| """ | |
| try: | |
| from duckduckgo_search import DDGS | |
| results = [] | |
| with DDGS() as ddgs: | |
| search_results = ddgs.text(query, max_results=max_results) | |
| for result in search_results: | |
| results.append(result) | |
| if not results: | |
| return f"No search results found for: {query}" | |
| formatted_results = [] | |
| for i, result in enumerate(results, 1): | |
| formatted_results.append( | |
| f"{i}. {result.get('title', 'No title')}\n" | |
| f" URL: {result.get('href', 'No URL')}\n" | |
| f" Snippet: {result.get('body', 'No description')[:300]}...\n" | |
| ) | |
| return "\n".join(formatted_results) | |
| except Exception as e: | |
| return f"Error in web search: {str(e)}" | |
| def fetch_webpage_content(url: str, max_length: int = 3000) -> str: | |
| """ | |
| Fetch and extract text content from a webpage. | |
| Args: | |
| url: The URL to fetch | |
| max_length: Maximum length of content to return (default: 3000) | |
| Returns: | |
| Cleaned text content from the webpage | |
| """ | |
| try: | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" | |
| } | |
| response = requests.get(url, headers=headers, timeout=10) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, "html.parser") | |
| # Remove unwanted elements | |
| for element in soup(["script", "style", "nav", "header", "footer", "aside"]): | |
| element.decompose() | |
| # Extract text | |
| text = soup.get_text(separator=" ", strip=True) | |
| # Clean up whitespace | |
| text = re.sub(r"\s+", " ", text) | |
| if len(text) > max_length: | |
| text = text[:max_length] + "..." | |
| return f"Content from {url}:\n{text}" | |
| except Exception as e: | |
| return f"Error fetching webpage {url}: {str(e)}" | |
| def arxiv_search(query: str) -> str: | |
| """ | |
| Search arXiv papers. | |
| Args: | |
| query: Search query or paper ID (e.g., "1605.08386") | |
| Returns: | |
| str: arXiv paper information | |
| """ | |
| try: | |
| arxiv = ArxivAPIWrapper() | |
| docs = arxiv.run(query) | |
| return str(docs) | |
| except Exception as e: | |
| return f"arXiv search error: {str(e)}" | |
| def wikipedia_search_tool(query: str) -> str: | |
| """ | |
| Search Wikipedia using LangChain's WikipediaQueryRun. | |
| Args: | |
| query: Search query | |
| Returns: | |
| str: Wikipedia search results | |
| """ | |
| try: | |
| wikipedia = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper()) | |
| result = wikipedia.run(query) | |
| return result | |
| except Exception as e: | |
| return f"Wikipedia search error: {str(e)}" | |
| def duckduckgo_search(query: str) -> str: | |
| """ | |
| Search using DuckDuckGo. | |
| Args: | |
| query: Search query | |
| Returns: | |
| str: DuckDuckGo search results | |
| """ | |
| try: | |
| search = DuckDuckGoSearchRun() | |
| result = search.invoke(query) | |
| return result | |
| except Exception as e: | |
| return f"DuckDuckGo search error: {str(e)}" | |
| def load_csv_file( | |
| filepath: str, | |
| max_rows: int = 100, | |
| max_columns: int = 20, | |
| get_all_rows: bool = False, | |
| ) -> str: | |
| """ | |
| Load and analyze a CSV file. | |
| Args: | |
| filepath: Path to the CSV file | |
| max_rows: Maximum number of rows to display (default: 100) | |
| max_columns: Maximum number of columns to display (default: 20) | |
| get_all_rows: If True, return all rows regardless of max_rows (default: False) | |
| Returns: | |
| CSV file content and basic statistics | |
| """ | |
| try: | |
| if not os.path.exists(filepath): | |
| return f"Error: File not found at {filepath}" | |
| # Read CSV | |
| df = pd.read_csv(filepath) | |
| # Basic info | |
| total_rows, total_cols = df.shape | |
| info = f"CSV File: {filepath}\n" | |
| info += f"Shape: {total_rows} rows × {total_cols} columns\n" | |
| info += f"Columns: {list(df.columns)}\n\n" | |
| # Limit display if needed | |
| display_rows = total_rows if get_all_rows else min(max_rows, total_rows) | |
| display_cols = min(max_columns, total_cols) | |
| if display_cols < total_cols: | |
| df_display = df.iloc[:display_rows, :display_cols] | |
| info += f"Showing first {display_rows} rows and {display_cols} columns:\n" | |
| else: | |
| df_display = df.iloc[:display_rows] | |
| info += f"Showing first {display_rows} rows:\n" | |
| info += df_display.to_string(index=True) | |
| # Basic statistics for numeric columns | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns | |
| if len(numeric_cols) > 0: | |
| info += "\n\nNumeric column statistics:\n" | |
| info += df[numeric_cols].describe().to_string() | |
| return info | |
| except Exception as e: | |
| return f"Error loading CSV file: {str(e)}" | |
| def load_excel_file( | |
| filepath: str, | |
| sheet_name: str | None = None, | |
| max_rows: int = 100, | |
| max_columns: int = 20, | |
| get_all_rows: bool = False, | |
| ) -> str: | |
| """ | |
| Load and analyze an Excel file. | |
| Args: | |
| filepath: Path to the Excel file | |
| sheet_name: Specific sheet to load (default: None for first sheet) | |
| max_rows: Maximum number of rows to display (default: 100) | |
| max_columns: Maximum number of columns to display (default: 20) | |
| get_all_rows: If True, return all rows regardless of max_rows (default: False) | |
| Returns: | |
| Excel file content and basic statistics | |
| """ | |
| try: | |
| if not os.path.exists(filepath): | |
| return f"Error: File not found at {filepath}" | |
| # Load workbook to get sheet names | |
| wb = load_workbook(filepath, read_only=True) | |
| sheet_names = wb.sheetnames | |
| # Determine which sheet to read | |
| if sheet_name and sheet_name in sheet_names: | |
| target_sheet = sheet_name | |
| else: | |
| target_sheet = sheet_names[0] # First sheet | |
| # Read Excel | |
| df = pd.read_excel(filepath, sheet_name=target_sheet) | |
| # Basic info | |
| total_rows, total_cols = df.shape | |
| info = f"Excel File: {filepath}\n" | |
| info += f"Available sheets: {sheet_names}\n" | |
| info += f"Reading sheet: '{target_sheet}'\n" | |
| info += f"Shape: {total_rows} rows × {total_cols} columns\n" | |
| info += f"Columns: {list(df.columns)}\n\n" | |
| # Limit display if needed | |
| display_rows = total_rows if get_all_rows else min(max_rows, total_rows) | |
| display_cols = min(max_columns, total_cols) | |
| if display_cols < total_cols: | |
| df_display = df.iloc[:display_rows, :display_cols] | |
| info += f"Showing first {display_rows} rows and {display_cols} columns:\n" | |
| else: | |
| df_display = df.iloc[:display_rows] | |
| info += f"Showing first {display_rows} rows:\n" | |
| info += df_display.to_string(index=True) | |
| # Basic statistics for numeric columns | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns | |
| if len(numeric_cols) > 0: | |
| info += "\n\nNumeric column statistics:\n" | |
| info += df[numeric_cols].describe().to_string() | |
| return info | |
| except Exception as e: | |
| return f"Error loading Excel file: {str(e)}" | |
| def read_text_file(filepath: str, max_length: int = 2000, encoding: str = "utf-8") -> str: | |
| """ | |
| Read content from a text file. | |
| Args: | |
| filepath: Path to the text file | |
| max_length: Maximum length of content to return (default: 2000) | |
| encoding: File encoding (default: "utf-8") | |
| Returns: | |
| Text file content | |
| """ | |
| try: | |
| if not os.path.exists(filepath): | |
| return f"Error: File not found at {filepath}" | |
| with open(filepath, encoding=encoding) as f: | |
| content = f.read() | |
| file_info = f"Text File: {filepath}\n" | |
| file_info += f"File size: {len(content)} characters\n" | |
| file_info += f"Lines: {content.count(chr(10)) + 1}\n\n" | |
| if len(content) > max_length: | |
| file_info += f"Content (first {max_length} characters):\n" | |
| file_info += content[:max_length] + "..." | |
| else: | |
| file_info += "Content:\n" + content | |
| return file_info | |
| except Exception as e: | |
| return f"Error reading text file: {str(e)}" | |
| # === AUDIO PROCESSING === | |
| def transcribe_audio_file(filepath: str, model_size: str = "base") -> str: | |
| """ | |
| Transcribe audio file to text using Whisper. | |
| Args: | |
| filepath: Path to the audio file | |
| model_size: Whisper model size ("tiny", "base", "small", "medium", "large") | |
| Returns: | |
| Transcribed text from the audio file | |
| """ | |
| try: | |
| if not os.path.exists(filepath): | |
| return f"Error: Audio file not found at {filepath}" | |
| # Load Whisper model | |
| model = whisper.load_model(model_size) | |
| # Transcribe audio | |
| result = model.transcribe(filepath) | |
| transcription_info = f"Audio File: {filepath}\n" | |
| transcription_info += f"Model used: {model_size}\n" | |
| transcription_info += f"Language detected: {result.get('language', 'Unknown')}\n\n" | |
| transcription_info += f"Transcription:\n{result['text']}" | |
| return transcription_info | |
| except Exception as e: | |
| return f"Error transcribing audio: {str(e)}" | |
| # === CHESS ANALYSIS === | |
| def analyze_chess_position(fen_notation: str) -> str: | |
| """ | |
| Analyze a chess position given in FEN notation. | |
| Args: | |
| fen_notation: Chess position in FEN (Forsyth-Edwards Notation) | |
| Returns: | |
| Analysis of the chess position including legal moves | |
| """ | |
| try: | |
| # Create board from FEN | |
| board = chess.Board(fen_notation) | |
| analysis = "Chess Position Analysis\n" | |
| analysis += f"FEN: {fen_notation}\n" | |
| analysis += f"Turn: {'White' if board.turn else 'Black'}\n" | |
| analysis += f"Castling rights: {board.castling_rights}\n" | |
| # Check game state | |
| if board.is_checkmate(): | |
| analysis += "Status: CHECKMATE\n" | |
| elif board.is_stalemate(): | |
| analysis += "Status: STALEMATE\n" | |
| elif board.is_check(): | |
| analysis += "Status: IN CHECK\n" | |
| else: | |
| analysis += "Status: Normal play\n" | |
| # List legal moves | |
| legal_moves = list(board.legal_moves) | |
| analysis += f"\nNumber of legal moves: {len(legal_moves)}\n" | |
| if legal_moves: | |
| # Categorize moves | |
| captures = [move for move in legal_moves if board.is_capture(move)] | |
| checks = [] | |
| for move in legal_moves: | |
| board.push(move) | |
| if board.is_check(): | |
| checks.append(move) | |
| board.pop() | |
| analysis += f"Legal moves: {', '.join([str(move) for move in legal_moves[:10]])}" | |
| if len(legal_moves) > 10: | |
| analysis += f" ... and {len(legal_moves) - 10} more" | |
| if captures: | |
| analysis += f"\nCaptures available: {', '.join([str(move) for move in captures])}" | |
| if checks: | |
| analysis += f"\nChecking moves: {', '.join([str(move) for move in checks])}" | |
| return analysis | |
| except Exception as e: | |
| return f"Error analyzing chess position: {str(e)}" | |
| # === STRING MANIPULATION === | |
| def reverse_string(text: str) -> str: | |
| """ | |
| Reverse a string character by character. | |
| Args: | |
| text: The string to reverse | |
| Returns: | |
| Reversed string | |
| """ | |
| try: | |
| reversed_text = text[::-1] | |
| return f"Original: '{text}'\nReversed: '{reversed_text}'" | |
| except Exception as e: | |
| return f"Error reversing string: {str(e)}" | |
| def reverse_words_in_string(text: str) -> str: | |
| """ | |
| Reverse the order of words in a string. | |
| Args: | |
| text: The string with words to reverse | |
| Returns: | |
| String with words in reverse order | |
| """ | |
| try: | |
| words = text.split() | |
| reversed_words = " ".join(words[::-1]) | |
| return f"Original: '{text}'\nWords reversed: '{reversed_words}'" | |
| except Exception as e: | |
| return f"Error reversing words: {str(e)}" | |
| # === DATA ANALYSIS === | |
| def analyze_table_commutativity(table_data: str) -> str: | |
| """ | |
| Analyze a mathematical operation table for commutativity. | |
| Args: | |
| table_data: String representation of the operation table | |
| Returns: | |
| Analysis of commutativity and any counter-examples | |
| """ | |
| try: | |
| lines = table_data.strip().split("\n") | |
| # Parse table | |
| table = {} | |
| elements = [] | |
| for line in lines: | |
| if "|" in line and not line.strip().startswith("|---|"): | |
| parts = [p.strip() for p in line.split("|") if p.strip()] | |
| if len(parts) > 1: | |
| row_element = parts[0] | |
| if row_element != "*": # Skip header row | |
| elements.append(row_element) | |
| table[row_element] = parts[1:] | |
| # Check commutativity | |
| violations = set() | |
| for i, elem1 in enumerate(elements): | |
| for j, elem2 in enumerate(elements): | |
| if i < len(table[elem1]) and j < len(table[elem2]): | |
| # Get a*b and b*a | |
| val1 = table[elem1][j] if j < len(table[elem1]) else None | |
| val2 = table[elem2][i] if i < len(table[elem2]) else None | |
| if val1 and val2 and val1 != val2: | |
| violations.add(elem1) | |
| violations.add(elem2) | |
| result = "Commutativity analysis:\n" | |
| result += f"Elements: {elements}\n" | |
| if violations: | |
| sorted_violations = sorted(list(violations)) | |
| result += "Operation is NOT commutative\n" | |
| result += f"Elements involved in violations: {', '.join(sorted_violations)}" | |
| else: | |
| result += "Operation appears to be commutative" | |
| return result | |
| except Exception as e: | |
| return f"Error analyzing table commutativity: {str(e)}" | |
| def count_items_in_list(items_text: str, separator: str = ",") -> str: | |
| """ | |
| Count items in a delimited list. | |
| Args: | |
| items_text: Text containing delimited items | |
| separator: Delimiter to split on (default: ",") | |
| Returns: | |
| Count and list of items | |
| """ | |
| try: | |
| items = [item.strip() for item in items_text.split(separator) if item.strip()] | |
| count = len(items) | |
| result = f"Item count: {count}\n" | |
| result += f"Items: {items}" | |
| return result | |
| except Exception as e: | |
| return f"Error counting items: {str(e)}" | |
| def ocr_tool(image_path: str) -> str: | |
| """ | |
| Extract text from images using OCR. | |
| Args: | |
| image_path: Path to image file | |
| Returns: | |
| str: Extracted text | |
| """ | |
| try: | |
| import pytesseract | |
| from PIL import Image | |
| image = Image.open(image_path) | |
| text = pytesseract.image_to_string(image) | |
| return f"OCR text from {image_path}:\n{text.strip()}" | |
| except ImportError: | |
| return "Error: pytesseract not installed. Install with: pip install pytesseract" | |
| except Exception as e: | |
| return f"OCR error: {str(e)}" | |
| def image_captioning_tool(image_path: str) -> str: | |
| """ | |
| Generate basic image information (placeholder for actual captioning). | |
| Args: | |
| image_path: Path to image file | |
| Returns: | |
| str: Basic image information | |
| """ | |
| try: | |
| from PIL import Image | |
| image = Image.open(image_path) | |
| width, height = image.size | |
| mode = image.mode | |
| format_type = image.format | |
| caption = f"Image: {image_path}\n" | |
| caption += f"Dimensions: {width}x{height} pixels\n" | |
| caption += f"Mode: {mode}\n" | |
| caption += f"Format: {format_type}\n" | |
| caption += "Note: For detailed content description, integrate with a vision model." | |
| return caption | |
| except Exception as e: | |
| return f"Image info error: {str(e)}" | |
| def visual_qa_tool(image_path: str, question: str) -> str: | |
| """ | |
| Answer questions about images (placeholder for actual VQA). | |
| Args: | |
| image_path: Path to image file | |
| question: Question about the image | |
| Returns: | |
| str: Basic response about the image | |
| """ | |
| try: | |
| from PIL import Image | |
| image = Image.open(image_path) | |
| width, height = image.size | |
| mode = image.mode | |
| response = f"Question: {question}\n" | |
| response += f"Image: {image_path} ({width}x{height}, {mode})\n" | |
| response += "Note: For actual visual QA, integrate with a vision-language model." | |
| return response | |
| except Exception as e: | |
| return f"Visual QA error: {str(e)}" | |
| all_tools = [ | |
| wikipedia_search, | |
| web_search_duckduckgo, | |
| fetch_webpage_content, | |
| arxiv_search, | |
| wikipedia_search_tool, | |
| duckduckgo_search, | |
| load_csv_file, | |
| load_excel_file, | |
| read_text_file, | |
| transcribe_audio_file, | |
| analyze_chess_position, | |
| reverse_string, | |
| reverse_words_in_string, | |
| analyze_table_commutativity, | |
| count_items_in_list, | |
| ocr_tool, | |
| image_captioning_tool, | |
| visual_qa_tool, | |
| ] | |