DataAnalystAgent / tools.py
KarthikMuraliM's picture
Initial commit from existing project
c0897c1
# tools.py
import subprocess
import requests
import base64
from pathlib import Path
import sys
import json
import pandas as pd
from typing import List,Dict
import io
from sqlalchemy import create_engine, text
import openai
import wikipedia
import numpy as np
import tabula
from PIL import Image
import base64
from geopy.geocoders import Nominatim
from playwright.async_api import async_playwright
import asyncio
from bs4 import BeautifulSoup
import requests
TOOL_DEFINITIONS = [
{
"type": "function",
"function": {
"name": "fetch_url",
"description": "Fetches the text content from a given URL. Use this for scraping websites or getting data from online sources.",
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The complete URL to fetch content from.",
},
},
"required": ["url"],
},
},
},
{
"type": "function",
"function": {
"name": "python_interpreter",
"description": (
"Executes Python code in an isolated environment for data analysis, manipulation, and visualization. "
"The environment has pandas, matplotlib, numpy, and scikit-learn available. "
"The code can access user-uploaded files directly by their filename (e.g., pd.read_csv('data.csv')). "
"To return a plot, save it as 'output.png'. All print() output is captured as the result."
),
"parameters": {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "The Python code to execute.",
},
},
"required": ["code"],
},
},
},
{
"type": "function",
"function": {
"name": "get_dataframe_info",
"description": "Reads a data file (like a .csv or .parquet) and returns a JSON summary including column names, data types, non-null counts, and descriptive statistics (mean, std, min, max, etc.). This is the best first step for understanding any dataset.",
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "The filename of the data file to analyze (e.g., 'data.csv').",
},
},
"required": ["file_path"],
},
},
},
{
"type": "function",
"function": {
"name": "calculate_correlation",
"description": "Computes the Pearson correlation coefficient between two specific numerical columns in a given data file. The name of this function is `calculate_correlation`.",
"parameters": {
"type": "object",
"properties": {
"file_path": {"type": "string", "description": "The filename of the data file (e.g., 'data.csv')."},
"column1": {"type": "string", "description": "The name of the first column."},
"column2": {"type": "string", "description": "The name of the second column."},
},
"required": ["file_path", "column1", "column2"],
},
},
},
{
"type": "function",
"function": {
"name": "create_pivot_table",
"description": "Generates a pivot table to summarize data. This function takes a file and the names of the columns to use for the index, columns, and values of the pivot table. The name of this function is `create_pivot_table`.",
"parameters": {
"type": "object",
"properties": {
"file_path": {"type": "string", "description": "The filename of the data file (e.g., 'data.csv')."},
"index": {"type": "string", "description": "The name of the column to use as the pivot table's index (rows)."},
"columns": {"type": "string", "description": "The name of the column to use as the pivot table's columns."},
"values": {"type": "string", "description": "The name of the column to aggregate as the values in the pivot table."},
},
"required": ["file_path", "index", "columns", "values"],
},
},
},
{
"type": "function",
"function": {
"name": "run_sql_query",
"description": "Executes a SQL query against a database (like SQLite or DuckDB) and returns the result as JSON. The name of this function is `run_sql_query`.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The SQL query to execute.",
},
"db_connection_string": {
"type": "string",
"description": "The SQLAlchemy connection string for the database. For an uploaded SQLite file named 'my_db.db', use 'sqlite:///my_db.db'. For a DuckDB file, use 'duckdb:///my_db.duckdb'.",
},
},
"required": ["query", "db_connection_string"],
},
},
},
{
"type": "function",
"function": {
"name": "get_sentiment",
"description": "Analyzes a piece of text (like a movie review) to determine if its sentiment is positive, negative, or neutral. The name of this function is `get_sentiment`.",
"parameters": {
"type": "object",
"properties": {
"text_to_analyze": {
"type": "string",
"description": "The text content to be analyzed.",
},
},
"required": ["text_to_analyze"],
},
},
},
{
"type": "function",
"function": {
"name": "scrape_wikipedia_summary",
"description": "Fetches the clean text summary from a Wikipedia page. Use this tool specifically for getting information from Wikipedia. The name of this function is `scrape_wikipedia_summary`.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The title or search query for the Wikipedia page (e.g., 'Python (programming language)').",
},
},
"required": ["query"],
},
},
},
{
"type": "function",
"function": {
"name": "scrape_pdf_tables",
"description": "Extracts all tabular data from a PDF document and returns it as a list of JSON objects. Use this for any PDF that contains tables. The name of this function is `scrape_pdf_tables`.",
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "The filename of the PDF file to process (e.g., 'report.pdf').",
},
},
"required": ["file_path"],
},
},
},
{
"type": "function",
"function": {
"name": "analyze_image_content",
"description": "Analyzes an uploaded image file (e.g., a PNG or JPG) and answers a specific question about its contents. Use this to identify objects, read text, or describe scenes in an image. The name of this function is `analyze_image_content`.",
"parameters": {
"type": "object",
"properties": {
"image_path": {"type": "string", "description": "The filename of the image to analyze (e.g., 'chart.png')."},
"prompt": {"type": "string", "description": "The specific question to ask about the image (e.g., 'What is the title of this chart?', 'Is there a cat in this picture?')."},
},
"required": ["image_path", "prompt"],
},
},
},
{
"type": "function",
"function": {
"name": "geocode_address",
"description": "Finds the geographic coordinates (latitude and longitude) for a given street address, city, or landmark. Uses the Nominatim service. The name of this function is `geocode_address`.",
"parameters": {
"type": "object",
"properties": {
"address": {
"type": "string",
"description": "The address or place name to geocode (e.g., '1600 Amphitheatre Parkway, Mountain View, CA' or 'Tokyo Tower').",
},
},
"required": ["address"],
},
},
},
{
"type": "function",
"function": {
"name": "scrape_dynamic_site",
"description":
"Renders a JavaScript-heavy website and saves the complete HTML to a file named 'scraped_page.html'. This is the first step in a two-step process. After calling this, use the 'parse_html' tool to extract specific data from the saved file. The name of this function is `scrape_dynamic_site`.", "parameters": {
"type": "object",
"properties": {
"url": {"type": "string", "description": "The URL of the dynamic website to scrape."},
},
"required": ["url"],
},
},
},
{
"type": "function",
"function": {
"name": "parse_html",
"description": "Extracts specific data from an HTML file (like one saved by 'scrape_dynamic_site') using CSS selectors. Provide a dictionary where keys are desired data names and values are the CSS selectors to find that data. The name of this function is `parse_html`.",
"parameters": {
"type": "object",
"properties": {
"file_path": {"type": "string", "description": "The local filename of the HTML file to parse (e.g., 'scraped_page.html')."},
"selectors": {
"type": "object",
"description": "A JSON object of 'data_name': 'css_selector' pairs. For example: {\"titles\": \"h2.product-title\", \"prices\": \".price-tag\"}",
},
},
"required": ["file_path", "selectors"],
},
},
},
{
"type": "function",
"function": {
"name": "get_bbc_weather",
"description": "Fetches the weather forecast for a location using its BBC Weather ID. Can provide a 3-day summary or a detailed hour-by-hour forecast. The name of this function is `get_bbc_weather`.",
"parameters": {
"type": "object",
"properties": {
"location_id": {
"type": "string",
"description": "The numerical ID for the location (e.g., '2643743' for London).",
},
"report_type": {
"type": "string",
"description": "The type of report to generate. Use 'summary' for a 3-day overview or 'detailed' for an hour-by-hour forecast.",
"enum": ["summary", "detailed"], # 'enum' helps the LLM choose a valid option
},
},
"required": ["location_id"],
},
},
},
]
def get_bbc_weather(location_id: str, report_type: str = 'summary') -> str:
"""
Fetches the weather forecast for a given BBC Weather location ID.
Can return a 'summary' (default) or a 'detailed' hour-by-hour report.
"""
print(f"Executing Tool 'get_bbc_weather' for ID: {location_id}, Type: {report_type}")
url = f"https://weather-broker-cdn.api.bbci.co.uk/en/forecast/aggregated/{location_id}"
try:
response = requests.get(url, timeout=15)
response.raise_for_status()
weather_data = response.json()
forecasts_data = weather_data.get("forecasts", [])
if not forecasts_data:
return "Error: Forecast data not found in the API response."
report = forecasts_data[0]
location_name = report.get("location", {}).get("name")
# --- NEW LOGIC ---
if report_type == 'detailed':
# Extract the detailed, timeseries forecast
detailed_forecast = {
"location_name": location_name,
"issued_at": report.get("issuedAt"),
"detailed_forecast": []
}
for slot in report.get("detailed", {}).get("reports", []):
hour_summary = {
"timestamp": slot.get("localDate"),
"temperature_c": slot.get("temperatureC"),
"feels_like_temp_c": slot.get("feelsLikeTempC"),
"wind_speed_mph": slot.get("windSpeedMph"),
"wind_direction": slot.get("windDirectionAbbreviation"),
"precipitation_probability_percent": slot.get("precipitationProbabilityInPercent"),
"weather_type": slot.get("weatherType")
}
detailed_forecast["detailed_forecast"].append(hour_summary)
return json.dumps(detailed_forecast, indent=2)
else: # Default to 'summary'
# The existing summary logic
summary_report = {
"location_name": location_name,
"issued_at": report.get("issuedAt"),
"daily_summary": []
}
for day in report.get("summary", {}).get("reports", []):
day_summary = {
"date": day.get("localDate"),
"condition": day.get("weatherType"),
"max_temp_c": day.get("maxTempC"),
"min_temp_c": day.get("minTempC"),
}
summary_report["daily_summary"].append(day_summary)
return json.dumps(summary_report, indent=2)
except Exception as e:
return f"An error occurred while processing weather data. Error: {e}"
def parse_html(file_path: str, selectors: Dict[str, str], work_dir: str) -> str:
"""
Parses a local HTML file and extracts data using a dictionary of CSS selectors.
For each key-value pair in the selectors dictionary, it finds elements matching
the selector (value) and stores their text content under the given key.
"""
print(f"Executing Tool 'parse_html' for file: {file_path}")
full_path = Path(work_dir) / file_path
if not full_path.exists():
return f"Error: HTML file not found at {full_path}"
try:
with open(full_path, "r", encoding="utf-8") as f:
html_content = f.read()
soup = BeautifulSoup(html_content, "lxml")
extracted_data = {}
for data_key, selector in selectors.items():
# Find all elements matching the selector
elements = soup.select(selector)
# Extract the text from each element, stripping whitespace
extracted_data[data_key] = [el.get_text(strip=True) for el in elements]
return json.dumps(extracted_data, indent=2)
except Exception as e:
return f"Failed to parse HTML file {file_path}. Error: {e}"
async def scrape_dynamic_site(url: str, work_dir: str) -> str:
"""
Renders a JavaScript-heavy website using a headless browser and saves the
complete, final HTML to a file named 'scraped_page.html'.
"""
print(f"Executing Tool 'scrape_dynamic_site' for url: {url}")
save_path = Path(work_dir) / "scraped_page.html"
try:
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page()
await page.goto(url, wait_until='networkidle', timeout=30000) # 30s timeout
content = await page.content()
await browser.close()
# Save the full HTML content to the specified file
with open(save_path, "w", encoding="utf-8") as f:
f.write(content)
# Return a success message with the path to the saved file
return json.dumps({
"status": "success",
"url": url,
"saved_to": str(save_path.name) # Return just the filename
})
except Exception as e:
return f"Failed to scrape dynamic site {url}. Error: {e}"
def geocode_address(address: str) -> str:
"""
Converts a physical address or place name into geographic coordinates (latitude and longitude).
"""
print(f"Executing Tool 'geocode_address' for address: {address}")
try:
# Create a geolocator instance. A unique user_agent is good practice.
geolocator = Nominatim(user_agent="data_analyst_agent_v1")
location = geolocator.geocode(address)
if location is None:
return f"Error: Could not find coordinates for the address '{address}'."
result = {
"address": address,
"latitude": location.latitude,
"longitude": location.longitude,
"full_address_found": location.address
}
return json.dumps(result, indent=2)
except Exception as e:
return f"Failed to geocode address. Error: {e}"
def analyze_image_content(image_path: str, prompt: str, work_dir: str, client: openai.Client) -> str:
"""
Analyzes the content of an image file using a multimodal LLM and answers a question about it.
"""
print(f"Executing Tool 'analyze_image_content' for file: {image_path}")
full_path = Path(work_dir) / image_path
if not full_path.exists():
return f"Error: Image file not found at {full_path}"
try:
# Open the image to verify it's a valid image file (optional but good practice)
Image.open(full_path)
# Encode the image to base64
with open(full_path, "rb") as image_file:
base64_image = base64.b64encode(image_file.read()).decode('utf-8')
# Call the multimodal model
response = client.chat.completions.create(
model="openai/gpt-4.1-nano",
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}},
],
}
],
max_tokens=500, # Allow for a reasonably detailed description
)
description = response.choices[0].message.content
return json.dumps({"image": image_path, "analysis": description})
except Exception as e:
return f"Failed to analyze image. Error: {e}"
def scrape_wikipedia_summary(query: str) -> str:
"""
Fetches the summary section of a Wikipedia page based on a search query.
"""
print(f"Executing Tool 'scrape_wikipedia_summary' for query: {query}")
try:
# Fetch the summary of the page
summary = wikipedia.summary(query, auto_suggest=True)
result = {
"query": query,
"summary": summary
}
return json.dumps(result, indent=2)
except wikipedia.exceptions.PageError:
return f"Error: Could not find a Wikipedia page for the query '{query}'."
except wikipedia.exceptions.DisambiguationError as e:
return f"Error: The query '{query}' is ambiguous. It could refer to any of the following: {e.options}"
except Exception as e:
return f"Failed to scrape Wikipedia. Error: {e}"
def scrape_pdf_tables(file_path: str, work_dir: str) -> str:
"""
Extracts all tables from a specified page in a PDF file.
"""
print(f"Executing Tool 'scrape_pdf_tables' for file: {file_path}")
full_path = Path(work_dir) / file_path
if not full_path.exists():
return f"Error: PDF file not found at {full_path}"
try:
# read_pdf returns a list of DataFrames, one for each table found
tables_as_dfs = tabula.read_pdf(full_path, pages='all', multiple_tables=True)
if not tables_as_dfs:
return "No tables were found in the PDF file."
# Convert each DataFrame in the list to a JSON string
tables_as_json = [df.to_json(orient='split') for df in tables_as_dfs]
# Return a JSON object containing the list of tables
return json.dumps({"file_name": file_path, "extracted_tables": tables_as_json})
except Exception as e:
return f"Failed to scrape tables from PDF. Make sure Java is installed on the system. Error: {e}"
def get_sentiment(text_to_analyze: str, client: openai.Client) -> str:
"""
Analyzes the sentiment of a given piece of text.
"""
print(f"Executing Tool 'get_sentiment'")
try:
# We use a specific, constrained prompt to force the LLM to be a classifier
response = client.chat.completions.create(
model="openai/gpt-5-nano", # Use a fast and cheap model for this simple task
messages=[
{"role": "system", "content": "You are a sentiment analysis tool. Classify the user's text as 'positive', 'negative', or 'neutral'. Respond with only one of these three words and nothing else."},
{"role": "user", "content": text_to_analyze}
],
max_tokens=5, # Limit the output to a single word
temperature=0.0 # Make the output deterministic
)
sentiment = response.choices[0].message.content.lower().strip()
# Basic validation
if sentiment not in ["positive", "negative", "neutral"]:
return "Error: Could not determine a valid sentiment."
return json.dumps({"text": text_to_analyze, "sentiment": sentiment})
except Exception as e:
return f"Failed to get sentiment. Error: {e}"
def run_sql_query(query: str, db_connection_string: str) -> str:
"""
Executes a SQL query against a specified database and returns the result.
Supports file-based databases like SQLite and DuckDB.
For SQLite, the connection string should be 'sqlite:///path/to/database.db'.
The path should be relative to the agent's working directory.
"""
print(f"Executing Tool 'run_sql_query'")
try:
# Create a database engine from the connection string
engine = create_engine(db_connection_string)
# Execute the query and fetch results into a pandas DataFrame
with engine.connect() as connection:
result_df = pd.read_sql_query(sql=text(query), con=connection)
# Return the result as a JSON string
return result_df.to_json(orient="records")
except Exception as e:
return f"Failed to execute SQL query. Error: {e}"
def calculate_correlation(file_path: str, column1: str, column2: str, work_dir: str) -> str:
"""
Calculates the Pearson correlation coefficient between two specified columns in a data file.
"""
print(f"Executing Tool 'calculate_correlation' for file: {file_path}")
full_path = Path(work_dir) / file_path
if not full_path.exists():
return f"Error: Data file not found at {full_path}"
try:
if file_path.lower().endswith('.csv'):
df = pd.read_csv(full_path)
elif file_path.lower().endswith('.parquet'):
df = pd.read_parquet(full_path)
else:
return f"Error: Unsupported file type."
# Ensure columns exist
if column1 not in df.columns or column2 not in df.columns:
return f"Error: One or both columns ('{column1}', '{column2}') not found in the file."
# Calculate correlation
correlation = df[column1].corr(df[column2])
result = {
"file_name": file_path,
"column_1": column1,
"column_2": column2,
"pearson_correlation": correlation
}
return json.dumps(result, indent=2)
except Exception as e:
return f"Failed to calculate correlation. Error: {e}"
def create_pivot_table(file_path: str, index: str, columns: str, values: str, work_dir: str) -> str:
"""
Creates a pivot table from the data in the specified file.
"""
print(f"Executing Tool 'create_pivot_table' for file: {file_path}")
full_path = Path(work_dir) / file_path
if not full_path.exists():
return f"Error: Data file not found at {full_path}"
try:
if file_path.lower().endswith('.csv'):
df = pd.read_csv(full_path)
elif file_path.lower().endswith('.parquet'):
df = pd.read_parquet(full_path)
else:
return f"Error: Unsupported file type."
# Create the pivot table
pivot_table = pd.pivot_table(df, values=values, index=index, columns=columns, aggfunc=np.sum)
# Return the pivot table as a JSON string
return pivot_table.to_json(orient="split")
except Exception as e:
return f"Failed to create pivot table. Error: {e}"
def get_dataframe_info(file_path: str, work_dir: str) -> str:
"""
Reads a data file (CSV, Parquet) and returns a summary of its contents.
The summary includes column names, data types, and basic statistics.
"""
print(f"Executing Tool 'get_dataframe_info' for file: {file_path}")
full_path = Path(work_dir) / file_path
if not full_path.exists():
return f"Error: Data file not found at {full_path}"
try:
if file_path.lower().endswith('.csv'):
df = pd.read_csv(full_path)
elif file_path.lower().endswith('.parquet'):
df = pd.read_parquet(full_path)
else:
return f"Error: Unsupported file type. Only .csv and .parquet are supported."
# Use a string buffer to capture the output of df.info()
info_buffer = io.StringIO()
df.info(buf=info_buffer)
info_str = info_buffer.getvalue()
# Get the statistical summary
describe_df = df.describe(include='all')
# Combine everything into a single, informative string
summary = {
"file_name": file_path,
"info": info_str,
"statistical_summary": describe_df.to_json(orient="split")
}
return json.dumps(summary, indent=2)
except Exception as e:
return f"Failed to get DataFrame info. Error: {e}"
def fetch_url(url: str) -> str:
"""Fetches text content from a specified URL using the AI Pipe proxy."""
print(f"Executing Tool 'fetch_url' with URL: {url}")
try:
proxy_url = f"https://aipipe.org/proxy/{url}"
response = requests.get(proxy_url, timeout=30)
response.raise_for_status()
return response.text
except requests.RequestException as e:
return f"Error: Failed to fetch URL {url}. Reason: {e}"
def python_interpreter(code: str, work_dir: str) -> str:
"""
Executes Python code in a sandboxed subprocess within a specific working directory.
The code can access any files within its `work_dir`.
If the code generates 'output.png', it will be base64 encoded and returned.
"""
python_executable = sys.executable
print(f"Executing Tool 'python_interpreter' in directory: {work_dir}")
work_path = Path(work_dir)
script_path = work_path / "agent_script.py"
plot_path = work_path / "output.png"
with open(script_path, "w") as f:
f.write(code)
print("\n\n--- 📜 DECODING: SCRIPT TO EXECUTE 📜 ---")
print(code)
print("------------------------------------------\n")
try:
python_executable = sys.executable
# +++ ADD THIS DEBUG LINE +++
print(f"--- [DEBUG] EXECUTING SUBPROCESS WITH PYTHON FROM: {python_executable} ---")
# +++++++++++++++++++++++++++
process = subprocess.run(
[python_executable, str(script_path)],
cwd=work_path, # Run the script from within the temp directory
capture_output=True,
text=True,
timeout=1000,
check=False
)
print("\n\n--- 📤 DECODING: SCRIPT RAW OUTPUT 📤 ---")
print(f"Return Code: {process.returncode}")
print("--- STDOUT ---")
print(process.stdout)
print("--- STDERR ---")
print(process.stderr)
print("------------------------------------------\n")
if process.returncode != 0:
return f"SCRIPT FAILED with return code {process.returncode}:\nSTDOUT:\n{process.stdout}\nSTDERR:\n{process.stderr}"
stdout = process.stdout
# Check if a plot was generated as output.png
if plot_path.exists():
with open(plot_path, "rb") as img_file:
img_base64 = base64.b64encode(img_file.read()).decode('utf-8')
# Prepend the plot's data URI to the stdout
plot_uri = f"data:image/png;base64,{img_base64}"
return f"image_output:\n{plot_uri}\n\ntext_output:\n{stdout}"
# If successful, just return the standard output
return process.stdout
except subprocess.CalledProcessError as e:
return f"SCRIPT FAILED:\n--- STDOUT ---\n{e.stdout}\n--- STDERR ---\n{e.stderr}"
except subprocess.TimeoutExpired:
return "Error: The Python script took too long to execute."
except Exception as e:
return f"An unexpected error occurred: {e}"