Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import json | |
| import requests | |
| import pandas as pd | |
| from pathlib import Path | |
| from typing import Optional, Union, Dict, Any, List | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # Simple tool-based agent without LangGraph for now | |
| class SimpleAgent: | |
| """Simple agent with tool capabilities""" | |
| def __init__(self, llm): | |
| self.llm = llm | |
| self.tools = { | |
| 'search_web': self.search_web, | |
| 'search_wikipedia': self.search_wikipedia, | |
| 'execute_python': self.execute_python, | |
| 'read_excel_file': self.read_excel_file, | |
| 'read_text_file': self.read_text_file, | |
| } | |
| def search_web(self, query: str) -> str: | |
| """Search the web using DuckDuckGo for current information.""" | |
| try: | |
| search_url = f"https://api.duckduckgo.com/?q={query}&format=json&no_html=1&skip_disambig=1" | |
| response = requests.get(search_url, timeout=10) | |
| if response.status_code == 200: | |
| data = response.json() | |
| results = [] | |
| if data.get("AbstractText"): | |
| results.append(f"Abstract: {data['AbstractText']}") | |
| if data.get("RelatedTopics"): | |
| for topic in data["RelatedTopics"][:3]: | |
| if isinstance(topic, dict) and topic.get("Text"): | |
| results.append(f"Related: {topic['Text']}") | |
| if results: | |
| return "\n".join(results) | |
| else: | |
| return f"Search performed for '{query}' but no specific results found." | |
| else: | |
| return f"Search failed with status code {response.status_code}" | |
| except Exception as e: | |
| return f"Search error: {str(e)}" | |
| def search_wikipedia(self, query: str) -> str: | |
| """Search Wikipedia for factual information.""" | |
| try: | |
| search_url = "https://en.wikipedia.org/api/rest_v1/page/summary/" + query.replace(" ", "_") | |
| response = requests.get(search_url, timeout=10) | |
| if response.status_code == 200: | |
| data = response.json() | |
| extract = data.get("extract", "") | |
| if extract: | |
| return f"Wikipedia: {extract[:500]}..." | |
| else: | |
| return f"Wikipedia page found for '{query}' but no extract available." | |
| else: | |
| return f"Wikipedia search failed for '{query}'" | |
| except Exception as e: | |
| return f"Wikipedia search error: {str(e)}" | |
| def execute_python(self, code: str) -> str: | |
| """Execute Python code and return the result.""" | |
| try: | |
| import io | |
| import sys | |
| safe_globals = { | |
| '__builtins__': { | |
| 'print': print, 'len': len, 'str': str, 'int': int, 'float': float, | |
| 'bool': bool, 'list': list, 'dict': dict, 'tuple': tuple, 'set': set, | |
| 'range': range, 'sum': sum, 'max': max, 'min': min, 'abs': abs, | |
| 'round': round, 'sorted': sorted, 'enumerate': enumerate, 'zip': zip, | |
| }, | |
| 'math': __import__('math'), | |
| 'json': __import__('json'), | |
| } | |
| old_stdout = sys.stdout | |
| sys.stdout = mystdout = io.StringIO() | |
| try: | |
| exec(code, safe_globals) | |
| output = mystdout.getvalue() | |
| finally: | |
| sys.stdout = old_stdout | |
| return output if output else "Code executed successfully (no output)" | |
| except Exception as e: | |
| return f"Python execution error: {str(e)}" | |
| def read_excel_file(self, file_path: str, sheet_name: Optional[str] = None) -> str: | |
| """Read an Excel file and return its contents.""" | |
| try: | |
| file_path_obj = Path(file_path) | |
| if not file_path_obj.exists(): | |
| return f"Error: File not found at {file_path}" | |
| if sheet_name and sheet_name.isdigit(): | |
| sheet_name = int(sheet_name) | |
| elif sheet_name is None: | |
| sheet_name = 0 | |
| df = pd.read_excel(file_path, sheet_name=sheet_name) | |
| if len(df) > 20: | |
| result = f"Excel file with {len(df)} rows and {len(df.columns)} columns:\n\n" | |
| result += "First 10 rows:\n" + df.head(10).to_string(index=False) | |
| result += f"\n\n... ({len(df) - 20} rows omitted) ...\n\n" | |
| result += "Last 10 rows:\n" + df.tail(10).to_string(index=False) | |
| else: | |
| result = f"Excel file with {len(df)} rows and {len(df.columns)} columns:\n\n" | |
| result += df.to_string(index=False) | |
| return result | |
| except Exception as e: | |
| return f"Error reading Excel file: {str(e)}" | |
| def read_text_file(self, file_path: str) -> str: | |
| """Read a text file and return its contents.""" | |
| try: | |
| file_path_obj = Path(file_path) | |
| if not file_path_obj.exists(): | |
| return f"Error: File not found at {file_path}" | |
| encodings = ['utf-8', 'utf-16', 'iso-8859-1', 'cp1252'] | |
| for encoding in encodings: | |
| try: | |
| with open(file_path_obj, 'r', encoding=encoding) as f: | |
| content = f.read() | |
| return f"File content ({encoding} encoding):\n\n{content}" | |
| except UnicodeDecodeError: | |
| continue | |
| return f"Error: Could not decode file with any standard encoding" | |
| except Exception as e: | |
| return f"Error reading file: {str(e)}" | |
| def run(self, question: str) -> str: | |
| """Run the agent with tool usage""" | |
| # First, try to answer directly | |
| direct_response = self.llm(f""" | |
| Question: {question} | |
| Think step by step. If this question requires: | |
| - Web search for current information, say "NEED_SEARCH: <search query>" | |
| - Mathematical calculation, say "NEED_PYTHON: <python code>" | |
| - Wikipedia lookup, say "NEED_WIKI: <search term>" | |
| - File analysis (if file path mentioned), say "NEED_FILE: <file_path>" | |
| Otherwise, provide a direct answer. | |
| Your response:""") | |
| # Check if tools are needed | |
| if "NEED_SEARCH:" in direct_response: | |
| search_query = direct_response.split("NEED_SEARCH:")[1].strip() | |
| search_result = self.search_web(search_query) | |
| return self.llm(f"Question: {question}\n\nSearch results: {search_result}\n\nFinal answer:") | |
| elif "NEED_PYTHON:" in direct_response: | |
| code = direct_response.split("NEED_PYTHON:")[1].strip() | |
| exec_result = self.execute_python(code) | |
| return self.llm(f"Question: {question}\n\nCalculation result: {exec_result}\n\nFinal answer:") | |
| elif "NEED_WIKI:" in direct_response: | |
| wiki_query = direct_response.split("NEED_WIKI:")[1].strip() | |
| wiki_result = self.search_wikipedia(wiki_query) | |
| return self.llm(f"Question: {question}\n\nWikipedia info: {wiki_result}\n\nFinal answer:") | |
| elif "NEED_FILE:" in direct_response: | |
| file_path = direct_response.split("NEED_FILE:")[1].strip() | |
| if file_path.endswith(('.xlsx', '.xls')): | |
| file_content = self.read_excel_file(file_path) | |
| else: | |
| file_content = self.read_text_file(file_path) | |
| return self.llm(f"Question: {question}\n\nFile content: {file_content}\n\nFinal answer:") | |
| else: | |
| return direct_response | |
| class OpenRouterLLM: | |
| """Simple OpenRouter LLM wrapper""" | |
| def __init__(self, model: str = "deepseek/deepseek-v3.1-terminus"): | |
| self.api_key = os.getenv("OPENROUTER_API_KEY") or os.getenv("my_key") | |
| self.model = model | |
| self.base_url = "https://openrouter.ai/api/v1/chat/completions" | |
| def __call__(self, prompt: str, max_tokens: int = 1500, temperature: float = 0.1) -> str: | |
| """Make API call to OpenRouter""" | |
| if not self.api_key or not self.api_key.startswith('sk-or-v1-'): | |
| return "Error: Invalid OpenRouter API key" | |
| headers = { | |
| "Authorization": f"Bearer {self.api_key}", | |
| "Content-Type": "application/json", | |
| } | |
| payload = { | |
| "model": self.model, | |
| "messages": [ | |
| { | |
| "role": "system", | |
| "content": "You are a helpful AI assistant. Provide direct, accurate answers. For GAIA evaluation, be precise and concise." | |
| }, | |
| { | |
| "role": "user", | |
| "content": prompt | |
| } | |
| ], | |
| "temperature": temperature, | |
| "max_tokens": max_tokens, | |
| } | |
| try: | |
| response = requests.post(self.base_url, headers=headers, json=payload, timeout=30) | |
| if response.status_code != 200: | |
| return f"API Error: {response.status_code}" | |
| result = response.json() | |
| if "choices" in result and len(result["choices"]) > 0: | |
| answer = result["choices"][0]["message"]["content"].strip() | |
| return self._clean_answer(answer) | |
| else: | |
| return "Error: No response content received" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def _clean_answer(self, answer: str) -> str: | |
| """Clean the answer for GAIA evaluation""" | |
| answer = answer.strip() | |
| # Remove common prefixes | |
| prefixes = [ | |
| "Answer:", "The answer is:", "Final answer:", "Result:", | |
| "Solution:", "Based on", "Therefore", "In conclusion" | |
| ] | |
| for prefix in prefixes: | |
| if answer.lower().startswith(prefix.lower()): | |
| answer = answer[len(prefix):].strip() | |
| if answer.startswith(':'): | |
| answer = answer[1:].strip() | |
| break | |
| # Remove quotes and periods from short answers | |
| if len(answer.split()) <= 3: | |
| answer = answer.strip('"\'.') | |
| return answer | |
| class GaiaAgent: | |
| """Simple tool-based agent for GAIA tasks""" | |
| def __init__(self): | |
| print("Initializing GaiaAgent with OpenRouter DeepSeek...") | |
| # Initialize the LLM | |
| self.llm = OpenRouterLLM(model="deepseek/deepseek-v3.1-terminus") | |
| # Initialize the agent with tools | |
| self.agent = SimpleAgent(self.llm) | |
| print("GaiaAgent initialized successfully!") | |
| def __call__(self, task_id: str, question: str) -> str: | |
| """Process a question and return the answer""" | |
| try: | |
| print(f"Processing task {task_id}: {question[:100]}...") | |
| # Check if there are file references in the question | |
| enhanced_question = self._enhance_question_with_file_analysis(question) | |
| # Run the agent | |
| answer = self.agent.run(enhanced_question) | |
| # Clean up the answer | |
| clean_answer = self._clean_final_answer(answer) | |
| print(f"Agent answer for {task_id}: {clean_answer}") | |
| return clean_answer | |
| except Exception as e: | |
| error_msg = f"Agent error: {str(e)}" | |
| print(f"Error processing task {task_id}: {error_msg}") | |
| return error_msg | |
| def _enhance_question_with_file_analysis(self, question: str) -> str: | |
| """Check if question mentions files and enhance accordingly""" | |
| # Look for file path mentions in the question | |
| file_patterns = [ | |
| r'/tmp/gaia_cached_files/[^\s]+', | |
| r'saved locally at:\s*([^\s]+)', | |
| r'file.*?\.xlsx?', | |
| r'file.*?\.csv', | |
| r'file.*?\.txt' | |
| ] | |
| for pattern in file_patterns: | |
| matches = re.findall(pattern, question, re.IGNORECASE) | |
| if matches: | |
| # File found, the agent will handle it automatically | |
| break | |
| return question | |
| def _clean_final_answer(self, answer: str) -> str: | |
| """Final cleaning of the answer""" | |
| answer = answer.strip() | |
| # Look for final answer pattern | |
| if "final answer:" in answer.lower(): | |
| parts = answer.lower().split("final answer:") | |
| if len(parts) > 1: | |
| answer = answer.split(":")[-1].strip() | |
| # Remove common unnecessary phrases | |
| cleanup_phrases = [ | |
| "based on the", "according to", "the answer is", "therefore", | |
| "in conclusion", "as a result", "so the answer is" | |
| ] | |
| for phrase in cleanup_phrases: | |
| if answer.lower().startswith(phrase): | |
| answer = answer[len(phrase):].strip() | |
| break | |
| # Clean up formatting | |
| answer = answer.strip('.,;:"\'') | |
| return answer |