Niraya666's picture
Upload 6 files (#1)
4c70715
"""Tools for GAIA Agent
This module provides tools for:
- Web search using DuckDuckGo
- Python code execution
- File reading (txt, py, json, xlsx, mp3, png)
- YouTube transcript extraction
- Image understanding via Kimi multimodal
- Unified content reading
"""
import os
import io
import sys
import json
import subprocess
from typing import Any
from pathlib import Path
from smolagents import tool
@tool
def web_search(query: str) -> str:
"""Search the web using DuckDuckGo.
Args:
query: The search query string.
Returns:
A string containing search results.
"""
try:
from duckduckgo_search import DDGS
with DDGS() as ddgs:
results = list(ddgs.text(query, max_results=10))
if not results:
return "No search results found."
formatted_results = []
for i, r in enumerate(results, 1):
title = r.get('title', 'No title')
body = r.get('body', 'No description')
href = r.get('href', '')
formatted_results.append(f"{i}. {title}\n{body}\nURL: {href}\n")
return "\n".join(formatted_results)
except Exception as e:
return f"Search error: {str(e)}"
@tool
def python_execute(code: str) -> str:
"""Execute Python code and return the result.
This tool runs Python code in a subprocess and captures stdout/stderr.
Supports common libraries like pandas, numpy, json, requests.
Args:
code: Python code to execute.
Returns:
The output of the code execution (stdout + stderr).
"""
try:
# Create a temporary script file
script_path = "/tmp/gaia_script.py"
# Wrap code to capture output
wrapped_code = f'''
import sys
import io
import json
import math
import re
import os
# Capture stdout
old_stdout = sys.stdout
sys.stdout = buffer = io.StringIO()
try:
{chr(10).join(" " + line for line in code.split(chr(10)))}
except Exception as e:
print(f"Error: {{e}}")
import traceback
traceback.print_exc()
# Get output
output = buffer.getvalue()
sys.stdout = old_stdout
print(output, end='')
'''
with open(script_path, 'w', encoding='utf-8') as f:
f.write(wrapped_code)
# Execute the script
result = subprocess.run(
[sys.executable, script_path],
capture_output=True,
text=True,
timeout=30
)
output = result.stdout
if result.stderr:
output += f"\n[STDERR]: {result.stderr}"
if result.returncode != 0:
output += f"\n[Exit code: {result.returncode}]"
return output if output else "(No output)"
except subprocess.TimeoutExpired:
return "Error: Code execution timed out (30s limit)"
except Exception as e:
return f"Execution error: {str(e)}"
@tool
def file_read(filepath: str) -> str:
"""Read file content (txt, py, json, xlsx, mp3, png, etc.).
Supports multiple file types:
- Text files (.txt, .py, .md): Returns content directly
- JSON files (.json): Returns formatted JSON
- Excel files (.xlsx, .xls): Returns sheet names and preview
- Audio files (.mp3, .wav): Returns file info and transcription if possible
- Image files (.png, .jpg): Returns file info (needs VLM for content analysis)
Args:
filepath: Path to the file to read.
Returns:
File content or description.
"""
try:
# Check if file exists
if not os.path.exists(filepath):
# Try to find file in current directory or common locations
possible_paths = [
filepath,
os.path.join(".", filepath),
os.path.join("/tmp", filepath),
]
found = False
for p in possible_paths:
if os.path.exists(p):
filepath = p
found = True
break
if not found:
return f"File not found: {filepath}"
# Get file extension
ext = Path(filepath).suffix.lower()
# Text-based files
if ext in ['.txt', '.py', '.md', '.csv', '.log', '.yaml', '.yml', '.html', '.css', '.js']:
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
return f"=== File: {filepath} ===\n{content}"
# JSON files
elif ext == '.json':
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
return f"=== JSON File: {filepath} ===\n{json.dumps(data, indent=2, ensure_ascii=False)}"
# Excel files
elif ext in ['.xlsx', '.xls']:
try:
import pandas as pd
df = pd.read_excel(filepath)
preview = df.head(20).to_string()
return f"=== Excel File: {filepath} ===\nShape: {df.shape}\nColumns: {list(df.columns)}\n\nPreview (first 20 rows):\n{preview}"
except ImportError:
return f"Excel file found but pandas not available for reading: {filepath}"
except Exception as e:
return f"Error reading Excel file {filepath}: {e}"
# Image files
elif ext in ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp']:
from PIL import Image
with Image.open(filepath) as img:
return f"=== Image File: {filepath} ===\nFormat: {img.format}\nSize: {img.size}\nMode: {img.mode}\n\n(Use a vision model to analyze image content)"
# Audio files
elif ext in ['.mp3', '.wav', '.ogg', '.flac', '.m4a']:
# Try to get basic info
info = f"=== Audio File: {filepath} ===\n"
info += f"Extension: {ext}\n"
info += f"Size: {os.path.getsize(filepath)} bytes\n"
# Try to transcribe with whisper if available
try:
import whisper
model = whisper.load_model("base")
result = model.transcribe(filepath)
info += f"\n=== Transcription ===\n{result['text']}"
except ImportError:
info += "\n(Whisper not available for transcription)"
except Exception as e:
info += f"\n(Transcription failed: {e})"
return info
# Binary files - return basic info
else:
size = os.path.getsize(filepath)
return f"=== Binary File: {filepath} ===\nSize: {size} bytes\nExtension: {ext}\n\n(File type not supported for direct reading)"
except Exception as e:
return f"Error reading file {filepath}: {str(e)}"
@tool
def youtube_transcript(url: str) -> str:
"""Extract transcript/captions from YouTube videos.
Uses youtube-transcript-api to fetch captions directly without downloading video.
Works with auto-generated or manual subtitles.
Args:
url: YouTube video URL (e.g., https://www.youtube.com/watch?v=...)
Returns:
Transcript text from the video, or error message if unavailable.
"""
try:
from youtube_transcript_api import YouTubeTranscriptApi
# Extract video ID from URL
video_id = None
if "youtube.com/watch?v=" in url:
video_id = url.split("youtube.com/watch?v=")[1].split("&")[0]
elif "youtu.be/" in url:
video_id = url.split("youtu.be/")[1].split("?")[0]
elif "youtube.com/shorts/" in url:
video_id = url.split("youtube.com/shorts/")[1].split("?")[0]
if not video_id:
return f"Could not extract video ID from URL: {url}"
# Get available transcripts (API v1.x style)
try:
# Try to fetch transcript directly with language preference
transcript_data = YouTubeTranscriptApi.fetch(video_id, languages=['en', 'en-US', 'en-GB'])
except:
# Fall back to any available transcript
try:
transcript_data = YouTubeTranscriptApi.fetch(video_id)
except:
return "No transcript available for this video"
# Format transcript - transcript_data is now a list of transcript snippets
text_parts = [snippet.text for snippet in transcript_data]
full_text = " ".join(text_parts)
return f"=== YouTube Transcript (Video ID: {video_id}) ===\n{full_text}"
except ImportError:
return "Error: youtube-transcript-api not installed. Run: pip install youtube-transcript-api"
except Exception as e:
return f"Error extracting transcript: {str(e)}"
@tool
def read_image(image_path: str, question: str = "") -> str:
"""Analyze image content using Kimi multimodal capabilities.
Uses the Kimi vision model to understand and describe image content.
Supports chess boards, charts, diagrams, screenshots, and general images.
Args:
image_path: Path to the image file (.png, .jpg, .jpeg)
question: Specific question about the image (e.g., "What chess move is shown?")
Returns:
Analysis/description of the image content from Kimi vision model.
"""
try:
import base64
from openai import OpenAI
# Check if file exists
if not os.path.exists(image_path):
# Try common locations
possible_paths = [image_path, os.path.join(".", image_path), os.path.join("/tmp", image_path)]
found = False
for p in possible_paths:
if os.path.exists(p):
image_path = p
found = True
break
if not found:
return f"Image file not found: {image_path}"
# Read and encode image
with open(image_path, "rb") as f:
image_data = f.read()
# Convert to base64
image_base64 = base64.b64encode(image_data).decode('utf-8')
# Determine MIME type
ext = Path(image_path).suffix.lower()
mime_type = {
'.png': 'image/png',
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.gif': 'image/gif',
'.webp': 'image/webp'
}.get(ext, 'image/png')
# Get API configuration from environment
# Support both OPENAI_API_KEY (legacy) and API_KEY (Kimi config)
api_key = os.getenv("OPENAI_API_KEY") or os.getenv("API_KEY")
base_url = os.getenv("BASE_URL", "https://api.moonshot.cn/v1")
# Support both MULTIMODAL_MODEL and MODEL_NAME
model = os.getenv("MULTIMODAL_MODEL") or os.getenv("MODEL_NAME", "kimi-k2.5")
if not api_key:
return "Error: API key not set. Set OPENAI_API_KEY or API_KEY in environment"
# Create client
client = OpenAI(api_key=api_key, base_url=base_url)
# Default question if not provided
if not question:
question = "Describe this image in detail."
# Call Kimi multimodal API
response = client.chat.completions.create(
model=model,
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": question},
{
"type": "image_url",
"image_url": {
"url": f"data:{mime_type};base64,{image_base64}"
}
}
]
}
],
max_tokens=2000
)
analysis = response.choices[0].message.content
return f"=== Image Analysis: {image_path} ===\n{analysis}"
except ImportError:
return "Error: openai package not installed"
except Exception as e:
return f"Error analyzing image: {str(e)}"
@tool
def read_content(source: str, question: str = "") -> str:
"""Unified content reader - automatically detects and reads various content types.
Supports:
- YouTube URLs: Extracts video transcript
- Image files (.png, .jpg, .jpeg): Analyzes using Kimi multimodal
- Web pages (http/https): Fetches and extracts text content
- Local files: Delegates to file_read tool
Args:
source: Content source (URL or file path)
question: Optional question for context (especially useful for images)
Returns:
Content text or analysis result.
"""
try:
# Check if it's a YouTube URL
if "youtube.com/watch" in source or "youtu.be/" in source or "youtube.com/shorts/" in source:
return youtube_transcript(source)
# Check if it's a web URL
if source.startswith(("http://", "https://")):
import requests
from bs4 import BeautifulSoup
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.0'
}
response = requests.get(source, headers=headers, timeout=30)
response.raise_for_status()
# Parse HTML
soup = BeautifulSoup(response.text, 'html.parser')
# Remove script and style elements
for script in soup(["script", "style"]):
script.decompose()
# Get text
text = soup.get_text(separator='\n', strip=True)
# Clean up whitespace
lines = [line.strip() for line in text.split('\n') if line.strip()]
cleaned_text = '\n'.join(lines)
# Truncate if too long
if len(cleaned_text) > 8000:
cleaned_text = cleaned_text[:8000] + "\n... [content truncated]"
return f"=== Web Content: {source} ===\n{cleaned_text}"
# Check if it's an image file
if source.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')):
return read_image(source, question)
# Otherwise, treat as local file
return file_read(source)
except Exception as e:
return f"Error reading content from {source}: {str(e)}"