LuisZermeno's picture
Update tools.py
f64ef80 verified
import os
import re
import json
import base64
import requests
import wikipediaapi
import numpy as np
import pandas as pd
from typing import Dict, Any, List, Optional, Union
from PIL import Image
import pytesseract
import io
from datetime import datetime
import ast
import operator
import math
from functools import reduce
import speech_recognition as sr
from pydub import AudioSegment
import tempfile
import logging
logger = logging.getLogger(__name__)
# Initialize Wikipedia API
wiki_wiki = wikipediaapi.Wikipedia('GAIA-Agent/1.0', 'en')
# Tool implementations
def web_search_tool(query: str, num_results: int = 5) -> str:
"""Search the web using DuckDuckGo"""
try:
from duckduckgo_search import DDGS
ddgs = DDGS()
results = list(ddgs.text(query, max_results=num_results))
if not results:
return "No search results found."
formatted_results = []
for i, result in enumerate(results):
formatted_results.append(
f"{i+1}. {result['title']}\n"
f" Link: {result['link']}\n"
f" Snippet: {result['body']}"
)
return "\n\n".join(formatted_results)
except Exception as e:
logger.error(f"Web search error: {str(e)}")
return f"Web search failed: {str(e)}"
def wikipedia_tool(query: str) -> str:
"""Search and get content from Wikipedia"""
try:
# Try to get page directly
page = wiki_wiki.page(query)
if page.exists():
# Get summary (first 1000 characters)
summary = page.summary[:1000] if len(page.summary) > 1000 else page.summary
return f"Title: {page.title}\n\nSummary: {summary}...\n\nURL: {page.fullurl}"
else:
# Search for pages
from duckduckgo_search import DDGS
ddgs = DDGS()
search_query = f"site:wikipedia.org {query}"
results = list(ddgs.text(search_query, max_results=3))
if results:
# Try to extract Wikipedia page title from first result
first_result = results[0]
if 'wikipedia.org/wiki/' in first_result['link']:
page_title = first_result['link'].split('/wiki/')[-1].replace('_', ' ')
page = wiki_wiki.page(page_title)
if page.exists():
summary = page.summary[:1000] if len(page.summary) > 1000 else page.summary
return f"Title: {page.title}\n\nSummary: {summary}...\n\nURL: {page.fullurl}"
# Return search results if can't get page
formatted_results = []
for result in results:
formatted_results.append(f"- {result['title']}: {result['body'][:200]}...")
return "Wikipedia search results:\n" + "\n".join(formatted_results)
return "No Wikipedia results found."
except Exception as e:
logger.error(f"Wikipedia error: {str(e)}")
# Fallback to web search
return web_search_tool(f"site:wikipedia.org {query}", num_results=3)
def calculator_tool(expression: str) -> str:
"""Evaluate mathematical expressions safely"""
try:
# Define allowed operations
allowed_names = {
k: v for k, v in math.__dict__.items() if not k.startswith("__")
}
allowed_names.update({
"abs": abs, "round": round, "min": min, "max": max,
"sum": sum, "len": len, "sorted": sorted
})
# Parse and evaluate
node = ast.parse(expression, mode='eval')
# Safety check
for n in ast.walk(node):
if isinstance(n, ast.Name) and n.id not in allowed_names:
raise ValueError(f"Unsafe operation: {n.id}")
result = eval(compile(ast.parse(expression, mode='eval'), '<string>', 'eval'),
{"__builtins__": {}}, allowed_names)
return str(result)
except Exception as e:
logger.error(f"Calculator error: {str(e)}")
return f"Calculation failed: {str(e)}"
def python_repl_tool(code: str) -> str:
"""Execute Python code in a safe environment"""
try:
import subprocess
import tempfile
# Create temporary file
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(code)
temp_filename = f.name
# Execute with timeout
result = subprocess.run(
['python', temp_filename],
capture_output=True,
text=True,
timeout=10
)
# Clean up
os.unlink(temp_filename)
output = result.stdout
if result.stderr:
output += f"\nErrors:\n{result.stderr}"
return output if output else "Code executed successfully with no output."
except subprocess.TimeoutExpired:
return "Code execution timed out (10 second limit)"
except Exception as e:
logger.error(f"Python REPL error: {str(e)}")
return f"Code execution failed: {str(e)}"
def image_analysis_tool(image_path: str, query: str = "") -> str:
"""Analyze images using OCR and basic computer vision"""
try:
# Handle base64 encoded images
if image_path.startswith('data:'):
header, encoded = image_path.split(',', 1)
data = base64.b64decode(encoded)
image = Image.open(io.BytesIO(data))
else:
# Check if file exists in uploaded files
uploaded_files = json.loads(os.environ.get("UPLOADED_FILES", "[]"))
if uploaded_files and not os.path.exists(image_path):
# Try to find the file in uploaded files
for file_path in uploaded_files:
if os.path.basename(file_path) == os.path.basename(image_path):
image_path = file_path
break
image = Image.open(image_path)
# Perform OCR
text = pytesseract.image_to_string(image)
# Basic image properties
width, height = image.size
mode = image.mode
result = f"Image properties: {width}x{height}, {mode} mode\n\n"
if text.strip():
result += f"OCR Text:\n{text}\n"
else:
result += "No text detected in image.\n"
# If specific query, try to answer it
if query:
result += f"\nRegarding '{query}': "
if query.lower() in text.lower():
result += "Found in image text."
else:
result += "Not found in image text."
return result
except Exception as e:
logger.error(f"Image analysis error: {str(e)}")
return f"Image analysis failed: {str(e)}"
def file_reader_tool(file_path: str, query: str = "") -> str:
"""Read and analyze various file types"""
try:
# Check uploaded files
uploaded_files = json.loads(os.environ.get("UPLOADED_FILES", "[]"))
if uploaded_files and not os.path.exists(file_path):
# Try to find the file in uploaded files
for uploaded_path in uploaded_files:
if os.path.basename(uploaded_path) == os.path.basename(file_path):
file_path = uploaded_path
break
if not os.path.exists(file_path):
return f"File not found: {file_path}"
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext in ['.txt', '.md', '.py', '.json', '.xml', '.html']:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return f"File content:\n{content[:2000]}{'...' if len(content) > 2000 else ''}"
elif file_ext in ['.csv']:
# Try multiple encodings and delimiters
encodings = ['utf-8', 'latin1', 'iso-8859-1', 'cp1252']
delimiters = [',', ';', '\t', '|']
df = None
for encoding in encodings:
for delimiter in delimiters:
try:
df = pd.read_csv(file_path, encoding=encoding, delimiter=delimiter)
if len(df.columns) > 1: # Successful parse
break
except:
continue
if df is not None and len(df.columns) > 1:
break
if df is None:
return "Failed to parse CSV file with multiple encoding/delimiter attempts"
info = f"CSV file with {len(df)} rows and {len(df.columns)} columns.\n"
info += f"Columns: {', '.join(df.columns)}\n\n"
info += f"First 5 rows:\n{df.head().to_string()}\n\n"
info += f"Data types:\n{df.dtypes.to_string()}"
# Check for date columns and analyze if query mentions time
if query and any(word in query.lower() for word in ['month', 'year', 'date', 'january', 'february', 'march', 'april', 'may', 'june', 'july', 'august', 'september', 'october', 'november', 'december']):
from search_strategies import DataAnalysisStrategy
temporal_result = DataAnalysisStrategy.analyze_for_temporal_data(df, query)
if temporal_result is not None:
info += f"\n\nTemporal analysis result:\n{temporal_result.head(10).to_string()}"
return info
elif file_ext in ['.xlsx', '.xls']:
df = pd.read_excel(file_path)
info = f"Excel file with {len(df)} rows and {len(df.columns)} columns.\n"
info += f"Columns: {', '.join(df.columns)}\n\n"
info += f"First 5 rows:\n{df.head().to_string()}"
return info
elif file_ext in ['.jpg', '.jpeg', '.png', '.gif', '.bmp']:
return image_analysis_tool(file_path, query)
elif file_ext in ['.mp3', '.wav', '.m4a']:
return audio_analysis_tool(file_path)
else:
return f"Unsupported file type: {file_ext}"
except Exception as e:
logger.error(f"File reader error: {str(e)}")
return f"Failed to read file: {str(e)}"
def audio_analysis_tool(audio_path: str) -> str:
"""Analyze audio files and extract speech"""
try:
recognizer = sr.Recognizer()
# Check uploaded files
uploaded_files = json.loads(os.environ.get("UPLOADED_FILES", "[]"))
if uploaded_files and not os.path.exists(audio_path):
for uploaded_path in uploaded_files:
if os.path.basename(uploaded_path) == os.path.basename(audio_path):
audio_path = uploaded_path
break
# Convert to WAV if needed
if not audio_path.endswith('.wav'):
audio = AudioSegment.from_file(audio_path)
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_file:
audio.export(tmp_file.name, format='wav')
wav_path = tmp_file.name
else:
wav_path = audio_path
# Perform speech recognition
with sr.AudioFile(wav_path) as source:
audio_data = recognizer.record(source)
try:
text = recognizer.recognize_google(audio_data)
result = f"Transcribed text: {text}"
except sr.UnknownValueError:
result = "Could not understand audio"
except sr.RequestError as e:
result = f"Speech recognition error: {str(e)}"
# Clean up temp file
if wav_path != audio_path and os.path.exists(wav_path):
os.unlink(wav_path)
return result
except Exception as e:
logger.error(f"Audio analysis error: {str(e)}")
return f"Audio analysis failed: {str(e)}"
def data_analysis_tool(file_path: str, operation: str, **kwargs) -> str:
"""Perform data analysis operations on CSV/Excel files"""
try:
# Check uploaded files
uploaded_files = json.loads(os.environ.get("UPLOADED_FILES", "[]"))
if uploaded_files and not os.path.exists(file_path):
for uploaded_path in uploaded_files:
if os.path.basename(uploaded_path) == os.path.basename(file_path):
file_path = uploaded_path
break
# Load data
if file_path.endswith('.csv'):
df = pd.read_csv(file_path)
else:
df = pd.read_excel(file_path)
# Perform requested operation
if operation == "sum":
column = kwargs.get('column')
if column and column in df.columns:
result = df[column].sum()
return f"Sum of {column}: {result}"
return f"Column '{column}' not found"
elif operation == "mean":
column = kwargs.get('column')
if column and column in df.columns:
result = df[column].mean()
return f"Mean of {column}: {result}"
return f"Column '{column}' not found"
elif operation == "count":
column = kwargs.get('column')
value = kwargs.get('value')
if column and column in df.columns:
if value:
result = len(df[df[column] == value])
return f"Count of {column}={value}: {result}"
else:
result = df[column].value_counts()
return f"Value counts for {column}:\n{result.to_string()}"
return f"Column '{column}' not found"
elif operation == "groupby":
group_column = kwargs.get('group_column')
agg_column = kwargs.get('agg_column')
agg_func = kwargs.get('agg_func', 'sum')
if group_column and agg_column:
result = df.groupby(group_column)[agg_column].agg(agg_func)
return f"Grouped results:\n{result.to_string()}"
return "Missing group_column or agg_column"
elif operation == "filter":
condition = kwargs.get('condition')
if condition:
filtered_df = df.query(condition)
return f"Filtered data ({len(filtered_df)} rows):\n{filtered_df.head().to_string()}"
return "Missing filter condition"
elif operation == "describe":
return f"Data description:\n{df.describe().to_string()}"
elif operation == "info":
buffer = io.StringIO()
df.info(buf=buffer)
return buffer.getvalue()
return "Operation not recognized or missing parameters."
except Exception as e:
logger.error(f"Data analysis error: {str(e)}")
return f"Data analysis failed: {str(e)}"
# Tool schemas for function calling
tool_schemas = {
"web_search": {
"name": "web_search",
"description": "Search the web for current information",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"num_results": {"type": "integer", "description": "Number of results", "default": 5}
},
"required": ["query"]
}
},
"wikipedia": {
"name": "wikipedia",
"description": "Search Wikipedia for information",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Wikipedia search query"}
},
"required": ["query"]
}
},
"calculator": {
"name": "calculator",
"description": "Perform mathematical calculations",
"parameters": {
"type": "object",
"properties": {
"expression": {"type": "string", "description": "Mathematical expression to evaluate"}
},
"required": ["expression"]
}
},
"python_repl": {
"name": "python_repl",
"description": "Execute Python code",
"parameters": {
"type": "object",
"properties": {
"code": {"type": "string", "description": "Python code to execute"}
},
"required": ["code"]
}
},
"image_analysis": {
"name": "image_analysis",
"description": "Analyze images with OCR and computer vision",
"parameters": {
"type": "object",
"properties": {
"image_path": {"type": "string", "description": "Path to image file"},
"query": {"type": "string", "description": "What to look for in the image", "default": ""}
},
"required": ["image_path"]
}
},
"file_reader": {
"name": "file_reader",
"description": "Read and analyze various file types",
"parameters": {
"type": "object",
"properties": {
"file_path": {"type": "string", "description": "Path to file"},
"query": {"type": "string", "description": "What to look for", "default": ""}
},
"required": ["file_path"]
}
},
"data_analysis": {
"name": "data_analysis",
"description": "Perform data analysis on CSV/Excel files",
"parameters": {
"type": "object",
"properties": {
"file_path": {"type": "string", "description": "Path to data file"},
"operation": {"type": "string", "description": "Operation: sum, mean, count, groupby, filter, describe, info"},
"kwargs": {"type": "object", "description": "Additional parameters for the operation"}
},
"required": ["file_path", "operation"]
}
}
}
def get_all_tools() -> Dict[str, Any]:
"""Return all available tools"""
from langchain.tools import Tool
tools = {
"web_search": Tool(
name="web_search",
func=web_search_tool,
description="Search the web for current information"
),
"wikipedia": Tool(
name="wikipedia",
func=wikipedia_tool,
description="Search Wikipedia for information"
),
"calculator": Tool(
name="calculator",
func=calculator_tool,
description="Perform mathematical calculations"
),
"python_repl": Tool(
name="python_repl",
func=python_repl_tool,
description="Execute Python code"
),
"image_analysis": Tool(
name="image_analysis",
func=image_analysis_tool,
description="Analyze images with OCR"
),
"file_reader": Tool(
name="file_reader",
func=file_reader_tool,
description="Read various file types"
),
"data_analysis": Tool(
name="data_analysis",
func=data_analysis_tool,
description="Analyze data files"
)
}
return tools