agent-gaia / tools.py
dleandro's picture
Tools, Retriever, Systemp prompt and Agent creation
3927a42
# - `Search Engine` (arXiv, Wikipedia, DuckDuckGo)
# - `Calculator` (add, substract, divide, multiply, modulus, etc.)
# - `Access` and `Download Files` from Web
# - `Excel`/`Google Sheets`: Process Downloaded files
import os
import requests
import tempfile
import uuid
import pytesseract
from datetime import datetime, timezone
from PIL import Image
from urllib.parse import urlparse
from typing import Optional
from langchain_core.tools import tool
from langchain_community.document_loaders import (
WikipediaLoader, ArxivLoader, PyPDFLoader
)
from langchain_community.tools import DuckDuckGoSearchResults
#* === MATH TOOLS ===
@tool
def add(a: float, b: float) -> int:
"""
Adds multple integers.
Args:
a (float): Number to add.
b (float): Number to add.
Returns:
int: Sum of the two provided integers.
"""
return a + b
@tool
def subtract(a: int, b: int) -> int:
"""
Subtracts one integer from another.
Args:
a (int): The number from which to subtract.
b (int): The number to subtract.
Returns:
int: The result of a - b.
"""
return a - b
@tool
def multiply(a: float, b: float) -> int:
"""
Multiplies multple integers.
Args:
a (float): First number to multiply.
b (float): Second number to multiply.
Returns:
int: Multiplication of the two provided floats or integers.
"""
return a * b
@tool
def div(a: float, b: float):
"""
Divides two numbers.
Args:
a (int or float): The dividend.
b (int or float): The divisor.
Returns:
float: The result of dividing a by b.
Raises:
ZeroDivisionError: If b is zero.
"""
return a / b
@tool
def modulus(a: int, b: int):
"""
Computes the modulus (remainder) of dividing two integers.
Args:
a (int): The dividend.
b (int): The divisor.
Returns:
int: The remainder when a is divided by b.
Raises:
ZeroDivisionError: If b is zero.
"""
return a % b
@tool
def power(a: float, b: float) -> float:
"""
Raises a number `a` to the power of `b`.
Args:
a (float): Base number.
b (float): Exponent.
Returns:
float: Result of a ** b.
"""
return a**b
#* === SEARCH TOOLS ===
@tool
def wikipedia_search(query: str) -> dict:
"""
Search Wikipedia for a query and return up to 3 formatted results.
Args:
query (str): The topic to search for.
Returns:
dict: A dictionary with the key 'wikipedia_results' containing the formatted documents.
"""
search_docs = WikipediaLoader(query = query,load_max_docs = 3).load()
formatted_docs = "\n\n---\n\n".join(
[
f"Document source='{doc.metadata['source']}' page={doc.metadata.get('page', '')}/>\n"
f"{doc.page_content}\n</Document>"
for doc in search_docs
]
)
return {"wikipedia_results": formatted_docs}
@tool
def search_web(query: str) -> dict:
"""
Performs a web search using DuckDuckGo and returns up to 4 formatted results.
Args:
query (str): The search query to submit to DuckDuckGo.
Returns:
dict: A dictionary with a single key "web_results" containing the formatted search results
as a string. Each result includes the document source and content, separated by "---".
"""
search_docs = DuckDuckGoSearchResults(max_results = 4).invoke(query)
formatted_docs = "\n\n---\n\n".join(
[
f"Document source='{doc.metadata['source']}' page={doc.metadata.get('page', '')}/>\n"
f"{doc.page_content}\n</Document>"
for doc in search_docs
]
)
return {"web_results": formatted_docs}
@tool
def arxiv_search(query: str) -> dict:
"""
Perform a search on the arXiv academic paper repository and return the top results.
Args:
query (str): The search query to use on arXiv.
Returns:
dict: A dictionary containing a string under the key "arxiv_results", which includes
a formatted summary of the top retrieved documents. Each entry contains the
document's source, optional page number, and the first 1000 characters of the content.
"""
search_docs = ArxivLoader(query = query, load_max_docs = 3).load()
formatted_docs = "\n\n---\n\n".join(
[
f"Document source='{doc.metadata['source']}' page={doc.metadata.get('page', '')}/>\n"
f"{doc.page_content[:1000]}\n</Document>"
for doc in search_docs
]
)
return {"arxiv_results": formatted_docs}
#* === FILE PROCESSING TOOLS ===
@tool
def save_and_read_file(content: str, filename: Optional[str] = None) -> str:
"""
Saves the provided text content to a temporary file and returns its path.
If no filename is provided, a random temporary filename will be generated.
The file is saved in the system's temporary directory.
Args:
content (str): The text content to be written to the file.
filename (Optional[str]): Optional name for the file. If not provided, a temporary name is used.
Returns:
str: A message with the path to the saved file, indicating it is ready for processing.
"""
try:
temp_dir = tempfile.gettempdir()
if filename is None:
temp_file = tempfile.NamedTemporaryFile(delete = False, dir = temp_dir)
filepath = temp_file.name
else:
filepath = os.path.join(temp_dir, filename)
with open(filepath, "w", encoding = "utf-8") as f:
f.write(content)
return f"File saved to {filepath}. It is available to read for processing its contents."
except Exception as e:
return f"Error saving file: {str(e)}"
@tool
def download_file_from_url(url: str, filename: Optional[str] = None) -> str:
"""
Downloads a file from a given URL and saves it to a temporary directory.
If no filename is provided, it attempts to extract it from the URL. If the URL
does not contain a valid filename, a temporary unique filename will be generated.
Args:
url (str): The URL of the file to download.
filename (Optional[str]): Optional name for the downloaded file.
Returns:
str: A string indicating the path to the downloaded file, or an error message.
"""
try:
# Parse URL to get filename if not provided
if not filename:
path = urlparse(url).path
filename = os.path.basename(path)
if not filename:
ts = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')
filename = f"downloaded_{ts}_{uuid.uuid4().hex[:8]}.tmp"
# Create temporary file
temp_dir = tempfile.gettempdir()
filepath = os.path.join(temp_dir, filename)
# Download the file
response = requests.get(url, stream = True)
response.raise_for_status()
# Save the file
with open(filepath, "wb") as f:
for chunk in response.iter_content(chunk_size = 8192):
if chunk:
f.write(chunk)
return f"File downloaded to {filepath}. It is available to read for processing its contents."
except Exception as e:
return f"Error downloading file: {str(e)}"
@ tool
def extract_text_from_image(image_path: str) -> str:
"""
Extracts text content from an image file using Optical Character Recognition (OCR).
Args:
image_path (str): The path to the image file from which text will be extracted.
Returns:
str: Extracted text content. If extraction fails, returns an error message.
"""
try:
# Open image
image = Image.open(image_path)
# Extract text from image
text = pytesseract.image_to_string(image)
return f"Text extracted from image:\n\n{text.strip()}"
except Exception as e:
return f"Error extracting text from image '{image_path}': {str(e)}"
@tool
def pdf_loader(filepath: str) -> dict:
"""
Loads a PDF file from the given file path, parses its contents,
and returns a preview of each page's content (up to 1000 characters per page).
Args:
filepath (str): The full path to the PDF file.
Returns:
dict: A dictionary containing formatted PDF page previews under the key 'pdf_results'.
Each page is separated by "\n\n---\n\n".
"""
try:
pdf_content = PyPDFLoader(file_path=filepath).load()
formatted_content = "\n\n---\n\n".join(
[
f"Document source='{doc.metadata['source']}' page={doc.metadata.get('page', '')}/>\n"
f"{doc.page_content[:1000]}\n</Document>"
for doc in pdf_content
]
)
return {"pdf_results": formatted_content}
except Exception as e:
return {"pdf_results": f"Error reading PDF file: {str(e)}"}