Spaces:
Runtime error
Runtime error
| """ | |
| Tools for the FlexibleAgent | |
| All tool functions that the agent can use | |
| """ | |
| import os | |
| import re | |
| import requests | |
| import tempfile | |
| import mimetypes | |
| from pathlib import Path | |
| from langchain_core.tools import tool | |
| from langchain_community.retrievers import WikipediaRetriever | |
| from langchain_community.document_loaders import ( | |
| UnstructuredFileLoader, | |
| TextLoader, | |
| CSVLoader, | |
| PDFPlumberLoader, | |
| UnstructuredImageLoader, | |
| UnstructuredMarkdownLoader, | |
| UnstructuredWordDocumentLoader, | |
| UnstructuredPowerPointLoader, | |
| UnstructuredExcelLoader | |
| ) | |
| from langchain_community.tools.tavily_search import TavilySearchResults | |
| from langchain_core.tools import Tool | |
| from langchain_google_community import GoogleSearchAPIWrapper | |
| from langchain_community.tools import DuckDuckGoSearchResults | |
| from langchain_community.document_loaders import WebBaseLoader | |
| from simpleeval import simple_eval | |
| def wikipedia_search(query: str) -> str: | |
| """Search Wikipedia for factual information and encyclopedic content. | |
| Use this tool when you need: | |
| - Historical facts, dates, or events | |
| - Biographical information about people | |
| - Definitions and explanations of concepts | |
| - General factual knowledge | |
| - Information about places, organizations, or scientific topics | |
| Args: | |
| query: The search query.""" | |
| try: | |
| retriever = WikipediaRetriever(load_max_docs=10) | |
| docs = retriever.invoke(query) | |
| if not docs: | |
| return f"No Wikipedia articles found for '{query}'" | |
| output = f"Wikipedia search results for '{query}':\n\n" | |
| # Format the search results as HTML | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
| for doc in docs | |
| ] | |
| ) | |
| return output + formatted_search_docs | |
| except Exception as e: | |
| return f"Wikipedia search failed: {str(e)}" | |
| def youtube_search(query: str) -> str: | |
| """Search YouTube for videos and get video information, or extract information from a specific YouTube URL. | |
| Use this tool when: | |
| - The question explicitly mentions YouTube or videos | |
| - You need to find video content on a specific topic | |
| - You have a YouTube URL and need to get information about it | |
| - Looking for tutorials, demonstrations, or visual content | |
| - The user asks about video creators or channels | |
| When analyzing a YouTube URL, this tool provides: | |
| - Video title, channel, duration, views, upload date | |
| - Full description (contains key information about video content) | |
| - Tags (keywords related to the video) | |
| IMPORTANT: Use the title, description, and tags to answer questions about the video content. | |
| The description often contains detailed information about what happens in the video. | |
| Args: | |
| query: The YouTube search query or direct YouTube URL.""" | |
| try: | |
| import yt_dlp | |
| # Check if query is a direct YouTube URL | |
| if 'youtube.com' in query or 'youtu.be' in query: | |
| # Extract information from the specific video | |
| ydl_opts = { | |
| 'quiet': True, | |
| 'no_warnings': True, | |
| 'extract_flat': False, | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(query, download=False) | |
| output = f"YouTube Video Information:\n" | |
| output += f"Title: {info.get('title', 'N/A')}\n" | |
| output += f"Channel: {info.get('uploader', 'N/A')}\n" | |
| output += f"Duration: {info.get('duration', 0)} seconds\n" | |
| output += f"Views: {info.get('view_count', 'N/A')}\n" | |
| output += f"Upload Date: {info.get('upload_date', 'N/A')}\n\n" | |
| # Get full description (contains key information about video content) | |
| description = info.get('description', 'N/A') | |
| if description and description != 'N/A': | |
| output += f"Description:\n{description}\n\n" | |
| else: | |
| output += f"Description: Not available\n\n" | |
| # Add tags if available (help identify content) | |
| tags = info.get('tags', []) | |
| if tags: | |
| output += f"Tags: {', '.join(tags[:10])}\n" | |
| return output | |
| else: | |
| # Search for videos | |
| ydl_opts = { | |
| 'quiet': True, | |
| 'no_warnings': True, | |
| 'extract_flat': True, | |
| } | |
| search_query = f"ytsearch3:{query}" | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| results = ydl.extract_info(search_query, download=False) | |
| output = f"YouTube search results for '{query}':\n" | |
| for entry in results.get('entries', []): | |
| output += f"- {entry.get('title', 'N/A')} by {entry.get('uploader', 'N/A')}\n" | |
| output += f" Duration: {entry.get('duration', 0)} seconds\n" | |
| output += f" URL: {entry.get('url', 'N/A')}\n\n" | |
| return output | |
| except Exception as e: | |
| return f"YouTube search failed: {str(e)}" | |
| def web_search(query: str) -> str: | |
| """Search the web for current information and load full webpage content. | |
| Use this tool when: | |
| - You need current/recent information not available in Wikipedia | |
| - Looking for news, updates, or time-sensitive content | |
| - Wikipedia doesn't have the specific information | |
| - Need detailed content from specific web pages | |
| - Looking for niche or specialized information | |
| This tool performs a web search and loads the full content of the top 3 results. | |
| If the question refers to an article, use this tool to query for the specific article mentioned in the question. | |
| Args: | |
| query: The search query.""" | |
| result = "Results from web search:\n\n" | |
| search = DuckDuckGoSearchResults(output_format="list") | |
| search_results = search.invoke(query) | |
| urls = [search_result['link'] for search_result in search_results[:3]] | |
| loader = WebBaseLoader(web_paths=urls) | |
| for doc in loader.lazy_load(): | |
| result += f"{doc.metadata}\n\n" | |
| result += f"{doc.page_content}\n\n" | |
| result += f"--------------------------------\n\n" | |
| return result | |
| def decode_text(text: str) -> str: | |
| """Decode or reverse text that might be encoded backwards or in other ways. | |
| Use this tool when: | |
| - Text appears to be reversed or encoded | |
| - Words are spelled backwards | |
| - The question mentions "decode", "reverse", or "backwards" | |
| - Text looks scrambled or encoded | |
| Args: | |
| text: The text to decode or reverse.""" | |
| try: | |
| # Try reversing words | |
| words = text.split() | |
| reversed_words = [word[::-1] for word in words] | |
| reversed_text = " ".join(reversed_words) | |
| # Try reversing the entire string | |
| fully_reversed = text[::-1] | |
| return f"Original: {text}\nWord-by-word reversed: {reversed_text}\nFully reversed: {fully_reversed}" | |
| except Exception as e: | |
| return f"Text decoding failed: {str(e)}" | |
| def evaluate_computation(expression: str) -> str: | |
| """Safely evaluate mathematical expressions and computations. | |
| Use this tool when: | |
| - You need to perform mathematical calculations | |
| - The question involves arithmetic operations (+, -, *, /, **, %) | |
| - You need to evaluate numeric expressions | |
| - Computing formulas or mathematical operations | |
| Supports: | |
| - Basic arithmetic: +, -, *, /, **, % | |
| - Mathematical functions: abs, max, min, round, sum | |
| - Comparison operators: <, <=, >, >=, ==, != | |
| - Logical operators: and, or, not | |
| - Constants: True, False, None | |
| Args: | |
| expression: The mathematical expression to evaluate (e.g., "2 + 2", "3.14 * 5**2").""" | |
| try: | |
| result = simple_eval(expression) | |
| return f"Result of '{expression}': {result}" | |
| except Exception as e: | |
| return f"Computation failed for '{expression}': {str(e)}" | |
| def download_and_process_file(task_id: str) -> str: | |
| """Download and process a file from the GAIA API using the task_id. | |
| Use this tool when: | |
| - The question explicitly mentions an "attached file" or "attachment" | |
| - The question says "see the attached", "I've attached", "attached as", etc. | |
| - A task_id has been provided for file access | |
| This tool downloads and processes various file types including: | |
| - PDF, Word, PowerPoint, Excel documents | |
| - Images (extracts text via OCR) | |
| - Audio files (transcribes speech to text) | |
| - CSV, text, and markdown files | |
| Args: | |
| task_id: The GAIA task ID used to download the file.""" | |
| api_url = "https://agents-course-unit4-scoring.hf.space" | |
| try: | |
| # Download file from API | |
| file_url = f"{api_url}/files/{task_id}" | |
| print(f"Downloading file from: {file_url}") | |
| response = requests.get(file_url, timeout=30) | |
| response.raise_for_status() | |
| # Get filename from Content-Disposition header or use task_id | |
| filename = task_id | |
| if 'Content-Disposition' in response.headers: | |
| cd = response.headers['Content-Disposition'] | |
| filename_match = re.search(r'filename="?([^"]+)"?', cd) | |
| if filename_match: | |
| filename = filename_match.group(1) | |
| # Create temporary file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{filename}") as tmp_file: | |
| tmp_file.write(response.content) | |
| temp_path = tmp_file.name | |
| # Process the file based on type | |
| file_content = _process_downloaded_file(temp_path, filename) | |
| # Clean up | |
| os.unlink(temp_path) | |
| return f"FILE PROCESSED: {filename}\n\nContent:\n{file_content}" | |
| except requests.exceptions.RequestException as e: | |
| return f"File download failed: {str(e)}" | |
| except Exception as e: | |
| return f"File processing failed: {str(e)}" | |
| def _process_downloaded_file(file_path: str, filename: str) -> str: | |
| """Process a downloaded file based on its type and return content.""" | |
| try: | |
| # Determine file type | |
| mime_type, _ = mimetypes.guess_type(filename) | |
| file_extension = Path(filename).suffix.lower() | |
| # Handle audio files | |
| if mime_type and mime_type.startswith('audio') or file_extension in ['.mp3', '.wav', '.m4a', '.ogg']: | |
| return _process_audio_file(file_path) | |
| # Handle image files | |
| elif mime_type and mime_type.startswith('image') or file_extension in ['.jpg', '.jpeg', '.png', '.gif', '.bmp']: | |
| return _process_image_file(file_path) | |
| # Handle documents | |
| elif file_extension in ['.pdf']: | |
| loader = PDFPlumberLoader(file_path) | |
| docs = loader.load() | |
| return "\n".join([doc.page_content for doc in docs]) | |
| elif file_extension in ['.docx', '.doc']: | |
| loader = UnstructuredWordDocumentLoader(file_path) | |
| docs = loader.load() | |
| return "\n".join([doc.page_content for doc in docs]) | |
| elif file_extension in ['.pptx', '.ppt']: | |
| loader = UnstructuredPowerPointLoader(file_path) | |
| docs = loader.load() | |
| return "\n".join([doc.page_content for doc in docs]) | |
| elif file_extension in ['.xlsx', '.xls']: | |
| loader = UnstructuredExcelLoader(file_path) | |
| docs = loader.load() | |
| return "\n".join([doc.page_content for doc in docs]) | |
| elif file_extension in ['.csv']: | |
| loader = CSVLoader(file_path) | |
| docs = loader.load() | |
| return "\n".join([doc.page_content for doc in docs]) | |
| elif file_extension in ['.md', '.markdown']: | |
| loader = UnstructuredMarkdownLoader(file_path) | |
| docs = loader.load() | |
| return "\n".join([doc.page_content for doc in docs]) | |
| elif file_extension in ['.txt'] or mime_type and mime_type.startswith('text'): | |
| loader = TextLoader(file_path) | |
| docs = loader.load() | |
| return "\n".join([doc.page_content for doc in docs]) | |
| # Fallback: try unstructured loader | |
| else: | |
| loader = UnstructuredFileLoader(file_path) | |
| docs = loader.load() | |
| return "\n".join([doc.page_content for doc in docs]) | |
| except Exception as e: | |
| return f"Error processing file {filename}: {str(e)}" | |
| def _process_audio_file(file_path: str) -> str: | |
| """Process audio files using speech recognition.""" | |
| try: | |
| import speech_recognition as sr | |
| from pydub import AudioSegment | |
| # Convert to WAV if needed | |
| audio = AudioSegment.from_file(file_path) | |
| wav_path = file_path + ".wav" | |
| audio.export(wav_path, format="wav") | |
| # Use speech recognition | |
| recognizer = sr.Recognizer() | |
| with sr.AudioFile(wav_path) as source: | |
| audio_data = recognizer.record(source) | |
| text = recognizer.recognize_google(audio_data) | |
| # Clean up temporary WAV file | |
| if os.path.exists(wav_path): | |
| os.unlink(wav_path) | |
| return f"Audio transcription:\n{text}" | |
| except ImportError: | |
| return "Audio processing requires additional dependencies (speech_recognition, pydub)" | |
| except Exception as e: | |
| # Fallback: try with whisper if available | |
| try: | |
| import whisper | |
| model = whisper.load_model("base") | |
| result = model.transcribe(file_path) | |
| return f"Audio transcription (Whisper):\n{result['text']}" | |
| except ImportError: | |
| return f"Audio processing failed: {str(e)}. Consider installing speech_recognition, pydub, or openai-whisper." | |
| except Exception as e2: | |
| return f"Audio processing failed: {str(e2)}" | |
| def _process_image_file(file_path: str) -> str: | |
| """Process image files.""" | |
| try: | |
| # Use unstructured image loader | |
| loader = UnstructuredImageLoader(file_path) | |
| docs = loader.load() | |
| content = "\n".join([doc.page_content for doc in docs]) | |
| if content.strip(): | |
| return f"Image content extracted:\n{content}" | |
| else: | |
| return f"Image file detected but no text content could be extracted. Consider using OCR or image analysis tools." | |
| except Exception as e: | |
| return f"Image processing failed: {str(e)}" | |