Spaces:
Running
Running
| import os | |
| import json | |
| import logging | |
| from typing import Dict, List, Any, Optional, Union | |
| from datetime import datetime | |
| import asyncio | |
| import base64 | |
| from io import BytesIO | |
| import google.generativeai as genai | |
| from google.generativeai.types import HarmCategory, HarmBlockThreshold | |
| from PIL import Image | |
| import pandas as pd | |
| import numpy as np | |
| import requests | |
| from duckduckgo_search import DDGS | |
| import tempfile | |
| from pathlib import Path | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class GAIAQuestion: | |
| """GAIA benchmark question structure""" | |
| def __init__(self, question_id: str, question: str, level: int, | |
| final_answer: Optional[str] = None, file_name: Optional[str] = None, | |
| file_path: Optional[str] = None, annotator_metadata: Optional[Dict] = None): | |
| self.question_id = question_id | |
| self.question = question | |
| self.level = level | |
| self.final_answer = final_answer | |
| self.file_name = file_name | |
| self.file_path = file_path | |
| self.annotator_metadata = annotator_metadata | |
| class GeminiTool: | |
| """Base class for Gemini agent tools""" | |
| def __init__(self, name: str, description: str): | |
| self.name = name | |
| self.description = description | |
| def execute(self, input_data: str) -> str: | |
| raise NotImplementedError | |
| class CalculatorTool(GeminiTool): | |
| """Advanced calculator tool for mathematical operations""" | |
| def __init__(self): | |
| super().__init__( | |
| name="calculator", | |
| description=""" | |
| Performs mathematical calculations including: | |
| - Basic arithmetic (+, -, *, /, %) | |
| - Advanced math (sqrt, log, sin, cos, tan, exp, etc.) | |
| - Financial calculations (compound interest, annuities, etc.) | |
| - Statistical operations (mean, median, std, etc.) | |
| Examples: | |
| - "sqrt(144)" β 12 | |
| - "log(100)" β 2.0 (base 10) | |
| - "sin(pi/2)" β 1.0 | |
| - "compound_interest(1000, 0.05, 3)" β compound interest calculation | |
| """ | |
| ) | |
| def execute(self, expression: str) -> str: | |
| try: | |
| import math | |
| import statistics | |
| # Enhanced safe evaluation environment | |
| safe_dict = { | |
| "__builtins__": {}, | |
| # Basic operations | |
| "abs": abs, "round": round, "min": min, "max": max, | |
| "sum": sum, "pow": pow, "divmod": divmod, | |
| # Math functions | |
| "sqrt": math.sqrt, "log": math.log, "log10": math.log10, | |
| "ln": math.log, "exp": math.exp, | |
| "sin": math.sin, "cos": math.cos, "tan": math.tan, | |
| "asin": math.asin, "acos": math.acos, "atan": math.atan, | |
| "sinh": math.sinh, "cosh": math.cosh, "tanh": math.tanh, | |
| "pi": math.pi, "e": math.e, | |
| "floor": math.floor, "ceil": math.ceil, | |
| "factorial": math.factorial, "gcd": math.gcd, | |
| # Statistical functions | |
| "mean": statistics.mean, "median": statistics.median, | |
| "mode": statistics.mode, "stdev": statistics.stdev, | |
| # Financial functions | |
| "compound_interest": self._compound_interest, | |
| "simple_interest": self._simple_interest, | |
| "present_value": self._present_value, | |
| "future_value": self._future_value, | |
| } | |
| # Handle special financial calculations | |
| if "compound_interest" in expression.lower(): | |
| return self._handle_financial_calculation(expression) | |
| # Evaluate the expression safely | |
| result = eval(expression, safe_dict) | |
| return f"Calculation result: {result}" | |
| except Exception as e: | |
| return f"Calculation error: {str(e)}. Please check your mathematical expression." | |
| def _compound_interest(self, principal: float, rate: float, time: float, n: int = 1) -> float: | |
| """Calculate compound interest: A = P(1 + r/n)^(nt)""" | |
| return principal * (1 + rate/n) ** (n * time) | |
| def _simple_interest(self, principal: float, rate: float, time: float) -> float: | |
| """Calculate simple interest: A = P(1 + rt)""" | |
| return principal * (1 + rate * time) | |
| def _present_value(self, future_value: float, rate: float, time: float) -> float: | |
| """Calculate present value: PV = FV / (1 + r)^t""" | |
| return future_value / (1 + rate) ** time | |
| def _future_value(self, present_value: float, rate: float, time: float) -> float: | |
| """Calculate future value: FV = PV * (1 + r)^t""" | |
| return present_value * (1 + rate) ** time | |
| def _handle_financial_calculation(self, expression: str) -> str: | |
| """Handle complex financial calculations""" | |
| try: | |
| # Parse common financial calculation patterns | |
| if "compound" in expression.lower(): | |
| # Extract parameters from natural language | |
| # This is a simplified parser - in production, you'd use more sophisticated NLP | |
| import re | |
| # Look for patterns like "1000 at 5% for 3 years" | |
| money_pattern = r'\$?(\d+(?:\.\d+)?)' | |
| rate_pattern = r'(\d+(?:\.\d+)?)%' | |
| time_pattern = r'(\d+(?:\.\d+)?)\s*years?' | |
| money_match = re.search(money_pattern, expression) | |
| rate_match = re.search(rate_pattern, expression) | |
| time_match = re.search(time_pattern, expression) | |
| if money_match and rate_match and time_match: | |
| principal = float(money_match.group(1)) | |
| rate = float(rate_match.group(1)) / 100 # Convert percentage | |
| time = float(time_match.group(1)) | |
| # Default to annual compounding | |
| n = 12 if "monthly" in expression.lower() else 1 | |
| result = self._compound_interest(principal, rate, time, n) | |
| return f""" | |
| Financial Calculation - Compound Interest: | |
| - Principal: ${principal:,.2f} | |
| - Interest Rate: {rate*100}% per year | |
| - Time Period: {time} years | |
| - Compounding: {'Monthly' if n == 12 else 'Annually'} | |
| - Final Amount: ${result:,.2f} | |
| - Interest Earned: ${result - principal:,.2f} | |
| """ | |
| return "Unable to parse financial calculation. Please use format like: compound_interest(1000, 0.05, 3)" | |
| except Exception as e: | |
| return f"Financial calculation error: {str(e)}" | |
| class WebSearchTool(GeminiTool): | |
| """Web search tool using DuckDuckGo""" | |
| def __init__(self): | |
| super().__init__( | |
| name="web_search", | |
| description=""" | |
| Searches the web for current information using DuckDuckGo. | |
| Returns relevant, up-to-date search results with summaries. | |
| Best for: | |
| - Current events and news | |
| - Recent statistics and data | |
| - Current prices, populations, etc. | |
| - Latest information on any topic | |
| Example: "current population of Tokyo 2024" | |
| """ | |
| ) | |
| self.ddgs = DDGS() | |
| def execute(self, query: str) -> str: | |
| try: | |
| # Perform web search | |
| results = list(self.ddgs.text(query, max_results=5)) | |
| if not results: | |
| return f"No search results found for: {query}" | |
| formatted_results = f"Web search results for '{query}':\n\n" | |
| for i, result in enumerate(results, 1): | |
| title = result.get('title', 'No title') | |
| snippet = result.get('body', 'No description') | |
| url = result.get('href', 'No URL') | |
| formatted_results += f"{i}. **{title}**\n" | |
| formatted_results += f" {snippet[:200]}...\n" | |
| formatted_results += f" Source: {url}\n\n" | |
| return formatted_results | |
| except Exception as e: | |
| return f"Web search error: {str(e)}. Unable to perform search at this time." | |
| class FileAnalyzerTool(GeminiTool): | |
| """Tool for analyzing various file types""" | |
| def __init__(self): | |
| super().__init__( | |
| name="file_analyzer", | |
| description=""" | |
| Analyzes various file types including: | |
| - Text files (.txt, .md, .json, .csv) | |
| - Data files (CSV, Excel, JSON) | |
| - Image files (PNG, JPG, GIF, etc.) | |
| - Documents and structured data | |
| Provides summaries, statistics, and insights from file contents. | |
| """ | |
| ) | |
| def execute(self, file_path: str) -> str: | |
| try: | |
| if not os.path.exists(file_path): | |
| return f"File not found: {file_path}" | |
| file_extension = Path(file_path).suffix.lower() | |
| if file_extension in ['.txt', '.md', '.py', '.js', '.html', '.css']: | |
| return self._analyze_text_file(file_path) | |
| elif file_extension == '.json': | |
| return self._analyze_json_file(file_path) | |
| elif file_extension == '.csv': | |
| return self._analyze_csv_file(file_path) | |
| elif file_extension in ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp']: | |
| return self._analyze_image_file(file_path) | |
| else: | |
| return f"Unsupported file type: {file_extension}" | |
| except Exception as e: | |
| return f"Error analyzing file: {str(e)}" | |
| def _analyze_text_file(self, file_path: str) -> str: | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| content = f.read() | |
| lines = content.split('\n') | |
| words = content.split() | |
| chars = len(content) | |
| # Basic text statistics | |
| avg_line_length = sum(len(line) for line in lines) / len(lines) if lines else 0 | |
| avg_word_length = sum(len(word) for word in words) / len(words) if words else 0 | |
| preview = content[:500] + ('...' if len(content) > 500 else '') | |
| return f""" | |
| π Text File Analysis: | |
| - File: {Path(file_path).name} | |
| - Lines: {len(lines):,} | |
| - Words: {len(words):,} | |
| - Characters: {chars:,} | |
| - Average line length: {avg_line_length:.1f} characters | |
| - Average word length: {avg_word_length:.1f} characters | |
| π Content Preview: | |
| {preview} | |
| """ | |
| def _analyze_json_file(self, file_path: str) -> str: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| data_type = type(data).__name__ | |
| if isinstance(data, dict): | |
| keys_info = f"Keys ({len(data)}): {list(data.keys())[:10]}" | |
| if len(data) > 10: | |
| keys_info += "..." | |
| elif isinstance(data, list): | |
| keys_info = f"List with {len(data)} items" | |
| else: | |
| keys_info = f"Single {data_type} value" | |
| preview = json.dumps(data, indent=2)[:500] | |
| if len(str(data)) > 500: | |
| preview += "..." | |
| return f""" | |
| π§ JSON File Analysis: | |
| - File: {Path(file_path).name} | |
| - Data type: {data_type} | |
| - {keys_info} | |
| - File size: {os.path.getsize(file_path):,} bytes | |
| π Content Preview: | |
| {preview} | |
| """ | |
| def _analyze_csv_file(self, file_path: str) -> str: | |
| try: | |
| df = pd.read_csv(file_path) | |
| # Basic statistics | |
| rows, cols = df.shape | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() | |
| text_cols = df.select_dtypes(include=['object']).columns.tolist() | |
| missing_data = df.isnull().sum() | |
| # Summary statistics for numeric columns | |
| numeric_summary = "" | |
| if numeric_cols: | |
| numeric_summary = "\nπ Numeric Columns Summary:\n" | |
| for col in numeric_cols[:5]: # Show first 5 numeric columns | |
| col_data = df[col] | |
| numeric_summary += f" {col}: mean={col_data.mean():.2f}, std={col_data.std():.2f}, min={col_data.min()}, max={col_data.max()}\n" | |
| preview = df.head(3).to_string(max_cols=6) | |
| return f""" | |
| π CSV File Analysis: | |
| - File: {Path(file_path).name} | |
| - Dimensions: {rows:,} rows Γ {cols} columns | |
| - Numeric columns: {len(numeric_cols)} ({numeric_cols[:5]}) | |
| - Text columns: {len(text_cols)} ({text_cols[:5]}) | |
| - Missing values: {missing_data.sum()} total | |
| - File size: {os.path.getsize(file_path):,} bytes | |
| {numeric_summary} | |
| π Data Preview (first 3 rows): | |
| {preview} | |
| """ | |
| except Exception as e: | |
| return f"Error analyzing CSV file: {str(e)}" | |
| def _analyze_image_file(self, file_path: str) -> str: | |
| try: | |
| with Image.open(file_path) as img: | |
| width, height = img.size | |
| mode = img.mode | |
| format_name = img.format | |
| file_size = os.path.getsize(file_path) | |
| # Calculate aspect ratio | |
| aspect_ratio = width / height | |
| # Determine image orientation | |
| orientation = "Square" if abs(aspect_ratio - 1) < 0.1 else ("Landscape" if aspect_ratio > 1 else "Portrait") | |
| return f""" | |
| πΌοΈ Image File Analysis: | |
| - File: {Path(file_path).name} | |
| - Format: {format_name} | |
| - Dimensions: {width} Γ {height} pixels | |
| - Color mode: {mode} | |
| - Aspect ratio: {aspect_ratio:.2f} ({orientation}) | |
| - File size: {file_size:,} bytes ({file_size/1024:.1f} KB) | |
| Note: For detailed image content analysis, the image will be processed by Gemini's vision capabilities. | |
| """ | |
| except Exception as e: | |
| return f"Error analyzing image: {str(e)}" | |
| class GeminiGAIAAgent: | |
| """ | |
| Advanced GAIA benchmark agent using Google Gemini | |
| Optimized for multimodal understanding and complex reasoning | |
| """ | |
| def __init__(self, | |
| model_name: str = "gemini-2.5-flash", | |
| api_key: Optional[str] = None, | |
| temperature: float = 0.1, | |
| max_tokens: int = 2048, | |
| verbose: bool = True): | |
| self.model_name = model_name | |
| self.temperature = temperature | |
| self.max_tokens = max_tokens | |
| self.verbose = verbose | |
| # Configure Gemini API | |
| self._configure_gemini(api_key) | |
| # Initialize model | |
| self.model = self._initialize_model() | |
| # Initialize tools | |
| self.tools = self._initialize_tools() | |
| # Conversation history | |
| self.conversation_history = [] | |
| logger.info(f"Gemini GAIA Agent initialized with model: {model_name}") | |
| def _configure_gemini(self, api_key: Optional[str]): | |
| """Configure Gemini API""" | |
| if api_key: | |
| genai.configure(api_key=api_key) | |
| elif os.getenv("GOOGLE_API_KEY"): | |
| genai.configure(api_key=os.getenv("GOOGLE_API_KEY")) | |
| else: | |
| logger.warning("No Google API key provided. Please set GOOGLE_API_KEY environment variable or pass api_key parameter.") | |
| def _initialize_model(self): | |
| """Initialize the Gemini model""" | |
| try: | |
| # Configure safety settings for more permissive responses | |
| safety_settings = { | |
| HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE, | |
| HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE, | |
| HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE, | |
| HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE, | |
| } | |
| # Generation configuration | |
| generation_config = genai.types.GenerationConfig( | |
| temperature=self.temperature, | |
| max_output_tokens=self.max_tokens, | |
| top_p=0.8, | |
| top_k=40 | |
| ) | |
| model = genai.GenerativeModel( | |
| model_name=self.model_name, | |
| generation_config=generation_config, | |
| safety_settings=safety_settings | |
| ) | |
| return model | |
| except Exception as e: | |
| logger.error(f"Failed to initialize Gemini model: {str(e)}") | |
| return None | |
| def _initialize_tools(self) -> Dict[str, GeminiTool]: | |
| """Initialize all available tools""" | |
| tools = { | |
| "calculator": CalculatorTool(), | |
| "web_search": WebSearchTool(), | |
| "file_analyzer": FileAnalyzerTool(), | |
| } | |
| return tools | |
| def _create_system_prompt(self) -> str: | |
| """Create the system prompt for the agent""" | |
| current_time = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') | |
| return f"""You are an advanced AI assistant designed to solve GAIA benchmark questions with exceptional accuracy and reasoning. | |
| GAIA (General AI Assistants) benchmark tests your ability to: | |
| 1. π§ **Complex Reasoning**: Multi-step problem solving and logical inference | |
| 2. π§ **Tool Usage**: Effective use of calculators, web search, and file analysis | |
| 3. πΌοΈ **Multimodal Understanding**: Processing text, images, data files, and documents | |
| 4. π― **Accuracy**: Providing precise, well-researched answers | |
| AVAILABLE TOOLS: | |
| - **calculator**: Advanced mathematical operations, financial calculations, statistics | |
| - **web_search**: Current information from the web using DuckDuckGo | |
| - **file_analyzer**: Analysis of text files, CSV data, JSON, and images | |
| INSTRUCTIONS: | |
| 1. **Think Step-by-Step**: Break down complex problems into logical steps | |
| 2. **Use Tools Strategically**: Choose the right tools for each task | |
| 3. **Verify Information**: Double-check calculations and search for current data when needed | |
| 4. **Be Precise**: Provide exact, accurate answers with proper reasoning | |
| 5. **Show Your Work**: Explain your thought process clearly | |
| 6. **Handle Files**: Analyze uploaded files as part of your solution process | |
| RESPONSE FORMAT: | |
| When using tools, clearly indicate: | |
| - Which tool you're using and why | |
| - The input you're providing to the tool | |
| - How the tool's output contributes to your final answer | |
| Current Date/Time (UTC): {current_time} | |
| User: AdilzhanB | |
| Remember: Your goal is to provide the most accurate and well-reasoned answer possible for each GAIA question.""" | |
| def _identify_required_tools(self, question: str, file_path: Optional[str] = None) -> List[str]: | |
| """Identify which tools might be needed for a question""" | |
| required_tools = [] | |
| question_lower = question.lower() | |
| # Mathematical operations | |
| math_keywords = ['calculate', 'compute', 'math', 'formula', 'equation', | |
| 'interest', 'percentage', 'average', 'sum', 'multiply', | |
| 'divide', 'square root', 'logarithm', 'statistics'] | |
| if any(keyword in question_lower for keyword in math_keywords): | |
| required_tools.append('calculator') | |
| # Current/recent information | |
| current_keywords = ['current', 'latest', 'recent', 'today', '2024', '2025', | |
| 'now', 'present', 'up-to-date', 'newest'] | |
| search_keywords = ['population', 'price', 'news', 'event', 'happening'] | |
| if any(keyword in question_lower for keyword in current_keywords + search_keywords): | |
| required_tools.append('web_search') | |
| # File analysis | |
| if file_path or any(keyword in question_lower for keyword in | |
| ['file', 'document', 'image', 'data', 'csv', 'analyze', 'uploaded']): | |
| required_tools.append('file_analyzer') | |
| return required_tools | |
| def _use_tool(self, tool_name: str, input_data: str) -> str: | |
| """Execute a specific tool with given input""" | |
| if tool_name not in self.tools: | |
| return f"Tool '{tool_name}' not available." | |
| try: | |
| result = self.tools[tool_name].execute(input_data) | |
| return result | |
| except Exception as e: | |
| return f"Error using {tool_name}: {str(e)}" | |
| def _process_image_for_gemini(self, file_path: str) -> Optional[dict]: | |
| """Process image file for Gemini's multimodal capabilities""" | |
| try: | |
| with open(file_path, 'rb') as f: | |
| image_data = f.read() | |
| # Convert to format Gemini expects | |
| import mimetypes | |
| mime_type, _ = mimetypes.guess_type(file_path) | |
| return { | |
| 'mime_type': mime_type or 'image/jpeg', | |
| 'data': image_data | |
| } | |
| except Exception as e: | |
| logger.error(f"Error processing image: {str(e)}") | |
| return None | |
| def solve_gaia_question(self, gaia_question: GAIAQuestion) -> Dict[str, Any]: | |
| """ | |
| Main method to solve a GAIA benchmark question | |
| """ | |
| start_time = datetime.utcnow() | |
| logger.info(f"Solving GAIA Question {gaia_question.question_id} (Level {gaia_question.level})") | |
| if not self.model: | |
| return { | |
| "question_id": gaia_question.question_id, | |
| "error": "Model not initialized. Please check your Google API key.", | |
| "timestamp": start_time.isoformat() | |
| } | |
| try: | |
| # Step 1: Analyze question and identify required tools | |
| required_tools = self._identify_required_tools(gaia_question.question, gaia_question.file_path) | |
| # Step 2: Gather context from tools | |
| tool_results = {} | |
| reasoning_steps = [] | |
| # File analysis first (if applicable) | |
| if gaia_question.file_path and os.path.exists(gaia_question.file_path): | |
| reasoning_steps.append(f"π Analyzing uploaded file: {gaia_question.file_name}") | |
| file_analysis = self._use_tool("file_analyzer", gaia_question.file_path) | |
| tool_results["file_analyzer"] = file_analysis | |
| reasoning_steps.append(f"β File analysis completed") | |
| # Use other tools as needed | |
| for tool_name in required_tools: | |
| if tool_name != "file_analyzer": # Already handled above | |
| reasoning_steps.append(f"π§ Using {tool_name} tool") | |
| if tool_name == "web_search": | |
| # Extract search query from question | |
| search_query = gaia_question.question | |
| tool_result = self._use_tool(tool_name, search_query) | |
| elif tool_name == "calculator": | |
| # For now, we'll let Gemini decide what to calculate | |
| tool_result = "Calculator tool available for mathematical operations" | |
| else: | |
| tool_result = self._use_tool(tool_name, gaia_question.question) | |
| tool_results[tool_name] = tool_result | |
| reasoning_steps.append(f"β {tool_name} completed") | |
| # Step 3: Prepare content for Gemini | |
| content_parts = [] | |
| # System prompt and question | |
| prompt = f"""{self._create_system_prompt()} | |
| GAIA BENCHMARK QUESTION (Level {gaia_question.level}): | |
| Question ID: {gaia_question.question_id} | |
| Question: {gaia_question.question} | |
| AVAILABLE TOOL RESULTS: | |
| {json.dumps(tool_results, indent=2) if tool_results else "No tools used yet."} | |
| TASK: | |
| Solve this GAIA question step by step. You may request specific tool usage if needed by clearly stating: | |
| "USE_TOOL: [tool_name] with input: [input_data]" | |
| Provide your complete reasoning and final answer.""" | |
| content_parts.append(prompt) | |
| # Add image if it's an image file | |
| if (gaia_question.file_path and | |
| Path(gaia_question.file_path).suffix.lower() in ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp']): | |
| image_data = self._process_image_for_gemini(gaia_question.file_path) | |
| if image_data: | |
| content_parts.append(image_data) | |
| reasoning_steps.append("πΌοΈ Image included for visual analysis") | |
| # Step 4: Generate response with Gemini | |
| reasoning_steps.append("π€ Generating response with Gemini...") | |
| response = self.model.generate_content(content_parts) | |
| if not response or not response.text: | |
| raise Exception("Empty response from Gemini model") | |
| agent_response = response.text | |
| reasoning_steps.append("β Response generated successfully") | |
| # Step 5: Process any additional tool requests | |
| if "USE_TOOL:" in agent_response: | |
| reasoning_steps.append("π§ Processing additional tool requests...") | |
| agent_response = self._process_tool_requests(agent_response, reasoning_steps) | |
| # Step 6: Calculate confidence and metrics | |
| confidence_score = self._calculate_confidence(agent_response, tool_results) | |
| end_time = datetime.utcnow() | |
| processing_time = (end_time - start_time).total_seconds() | |
| # Step 7: Prepare final result | |
| result = { | |
| "question_id": gaia_question.question_id, | |
| "question": gaia_question.question, | |
| "level": gaia_question.level, | |
| "agent_response": agent_response, | |
| "reasoning_steps": reasoning_steps, | |
| "tools_used": list(tool_results.keys()), | |
| "tool_results": tool_results, | |
| "confidence_score": confidence_score, | |
| "processing_time_seconds": processing_time, | |
| "timestamp": end_time.isoformat(), | |
| "model_used": self.model_name, | |
| "agent_version": "1.0-gemini" | |
| } | |
| # Add to conversation history | |
| self.conversation_history.append(result) | |
| logger.info(f"Question {gaia_question.question_id} solved successfully in {processing_time:.2f}s") | |
| return result | |
| except Exception as e: | |
| error_msg = f"Error solving question: {str(e)}" | |
| logger.error(error_msg) | |
| return { | |
| "question_id": gaia_question.question_id, | |
| "question": gaia_question.question, | |
| "level": gaia_question.level, | |
| "agent_response": f"Error: {error_msg}", | |
| "error": True, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "model_used": self.model_name | |
| } | |
| def _process_tool_requests(self, response: str, reasoning_steps: List[str]) -> str: | |
| """Process tool usage requests from Gemini's response""" | |
| lines = response.split('\n') | |
| processed_response = [] | |
| for line in lines: | |
| if line.strip().startswith("USE_TOOL:"): | |
| try: | |
| # Parse tool request: "USE_TOOL: calculator with input: 2+2" | |
| parts = line.split("USE_TOOL:")[1].strip() | |
| tool_name = parts.split("with input:")[0].strip() | |
| tool_input = parts.split("with input:")[1].strip() | |
| reasoning_steps.append(f"π§ Executing {tool_name} with input: {tool_input}") | |
| # Execute the tool | |
| tool_result = self._use_tool(tool_name, tool_input) | |
| # Replace the tool request with the result | |
| processed_response.append(f"Tool Result ({tool_name}): {tool_result}") | |
| reasoning_steps.append(f"β {tool_name} executed successfully") | |
| except Exception as e: | |
| processed_response.append(f"Tool Error: {str(e)}") | |
| reasoning_steps.append(f"β Tool execution failed: {str(e)}") | |
| else: | |
| processed_response.append(line) | |
| return '\n'.join(processed_response) | |
| def _calculate_confidence(self, response: str, tool_results: Dict) -> float: | |
| """Calculate confidence score based on various factors""" | |
| confidence = 0.5 # Base confidence | |
| # Increase confidence for detailed responses | |
| if len(response) > 200: | |
| confidence += 0.1 | |
| # Increase confidence for tool usage | |
| if tool_results: | |
| confidence += 0.2 | |
| # Increase confidence for structured responses | |
| if any(marker in response for marker in ['Step', 'Analysis:', 'Result:', 'Conclusion:']): | |
| confidence += 0.1 | |
| # Decrease confidence for uncertainty indicators | |
| uncertainty_words = ['uncertain', 'unclear', 'might', 'possibly', 'approximately', 'estimate'] | |
| if any(word in response.lower() for word in uncertainty_words): | |
| confidence -= 0.1 | |
| # Increase confidence for numerical precision | |
| if any(char.isdigit() for char in response): | |
| confidence += 0.1 | |
| return max(0.0, min(1.0, confidence)) | |
| def get_available_tools(self) -> List[str]: | |
| """Get list of available tool names""" | |
| return list(self.tools.keys()) | |
| def test_tools(self) -> Dict[str, str]: | |
| """Test all tools to ensure they're working""" | |
| test_results = {} | |
| for tool_name, tool in self.tools.items(): | |
| try: | |
| if tool_name == "calculator": | |
| result = tool.execute("sqrt(16)") | |
| elif tool_name == "web_search": | |
| result = tool.execute("test search query") | |
| elif tool_name == "file_analyzer": | |
| # Create a temporary test file | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: | |
| f.write("Test file content") | |
| temp_path = f.name | |
| result = tool.execute(temp_path) | |
| os.unlink(temp_path) # Clean up | |
| else: | |
| result = "Tool available" | |
| test_results[tool_name] = f"β Working: {result[:100]}..." | |
| except Exception as e: | |
| test_results[tool_name] = f"β Error: {str(e)}" | |
| return test_results | |
| def get_conversation_history(self, limit: int = 5) -> List[Dict]: | |
| """Get recent conversation history""" | |
| return self.conversation_history[-limit:] if self.conversation_history else [] | |
| # Example usage and testing | |
| if __name__ == "__main__": | |
| import sys | |
| # Check for API key | |
| if not os.getenv("GOOGLE_API_KEY"): | |
| print("β οΈ Please set your GOOGLE_API_KEY environment variable") | |
| print("You can get one from: https://makersuite.google.com/app/apikey") | |
| sys.exit(1) | |
| # Initialize agent | |
| print("π Initializing Gemini GAIA Agent...") | |
| agent = GeminiGAIAAgent(verbose=True) | |
| # Test tools | |
| print("\nπ§ Testing tools...") | |
| tool_results = agent.test_tools() | |
| for tool, result in tool_results.items(): | |
| print(f" {tool}: {result}") | |
| # Test with sample questions | |
| sample_questions = [ | |
| GAIAQuestion( | |
| question_id="test_001", | |
| question="What is the square root of 144?", | |
| level=1 | |
| ), | |
| GAIAQuestion( | |
| question_id="test_002", | |
| question="If I invest $1000 at 5% annual compound interest, how much will I have after 3 years?", | |
| level=2 | |
| ), | |
| GAIAQuestion( | |
| question_id="test_003", | |
| question="What is the current population of Tokyo according to the latest data?", | |
| level=2 | |
| ) | |
| ] | |
| print("\nπ Testing sample questions...") | |
| for question in sample_questions: | |
| print(f"\n{'='*60}") | |
| result = agent.solve_gaia_question(question) | |
| print(f"Question: {result['question']}") | |
| print(f"Level: {result['level']}") | |
| print(f"Tools Used: {result.get('tools_used', [])}") | |
| print(f"Confidence: {result.get('confidence_score', 0):.2f}") | |
| print(f"Answer: {result['agent_response'][:300]}...") | |
| if result.get('error'): | |
| print(f"β Error occurred: {result.get('agent_response')}") |