Spaces:
Runtime error
Runtime error
| import os | |
| import re | |
| import json | |
| import base64 | |
| import requests | |
| import wikipediaapi | |
| import numpy as np | |
| import pandas as pd | |
| from typing import Dict, Any, List, Optional, Union | |
| from PIL import Image | |
| import pytesseract | |
| import io | |
| from datetime import datetime | |
| import ast | |
| import operator | |
| import math | |
| from functools import reduce | |
| import speech_recognition as sr | |
| from pydub import AudioSegment | |
| import tempfile | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| # Initialize Wikipedia API | |
| wiki_wiki = wikipediaapi.Wikipedia('GAIA-Agent/1.0', 'en') | |
| # Tool implementations | |
| def web_search_tool(query: str, num_results: int = 5) -> str: | |
| """Search the web using DuckDuckGo""" | |
| try: | |
| from duckduckgo_search import DDGS | |
| ddgs = DDGS() | |
| results = list(ddgs.text(query, max_results=num_results)) | |
| if not results: | |
| return "No search results found." | |
| formatted_results = [] | |
| for i, result in enumerate(results): | |
| formatted_results.append( | |
| f"{i+1}. {result['title']}\n" | |
| f" Link: {result['link']}\n" | |
| f" Snippet: {result['body']}" | |
| ) | |
| return "\n\n".join(formatted_results) | |
| except Exception as e: | |
| logger.error(f"Web search error: {str(e)}") | |
| return f"Web search failed: {str(e)}" | |
| def wikipedia_tool(query: str) -> str: | |
| """Search and get content from Wikipedia""" | |
| try: | |
| # Try to get page directly | |
| page = wiki_wiki.page(query) | |
| if page.exists(): | |
| # Get summary (first 1000 characters) | |
| summary = page.summary[:1000] if len(page.summary) > 1000 else page.summary | |
| return f"Title: {page.title}\n\nSummary: {summary}...\n\nURL: {page.fullurl}" | |
| else: | |
| # Search for pages | |
| from duckduckgo_search import DDGS | |
| ddgs = DDGS() | |
| search_query = f"site:wikipedia.org {query}" | |
| results = list(ddgs.text(search_query, max_results=3)) | |
| if results: | |
| # Try to extract Wikipedia page title from first result | |
| first_result = results[0] | |
| if 'wikipedia.org/wiki/' in first_result['link']: | |
| page_title = first_result['link'].split('/wiki/')[-1].replace('_', ' ') | |
| page = wiki_wiki.page(page_title) | |
| if page.exists(): | |
| summary = page.summary[:1000] if len(page.summary) > 1000 else page.summary | |
| return f"Title: {page.title}\n\nSummary: {summary}...\n\nURL: {page.fullurl}" | |
| # Return search results if can't get page | |
| formatted_results = [] | |
| for result in results: | |
| formatted_results.append(f"- {result['title']}: {result['body'][:200]}...") | |
| return "Wikipedia search results:\n" + "\n".join(formatted_results) | |
| return "No Wikipedia results found." | |
| except Exception as e: | |
| logger.error(f"Wikipedia error: {str(e)}") | |
| # Fallback to web search | |
| return web_search_tool(f"site:wikipedia.org {query}", num_results=3) | |
| def calculator_tool(expression: str) -> str: | |
| """Evaluate mathematical expressions safely""" | |
| try: | |
| # Define allowed operations | |
| allowed_names = { | |
| k: v for k, v in math.__dict__.items() if not k.startswith("__") | |
| } | |
| allowed_names.update({ | |
| "abs": abs, "round": round, "min": min, "max": max, | |
| "sum": sum, "len": len, "sorted": sorted | |
| }) | |
| # Parse and evaluate | |
| node = ast.parse(expression, mode='eval') | |
| # Safety check | |
| for n in ast.walk(node): | |
| if isinstance(n, ast.Name) and n.id not in allowed_names: | |
| raise ValueError(f"Unsafe operation: {n.id}") | |
| result = eval(compile(ast.parse(expression, mode='eval'), '<string>', 'eval'), | |
| {"__builtins__": {}}, allowed_names) | |
| return str(result) | |
| except Exception as e: | |
| logger.error(f"Calculator error: {str(e)}") | |
| return f"Calculation failed: {str(e)}" | |
| def python_repl_tool(code: str) -> str: | |
| """Execute Python code in a safe environment""" | |
| try: | |
| import subprocess | |
| import tempfile | |
| # Create temporary file | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: | |
| f.write(code) | |
| temp_filename = f.name | |
| # Execute with timeout | |
| result = subprocess.run( | |
| ['python', temp_filename], | |
| capture_output=True, | |
| text=True, | |
| timeout=10 | |
| ) | |
| # Clean up | |
| os.unlink(temp_filename) | |
| output = result.stdout | |
| if result.stderr: | |
| output += f"\nErrors:\n{result.stderr}" | |
| return output if output else "Code executed successfully with no output." | |
| except subprocess.TimeoutExpired: | |
| return "Code execution timed out (10 second limit)" | |
| except Exception as e: | |
| logger.error(f"Python REPL error: {str(e)}") | |
| return f"Code execution failed: {str(e)}" | |
| def image_analysis_tool(image_path: str, query: str = "") -> str: | |
| """Analyze images using OCR and basic computer vision""" | |
| try: | |
| # Handle base64 encoded images | |
| if image_path.startswith('data:'): | |
| header, encoded = image_path.split(',', 1) | |
| data = base64.b64decode(encoded) | |
| image = Image.open(io.BytesIO(data)) | |
| else: | |
| # Check if file exists in uploaded files | |
| uploaded_files = json.loads(os.environ.get("UPLOADED_FILES", "[]")) | |
| if uploaded_files and not os.path.exists(image_path): | |
| # Try to find the file in uploaded files | |
| for file_path in uploaded_files: | |
| if os.path.basename(file_path) == os.path.basename(image_path): | |
| image_path = file_path | |
| break | |
| image = Image.open(image_path) | |
| # Perform OCR | |
| text = pytesseract.image_to_string(image) | |
| # Basic image properties | |
| width, height = image.size | |
| mode = image.mode | |
| result = f"Image properties: {width}x{height}, {mode} mode\n\n" | |
| if text.strip(): | |
| result += f"OCR Text:\n{text}\n" | |
| else: | |
| result += "No text detected in image.\n" | |
| # If specific query, try to answer it | |
| if query: | |
| result += f"\nRegarding '{query}': " | |
| if query.lower() in text.lower(): | |
| result += "Found in image text." | |
| else: | |
| result += "Not found in image text." | |
| return result | |
| except Exception as e: | |
| logger.error(f"Image analysis error: {str(e)}") | |
| return f"Image analysis failed: {str(e)}" | |
| def file_reader_tool(file_path: str, query: str = "") -> str: | |
| """Read and analyze various file types""" | |
| try: | |
| # Check uploaded files | |
| uploaded_files = json.loads(os.environ.get("UPLOADED_FILES", "[]")) | |
| if uploaded_files and not os.path.exists(file_path): | |
| # Try to find the file in uploaded files | |
| for uploaded_path in uploaded_files: | |
| if os.path.basename(uploaded_path) == os.path.basename(file_path): | |
| file_path = uploaded_path | |
| break | |
| if not os.path.exists(file_path): | |
| return f"File not found: {file_path}" | |
| file_ext = os.path.splitext(file_path)[1].lower() | |
| if file_ext in ['.txt', '.md', '.py', '.json', '.xml', '.html']: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| return f"File content:\n{content[:2000]}{'...' if len(content) > 2000 else ''}" | |
| elif file_ext in ['.csv']: | |
| # Try multiple encodings and delimiters | |
| encodings = ['utf-8', 'latin1', 'iso-8859-1', 'cp1252'] | |
| delimiters = [',', ';', '\t', '|'] | |
| df = None | |
| for encoding in encodings: | |
| for delimiter in delimiters: | |
| try: | |
| df = pd.read_csv(file_path, encoding=encoding, delimiter=delimiter) | |
| if len(df.columns) > 1: # Successful parse | |
| break | |
| except: | |
| continue | |
| if df is not None and len(df.columns) > 1: | |
| break | |
| if df is None: | |
| return "Failed to parse CSV file with multiple encoding/delimiter attempts" | |
| info = f"CSV file with {len(df)} rows and {len(df.columns)} columns.\n" | |
| info += f"Columns: {', '.join(df.columns)}\n\n" | |
| info += f"First 5 rows:\n{df.head().to_string()}\n\n" | |
| info += f"Data types:\n{df.dtypes.to_string()}" | |
| # Check for date columns and analyze if query mentions time | |
| if query and any(word in query.lower() for word in ['month', 'year', 'date', 'january', 'february', 'march', 'april', 'may', 'june', 'july', 'august', 'september', 'october', 'november', 'december']): | |
| from search_strategies import DataAnalysisStrategy | |
| temporal_result = DataAnalysisStrategy.analyze_for_temporal_data(df, query) | |
| if temporal_result is not None: | |
| info += f"\n\nTemporal analysis result:\n{temporal_result.head(10).to_string()}" | |
| return info | |
| elif file_ext in ['.xlsx', '.xls']: | |
| df = pd.read_excel(file_path) | |
| info = f"Excel file with {len(df)} rows and {len(df.columns)} columns.\n" | |
| info += f"Columns: {', '.join(df.columns)}\n\n" | |
| info += f"First 5 rows:\n{df.head().to_string()}" | |
| return info | |
| elif file_ext in ['.jpg', '.jpeg', '.png', '.gif', '.bmp']: | |
| return image_analysis_tool(file_path, query) | |
| elif file_ext in ['.mp3', '.wav', '.m4a']: | |
| return audio_analysis_tool(file_path) | |
| else: | |
| return f"Unsupported file type: {file_ext}" | |
| except Exception as e: | |
| logger.error(f"File reader error: {str(e)}") | |
| return f"Failed to read file: {str(e)}" | |
| def audio_analysis_tool(audio_path: str) -> str: | |
| """Analyze audio files and extract speech""" | |
| try: | |
| recognizer = sr.Recognizer() | |
| # Check uploaded files | |
| uploaded_files = json.loads(os.environ.get("UPLOADED_FILES", "[]")) | |
| if uploaded_files and not os.path.exists(audio_path): | |
| for uploaded_path in uploaded_files: | |
| if os.path.basename(uploaded_path) == os.path.basename(audio_path): | |
| audio_path = uploaded_path | |
| break | |
| # Convert to WAV if needed | |
| if not audio_path.endswith('.wav'): | |
| audio = AudioSegment.from_file(audio_path) | |
| with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_file: | |
| audio.export(tmp_file.name, format='wav') | |
| wav_path = tmp_file.name | |
| else: | |
| wav_path = audio_path | |
| # Perform speech recognition | |
| with sr.AudioFile(wav_path) as source: | |
| audio_data = recognizer.record(source) | |
| try: | |
| text = recognizer.recognize_google(audio_data) | |
| result = f"Transcribed text: {text}" | |
| except sr.UnknownValueError: | |
| result = "Could not understand audio" | |
| except sr.RequestError as e: | |
| result = f"Speech recognition error: {str(e)}" | |
| # Clean up temp file | |
| if wav_path != audio_path and os.path.exists(wav_path): | |
| os.unlink(wav_path) | |
| return result | |
| except Exception as e: | |
| logger.error(f"Audio analysis error: {str(e)}") | |
| return f"Audio analysis failed: {str(e)}" | |
| def data_analysis_tool(file_path: str, operation: str, **kwargs) -> str: | |
| """Perform data analysis operations on CSV/Excel files""" | |
| try: | |
| # Check uploaded files | |
| uploaded_files = json.loads(os.environ.get("UPLOADED_FILES", "[]")) | |
| if uploaded_files and not os.path.exists(file_path): | |
| for uploaded_path in uploaded_files: | |
| if os.path.basename(uploaded_path) == os.path.basename(file_path): | |
| file_path = uploaded_path | |
| break | |
| # Load data | |
| if file_path.endswith('.csv'): | |
| df = pd.read_csv(file_path) | |
| else: | |
| df = pd.read_excel(file_path) | |
| # Perform requested operation | |
| if operation == "sum": | |
| column = kwargs.get('column') | |
| if column and column in df.columns: | |
| result = df[column].sum() | |
| return f"Sum of {column}: {result}" | |
| return f"Column '{column}' not found" | |
| elif operation == "mean": | |
| column = kwargs.get('column') | |
| if column and column in df.columns: | |
| result = df[column].mean() | |
| return f"Mean of {column}: {result}" | |
| return f"Column '{column}' not found" | |
| elif operation == "count": | |
| column = kwargs.get('column') | |
| value = kwargs.get('value') | |
| if column and column in df.columns: | |
| if value: | |
| result = len(df[df[column] == value]) | |
| return f"Count of {column}={value}: {result}" | |
| else: | |
| result = df[column].value_counts() | |
| return f"Value counts for {column}:\n{result.to_string()}" | |
| return f"Column '{column}' not found" | |
| elif operation == "groupby": | |
| group_column = kwargs.get('group_column') | |
| agg_column = kwargs.get('agg_column') | |
| agg_func = kwargs.get('agg_func', 'sum') | |
| if group_column and agg_column: | |
| result = df.groupby(group_column)[agg_column].agg(agg_func) | |
| return f"Grouped results:\n{result.to_string()}" | |
| return "Missing group_column or agg_column" | |
| elif operation == "filter": | |
| condition = kwargs.get('condition') | |
| if condition: | |
| filtered_df = df.query(condition) | |
| return f"Filtered data ({len(filtered_df)} rows):\n{filtered_df.head().to_string()}" | |
| return "Missing filter condition" | |
| elif operation == "describe": | |
| return f"Data description:\n{df.describe().to_string()}" | |
| elif operation == "info": | |
| buffer = io.StringIO() | |
| df.info(buf=buffer) | |
| return buffer.getvalue() | |
| return "Operation not recognized or missing parameters." | |
| except Exception as e: | |
| logger.error(f"Data analysis error: {str(e)}") | |
| return f"Data analysis failed: {str(e)}" | |
| # Tool schemas for function calling | |
| tool_schemas = { | |
| "web_search": { | |
| "name": "web_search", | |
| "description": "Search the web for current information", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "query": {"type": "string", "description": "Search query"}, | |
| "num_results": {"type": "integer", "description": "Number of results", "default": 5} | |
| }, | |
| "required": ["query"] | |
| } | |
| }, | |
| "wikipedia": { | |
| "name": "wikipedia", | |
| "description": "Search Wikipedia for information", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "query": {"type": "string", "description": "Wikipedia search query"} | |
| }, | |
| "required": ["query"] | |
| } | |
| }, | |
| "calculator": { | |
| "name": "calculator", | |
| "description": "Perform mathematical calculations", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "expression": {"type": "string", "description": "Mathematical expression to evaluate"} | |
| }, | |
| "required": ["expression"] | |
| } | |
| }, | |
| "python_repl": { | |
| "name": "python_repl", | |
| "description": "Execute Python code", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "code": {"type": "string", "description": "Python code to execute"} | |
| }, | |
| "required": ["code"] | |
| } | |
| }, | |
| "image_analysis": { | |
| "name": "image_analysis", | |
| "description": "Analyze images with OCR and computer vision", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "image_path": {"type": "string", "description": "Path to image file"}, | |
| "query": {"type": "string", "description": "What to look for in the image", "default": ""} | |
| }, | |
| "required": ["image_path"] | |
| } | |
| }, | |
| "file_reader": { | |
| "name": "file_reader", | |
| "description": "Read and analyze various file types", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "file_path": {"type": "string", "description": "Path to file"}, | |
| "query": {"type": "string", "description": "What to look for", "default": ""} | |
| }, | |
| "required": ["file_path"] | |
| } | |
| }, | |
| "data_analysis": { | |
| "name": "data_analysis", | |
| "description": "Perform data analysis on CSV/Excel files", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "file_path": {"type": "string", "description": "Path to data file"}, | |
| "operation": {"type": "string", "description": "Operation: sum, mean, count, groupby, filter, describe, info"}, | |
| "kwargs": {"type": "object", "description": "Additional parameters for the operation"} | |
| }, | |
| "required": ["file_path", "operation"] | |
| } | |
| } | |
| } | |
| def get_all_tools() -> Dict[str, Any]: | |
| """Return all available tools""" | |
| from langchain.tools import Tool | |
| tools = { | |
| "web_search": Tool( | |
| name="web_search", | |
| func=web_search_tool, | |
| description="Search the web for current information" | |
| ), | |
| "wikipedia": Tool( | |
| name="wikipedia", | |
| func=wikipedia_tool, | |
| description="Search Wikipedia for information" | |
| ), | |
| "calculator": Tool( | |
| name="calculator", | |
| func=calculator_tool, | |
| description="Perform mathematical calculations" | |
| ), | |
| "python_repl": Tool( | |
| name="python_repl", | |
| func=python_repl_tool, | |
| description="Execute Python code" | |
| ), | |
| "image_analysis": Tool( | |
| name="image_analysis", | |
| func=image_analysis_tool, | |
| description="Analyze images with OCR" | |
| ), | |
| "file_reader": Tool( | |
| name="file_reader", | |
| func=file_reader_tool, | |
| description="Read various file types" | |
| ), | |
| "data_analysis": Tool( | |
| name="data_analysis", | |
| func=data_analysis_tool, | |
| description="Analyze data files" | |
| ) | |
| } | |
| return tools |