File size: 14,753 Bytes
4c70715
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
"""Tools for GAIA Agent

This module provides tools for:
- Web search using DuckDuckGo
- Python code execution
- File reading (txt, py, json, xlsx, mp3, png)
- YouTube transcript extraction
- Image understanding via Kimi multimodal
- Unified content reading
"""

import os
import io
import sys
import json
import subprocess
from typing import Any
from pathlib import Path

from smolagents import tool


@tool
def web_search(query: str) -> str:
    """Search the web using DuckDuckGo.
    
    Args:
        query: The search query string.
        
    Returns:
        A string containing search results.
    """
    try:
        from duckduckgo_search import DDGS
        
        with DDGS() as ddgs:
            results = list(ddgs.text(query, max_results=10))
            
        if not results:
            return "No search results found."
            
        formatted_results = []
        for i, r in enumerate(results, 1):
            title = r.get('title', 'No title')
            body = r.get('body', 'No description')
            href = r.get('href', '')
            formatted_results.append(f"{i}. {title}\n{body}\nURL: {href}\n")
            
        return "\n".join(formatted_results)
        
    except Exception as e:
        return f"Search error: {str(e)}"


@tool
def python_execute(code: str) -> str:
    """Execute Python code and return the result.
    
    This tool runs Python code in a subprocess and captures stdout/stderr.
    Supports common libraries like pandas, numpy, json, requests.
    
    Args:
        code: Python code to execute.
        
    Returns:
        The output of the code execution (stdout + stderr).
    """
    try:
        # Create a temporary script file
        script_path = "/tmp/gaia_script.py"
        
        # Wrap code to capture output
        wrapped_code = f'''
import sys
import io
import json
import math
import re
import os

# Capture stdout
old_stdout = sys.stdout
sys.stdout = buffer = io.StringIO()

try:
{chr(10).join("    " + line for line in code.split(chr(10)))}
except Exception as e:
    print(f"Error: {{e}}")
    import traceback
    traceback.print_exc()

# Get output
output = buffer.getvalue()
sys.stdout = old_stdout
print(output, end='')
'''
        
        with open(script_path, 'w', encoding='utf-8') as f:
            f.write(wrapped_code)
            
        # Execute the script
        result = subprocess.run(
            [sys.executable, script_path],
            capture_output=True,
            text=True,
            timeout=30
        )
        
        output = result.stdout
        if result.stderr:
            output += f"\n[STDERR]: {result.stderr}"
            
        if result.returncode != 0:
            output += f"\n[Exit code: {result.returncode}]"
            
        return output if output else "(No output)"
        
    except subprocess.TimeoutExpired:
        return "Error: Code execution timed out (30s limit)"
    except Exception as e:
        return f"Execution error: {str(e)}"


@tool
def file_read(filepath: str) -> str:
    """Read file content (txt, py, json, xlsx, mp3, png, etc.).
    
    Supports multiple file types:
    - Text files (.txt, .py, .md): Returns content directly
    - JSON files (.json): Returns formatted JSON
    - Excel files (.xlsx, .xls): Returns sheet names and preview
    - Audio files (.mp3, .wav): Returns file info and transcription if possible
    - Image files (.png, .jpg): Returns file info (needs VLM for content analysis)
    
    Args:
        filepath: Path to the file to read.
        
    Returns:
        File content or description.
    """
    try:
        # Check if file exists
        if not os.path.exists(filepath):
            # Try to find file in current directory or common locations
            possible_paths = [
                filepath,
                os.path.join(".", filepath),
                os.path.join("/tmp", filepath),
            ]
            
            found = False
            for p in possible_paths:
                if os.path.exists(p):
                    filepath = p
                    found = True
                    break
                    
            if not found:
                return f"File not found: {filepath}"
        
        # Get file extension
        ext = Path(filepath).suffix.lower()
        
        # Text-based files
        if ext in ['.txt', '.py', '.md', '.csv', '.log', '.yaml', '.yml', '.html', '.css', '.js']:
            with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
                content = f.read()
            return f"=== File: {filepath} ===\n{content}"
            
        # JSON files
        elif ext == '.json':
            with open(filepath, 'r', encoding='utf-8') as f:
                data = json.load(f)
            return f"=== JSON File: {filepath} ===\n{json.dumps(data, indent=2, ensure_ascii=False)}"
            
        # Excel files
        elif ext in ['.xlsx', '.xls']:
            try:
                import pandas as pd
                df = pd.read_excel(filepath)
                preview = df.head(20).to_string()
                return f"=== Excel File: {filepath} ===\nShape: {df.shape}\nColumns: {list(df.columns)}\n\nPreview (first 20 rows):\n{preview}"
            except ImportError:
                return f"Excel file found but pandas not available for reading: {filepath}"
            except Exception as e:
                return f"Error reading Excel file {filepath}: {e}"
                
        # Image files
        elif ext in ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp']:
            from PIL import Image
            with Image.open(filepath) as img:
                return f"=== Image File: {filepath} ===\nFormat: {img.format}\nSize: {img.size}\nMode: {img.mode}\n\n(Use a vision model to analyze image content)"
                
        # Audio files
        elif ext in ['.mp3', '.wav', '.ogg', '.flac', '.m4a']:
            # Try to get basic info
            info = f"=== Audio File: {filepath} ===\n"
            info += f"Extension: {ext}\n"
            info += f"Size: {os.path.getsize(filepath)} bytes\n"
            
            # Try to transcribe with whisper if available
            try:
                import whisper
                model = whisper.load_model("base")
                result = model.transcribe(filepath)
                info += f"\n=== Transcription ===\n{result['text']}"
            except ImportError:
                info += "\n(Whisper not available for transcription)"
            except Exception as e:
                info += f"\n(Transcription failed: {e})"
                
            return info
            
        # Binary files - return basic info
        else:
            size = os.path.getsize(filepath)
            return f"=== Binary File: {filepath} ===\nSize: {size} bytes\nExtension: {ext}\n\n(File type not supported for direct reading)"
            
    except Exception as e:
        return f"Error reading file {filepath}: {str(e)}"


@tool
def youtube_transcript(url: str) -> str:
    """Extract transcript/captions from YouTube videos.
    
    Uses youtube-transcript-api to fetch captions directly without downloading video.
    Works with auto-generated or manual subtitles.
    
    Args:
        url: YouTube video URL (e.g., https://www.youtube.com/watch?v=...)
        
    Returns:
        Transcript text from the video, or error message if unavailable.
    """
    try:
        from youtube_transcript_api import YouTubeTranscriptApi
        
        # Extract video ID from URL
        video_id = None
        if "youtube.com/watch?v=" in url:
            video_id = url.split("youtube.com/watch?v=")[1].split("&")[0]
        elif "youtu.be/" in url:
            video_id = url.split("youtu.be/")[1].split("?")[0]
        elif "youtube.com/shorts/" in url:
            video_id = url.split("youtube.com/shorts/")[1].split("?")[0]
        
        if not video_id:
            return f"Could not extract video ID from URL: {url}"
        
        # Get available transcripts (API v1.x style)
        try:
            # Try to fetch transcript directly with language preference
            transcript_data = YouTubeTranscriptApi.fetch(video_id, languages=['en', 'en-US', 'en-GB'])
        except:
            # Fall back to any available transcript
            try:
                transcript_data = YouTubeTranscriptApi.fetch(video_id)
            except:
                return "No transcript available for this video"
        
        # Format transcript - transcript_data is now a list of transcript snippets
        text_parts = [snippet.text for snippet in transcript_data]
        full_text = " ".join(text_parts)
        
        return f"=== YouTube Transcript (Video ID: {video_id}) ===\n{full_text}"
        
    except ImportError:
        return "Error: youtube-transcript-api not installed. Run: pip install youtube-transcript-api"
    except Exception as e:
        return f"Error extracting transcript: {str(e)}"


@tool
def read_image(image_path: str, question: str = "") -> str:
    """Analyze image content using Kimi multimodal capabilities.
    
    Uses the Kimi vision model to understand and describe image content.
    Supports chess boards, charts, diagrams, screenshots, and general images.
    
    Args:
        image_path: Path to the image file (.png, .jpg, .jpeg)
        question: Specific question about the image (e.g., "What chess move is shown?")
                 
    Returns:
        Analysis/description of the image content from Kimi vision model.
    """
    try:
        import base64
        from openai import OpenAI
        
        # Check if file exists
        if not os.path.exists(image_path):
            # Try common locations
            possible_paths = [image_path, os.path.join(".", image_path), os.path.join("/tmp", image_path)]
            found = False
            for p in possible_paths:
                if os.path.exists(p):
                    image_path = p
                    found = True
                    break
            if not found:
                return f"Image file not found: {image_path}"
        
        # Read and encode image
        with open(image_path, "rb") as f:
            image_data = f.read()
        
        # Convert to base64
        image_base64 = base64.b64encode(image_data).decode('utf-8')
        
        # Determine MIME type
        ext = Path(image_path).suffix.lower()
        mime_type = {
            '.png': 'image/png',
            '.jpg': 'image/jpeg',
            '.jpeg': 'image/jpeg',
            '.gif': 'image/gif',
            '.webp': 'image/webp'
        }.get(ext, 'image/png')
        
        # Get API configuration from environment
        # Support both OPENAI_API_KEY (legacy) and API_KEY (Kimi config)
        api_key = os.getenv("OPENAI_API_KEY") or os.getenv("API_KEY")
        base_url = os.getenv("BASE_URL", "https://api.moonshot.cn/v1")
        # Support both MULTIMODAL_MODEL and MODEL_NAME
        model = os.getenv("MULTIMODAL_MODEL") or os.getenv("MODEL_NAME", "kimi-k2.5")
        
        if not api_key:
            return "Error: API key not set. Set OPENAI_API_KEY or API_KEY in environment"
        
        # Create client
        client = OpenAI(api_key=api_key, base_url=base_url)
        
        # Default question if not provided
        if not question:
            question = "Describe this image in detail."
        
        # Call Kimi multimodal API
        response = client.chat.completions.create(
            model=model,
            messages=[
                {
                    "role": "user",
                    "content": [
                        {"type": "text", "text": question},
                        {
                            "type": "image_url",
                            "image_url": {
                                "url": f"data:{mime_type};base64,{image_base64}"
                            }
                        }
                    ]
                }
            ],
            max_tokens=2000
        )
        
        analysis = response.choices[0].message.content
        return f"=== Image Analysis: {image_path} ===\n{analysis}"
        
    except ImportError:
        return "Error: openai package not installed"
    except Exception as e:
        return f"Error analyzing image: {str(e)}"


@tool
def read_content(source: str, question: str = "") -> str:
    """Unified content reader - automatically detects and reads various content types.
    
    Supports:
    - YouTube URLs: Extracts video transcript
    - Image files (.png, .jpg, .jpeg): Analyzes using Kimi multimodal
    - Web pages (http/https): Fetches and extracts text content
    - Local files: Delegates to file_read tool
    
    Args:
        source: Content source (URL or file path)
        question: Optional question for context (especially useful for images)
        
    Returns:
        Content text or analysis result.
    """
    try:
        # Check if it's a YouTube URL
        if "youtube.com/watch" in source or "youtu.be/" in source or "youtube.com/shorts/" in source:
            return youtube_transcript(source)
        
        # Check if it's a web URL
        if source.startswith(("http://", "https://")):
            import requests
            from bs4 import BeautifulSoup
            
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.0'
            }
            response = requests.get(source, headers=headers, timeout=30)
            response.raise_for_status()
            
            # Parse HTML
            soup = BeautifulSoup(response.text, 'html.parser')
            
            # Remove script and style elements
            for script in soup(["script", "style"]):
                script.decompose()
            
            # Get text
            text = soup.get_text(separator='\n', strip=True)
            
            # Clean up whitespace
            lines = [line.strip() for line in text.split('\n') if line.strip()]
            cleaned_text = '\n'.join(lines)
            
            # Truncate if too long
            if len(cleaned_text) > 8000:
                cleaned_text = cleaned_text[:8000] + "\n... [content truncated]"
            
            return f"=== Web Content: {source} ===\n{cleaned_text}"
        
        # Check if it's an image file
        if source.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')):
            return read_image(source, question)
        
        # Otherwise, treat as local file
        return file_read(source)
        
    except Exception as e:
        return f"Error reading content from {source}: {str(e)}"