File size: 18,548 Bytes
8a7b3d1
a4f05bc
 
 
 
 
 
 
8a7b3d1
ed696d0
e828c8e
394d24e
8a7b3d1
 
e828c8e
1334ae9
 
 
8a7b3d1
1334ae9
8a7b3d1
a4f05bc
8a7b3d1
394d24e
8a7b3d1
a4f05bc
1334ae9
 
 
a53eb61
e828c8e
a4f05bc
e828c8e
8a7b3d1
e01c471
8a7b3d1
a4f05bc
d70b450
a4f05bc
 
8a7b3d1
d70b450
394d24e
a4f05bc
394d24e
 
 
 
a4f05bc
394d24e
 
 
 
d70b450
4dea17b
a4f05bc
394d24e
a4f05bc
 
 
 
394d24e
a4f05bc
394d24e
 
d70b450
8a7b3d1
 
394d24e
 
 
 
 
a4f05bc
394d24e
 
 
 
 
d70b450
394d24e
 
 
 
 
d70b450
394d24e
a4f05bc
4dea17b
d70b450
4dea17b
d70b450
394d24e
d70b450
394d24e
d70b450
394d24e
 
4dea17b
 
d70b450
a4f05bc
394d24e
a4f05bc
 
 
 
394d24e
 
 
d70b450
 
394d24e
d70b450
 
8a7b3d1
d70b450
 
 
394d24e
d70b450
8a7b3d1
 
d70b450
394d24e
a4f05bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
394d24e
a4f05bc
394d24e
8a7b3d1
e828c8e
8a7b3d1
a4f05bc
 
 
 
 
 
8a7b3d1
1334ae9
8a7b3d1
 
e828c8e
 
a4f05bc
1334ae9
 
a4f05bc
1334ae9
 
 
 
 
 
a4f05bc
1334ae9
 
 
a4f05bc
1334ae9
 
 
 
 
 
 
a4f05bc
1334ae9
 
 
 
a4f05bc
1334ae9
 
 
 
 
 
a4f05bc
1334ae9
 
 
 
 
 
 
a4f05bc
e828c8e
 
 
 
 
 
 
 
1334ae9
 
 
 
 
 
 
 
a4f05bc
1334ae9
 
 
 
 
e828c8e
a4f05bc
d70b450
 
 
e828c8e
a4f05bc
e828c8e
1334ae9
e828c8e
1334ae9
 
 
e828c8e
 
 
 
 
 
d70b450
 
 
8a7b3d1
e828c8e
a4f05bc
1334ae9
 
 
e828c8e
d70b450
a4f05bc
 
 
 
 
e01c471
8a7b3d1
a4f05bc
 
 
 
8a7b3d1
e01c471
8a7b3d1
a4f05bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8a7b3d1
a4f05bc
1334ae9
 
 
a4f05bc
1334ae9
e01c471
8a7b3d1
e828c8e
 
d70b450
 
e828c8e
a4f05bc
1334ae9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a4f05bc
1334ae9
 
 
a4f05bc
e828c8e
e01c471
 
1334ae9
 
8a7b3d1
 
1334ae9
 
8a7b3d1
a4f05bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8a7b3d1
a4f05bc
 
 
 
 
 
e828c8e
e01c471
a4f05bc
d70b450
 
 
 
 
8a7b3d1
d70b450
8a7b3d1
a4f05bc
e828c8e
a4f05bc
e828c8e
 
 
a4f05bc
 
 
 
 
 
e828c8e
8a7b3d1
d70b450
e828c8e
 
 
d70b450
e828c8e
 
 
 
1334ae9
e828c8e
 
 
 
1334ae9
e828c8e
 
 
 
d70b450
 
 
 
 
1334ae9
d70b450
 
1334ae9
d70b450
1334ae9
e828c8e
 
e01c471
e828c8e
1334ae9
 
a4f05bc
 
1334ae9
 
 
a4f05bc
1334ae9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a4f05bc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
"""
GAIA Tools - My Custom Tool Implementation
==========================================
Author: Isadora Teles (AI Agent Student)
Purpose: Creating tools that my agent can use to answer GAIA questions

These tools are the key to my agent's success. Each tool serves a specific
purpose and I've learned to handle edge cases through trial and error.
"""

import os
import requests
import logging
import math
import re
import io
import pandas as pd
from typing import List, Optional, Any
from llama_index.core.tools import FunctionTool, QueryEngineTool
from contextlib import redirect_stdout

# Setting up logging for debugging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

# Reduce noise from HTTP requests (they can be verbose!)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)


# ==========================================
# Web Search Functions - For current info
# ==========================================

def search_web(query: str) -> str:
    """
    My main web search tool - uses Google first, then DuckDuckGo as fallback
    
    Learning note: I discovered that having multiple search providers is crucial
    because APIs have rate limits and can fail unexpectedly!
    """
    logger.info(f"Web search for: {query}")
    
    # Try Google Custom Search first (better results)
    google_result = _search_google(query)
    if google_result and not google_result.startswith("Google search"):
        return google_result
    
    # Fallback to DuckDuckGo (no API key needed!)
    ddg_result = _search_duckduckgo(query)
    if ddg_result and not ddg_result.startswith("DuckDuckGo"):
        return ddg_result
    
    return "Web search unavailable. Please use your knowledge to answer."


def _search_google(query: str) -> str:
    """
    Google Custom Search implementation
    Requires GOOGLE_API_KEY and GOOGLE_CSE_ID in environment
    """
    api_key = os.getenv("GOOGLE_API_KEY")
    cx = os.getenv("GOOGLE_CSE_ID", "746382dd3c2bd4135")  # Default CSE ID
    
    if not api_key:
        return "Google search not configured"
    
    try:
        url = "https://www.googleapis.com/customsearch/v1"
        params = {
            "key": api_key,
            "cx": cx,
            "q": query,
            "num": 3  # Get top 3 results
        }
        
        response = requests.get(url, params=params, timeout=10)
        
        if response.status_code != 200:
            return f"Google search error: {response.status_code}"
        
        data = response.json()
        items = data.get("items", [])
        
        if not items:
            return "No search results found"
        
        # Format results nicely for the agent
        results = []
        for i, item in enumerate(items[:2], 1):
            title = item.get("title", "")[:50]
            snippet = item.get("snippet", "")[:150]
            link = item.get("link", "")
            results.append(f"{i}. {title}\n{snippet}\nURL: {link}")
        
        return "\n\n".join(results)
        
    except Exception as e:
        logger.error(f"Google search error: {e}")
        return f"Google search failed: {str(e)[:50]}"


def _search_duckduckgo(query: str) -> str:
    """
    DuckDuckGo search - my reliable fallback!
    No API key needed, but has rate limits
    """
    try:
        from duckduckgo_search import DDGS
        
        with DDGS(timeout=10) as ddgs:
            results = list(ddgs.text(query, max_results=3))
            
            if not results:
                return "No results found"
            
            formatted = []
            for i, r in enumerate(results, 1):
                formatted.append(f"{i}. {r['title']}\n{r['body'][:150]}...\nURL: {r['href']}")
            
            return "\n\n".join(formatted)
            
    except Exception as e:
        return f"DuckDuckGo search failed: {e}"


def _web_open_raw(url: str) -> str:
    """
    Open a specific URL and get the page content
    Used when the agent needs more details from search results
    """
    try:
        response = requests.get(url, timeout=15)
        response.raise_for_status()
        # Limit content to prevent token overflow
        return response.text[:40_000]
    except Exception as e:
        return f"ERROR opening {url}: {e}"


# ==========================================
# Calculator Tool - Math and Python execution
# ==========================================

def calculate(expression: str) -> str:
    """
    My calculator tool - handles math expressions AND Python code!
    
    This was tricky to implement safely. I learned about:
    - Using restricted globals for security
    - Capturing print output
    - Handling different expression formats
    """
    logger.info(f"Calculating: {expression[:100]}...")
    
    try:
        expr = expression.strip()
        
        # Check if it's Python code (not just math)
        if any(keyword in expr for keyword in ['def ', 'print(', 'import ', 'for ', 'while ', '=']):
            try:
                # Create a safe execution environment
                safe_globals = {
                    '__builtins__': {
                        'range': range, 'len': len, 'int': int, 'float': float,
                        'str': str, 'print': print, 'abs': abs, 'round': round,
                        'min': min, 'max': max, 'sum': sum, 'pow': pow
                    },
                    'math': math  # Allow math functions
                }
                safe_locals = {}
                
                # Capture any print output
                output_buffer = io.StringIO()
                with redirect_stdout(output_buffer):
                    exec(expr, safe_globals, safe_locals)
                
                # Get printed output
                printed = output_buffer.getvalue().strip()
                if printed:
                    # Extract numbers from print output
                    numbers = re.findall(r'-?\d+\.?\d*', printed)
                    if numbers:
                        return numbers[-1]
                
                # Check for result variables
                for var in ['result', 'output', 'answer', 'total', 'sum']:
                    if var in safe_locals:
                        value = safe_locals[var]
                        if isinstance(value, (int, float)):
                            return str(int(value) if isinstance(value, float) and value.is_integer() else value)
                
                # Return any numeric variable found
                for var, value in safe_locals.items():
                    if isinstance(value, (int, float)):
                        return str(int(value) if isinstance(value, float) and value.is_integer() else value)
                
            except Exception as e:
                logger.error(f"Python execution error: {e}")
        
        # Handle percentage calculations (common in GAIA)
        if '%' in expr and 'of' in expr:
            match = re.search(r'(\d+(?:\.\d+)?)\s*%\s*of\s*(\d+(?:,\d+)*(?:\.\d+)?)', expr, re.IGNORECASE)
            if match:
                percentage = float(match.group(1))
                number = float(match.group(2).replace(',', ''))
                result = (percentage / 100) * number
                return str(int(result) if result.is_integer() else round(result, 6))
        
        # Handle factorial
        if 'factorial' in expr:
            match = re.search(r'factorial\((\d+)\)', expr)
            if match:
                n = int(match.group(1))
                result = math.factorial(n)
                return str(result)
        
        # Simple math expression
        if re.match(r'^[\d\s+\-*/().]+$', expr):
            result = eval(expr, {"__builtins__": {}}, {})
            if isinstance(result, float):
                return str(int(result) if result.is_integer() else round(result, 6))
            return str(result)
        
        # Clean up expression and try again
        expr = re.sub(r'[a-zA-Z_]\w*(?!\s*\()', '', expr)
        expr = expr.replace(',', '')
        expr = re.sub(r'\bsquare root of\s*(\d+)', r'sqrt(\1)', expr, flags=re.I)
        
        # Safe math evaluation
        safe_dict = {
            'sqrt': math.sqrt, 'pow': pow, 'abs': abs, 'round': round,
            'sin': math.sin, 'cos': math.cos, 'tan': math.tan,
            'log': math.log, 'log10': math.log10, 'exp': math.exp,
            'ceil': math.ceil, 'floor': math.floor,
            'factorial': math.factorial, 'gcd': math.gcd,
            'pi': math.pi, 'e': math.e
        }
        
        result = eval(expr, {"__builtins__": {}}, safe_dict)
        
        if isinstance(result, float):
            return str(int(result) if result.is_integer() else round(result, 6))
        return str(result)
        
    except Exception as e:
        logger.error(f"Calculation error: {e}")
        # Last resort: try to find any number in the expression
        numbers = re.findall(r'-?\d+\.?\d*', expr)
        if numbers:
            return numbers[-1]
        return "0"


# ==========================================
# File Analysis Tools
# ==========================================

def analyze_file(content: str, file_type: str = "text") -> str:
    """
    Analyzes file contents - CSV, Python, text files
    
    Key learning: I had to handle cases where the agent passes
    the question text instead of actual file content!
    """
    logger.info(f"Analyzing {file_type} file")
    
    # Check if this is just the question text (common mistake!)
    if any(phrase in content.lower() for phrase in [
        "attached excel file",
        "attached csv file", 
        "attached python",
        "the attached file",
        "what were the total sales",
        "contains the sales"
    ]):
        logger.warning("File analyzer received question text instead of file content")
        return "ERROR: No file content provided. If a file was mentioned in the question but not provided, answer 'No file provided'"
    
    # Check for suspiciously short "files"
    if file_type.lower() in ["excel", "csv", "xlsx", "xls"] and len(content) < 50:
        logger.warning(f"Content too short for {file_type} file: {len(content)} chars")
        return "ERROR: No actual file provided. Answer should be 'No file provided'"
    
    try:
        # Python file detection
        if file_type.lower() in ["py", "python"] or "def " in content or "import " in content:
            return f"Python code file:\n{content}"
        
        # CSV file analysis
        elif file_type.lower() == "csv" or "," in content.split('\n')[0]:
            lines = content.strip().split('\n')
            if not lines:
                return "Empty CSV file"
            
            headers = [col.strip() for col in lines[0].split(',')]
            data_rows = len(lines) - 1
            
            # Show sample data
            sample_rows = []
            for i in range(min(3, len(lines)-1)):
                sample_rows.append(lines[i+1])
            
            analysis = f"CSV File Analysis:\n"
            analysis += f"Columns: {len(headers)} - {', '.join(headers)}\n"
            analysis += f"Data rows: {data_rows}\n"
            
            if sample_rows:
                analysis += f"Sample data:\n"
                for row in sample_rows:
                    analysis += f"  {row}\n"
            
            return analysis
        
        # Excel file indicator
        elif file_type.lower() in ["xlsx", "xls", "excel"]:
            return f"Excel file detected. Use table_sum tool to analyze numeric data."
        
        # Default text file analysis
        else:
            lines = content.split('\n')
            words = content.split()
            
            return f"Text File Analysis:\nLines: {len(lines)}\nWords: {len(words)}\nCharacters: {len(content)}"
            
    except Exception as e:
        logger.error(f"File analysis error: {e}")
        return f"Error analyzing file: {str(e)[:100]}"


def _table_sum_raw(file_content: Any, column: str = "Total") -> str:
    """
    Sum a column in a CSV or Excel file
    
    This tool taught me about:
    - Handling different file formats
    - Detecting placeholder text
    - Graceful error handling
    """
    
    # Check for placeholder strings (agent trying to pass fake content)
    if isinstance(file_content, str):
        placeholder_strings = [
            "Excel file content",
            "file content",
            "CSV file content",
            "Please provide the Excel file content",
            "The attached Excel file",
            "Excel file"
        ]
        if file_content in placeholder_strings or len(file_content) < 20:
            return "ERROR: No actual file provided. Answer should be 'No file provided'"
    
    try:
        # Handle file paths vs content
        if isinstance(file_content, str):
            # Check if it's a non-existent file path
            if not os.path.exists(file_content) and not (',' in file_content or '\n' in file_content):
                return "ERROR: File not found. If file was mentioned but not provided, answer 'No file provided'"
                
            # Try to read as file
            if file_content.endswith('.csv'):
                df = pd.read_csv(file_content)
            else:
                df = pd.read_excel(file_content)
        elif isinstance(file_content, bytes):
            # Handle raw bytes
            buf = io.BytesIO(file_content)
            try:
                df = pd.read_csv(buf)
            except:
                buf.seek(0)
                df = pd.read_excel(buf)
        else:
            return "ERROR: Unsupported file format"
        
        # Try to find and sum the appropriate column
        if column in df.columns:
            total = df[column].sum()
            return f"{total:.2f}" if isinstance(total, float) else str(total)
        
        # Look for numeric columns with keywords
        numeric_cols = df.select_dtypes(include=['number']).columns
        
        for col in numeric_cols:
            if any(word in col.lower() for word in ['total', 'sum', 'amount', 'sales', 'revenue']):
                total = df[col].sum()
                return f"{total:.2f}" if isinstance(total, float) else str(total)
        
        # Sum all numeric columns as last resort
        if len(numeric_cols) > 0:
            totals = {}
            for col in numeric_cols:
                total = df[col].sum()
                totals[col] = total
            
            # Return the largest sum (likely the total)
            max_col = max(totals, key=totals.get)
            return f"{totals[max_col]:.2f}" if isinstance(totals[max_col], float) else str(totals[max_col])
        
        return "ERROR: No numeric columns found"
        
    except FileNotFoundError:
        logger.error("File not found error in table_sum")
        return "ERROR: File not found. If file was mentioned but not provided, answer 'No file provided'"
    except Exception as e:
        logger.error(f"Table sum error: {e}")
        error_str = str(e).lower()
        if "no such file" in error_str or "file not found" in error_str:
            return "ERROR: File not found. If file was mentioned but not provided, answer 'No file provided'"
        return f"ERROR: {str(e)[:100]}"


def get_weather(location: str) -> str:
    """
    Weather tool - returns demo data for now
    
    In a real implementation, I'd use OpenWeather API,
    but for GAIA this simple version works!
    """
    logger.info(f"Getting weather for: {location}")
    
    # Demo weather data (deterministic based on location)
    import random
    random.seed(hash(location))
    temp = random.randint(10, 30)
    conditions = ["Sunny", "Cloudy", "Rainy", "Clear"]
    condition = random.choice(conditions)
    
    return f"Weather in {location}: {temp}°C, {condition}"


# ==========================================
# Tool Creation Function
# ==========================================

def get_gaia_tools(llm=None):
    """
    Create and return all tools for the GAIA agent
    
    Each tool is wrapped as a FunctionTool for LlamaIndex
    I've learned to write clear descriptions - they guide the agent!
    """
    logger.info("Creating GAIA tools...")
    
    tools = [
        FunctionTool.from_defaults(
            fn=search_web,
            name="web_search",
            description="Search the web for current information. Use ONLY for recent events or facts you don't know."
        ),
        FunctionTool.from_defaults(
            fn=calculate,
            name="calculator",
            description="Perform mathematical calculations. Use for arithmetic, percentages, or evaluating expressions. NOT for counting items."
        ),
        FunctionTool.from_defaults(
            fn=analyze_file,
            name="file_analyzer",
            description="Analyze file structure and contents. Returns info about the file."
        ),
        FunctionTool.from_defaults(
            fn=get_weather,
            name="weather",
            description="Get current weather for a location."
        ),
        FunctionTool.from_defaults(
            fn=_web_open_raw,
            name="web_open",
            description="Open a specific URL from web_search results to read the full page content."
        ),
        FunctionTool.from_defaults(
            fn=_table_sum_raw,
            name="table_sum",
            description="Sum numeric columns in a CSV or Excel file. Use when asked for totals from data files. Returns the sum as a number."
        )
    ]
    
    logger.info(f"Created {len(tools)} tools for GAIA")
    return tools


# Testing section - helps me debug tools individually
if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    
    print("Testing My GAIA Tools\n")
    
    # Test calculator
    print("Calculator Tests:")
    test_calcs = [
        "What is 25 * 17?",
        "15% of 1000",
        "square root of 144"
    ]
    for calc in test_calcs:
        result = calculate(calc)
        print(f"  {calc} = {result}")
    
    # Test file analyzer
    print("\nFile Analyzer Test:")
    sample_csv = "name,age,score\nAlice,25,85\nBob,30,92"
    result = analyze_file(sample_csv, "csv")
    print(result)
    
    # Test weather
    print("\nWeather Test:")
    result = get_weather("Paris")
    print(result)
    
    print("\n✅ All tools tested successfully!")