"""Tools for GAIA Agent This module provides tools for: - Web search using DuckDuckGo - Python code execution - File reading (txt, py, json, xlsx, mp3, png) - YouTube transcript extraction - Image understanding via Kimi multimodal - Unified content reading """ import os import io import sys import json import subprocess from typing import Any from pathlib import Path from smolagents import tool @tool def web_search(query: str) -> str: """Search the web using DuckDuckGo. Args: query: The search query string. Returns: A string containing search results. """ try: from duckduckgo_search import DDGS with DDGS() as ddgs: results = list(ddgs.text(query, max_results=10)) if not results: return "No search results found." formatted_results = [] for i, r in enumerate(results, 1): title = r.get('title', 'No title') body = r.get('body', 'No description') href = r.get('href', '') formatted_results.append(f"{i}. {title}\n{body}\nURL: {href}\n") return "\n".join(formatted_results) except Exception as e: return f"Search error: {str(e)}" @tool def python_execute(code: str) -> str: """Execute Python code and return the result. This tool runs Python code in a subprocess and captures stdout/stderr. Supports common libraries like pandas, numpy, json, requests. Args: code: Python code to execute. Returns: The output of the code execution (stdout + stderr). """ try: # Create a temporary script file script_path = "/tmp/gaia_script.py" # Wrap code to capture output wrapped_code = f''' import sys import io import json import math import re import os # Capture stdout old_stdout = sys.stdout sys.stdout = buffer = io.StringIO() try: {chr(10).join(" " + line for line in code.split(chr(10)))} except Exception as e: print(f"Error: {{e}}") import traceback traceback.print_exc() # Get output output = buffer.getvalue() sys.stdout = old_stdout print(output, end='') ''' with open(script_path, 'w', encoding='utf-8') as f: f.write(wrapped_code) # Execute the script result = subprocess.run( [sys.executable, script_path], capture_output=True, text=True, timeout=30 ) output = result.stdout if result.stderr: output += f"\n[STDERR]: {result.stderr}" if result.returncode != 0: output += f"\n[Exit code: {result.returncode}]" return output if output else "(No output)" except subprocess.TimeoutExpired: return "Error: Code execution timed out (30s limit)" except Exception as e: return f"Execution error: {str(e)}" @tool def file_read(filepath: str) -> str: """Read file content (txt, py, json, xlsx, mp3, png, etc.). Supports multiple file types: - Text files (.txt, .py, .md): Returns content directly - JSON files (.json): Returns formatted JSON - Excel files (.xlsx, .xls): Returns sheet names and preview - Audio files (.mp3, .wav): Returns file info and transcription if possible - Image files (.png, .jpg): Returns file info (needs VLM for content analysis) Args: filepath: Path to the file to read. Returns: File content or description. """ try: # Check if file exists if not os.path.exists(filepath): # Try to find file in current directory or common locations possible_paths = [ filepath, os.path.join(".", filepath), os.path.join("/tmp", filepath), ] found = False for p in possible_paths: if os.path.exists(p): filepath = p found = True break if not found: return f"File not found: {filepath}" # Get file extension ext = Path(filepath).suffix.lower() # Text-based files if ext in ['.txt', '.py', '.md', '.csv', '.log', '.yaml', '.yml', '.html', '.css', '.js']: with open(filepath, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() return f"=== File: {filepath} ===\n{content}" # JSON files elif ext == '.json': with open(filepath, 'r', encoding='utf-8') as f: data = json.load(f) return f"=== JSON File: {filepath} ===\n{json.dumps(data, indent=2, ensure_ascii=False)}" # Excel files elif ext in ['.xlsx', '.xls']: try: import pandas as pd df = pd.read_excel(filepath) preview = df.head(20).to_string() return f"=== Excel File: {filepath} ===\nShape: {df.shape}\nColumns: {list(df.columns)}\n\nPreview (first 20 rows):\n{preview}" except ImportError: return f"Excel file found but pandas not available for reading: {filepath}" except Exception as e: return f"Error reading Excel file {filepath}: {e}" # Image files elif ext in ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp']: from PIL import Image with Image.open(filepath) as img: return f"=== Image File: {filepath} ===\nFormat: {img.format}\nSize: {img.size}\nMode: {img.mode}\n\n(Use a vision model to analyze image content)" # Audio files elif ext in ['.mp3', '.wav', '.ogg', '.flac', '.m4a']: # Try to get basic info info = f"=== Audio File: {filepath} ===\n" info += f"Extension: {ext}\n" info += f"Size: {os.path.getsize(filepath)} bytes\n" # Try to transcribe with whisper if available try: import whisper model = whisper.load_model("base") result = model.transcribe(filepath) info += f"\n=== Transcription ===\n{result['text']}" except ImportError: info += "\n(Whisper not available for transcription)" except Exception as e: info += f"\n(Transcription failed: {e})" return info # Binary files - return basic info else: size = os.path.getsize(filepath) return f"=== Binary File: {filepath} ===\nSize: {size} bytes\nExtension: {ext}\n\n(File type not supported for direct reading)" except Exception as e: return f"Error reading file {filepath}: {str(e)}" @tool def youtube_transcript(url: str) -> str: """Extract transcript/captions from YouTube videos. Uses youtube-transcript-api to fetch captions directly without downloading video. Works with auto-generated or manual subtitles. Args: url: YouTube video URL (e.g., https://www.youtube.com/watch?v=...) Returns: Transcript text from the video, or error message if unavailable. """ try: from youtube_transcript_api import YouTubeTranscriptApi # Extract video ID from URL video_id = None if "youtube.com/watch?v=" in url: video_id = url.split("youtube.com/watch?v=")[1].split("&")[0] elif "youtu.be/" in url: video_id = url.split("youtu.be/")[1].split("?")[0] elif "youtube.com/shorts/" in url: video_id = url.split("youtube.com/shorts/")[1].split("?")[0] if not video_id: return f"Could not extract video ID from URL: {url}" # Get available transcripts (API v1.x style) try: # Try to fetch transcript directly with language preference transcript_data = YouTubeTranscriptApi.fetch(video_id, languages=['en', 'en-US', 'en-GB']) except: # Fall back to any available transcript try: transcript_data = YouTubeTranscriptApi.fetch(video_id) except: return "No transcript available for this video" # Format transcript - transcript_data is now a list of transcript snippets text_parts = [snippet.text for snippet in transcript_data] full_text = " ".join(text_parts) return f"=== YouTube Transcript (Video ID: {video_id}) ===\n{full_text}" except ImportError: return "Error: youtube-transcript-api not installed. Run: pip install youtube-transcript-api" except Exception as e: return f"Error extracting transcript: {str(e)}" @tool def read_image(image_path: str, question: str = "") -> str: """Analyze image content using Kimi multimodal capabilities. Uses the Kimi vision model to understand and describe image content. Supports chess boards, charts, diagrams, screenshots, and general images. Args: image_path: Path to the image file (.png, .jpg, .jpeg) question: Specific question about the image (e.g., "What chess move is shown?") Returns: Analysis/description of the image content from Kimi vision model. """ try: import base64 from openai import OpenAI # Check if file exists if not os.path.exists(image_path): # Try common locations possible_paths = [image_path, os.path.join(".", image_path), os.path.join("/tmp", image_path)] found = False for p in possible_paths: if os.path.exists(p): image_path = p found = True break if not found: return f"Image file not found: {image_path}" # Read and encode image with open(image_path, "rb") as f: image_data = f.read() # Convert to base64 image_base64 = base64.b64encode(image_data).decode('utf-8') # Determine MIME type ext = Path(image_path).suffix.lower() mime_type = { '.png': 'image/png', '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.gif': 'image/gif', '.webp': 'image/webp' }.get(ext, 'image/png') # Get API configuration from environment # Support both OPENAI_API_KEY (legacy) and API_KEY (Kimi config) api_key = os.getenv("OPENAI_API_KEY") or os.getenv("API_KEY") base_url = os.getenv("BASE_URL", "https://api.moonshot.cn/v1") # Support both MULTIMODAL_MODEL and MODEL_NAME model = os.getenv("MULTIMODAL_MODEL") or os.getenv("MODEL_NAME", "kimi-k2.5") if not api_key: return "Error: API key not set. Set OPENAI_API_KEY or API_KEY in environment" # Create client client = OpenAI(api_key=api_key, base_url=base_url) # Default question if not provided if not question: question = "Describe this image in detail." # Call Kimi multimodal API response = client.chat.completions.create( model=model, messages=[ { "role": "user", "content": [ {"type": "text", "text": question}, { "type": "image_url", "image_url": { "url": f"data:{mime_type};base64,{image_base64}" } } ] } ], max_tokens=2000 ) analysis = response.choices[0].message.content return f"=== Image Analysis: {image_path} ===\n{analysis}" except ImportError: return "Error: openai package not installed" except Exception as e: return f"Error analyzing image: {str(e)}" @tool def read_content(source: str, question: str = "") -> str: """Unified content reader - automatically detects and reads various content types. Supports: - YouTube URLs: Extracts video transcript - Image files (.png, .jpg, .jpeg): Analyzes using Kimi multimodal - Web pages (http/https): Fetches and extracts text content - Local files: Delegates to file_read tool Args: source: Content source (URL or file path) question: Optional question for context (especially useful for images) Returns: Content text or analysis result. """ try: # Check if it's a YouTube URL if "youtube.com/watch" in source or "youtu.be/" in source or "youtube.com/shorts/" in source: return youtube_transcript(source) # Check if it's a web URL if source.startswith(("http://", "https://")): import requests from bs4 import BeautifulSoup headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.0' } response = requests.get(source, headers=headers, timeout=30) response.raise_for_status() # Parse HTML soup = BeautifulSoup(response.text, 'html.parser') # Remove script and style elements for script in soup(["script", "style"]): script.decompose() # Get text text = soup.get_text(separator='\n', strip=True) # Clean up whitespace lines = [line.strip() for line in text.split('\n') if line.strip()] cleaned_text = '\n'.join(lines) # Truncate if too long if len(cleaned_text) > 8000: cleaned_text = cleaned_text[:8000] + "\n... [content truncated]" return f"=== Web Content: {source} ===\n{cleaned_text}" # Check if it's an image file if source.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')): return read_image(source, question) # Otherwise, treat as local file return file_read(source) except Exception as e: return f"Error reading content from {source}: {str(e)}"