| """Tools for GAIA Agent |
| |
| This module provides tools for: |
| - Web search using DuckDuckGo |
| - Python code execution |
| - File reading (txt, py, json, xlsx, mp3, png) |
| - YouTube transcript extraction |
| - Image understanding via Kimi multimodal |
| - Unified content reading |
| """ |
|
|
| import os |
| import io |
| import sys |
| import json |
| import subprocess |
| from typing import Any |
| from pathlib import Path |
|
|
| from smolagents import tool |
|
|
|
|
| @tool |
| def web_search(query: str) -> str: |
| """Search the web using DuckDuckGo. |
| |
| Args: |
| query: The search query string. |
| |
| Returns: |
| A string containing search results. |
| """ |
| try: |
| from duckduckgo_search import DDGS |
| |
| with DDGS() as ddgs: |
| results = list(ddgs.text(query, max_results=10)) |
| |
| if not results: |
| return "No search results found." |
| |
| formatted_results = [] |
| for i, r in enumerate(results, 1): |
| title = r.get('title', 'No title') |
| body = r.get('body', 'No description') |
| href = r.get('href', '') |
| formatted_results.append(f"{i}. {title}\n{body}\nURL: {href}\n") |
| |
| return "\n".join(formatted_results) |
| |
| except Exception as e: |
| return f"Search error: {str(e)}" |
|
|
|
|
| @tool |
| def python_execute(code: str) -> str: |
| """Execute Python code and return the result. |
| |
| This tool runs Python code in a subprocess and captures stdout/stderr. |
| Supports common libraries like pandas, numpy, json, requests. |
| |
| Args: |
| code: Python code to execute. |
| |
| Returns: |
| The output of the code execution (stdout + stderr). |
| """ |
| try: |
| |
| script_path = "/tmp/gaia_script.py" |
| |
| |
| wrapped_code = f''' |
| import sys |
| import io |
| import json |
| import math |
| import re |
| import os |
| |
| # Capture stdout |
| old_stdout = sys.stdout |
| sys.stdout = buffer = io.StringIO() |
| |
| try: |
| {chr(10).join(" " + line for line in code.split(chr(10)))} |
| except Exception as e: |
| print(f"Error: {{e}}") |
| import traceback |
| traceback.print_exc() |
| |
| # Get output |
| output = buffer.getvalue() |
| sys.stdout = old_stdout |
| print(output, end='') |
| ''' |
| |
| with open(script_path, 'w', encoding='utf-8') as f: |
| f.write(wrapped_code) |
| |
| |
| result = subprocess.run( |
| [sys.executable, script_path], |
| capture_output=True, |
| text=True, |
| timeout=30 |
| ) |
| |
| output = result.stdout |
| if result.stderr: |
| output += f"\n[STDERR]: {result.stderr}" |
| |
| if result.returncode != 0: |
| output += f"\n[Exit code: {result.returncode}]" |
| |
| return output if output else "(No output)" |
| |
| except subprocess.TimeoutExpired: |
| return "Error: Code execution timed out (30s limit)" |
| except Exception as e: |
| return f"Execution error: {str(e)}" |
|
|
|
|
| @tool |
| def file_read(filepath: str) -> str: |
| """Read file content (txt, py, json, xlsx, mp3, png, etc.). |
| |
| Supports multiple file types: |
| - Text files (.txt, .py, .md): Returns content directly |
| - JSON files (.json): Returns formatted JSON |
| - Excel files (.xlsx, .xls): Returns sheet names and preview |
| - Audio files (.mp3, .wav): Returns file info and transcription if possible |
| - Image files (.png, .jpg): Returns file info (needs VLM for content analysis) |
| |
| Args: |
| filepath: Path to the file to read. |
| |
| Returns: |
| File content or description. |
| """ |
| try: |
| |
| if not os.path.exists(filepath): |
| |
| possible_paths = [ |
| filepath, |
| os.path.join(".", filepath), |
| os.path.join("/tmp", filepath), |
| ] |
| |
| found = False |
| for p in possible_paths: |
| if os.path.exists(p): |
| filepath = p |
| found = True |
| break |
| |
| if not found: |
| return f"File not found: {filepath}" |
| |
| |
| ext = Path(filepath).suffix.lower() |
| |
| |
| if ext in ['.txt', '.py', '.md', '.csv', '.log', '.yaml', '.yml', '.html', '.css', '.js']: |
| with open(filepath, 'r', encoding='utf-8', errors='ignore') as f: |
| content = f.read() |
| return f"=== File: {filepath} ===\n{content}" |
| |
| |
| elif ext == '.json': |
| with open(filepath, 'r', encoding='utf-8') as f: |
| data = json.load(f) |
| return f"=== JSON File: {filepath} ===\n{json.dumps(data, indent=2, ensure_ascii=False)}" |
| |
| |
| elif ext in ['.xlsx', '.xls']: |
| try: |
| import pandas as pd |
| df = pd.read_excel(filepath) |
| preview = df.head(20).to_string() |
| return f"=== Excel File: {filepath} ===\nShape: {df.shape}\nColumns: {list(df.columns)}\n\nPreview (first 20 rows):\n{preview}" |
| except ImportError: |
| return f"Excel file found but pandas not available for reading: {filepath}" |
| except Exception as e: |
| return f"Error reading Excel file {filepath}: {e}" |
| |
| |
| elif ext in ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp']: |
| from PIL import Image |
| with Image.open(filepath) as img: |
| return f"=== Image File: {filepath} ===\nFormat: {img.format}\nSize: {img.size}\nMode: {img.mode}\n\n(Use a vision model to analyze image content)" |
| |
| |
| elif ext in ['.mp3', '.wav', '.ogg', '.flac', '.m4a']: |
| |
| info = f"=== Audio File: {filepath} ===\n" |
| info += f"Extension: {ext}\n" |
| info += f"Size: {os.path.getsize(filepath)} bytes\n" |
| |
| |
| try: |
| import whisper |
| model = whisper.load_model("base") |
| result = model.transcribe(filepath) |
| info += f"\n=== Transcription ===\n{result['text']}" |
| except ImportError: |
| info += "\n(Whisper not available for transcription)" |
| except Exception as e: |
| info += f"\n(Transcription failed: {e})" |
| |
| return info |
| |
| |
| else: |
| size = os.path.getsize(filepath) |
| return f"=== Binary File: {filepath} ===\nSize: {size} bytes\nExtension: {ext}\n\n(File type not supported for direct reading)" |
| |
| except Exception as e: |
| return f"Error reading file {filepath}: {str(e)}" |
|
|
|
|
| @tool |
| def youtube_transcript(url: str) -> str: |
| """Extract transcript/captions from YouTube videos. |
| |
| Uses youtube-transcript-api to fetch captions directly without downloading video. |
| Works with auto-generated or manual subtitles. |
| |
| Args: |
| url: YouTube video URL (e.g., https://www.youtube.com/watch?v=...) |
| |
| Returns: |
| Transcript text from the video, or error message if unavailable. |
| """ |
| try: |
| from youtube_transcript_api import YouTubeTranscriptApi |
| |
| |
| video_id = None |
| if "youtube.com/watch?v=" in url: |
| video_id = url.split("youtube.com/watch?v=")[1].split("&")[0] |
| elif "youtu.be/" in url: |
| video_id = url.split("youtu.be/")[1].split("?")[0] |
| elif "youtube.com/shorts/" in url: |
| video_id = url.split("youtube.com/shorts/")[1].split("?")[0] |
| |
| if not video_id: |
| return f"Could not extract video ID from URL: {url}" |
| |
| |
| try: |
| |
| transcript_data = YouTubeTranscriptApi.fetch(video_id, languages=['en', 'en-US', 'en-GB']) |
| except: |
| |
| try: |
| transcript_data = YouTubeTranscriptApi.fetch(video_id) |
| except: |
| return "No transcript available for this video" |
| |
| |
| text_parts = [snippet.text for snippet in transcript_data] |
| full_text = " ".join(text_parts) |
| |
| return f"=== YouTube Transcript (Video ID: {video_id}) ===\n{full_text}" |
| |
| except ImportError: |
| return "Error: youtube-transcript-api not installed. Run: pip install youtube-transcript-api" |
| except Exception as e: |
| return f"Error extracting transcript: {str(e)}" |
|
|
|
|
| @tool |
| def read_image(image_path: str, question: str = "") -> str: |
| """Analyze image content using Kimi multimodal capabilities. |
| |
| Uses the Kimi vision model to understand and describe image content. |
| Supports chess boards, charts, diagrams, screenshots, and general images. |
| |
| Args: |
| image_path: Path to the image file (.png, .jpg, .jpeg) |
| question: Specific question about the image (e.g., "What chess move is shown?") |
| |
| Returns: |
| Analysis/description of the image content from Kimi vision model. |
| """ |
| try: |
| import base64 |
| from openai import OpenAI |
| |
| |
| if not os.path.exists(image_path): |
| |
| possible_paths = [image_path, os.path.join(".", image_path), os.path.join("/tmp", image_path)] |
| found = False |
| for p in possible_paths: |
| if os.path.exists(p): |
| image_path = p |
| found = True |
| break |
| if not found: |
| return f"Image file not found: {image_path}" |
| |
| |
| with open(image_path, "rb") as f: |
| image_data = f.read() |
| |
| |
| image_base64 = base64.b64encode(image_data).decode('utf-8') |
| |
| |
| ext = Path(image_path).suffix.lower() |
| mime_type = { |
| '.png': 'image/png', |
| '.jpg': 'image/jpeg', |
| '.jpeg': 'image/jpeg', |
| '.gif': 'image/gif', |
| '.webp': 'image/webp' |
| }.get(ext, 'image/png') |
| |
| |
| |
| api_key = os.getenv("OPENAI_API_KEY") or os.getenv("API_KEY") |
| base_url = os.getenv("BASE_URL", "https://api.moonshot.cn/v1") |
| |
| model = os.getenv("MULTIMODAL_MODEL") or os.getenv("MODEL_NAME", "kimi-k2.5") |
| |
| if not api_key: |
| return "Error: API key not set. Set OPENAI_API_KEY or API_KEY in environment" |
| |
| |
| client = OpenAI(api_key=api_key, base_url=base_url) |
| |
| |
| if not question: |
| question = "Describe this image in detail." |
| |
| |
| response = client.chat.completions.create( |
| model=model, |
| messages=[ |
| { |
| "role": "user", |
| "content": [ |
| {"type": "text", "text": question}, |
| { |
| "type": "image_url", |
| "image_url": { |
| "url": f"data:{mime_type};base64,{image_base64}" |
| } |
| } |
| ] |
| } |
| ], |
| max_tokens=2000 |
| ) |
| |
| analysis = response.choices[0].message.content |
| return f"=== Image Analysis: {image_path} ===\n{analysis}" |
| |
| except ImportError: |
| return "Error: openai package not installed" |
| except Exception as e: |
| return f"Error analyzing image: {str(e)}" |
|
|
|
|
| @tool |
| def read_content(source: str, question: str = "") -> str: |
| """Unified content reader - automatically detects and reads various content types. |
| |
| Supports: |
| - YouTube URLs: Extracts video transcript |
| - Image files (.png, .jpg, .jpeg): Analyzes using Kimi multimodal |
| - Web pages (http/https): Fetches and extracts text content |
| - Local files: Delegates to file_read tool |
| |
| Args: |
| source: Content source (URL or file path) |
| question: Optional question for context (especially useful for images) |
| |
| Returns: |
| Content text or analysis result. |
| """ |
| try: |
| |
| if "youtube.com/watch" in source or "youtu.be/" in source or "youtube.com/shorts/" in source: |
| return youtube_transcript(source) |
| |
| |
| if source.startswith(("http://", "https://")): |
| import requests |
| from bs4 import BeautifulSoup |
| |
| headers = { |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.0' |
| } |
| response = requests.get(source, headers=headers, timeout=30) |
| response.raise_for_status() |
| |
| |
| soup = BeautifulSoup(response.text, 'html.parser') |
| |
| |
| for script in soup(["script", "style"]): |
| script.decompose() |
| |
| |
| text = soup.get_text(separator='\n', strip=True) |
| |
| |
| lines = [line.strip() for line in text.split('\n') if line.strip()] |
| cleaned_text = '\n'.join(lines) |
| |
| |
| if len(cleaned_text) > 8000: |
| cleaned_text = cleaned_text[:8000] + "\n... [content truncated]" |
| |
| return f"=== Web Content: {source} ===\n{cleaned_text}" |
| |
| |
| if source.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')): |
| return read_image(source, question) |
| |
| |
| return file_read(source) |
| |
| except Exception as e: |
| return f"Error reading content from {source}: {str(e)}" |
|
|