Spaces:
Runtime error
Runtime error
| """LangGraph tools for the agent.""" | |
| import os | |
| from pathlib import Path | |
| from typing import Optional | |
| import time | |
| from langchain.tools import BaseTool | |
| from langchain_community.tools import DuckDuckGoSearchRun, WikipediaQueryRun | |
| from langchain_community.utilities import WikipediaAPIWrapper | |
| from langchain_experimental.tools import PythonAstREPLTool | |
| from langchain_tavily import TavilySearch | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| import re as regex | |
| import requests | |
| from urllib.parse import urlparse | |
| import google.generativeai as genai | |
| def find_file(filename: str) -> str: | |
| """Helper function to find a file in multiple locations.""" | |
| # Try multiple locations in order | |
| locations = [ | |
| Path(filename), # Current directory | |
| Path("downloads") / filename, # Downloads directory | |
| Path(".") / filename, # Explicit current directory | |
| ] | |
| for path in locations: | |
| if path.exists(): | |
| return str(path) | |
| # File not found, return the downloads path as default | |
| return str(Path("downloads") / filename) | |
| class ListFilesTool(BaseTool): | |
| """Tool to list files in the current directory or a specified directory.""" | |
| name: str = "list_files" | |
| description: str = "Lists all files in the current directory or a specified directory. Input should be a directory path (optional, defaults to current directory)." | |
| def _run(self, directory: str = ".") -> str: | |
| """List files in the specified directory.""" | |
| try: | |
| # Check both current directory and downloads directory | |
| paths_to_check = [Path(directory)] | |
| if directory == ".": | |
| paths_to_check.append(Path("downloads")) | |
| all_files = [] | |
| for path in paths_to_check: | |
| if path.exists(): | |
| location = "downloads/" if path.name == "downloads" else "./" | |
| for item in path.iterdir(): | |
| if item.is_file(): | |
| all_files.append(f"{location}{item.name} ({item.stat().st_size} bytes)") | |
| if not all_files: | |
| return f"No files found in '{directory}'." | |
| return "Files found:\n" + "\n".join(all_files) | |
| except Exception as e: | |
| return f"Error listing files: {str(e)}" | |
| async def _arun(self, directory: str = ".") -> str: | |
| """Async version.""" | |
| return self._run(directory) | |
| class ReadFileTool(BaseTool): | |
| """Tool to read the contents of a text file.""" | |
| name: str = "read_file" | |
| description: str = "Reads the contents of a text file. Input should be the file path." | |
| def _run(self, file_path: str) -> str: | |
| """Read the file contents.""" | |
| try: | |
| # Find file in multiple locations | |
| actual_path = find_file(file_path) | |
| path = Path(actual_path) | |
| if not path.exists(): | |
| return f"File '{file_path}' does not exist in current directory or downloads/." | |
| with open(path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| return content | |
| except Exception as e: | |
| return f"Error reading file: {str(e)}" | |
| async def _arun(self, file_path: str) -> str: | |
| """Async version.""" | |
| return self._run(file_path) | |
| class ExcelReaderTool(BaseTool): | |
| """Tool for reading and analyzing Excel files.""" | |
| name: str = "read_excel" | |
| description: str = "Reads an Excel file and returns its contents as a pandas DataFrame. Input should be the file path to the Excel file (.xlsx or .xls)." | |
| def _run(self, file_path: str) -> str: | |
| """Read Excel file and return summary.""" | |
| try: | |
| import pandas as pd | |
| # Find file in multiple locations | |
| actual_path = find_file(file_path) | |
| path = Path(actual_path) | |
| if not path.exists(): | |
| return f"File '{file_path}' does not exist in current directory or downloads/." | |
| # Read the Excel file | |
| df = pd.read_excel(path) | |
| # Return a summary | |
| result = f"Excel file loaded successfully.\n" | |
| result += f"Shape: {df.shape[0]} rows, {df.shape[1]} columns\n" | |
| result += f"Columns: {', '.join(df.columns.tolist())}\n\n" | |
| result += f"First few rows:\n{df.head().to_string()}\n\n" | |
| result += f"Data types:\n{df.dtypes.to_string()}\n\n" | |
| result += f"Summary statistics:\n{df.describe().to_string()}" | |
| return result | |
| except Exception as e: | |
| return f"Error reading Excel file: {str(e)}" | |
| async def _arun(self, file_path: str) -> str: | |
| """Async version.""" | |
| return self._run(file_path) | |
| class DownloadFileTool(BaseTool): | |
| """Tool for downloading files from URLs.""" | |
| name: str = "download_file" | |
| description: str = "Downloads a file from a URL and saves it to the current directory. Input should be the URL of the file to download. Returns the local file path." | |
| def _run(self, url: str) -> str: | |
| """Download file from URL.""" | |
| try: | |
| # Parse URL to get filename | |
| parsed = urlparse(url) | |
| filename = os.path.basename(parsed.path) | |
| # If no filename in URL, generate one | |
| if not filename or '.' not in filename: | |
| # Try to get from Content-Disposition header | |
| response = requests.head(url, allow_redirects=True, timeout=10) | |
| if 'Content-Disposition' in response.headers: | |
| content_disp = response.headers['Content-Disposition'] | |
| if 'filename=' in content_disp: | |
| filename = content_disp.split('filename=')[1].strip('"\'') | |
| # Still no filename? Generate one based on content type | |
| if not filename or '.' not in filename: | |
| content_type = response.headers.get('Content-Type', '') | |
| ext = '.bin' | |
| if 'image' in content_type: | |
| ext = '.png' if 'png' in content_type else '.jpg' | |
| elif 'excel' in content_type or 'spreadsheet' in content_type: | |
| ext = '.xlsx' | |
| elif 'pdf' in content_type: | |
| ext = '.pdf' | |
| filename = f"downloaded_file{ext}" | |
| # Download the file | |
| print(f"📥 Downloading: {url}") | |
| response = requests.get(url, timeout=30) | |
| response.raise_for_status() | |
| # Save to current directory | |
| filepath = Path(filename) | |
| with open(filepath, 'wb') as f: | |
| f.write(response.content) | |
| file_size = len(response.content) | |
| print(f"✅ Downloaded: {filename} ({file_size} bytes)") | |
| return f"File downloaded successfully: {filename} ({file_size} bytes)" | |
| except Exception as e: | |
| return f"Error downloading file: {str(e)}" | |
| async def _arun(self, url: str) -> str: | |
| """Async version.""" | |
| return self._run(url) | |
| class YouTubeTranscriptTool(BaseTool): | |
| """Tool for getting transcripts from YouTube videos.""" | |
| name: str = "youtube_transcript" | |
| description: str = "Gets the transcript/captions from a YouTube video. Input should be either a YouTube URL or video ID." | |
| def _run(self, video_input: str) -> str: | |
| """Get YouTube transcript.""" | |
| try: | |
| # Extract video ID from URL if needed | |
| video_id = video_input | |
| if 'youtube.com' in video_input or 'youtu.be' in video_input: | |
| # Extract video ID from various YouTube URL formats | |
| patterns = [ | |
| r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', | |
| r'(?:embed\/)([0-9A-Za-z_-]{11})', | |
| r'(?:watch\?v=)([0-9A-Za-z_-]{11})' | |
| ] | |
| for pattern in patterns: | |
| match = regex.search(pattern, video_input) | |
| if match: | |
| video_id = match.group(1) | |
| break | |
| # Get transcript using the correct API | |
| try: | |
| # Try to get transcript (auto-generated or manual) | |
| transcript_data = YouTubeTranscriptApi.get_transcript(video_id, languages=['en']) | |
| except: | |
| # Try any available language | |
| try: | |
| transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) | |
| transcript = next(iter(transcript_list)) | |
| transcript_data = transcript.fetch() | |
| except: | |
| return f"Error: No transcript available for video {video_id}" | |
| # Format transcript | |
| full_transcript = "\n".join([f"[{item['start']:.1f}s] {item['text']}" for item in transcript_data]) | |
| return f"YouTube Transcript for video {video_id}:\n\n{full_transcript}" | |
| except Exception as e: | |
| return f"Error getting YouTube transcript: {str(e)}" | |
| async def _arun(self, video_input: str) -> str: | |
| """Async version.""" | |
| return self._run(video_input) | |
| class CalculatorTool(BaseTool): | |
| """Tool for performing mathematical calculations safely.""" | |
| name: str = "calculator" | |
| description: str = "Useful for mathematical calculations. Input should be a mathematical expression as a string (e.g., '2 + 2', '(5 * 3) / 2')." | |
| def _run(self, expression: str) -> str: | |
| """Evaluate a mathematical expression safely.""" | |
| try: | |
| # Remove any potentially dangerous operations | |
| if any(dangerous in expression.lower() for dangerous in ['import', 'exec', 'eval', '__']): | |
| return "Error: Expression contains forbidden operations." | |
| # Evaluate using Python's eval with restricted namespace | |
| result = eval(expression, {"__builtins__": {}}, {}) | |
| return str(result) | |
| except Exception as e: | |
| return f"Error calculating: {str(e)}" | |
| async def _arun(self, expression: str) -> str: | |
| """Async version.""" | |
| return self._run(expression) | |
| class GeminiVideoTool(BaseTool): | |
| """Tool for understanding YouTube videos using Google Gemini.""" | |
| name: str = "understand_video" | |
| description: str = """Analyzes YouTube videos using Google Gemini's native video understanding. | |
| Input format: 'URL: <youtube_url> | QUESTION: <specific_question>' | |
| Example: 'URL: https://www.youtube.com/watch?v=abc123 | QUESTION: How many birds are visible?' | |
| Can answer questions about video content without transcripts.""" | |
| def _run(self, youtube_url: str) -> str: | |
| """Analyze YouTube video using Gemini.""" | |
| try: | |
| # Check if GEMINI_API_KEY is available | |
| api_key = os.getenv("GEMINI_API_KEY") | |
| if not api_key: | |
| return "Error: GEMINI_API_KEY not set. Cannot analyze video." | |
| # Parse input - check if it contains both URL and question | |
| url = youtube_url | |
| question = None | |
| if '|' in youtube_url: | |
| parts = youtube_url.split('|') | |
| for part in parts: | |
| part = part.strip() | |
| if part.startswith('URL:'): | |
| url = part.replace('URL:', '').strip() | |
| elif part.startswith('QUESTION:'): | |
| question = part.replace('QUESTION:', '').strip() | |
| # Configure Gemini | |
| genai.configure(api_key=api_key) | |
| model = genai.GenerativeModel('gemini-2.0-flash-exp') | |
| # Create targeted prompt | |
| if question: | |
| prompt = f"Watch this YouTube video carefully and answer the following specific question: {question}\n\nProvide a direct, concise answer based only on what you observe in the video." | |
| else: | |
| prompt = "Analyze this YouTube video and describe what you see in detail. Pay attention to all visual details, objects, people, actions, and events." | |
| response = model.generate_content([prompt, url]) | |
| return f"Video Analysis:\n{response.text}" | |
| except Exception as e: | |
| return f"Error analyzing video: {str(e)}" | |
| async def _arun(self, youtube_url: str) -> str: | |
| """Async version.""" | |
| return self._run(youtube_url) | |
| class GeminiAudioTool(BaseTool): | |
| """Tool for understanding audio files (MP3) using Google Gemini.""" | |
| name: str = "understand_audio" | |
| description: str = "Analyzes audio files (MP3) using Google Gemini's audio understanding. Input should be the file path to the audio file. Can transcribe and answer questions about audio content." | |
| def _run(self, file_path: str) -> str: | |
| """Analyze audio file using Gemini.""" | |
| try: | |
| # Check if GEMINI_API_KEY is available | |
| api_key = os.getenv("GEMINI_API_KEY") | |
| if not api_key: | |
| return "Error: GEMINI_API_KEY not set. Cannot analyze audio." | |
| # Find file in multiple locations | |
| actual_path = find_file(file_path) | |
| if not os.path.exists(actual_path): | |
| return f"Error: Audio file '{file_path}' not found in current directory or downloads/." | |
| file_path = actual_path # Use the found path | |
| # Configure Gemini | |
| genai.configure(api_key=api_key) | |
| # Upload audio file to Gemini | |
| print(f"Uploading audio file to Gemini: {file_path}") | |
| audio_file = genai.upload_file(path=file_path) | |
| # Wait for file to be processed | |
| import time | |
| while audio_file.state.name == "PROCESSING": | |
| time.sleep(2) | |
| audio_file = genai.get_file(audio_file.name) | |
| if audio_file.state.name == "FAILED": | |
| return f"Error: Gemini failed to process audio file" | |
| # Analyze audio | |
| model = genai.GenerativeModel('gemini-2.0-flash-exp') | |
| prompt = "Please transcribe this audio file and provide the complete content. Pay attention to all details, numbers, names, and instructions mentioned." | |
| response = model.generate_content([audio_file, prompt]) | |
| return f"Audio Transcription:\n{response.text}" | |
| except Exception as e: | |
| return f"Error analyzing audio: {str(e)}" | |
| async def _arun(self, file_path: str) -> str: | |
| """Async version.""" | |
| return self._run(file_path) | |
| class ImageAnalysisTool(BaseTool): | |
| """Tool for analyzing images using Google Gemini.""" | |
| name: str = "analyze_image" | |
| description: str = "Analyzes images using Google Gemini's vision capabilities. Input should be the file path to the image. Can describe images, read text, analyze chess positions, etc." | |
| def _run(self, file_path: str) -> str: | |
| """Analyze image using Gemini.""" | |
| try: | |
| # Check if GEMINI_API_KEY is available | |
| api_key = os.getenv("GEMINI_API_KEY") | |
| if not api_key: | |
| return "Error: GEMINI_API_KEY not set. Cannot analyze image." | |
| # Find file in multiple locations | |
| actual_path = find_file(file_path) | |
| if not os.path.exists(actual_path): | |
| return f"Error: Image file '{file_path}' not found in current directory or downloads/." | |
| file_path = actual_path # Use the found path | |
| # Configure Gemini | |
| genai.configure(api_key=api_key) | |
| # Upload image file to Gemini | |
| print(f"Uploading image to Gemini: {file_path}") | |
| image_file = genai.upload_file(path=file_path) | |
| # Analyze image | |
| model = genai.GenerativeModel('gemini-2.0-flash-exp') | |
| prompt = "Please analyze this image in detail. Describe everything you see including: objects, text, positions, colors, patterns, and any relevant information. If this is a chess board, provide the position in detail. If there's text, transcribe it." | |
| response = model.generate_content([image_file, prompt]) | |
| return f"Image Analysis:\n{response.text}" | |
| except Exception as e: | |
| return f"Error analyzing image: {str(e)}" | |
| async def _arun(self, file_path: str) -> str: | |
| """Async version.""" | |
| return self._run(file_path) | |
| class ExecutePythonFileTool(BaseTool): | |
| """Tool for executing Python files and capturing output.""" | |
| name: str = "execute_python_file" | |
| description: str = "Executes a Python file and returns its output. Input should be the file path to the .py file. Captures stdout and returns the final output." | |
| def _run(self, file_path: str) -> str: | |
| """Execute Python file and return output.""" | |
| try: | |
| import subprocess | |
| import sys | |
| # Find file in multiple locations | |
| actual_path = find_file(file_path) | |
| if not os.path.exists(actual_path): | |
| return f"Error: Python file '{file_path}' not found in current directory or downloads/." | |
| # Execute the Python file | |
| result = subprocess.run( | |
| [sys.executable, actual_path], | |
| capture_output=True, | |
| text=True, | |
| timeout=30 # 30 second timeout | |
| ) | |
| output = "" | |
| if result.stdout: | |
| output += f"Output:\n{result.stdout}\n" | |
| if result.stderr: | |
| output += f"Errors:\n{result.stderr}\n" | |
| if result.returncode != 0: | |
| output += f"Exit code: {result.returncode}\n" | |
| return output if output else "Script executed successfully with no output." | |
| except subprocess.TimeoutExpired: | |
| return "Error: Script execution timed out (30 seconds limit)." | |
| except Exception as e: | |
| return f"Error executing Python file: {str(e)}" | |
| async def _arun(self, file_path: str) -> str: | |
| """Async version.""" | |
| return self._run(file_path) | |
| # Create Wikipedia tool wrapper | |
| def create_wikipedia_tool(): | |
| """Create a Wikipedia search tool.""" | |
| api_wrapper = WikipediaAPIWrapper(top_k_results=2, doc_content_chars_max=4000) | |
| return WikipediaQueryRun(api_wrapper=api_wrapper) | |
| tool_classes = [ | |
| DuckDuckGoSearchRun, | |
| TavilySearch, | |
| PythonAstREPLTool, | |
| ListFilesTool, | |
| ReadFileTool, | |
| ExcelReaderTool, | |
| DownloadFileTool, | |
| ExecutePythonFileTool, # Execute Python files | |
| YouTubeTranscriptTool, | |
| GeminiVideoTool, # Gemini video understanding | |
| GeminiAudioTool, # Gemini audio transcription (MP3) | |
| ImageAnalysisTool, # Gemini image analysis (chess, diagrams, etc.) | |
| CalculatorTool, | |
| create_wikipedia_tool # This returns an instance, not a class | |
| ] | |