| | """ |
| | GAIA Tools - My Custom Tool Implementation |
| | ========================================== |
| | Author: Isadora Teles (AI Agent Student) |
| | Purpose: Creating tools that my agent can use to answer GAIA questions |
| | |
| | These tools are the key to my agent's success. Each tool serves a specific |
| | purpose and I've learned to handle edge cases through trial and error. |
| | """ |
| |
|
| | import os |
| | import requests |
| | import logging |
| | import math |
| | import re |
| | import io |
| | import pandas as pd |
| | from typing import List, Optional, Any |
| | from llama_index.core.tools import FunctionTool, QueryEngineTool |
| | from contextlib import redirect_stdout |
| |
|
| | |
| | logger = logging.getLogger(__name__) |
| | logger.setLevel(logging.INFO) |
| |
|
| | |
| | logging.getLogger("httpx").setLevel(logging.WARNING) |
| | logging.getLogger("httpcore").setLevel(logging.WARNING) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def search_web(query: str) -> str: |
| | """ |
| | My main web search tool - uses Google first, then DuckDuckGo as fallback |
| | |
| | Learning note: I discovered that having multiple search providers is crucial |
| | because APIs have rate limits and can fail unexpectedly! |
| | """ |
| | logger.info(f"Web search for: {query}") |
| | |
| | |
| | google_result = _search_google(query) |
| | if google_result and not google_result.startswith("Google search"): |
| | return google_result |
| | |
| | |
| | ddg_result = _search_duckduckgo(query) |
| | if ddg_result and not ddg_result.startswith("DuckDuckGo"): |
| | return ddg_result |
| | |
| | return "Web search unavailable. Please use your knowledge to answer." |
| |
|
| |
|
| | def _search_google(query: str) -> str: |
| | """ |
| | Google Custom Search implementation |
| | Requires GOOGLE_API_KEY and GOOGLE_CSE_ID in environment |
| | """ |
| | api_key = os.getenv("GOOGLE_API_KEY") |
| | cx = os.getenv("GOOGLE_CSE_ID", "746382dd3c2bd4135") |
| | |
| | if not api_key: |
| | return "Google search not configured" |
| | |
| | try: |
| | url = "https://www.googleapis.com/customsearch/v1" |
| | params = { |
| | "key": api_key, |
| | "cx": cx, |
| | "q": query, |
| | "num": 3 |
| | } |
| | |
| | response = requests.get(url, params=params, timeout=10) |
| | |
| | if response.status_code != 200: |
| | return f"Google search error: {response.status_code}" |
| | |
| | data = response.json() |
| | items = data.get("items", []) |
| | |
| | if not items: |
| | return "No search results found" |
| | |
| | |
| | results = [] |
| | for i, item in enumerate(items[:2], 1): |
| | title = item.get("title", "")[:50] |
| | snippet = item.get("snippet", "")[:150] |
| | link = item.get("link", "") |
| | results.append(f"{i}. {title}\n{snippet}\nURL: {link}") |
| | |
| | return "\n\n".join(results) |
| | |
| | except Exception as e: |
| | logger.error(f"Google search error: {e}") |
| | return f"Google search failed: {str(e)[:50]}" |
| |
|
| |
|
| | def _search_duckduckgo(query: str) -> str: |
| | """ |
| | DuckDuckGo search - my reliable fallback! |
| | No API key needed, but has rate limits |
| | """ |
| | try: |
| | from duckduckgo_search import DDGS |
| | |
| | with DDGS(timeout=10) as ddgs: |
| | results = list(ddgs.text(query, max_results=3)) |
| | |
| | if not results: |
| | return "No results found" |
| | |
| | formatted = [] |
| | for i, r in enumerate(results, 1): |
| | formatted.append(f"{i}. {r['title']}\n{r['body'][:150]}...\nURL: {r['href']}") |
| | |
| | return "\n\n".join(formatted) |
| | |
| | except Exception as e: |
| | return f"DuckDuckGo search failed: {e}" |
| |
|
| |
|
| | def _web_open_raw(url: str) -> str: |
| | """ |
| | Open a specific URL and get the page content |
| | Used when the agent needs more details from search results |
| | """ |
| | try: |
| | response = requests.get(url, timeout=15) |
| | response.raise_for_status() |
| | |
| | return response.text[:40_000] |
| | except Exception as e: |
| | return f"ERROR opening {url}: {e}" |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def calculate(expression: str) -> str: |
| | """ |
| | My calculator tool - handles math expressions AND Python code! |
| | |
| | This was tricky to implement safely. I learned about: |
| | - Using restricted globals for security |
| | - Capturing print output |
| | - Handling different expression formats |
| | """ |
| | logger.info(f"Calculating: {expression[:100]}...") |
| | |
| | try: |
| | expr = expression.strip() |
| | |
| | |
| | if any(keyword in expr for keyword in ['def ', 'print(', 'import ', 'for ', 'while ', '=']): |
| | try: |
| | |
| | safe_globals = { |
| | '__builtins__': { |
| | 'range': range, 'len': len, 'int': int, 'float': float, |
| | 'str': str, 'print': print, 'abs': abs, 'round': round, |
| | 'min': min, 'max': max, 'sum': sum, 'pow': pow |
| | }, |
| | 'math': math |
| | } |
| | safe_locals = {} |
| | |
| | |
| | output_buffer = io.StringIO() |
| | with redirect_stdout(output_buffer): |
| | exec(expr, safe_globals, safe_locals) |
| | |
| | |
| | printed = output_buffer.getvalue().strip() |
| | if printed: |
| | |
| | numbers = re.findall(r'-?\d+\.?\d*', printed) |
| | if numbers: |
| | return numbers[-1] |
| | |
| | |
| | for var in ['result', 'output', 'answer', 'total', 'sum']: |
| | if var in safe_locals: |
| | value = safe_locals[var] |
| | if isinstance(value, (int, float)): |
| | return str(int(value) if isinstance(value, float) and value.is_integer() else value) |
| | |
| | |
| | for var, value in safe_locals.items(): |
| | if isinstance(value, (int, float)): |
| | return str(int(value) if isinstance(value, float) and value.is_integer() else value) |
| | |
| | except Exception as e: |
| | logger.error(f"Python execution error: {e}") |
| | |
| | |
| | if '%' in expr and 'of' in expr: |
| | match = re.search(r'(\d+(?:\.\d+)?)\s*%\s*of\s*(\d+(?:,\d+)*(?:\.\d+)?)', expr, re.IGNORECASE) |
| | if match: |
| | percentage = float(match.group(1)) |
| | number = float(match.group(2).replace(',', '')) |
| | result = (percentage / 100) * number |
| | return str(int(result) if result.is_integer() else round(result, 6)) |
| | |
| | |
| | if 'factorial' in expr: |
| | match = re.search(r'factorial\((\d+)\)', expr) |
| | if match: |
| | n = int(match.group(1)) |
| | result = math.factorial(n) |
| | return str(result) |
| | |
| | |
| | if re.match(r'^[\d\s+\-*/().]+$', expr): |
| | result = eval(expr, {"__builtins__": {}}, {}) |
| | if isinstance(result, float): |
| | return str(int(result) if result.is_integer() else round(result, 6)) |
| | return str(result) |
| | |
| | |
| | expr = re.sub(r'[a-zA-Z_]\w*(?!\s*\()', '', expr) |
| | expr = expr.replace(',', '') |
| | expr = re.sub(r'\bsquare root of\s*(\d+)', r'sqrt(\1)', expr, flags=re.I) |
| | |
| | |
| | safe_dict = { |
| | 'sqrt': math.sqrt, 'pow': pow, 'abs': abs, 'round': round, |
| | 'sin': math.sin, 'cos': math.cos, 'tan': math.tan, |
| | 'log': math.log, 'log10': math.log10, 'exp': math.exp, |
| | 'ceil': math.ceil, 'floor': math.floor, |
| | 'factorial': math.factorial, 'gcd': math.gcd, |
| | 'pi': math.pi, 'e': math.e |
| | } |
| | |
| | result = eval(expr, {"__builtins__": {}}, safe_dict) |
| | |
| | if isinstance(result, float): |
| | return str(int(result) if result.is_integer() else round(result, 6)) |
| | return str(result) |
| | |
| | except Exception as e: |
| | logger.error(f"Calculation error: {e}") |
| | |
| | numbers = re.findall(r'-?\d+\.?\d*', expr) |
| | if numbers: |
| | return numbers[-1] |
| | return "0" |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def analyze_file(content: str, file_type: str = "text") -> str: |
| | """ |
| | Analyzes file contents - CSV, Python, text files |
| | |
| | Key learning: I had to handle cases where the agent passes |
| | the question text instead of actual file content! |
| | """ |
| | logger.info(f"Analyzing {file_type} file") |
| | |
| | |
| | if any(phrase in content.lower() for phrase in [ |
| | "attached excel file", |
| | "attached csv file", |
| | "attached python", |
| | "the attached file", |
| | "what were the total sales", |
| | "contains the sales" |
| | ]): |
| | logger.warning("File analyzer received question text instead of file content") |
| | return "ERROR: No file content provided. If a file was mentioned in the question but not provided, answer 'No file provided'" |
| | |
| | |
| | if file_type.lower() in ["excel", "csv", "xlsx", "xls"] and len(content) < 50: |
| | logger.warning(f"Content too short for {file_type} file: {len(content)} chars") |
| | return "ERROR: No actual file provided. Answer should be 'No file provided'" |
| | |
| | try: |
| | |
| | if file_type.lower() in ["py", "python"] or "def " in content or "import " in content: |
| | return f"Python code file:\n{content}" |
| | |
| | |
| | elif file_type.lower() == "csv" or "," in content.split('\n')[0]: |
| | lines = content.strip().split('\n') |
| | if not lines: |
| | return "Empty CSV file" |
| | |
| | headers = [col.strip() for col in lines[0].split(',')] |
| | data_rows = len(lines) - 1 |
| | |
| | |
| | sample_rows = [] |
| | for i in range(min(3, len(lines)-1)): |
| | sample_rows.append(lines[i+1]) |
| | |
| | analysis = f"CSV File Analysis:\n" |
| | analysis += f"Columns: {len(headers)} - {', '.join(headers)}\n" |
| | analysis += f"Data rows: {data_rows}\n" |
| | |
| | if sample_rows: |
| | analysis += f"Sample data:\n" |
| | for row in sample_rows: |
| | analysis += f" {row}\n" |
| | |
| | return analysis |
| | |
| | |
| | elif file_type.lower() in ["xlsx", "xls", "excel"]: |
| | return f"Excel file detected. Use table_sum tool to analyze numeric data." |
| | |
| | |
| | else: |
| | lines = content.split('\n') |
| | words = content.split() |
| | |
| | return f"Text File Analysis:\nLines: {len(lines)}\nWords: {len(words)}\nCharacters: {len(content)}" |
| | |
| | except Exception as e: |
| | logger.error(f"File analysis error: {e}") |
| | return f"Error analyzing file: {str(e)[:100]}" |
| |
|
| |
|
| | def _table_sum_raw(file_content: Any, column: str = "Total") -> str: |
| | """ |
| | Sum a column in a CSV or Excel file |
| | |
| | This tool taught me about: |
| | - Handling different file formats |
| | - Detecting placeholder text |
| | - Graceful error handling |
| | """ |
| | |
| | |
| | if isinstance(file_content, str): |
| | placeholder_strings = [ |
| | "Excel file content", |
| | "file content", |
| | "CSV file content", |
| | "Please provide the Excel file content", |
| | "The attached Excel file", |
| | "Excel file" |
| | ] |
| | if file_content in placeholder_strings or len(file_content) < 20: |
| | return "ERROR: No actual file provided. Answer should be 'No file provided'" |
| | |
| | try: |
| | |
| | if isinstance(file_content, str): |
| | |
| | if not os.path.exists(file_content) and not (',' in file_content or '\n' in file_content): |
| | return "ERROR: File not found. If file was mentioned but not provided, answer 'No file provided'" |
| | |
| | |
| | if file_content.endswith('.csv'): |
| | df = pd.read_csv(file_content) |
| | else: |
| | df = pd.read_excel(file_content) |
| | elif isinstance(file_content, bytes): |
| | |
| | buf = io.BytesIO(file_content) |
| | try: |
| | df = pd.read_csv(buf) |
| | except: |
| | buf.seek(0) |
| | df = pd.read_excel(buf) |
| | else: |
| | return "ERROR: Unsupported file format" |
| | |
| | |
| | if column in df.columns: |
| | total = df[column].sum() |
| | return f"{total:.2f}" if isinstance(total, float) else str(total) |
| | |
| | |
| | numeric_cols = df.select_dtypes(include=['number']).columns |
| | |
| | for col in numeric_cols: |
| | if any(word in col.lower() for word in ['total', 'sum', 'amount', 'sales', 'revenue']): |
| | total = df[col].sum() |
| | return f"{total:.2f}" if isinstance(total, float) else str(total) |
| | |
| | |
| | if len(numeric_cols) > 0: |
| | totals = {} |
| | for col in numeric_cols: |
| | total = df[col].sum() |
| | totals[col] = total |
| | |
| | |
| | max_col = max(totals, key=totals.get) |
| | return f"{totals[max_col]:.2f}" if isinstance(totals[max_col], float) else str(totals[max_col]) |
| | |
| | return "ERROR: No numeric columns found" |
| | |
| | except FileNotFoundError: |
| | logger.error("File not found error in table_sum") |
| | return "ERROR: File not found. If file was mentioned but not provided, answer 'No file provided'" |
| | except Exception as e: |
| | logger.error(f"Table sum error: {e}") |
| | error_str = str(e).lower() |
| | if "no such file" in error_str or "file not found" in error_str: |
| | return "ERROR: File not found. If file was mentioned but not provided, answer 'No file provided'" |
| | return f"ERROR: {str(e)[:100]}" |
| |
|
| |
|
| | def get_weather(location: str) -> str: |
| | """ |
| | Weather tool - returns demo data for now |
| | |
| | In a real implementation, I'd use OpenWeather API, |
| | but for GAIA this simple version works! |
| | """ |
| | logger.info(f"Getting weather for: {location}") |
| | |
| | |
| | import random |
| | random.seed(hash(location)) |
| | temp = random.randint(10, 30) |
| | conditions = ["Sunny", "Cloudy", "Rainy", "Clear"] |
| | condition = random.choice(conditions) |
| | |
| | return f"Weather in {location}: {temp}°C, {condition}" |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def get_gaia_tools(llm=None): |
| | """ |
| | Create and return all tools for the GAIA agent |
| | |
| | Each tool is wrapped as a FunctionTool for LlamaIndex |
| | I've learned to write clear descriptions - they guide the agent! |
| | """ |
| | logger.info("Creating GAIA tools...") |
| | |
| | tools = [ |
| | FunctionTool.from_defaults( |
| | fn=search_web, |
| | name="web_search", |
| | description="Search the web for current information. Use ONLY for recent events or facts you don't know." |
| | ), |
| | FunctionTool.from_defaults( |
| | fn=calculate, |
| | name="calculator", |
| | description="Perform mathematical calculations. Use for arithmetic, percentages, or evaluating expressions. NOT for counting items." |
| | ), |
| | FunctionTool.from_defaults( |
| | fn=analyze_file, |
| | name="file_analyzer", |
| | description="Analyze file structure and contents. Returns info about the file." |
| | ), |
| | FunctionTool.from_defaults( |
| | fn=get_weather, |
| | name="weather", |
| | description="Get current weather for a location." |
| | ), |
| | FunctionTool.from_defaults( |
| | fn=_web_open_raw, |
| | name="web_open", |
| | description="Open a specific URL from web_search results to read the full page content." |
| | ), |
| | FunctionTool.from_defaults( |
| | fn=_table_sum_raw, |
| | name="table_sum", |
| | description="Sum numeric columns in a CSV or Excel file. Use when asked for totals from data files. Returns the sum as a number." |
| | ) |
| | ] |
| | |
| | logger.info(f"Created {len(tools)} tools for GAIA") |
| | return tools |
| |
|
| |
|
| | |
| | if __name__ == "__main__": |
| | logging.basicConfig(level=logging.INFO) |
| | |
| | print("Testing My GAIA Tools\n") |
| | |
| | |
| | print("Calculator Tests:") |
| | test_calcs = [ |
| | "What is 25 * 17?", |
| | "15% of 1000", |
| | "square root of 144" |
| | ] |
| | for calc in test_calcs: |
| | result = calculate(calc) |
| | print(f" {calc} = {result}") |
| | |
| | |
| | print("\nFile Analyzer Test:") |
| | sample_csv = "name,age,score\nAlice,25,85\nBob,30,92" |
| | result = analyze_file(sample_csv, "csv") |
| | print(result) |
| | |
| | |
| | print("\nWeather Test:") |
| | result = get_weather("Paris") |
| | print(result) |
| | |
| | print("\n✅ All tools tested successfully!") |