Final-Agent-Course / tools.py
Chitranshu-9's picture
Final agent submission
84c9c5b
Raw
History Blame Contribute Delete
34.1 kB
from langchain_core.tools import tool
from langchain_tavily import TavilySearch
from langchain_community.document_loaders import WikipediaLoader
from youtube_transcript_api import YouTubeTranscriptApi
from PIL import Image, ImageDraw, ImageFont, ImageEnhance, ImageFilter
from typing import List, Dict, Any, Optional
from urllib.parse import urlparse
from code_interpreter_26042026 import CodeInterpreter
from tool_logger_26042026 import log_tool
from bs4 import BeautifulSoup
from dotenv import load_dotenv
interpreter_instance = CodeInterpreter()
from image_proccessing_26042026 import *
import os
import re
import cmath
import requests
import tempfile
import subprocess
import pytesseract
import numpy as np
import pandas as pd
load_dotenv()
@tool
def run_python_from_url(file_url: str) -> str:
"""
Download and execute a Python script from a URL.
Use this when:
- A Python file needs to be executed remotely
- The task explicitly involves running external scripts
Do NOT use for:
- Simple calculations
- Inline code execution (use execute_code_multilang instead)
Input: URL to .py file
Output: execution result
"""
try:
# Download file
response = requests.get(file_url)
print("response ",response, "file_url ", file_url)
response.raise_for_status()
# Save to temp file
with tempfile.NamedTemporaryFile(delete=False, suffix=".py") as tmp:
tmp.write(response.content)
tmp_path = tmp.name
# Execute file
result = subprocess.run(
["python", tmp_path],
capture_output=True,
text=True,
timeout=10
)
output = result.stdout.strip()
if not output:
output = result.stderr.strip()
log_tool("run_python_from_url", file_url, output)
return output if output else "No output"
except Exception as e:
log_tool("run_python_from_url", file_url, f"ERROR: {str(e)}")
return f"ERROR: {str(e)}"
@tool
def fetch_task_file(task_id: str) -> str:
"""
MUST USE when a question mentions an attached file.
Input:
- task_id (string)
Returns:
- local file path of downloaded file
After using this tool:
- You should analyze or execute the file depending on type
"""
try:
url = f"https://agents-course-unit4-scoring.hf.space/files/{task_id}"
response = requests.get(url)
print("fetch task file ",response, "taskid ",task_id, "url ", url)
if response.status_code != 200:
return f"ERROR: {response.text}"
# Try to detect filename (fallback)
filename = f"{task_id}"
# Save file
filepath = f"./downloadsHuggingFace/{filename}"
os.makedirs("./downloadsHuggingFace", exist_ok=True)
with open(filepath, "wb") as f:
f.write(response.content)
return filepath
except Exception as e:
log_tool("fetch_task_file", task_id, f"ERROR: {str(e)}")
return f"ERROR: {str(e)}"
# def web_search_raw(query: str) -> str:
# import requests
# from bs4 import BeautifulSoup
# import urllib.parse
# url = "https://duckduckgo.com/html/"
# headers = {"User-Agent": "Mozilla/5.0"}
# params = {"q": query}
# res = requests.get(url, params=params, headers=headers, timeout=10)
# soup = BeautifulSoup(res.text, "html.parser")
# print("web search response ", res.text, "query ",query)
# results = []
# for r in soup.select(".result__body")[:7]:
# title = r.select_one(".result__title")
# snippet = r.select_one(".result__snippet")
# link = r.select_one("a.result__a")
# if not link:
# continue
# raw_url = link.get("href", "")
# if "uddg=" in raw_url:
# raw_url = urllib.parse.unquote(raw_url.split("uddg=")[-1])
# results.append({
# "title": title.get_text(strip=True) if title else "",
# "snippet": snippet.get_text(strip=True) if snippet else "",
# "url": raw_url
# })
# return results
def duckduckgo_search(query: str):
import requests
url = "https://api.duckduckgo.com/"
params = {"q": query, "format": "json", "no_html": 1}
res = requests.get(url, params=params, timeout=10)
data = res.json()
results = []
if data.get("AbstractText"):
results.append({
"title": data.get("Heading", ""),
"snippet": data.get("AbstractText", ""),
"url": data.get("AbstractURL", "")
})
return results
def web_search_raw(query: str):
import requests
from bs4 import BeautifulSoup
import urllib.parse
import random
import time
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/122.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/121.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) Gecko/20100101 Firefox/123.0"
]
url = "https://duckduckgo.com/html/"
params = {"q": query}
headers = {
"User-Agent": random.choice(USER_AGENTS),
"Accept-Language": "en-US,en;q=0.9",
"Accept": "text/html,application/xhtml+xml",
"Connection": "keep-alive",
}
session = requests.Session()
# delay to avoid detection
time.sleep(random.uniform(1.5, 3.5))
res = session.get(url, params=params, headers=headers, timeout=10)
# 🚨 detect bot block
if "anomaly" in res.text.lower() or "captcha" in res.text.lower():
return "BOT_DETECTED"
soup = BeautifulSoup(res.text, "html.parser")
results = []
for r in soup.select(".result__body")[:5]:
title = r.select_one(".result__title")
snippet = r.select_one(".result__snippet")
link = r.select_one("a.result__a")
if not link:
continue
raw_url = link.get("href", "")
if "uddg=" in raw_url:
raw_url = urllib.parse.unquote(raw_url.split("uddg=")[-1])
results.append({
"title": title.get_text(strip=True) if title else "",
"snippet": snippet.get_text(strip=True) if snippet else "",
"url": raw_url
})
return results
@tool
def search(query: str) -> str:
"""Search the web for real-world information.
Use this when:
- The question requires external knowledge (people, places, events, facts)
- The question involves current or recent information
- You are unsure about factual accuracy
Do NOT use this when:
- The answer can be derived from reasoning or math
- The task involves files, images, or code execution
Input: search query string
Output: list of web results with titles and snippets"""
try:
tavily = TavilySearch(
max_results=5,
topic="general",
search_depth="basic", # cheaper + faster
include_answer=True # gives direct answer
)
response = tavily.invoke({"query": query})
# ---- Handle errors ----
if isinstance(response, dict) and response.get("error"):
return f"Search Error: {response['error']}"
results = response.get("results", [])
answer = response.get("answer", None)
if not results:
return "No relevant results found."
# ---- Format nicely for LLM ----
formatted = []
if answer:
formatted.append(f"🔎 QUICK ANSWER:\n{answer}\n")
for r in results:
formatted.append(
f"Title: {r.get('title','')}\n"
f"Snippet: {r.get('content','')}\n"
f"Source: {r.get('url','')}\n"
)
final_output = "\n---\n".join(formatted)
# ---- Log ----
log_tool("web_search", query, {"results": final_output})
return final_output
except Exception as e:
error_msg = f"Search failed: {str(e)}"
log_tool("web_search", query, error_msg)
return error_msg
# @tool
# def search(query: str) -> str:
# """Search the web for real-world information.
# Use this when:
# - The question requires external knowledge (people, places, events, facts)
# - The question involves current or recent information
# - You are unsure about factual accuracy
# Do NOT use this when:
# - The answer can be derived from reasoning or math
# - The task involves files, images, or code execution
# Input: search query string
# Output: list of web results with titles and snippets"""
# results = web_search_raw(query)
# print("search results ", results)
# if not results or results == "BOT_DETECTED":
# # return "No results found."
# print("⚠️ DuckDuckGo blocked → fallback")
# return duckduckgo_search(query)
# formatted = []
# for r in results:
# formatted.append(
# f"{r.get('title','')}\n{r.get('snippet','')}\n{r.get('link','')}"
# )
# log_tool("search", query, {"web_results": formatted})
# return "\n\n".join(formatted)
@tool
def youtube_transcript(url: str) -> str:
"""Extract transcript text from a YouTube video.
Use this when:
- The user asks about content from a specific YouTube video
- You need to summarize or analyze a video
Do NOT use this for:
- General web search
- Non-YouTube links
Input: YouTube video URL
Output: transcript text"""
try:
video_id = None
# Case 1: watch?v=
match = re.search(r"v=([^&]+)", url)
if match:
video_id = match.group(1)
# Case 2: youtu.be/
if not video_id:
match = re.search(r"youtu\.be/([^?&]+)", url)
if match:
video_id = match.group(1)
# Case 3: shorts/
if not video_id:
match = re.search(r"shorts/([^?&]+)", url)
if match:
video_id = match.group(1)
# Case 4: embed/
if not video_id:
match = re.search(r"embed/([^?&]+)", url)
if match:
video_id = match.group(1)
if not video_id:
raise ValueError("Could not extract video ID from URL")
transcript = YouTubeTranscriptApi().fetch(video_id)
text = " ".join([t.text for t in transcript])
print("youtube transcript text ", text)
log_tool("youtube_transcript", url, text)
return text[:5000] if text else "No transcript available"
except Exception as e:
print("youtube transcript err ", e)
log_tool("youtube_transcript", url, f"Transcript error: {str(e)}")
return f"Transcript error: {str(e)}"
# @tool
# def wiki_search(query: str) -> str:
# """Search Wikipedia (safe fallback)"""
# try:
# import wikipedia
# log_tool("wiki_search", query, wikipedia.summary(query, sentences=2))
# return wikipedia.summary(query, sentences=2)
# except Exception as e:
# log_tool("wiki_search", query, f"ERROR: {str(e)}")
# return f"ERROR: {str(e)}"
# @tool
# def wiki_search(query: str) -> str:
# """
# Search Wikipedia and return detailed page content including discography when available.
# """
# try:
# import wikipedia
# # Get best page
# page = wikipedia.page(query, auto_suggest=True)
# content = page.content[:5000] # limit for LLM
# log_tool("wiki_search", query, content)
# return content
# except Exception as e:
# log_tool("wiki_search", query, f"ERROR: {str(e)}")
# return f"ERROR: {str(e)}"
@tool
def wiki_search(query: str) -> str:
"""
Wikipedia search with strict size control to avoid token overflow.
"""
try:
import wikipedia
MAX_CHARS = 5000 # SAFE LIMIT
def trim(text):
return text[:MAX_CHARS]
# ---------- DIRECT ----------
try:
page = wikipedia.page(query, auto_suggest=True)
content = trim(page.content)
log_tool("wiki_direct", query, content)
return f"### {page.title}\n{content}"
except wikipedia.DisambiguationError as e:
options = e.options[:5]
for option in options:
try:
page = wikipedia.page(option, auto_suggest=False)
content = trim(page.content)
log_tool("wiki_disambiguation", query, option)
return f"### {page.title}\n{content}"
except:
continue
except:
pass
# ---------- SEARCH ----------
results = wikipedia.search(query)
combined = ""
for title in results[:3]: # 🔥 reduce results
try:
page = wikipedia.page(title, auto_suggest=False)
chunk = f"\n\n### {title}\n{page.content[:1500]}"
# 🔥 stop before overflow
if len(combined) + len(chunk) > MAX_CHARS:
break
combined += chunk
except:
continue
log_tool("wiki_search_fallback", query, combined[:1000])
return combined
except Exception as e:
log_tool("wiki_error", query, str(e))
return f"ERROR: {str(e)}"
@tool
def wiki_discography(query: str) -> str:
"""
Search Wikipedia specifically for discography or album lists of an artist.
"""
try:
import wikipedia
page = wikipedia.page(query + " discography", auto_suggest=True)
content = page.content
log_tool("wiki_discography", query, content)
return content
except Exception as e:
log_tool("wiki_discography", query, f"ERROR: {str(e)}")
return f"ERROR: {str(e)}"
### =============== MATHEMATICAL TOOLS =============== ###
@tool
def multiply(a: float, b: float) -> float:
"""
Perform precise multiplication.
Use this when:
- Exact arithmetic is required
Do NOT use for:
- Estimation or reasoning
Input: two numbers
Output: result
"""
return a * b
@tool
def add(a: float, b: float) -> float:
"""
Perform precise addition.
Use this when:
- Exact arithmetic is required
Do NOT use for:
- Estimation or reasoning
Input: two numbers
Output: result
"""
return a + b
@tool
def subtract(a: float, b: float) -> int:
"""
Perform precise subtraction.
Use this when:
- Exact arithmetic is required
Do NOT use for:
- Estimation or reasoning
Input: two numbers
Output: result
"""
return a - b
@tool
def divide(a: float, b: float) -> float:
"""
Perform precise addition.
Use this when:
- Exact arithmetic is required
Do NOT use for:
- Estimation or reasoning
Input: two numbers
Output: result
"""
if b == 0:
raise ValueError("Cannot divided by zero.")
return a / b
@tool
def modulus(a: int, b: int) -> int:
"""
Perform precise modulus.
Use this when:
- Exact arithmetic is required
Do NOT use for:
- Estimation or reasoning
Input: two numbers
Output: result
"""
return a % b
@tool
def power(a: float, b: float) -> float:
"""
Perform precise power.
Use this when:
- Exact arithmetic is required
Do NOT use for:
- Estimation or reasoning
Input: two numbers
Output: result
"""
return a**b
@tool
def square_root(a: float) -> float | complex:
"""
Perform precise square_root.
Use this when:
- Exact arithmetic is required
Do NOT use for:
- Estimation or reasoning
Input: two numbers
Output: result
"""
if a >= 0:
return a**0.5
return cmath.sqrt(a)
### ============== IMAGE PROCESSING AND GENERATION TOOLS =============== ###
@tool
def analyze_image(image_base64: str) -> Dict[str, Any]:
"""
Analyze image properties (size, colors, brightness).
Use this when:
- You need metadata or insights about an image
Do NOT use for:
- editing or transforming images
Input: base64 image
Output: analysis data
"""
try:
img = decode_image(image_base64)
width, height = img.size
mode = img.mode
if mode in ("RGB", "RGBA"):
arr = np.array(img)
avg_colors = arr.mean(axis=(0, 1))
dominant = ["Red", "Green", "Blue"][np.argmax(avg_colors[:3])]
brightness = avg_colors.mean()
color_analysis = {
"average_rgb": avg_colors.tolist(),
"brightness": brightness,
"dominant_color": dominant,
}
else:
color_analysis = {"note": f"No color analysis for mode {mode}"}
thumbnail = img.copy()
thumbnail.thumbnail((100, 100))
thumb_path = save_image(thumbnail, "thumbnails")
thumbnail_base64 = encode_image(thumb_path)
log_tool("analyze_image", image_base64, {
"dimensions": (width, height),
"mode": mode,
"color_analysis": color_analysis,
"thumbnail": thumbnail_base64,
})
return {
"dimensions": (width, height),
"mode": mode,
"color_analysis": color_analysis,
"thumbnail": thumbnail_base64,
}
except Exception as e:
log_tool("analyze_image", image_base64, f"ERROR: {str(e)}")
return {"error": str(e)}
@tool
def transform_image(
image_base64: str, operation: str, params: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Modify an image (resize, rotate, crop, etc.)
Use this when:
- The task requires changing an image
Input: image + operation
Output: transformed image
"""
try:
img = decode_image(image_base64)
params = params or {}
if operation == "resize":
img = img.resize(
(
params.get("width", img.width // 2),
params.get("height", img.height // 2),
)
)
elif operation == "rotate":
img = img.rotate(params.get("angle", 90), expand=True)
elif operation == "crop":
img = img.crop(
(
params.get("left", 0),
params.get("top", 0),
params.get("right", img.width),
params.get("bottom", img.height),
)
)
elif operation == "flip":
if params.get("direction", "horizontal") == "horizontal":
img = img.transpose(Image.FLIP_LEFT_RIGHT)
else:
img = img.transpose(Image.FLIP_TOP_BOTTOM)
elif operation == "adjust_brightness":
img = ImageEnhance.Brightness(img).enhance(params.get("factor", 1.5))
elif operation == "adjust_contrast":
img = ImageEnhance.Contrast(img).enhance(params.get("factor", 1.5))
elif operation == "blur":
img = img.filter(ImageFilter.GaussianBlur(params.get("radius", 2)))
elif operation == "sharpen":
img = img.filter(ImageFilter.SHARPEN)
elif operation == "grayscale":
img = img.convert("L")
else:
return {"error": f"Unknown operation: {operation}"}
result_path = save_image(img)
result_base64 = encode_image(result_path)
log_tool("transform_image", {"operation": operation, "params": params}, result_base64)
return {"transformed_image": result_base64}
except Exception as e:
log_tool("transform_image", {"operation": operation, "params": params}, f"ERROR: {str(e)}")
return {"error": str(e)}
@tool
def draw_on_image(
image_base64: str, drawing_type: str, params: Dict[str, Any]
) -> Dict[str, Any]:
"""
Draw shapes (rectangle, circle, line) or text onto an image.
Use this when:
- Annotation or overlay is needed
Args:
image_base64 (str): Base64 encoded input image
drawing_type (str): Drawing type
params (Dict[str, Any]): Drawing parameters
Returns:
Dictionary with result image (base64)
"""
try:
img = decode_image(image_base64)
draw = ImageDraw.Draw(img)
color = params.get("color", "red")
if drawing_type == "rectangle":
draw.rectangle(
[params["left"], params["top"], params["right"], params["bottom"]],
outline=color,
width=params.get("width", 2),
)
elif drawing_type == "circle":
x, y, r = params["x"], params["y"], params["radius"]
draw.ellipse(
(x - r, y - r, x + r, y + r),
outline=color,
width=params.get("width", 2),
)
elif drawing_type == "line":
draw.line(
(
params["start_x"],
params["start_y"],
params["end_x"],
params["end_y"],
),
fill=color,
width=params.get("width", 2),
)
elif drawing_type == "text":
font_size = params.get("font_size", 20)
try:
font = ImageFont.truetype("arial.ttf", font_size)
except IOError:
font = ImageFont.load_default()
draw.text(
(params["x"], params["y"]),
params.get("text", "Text"),
fill=color,
font=font,
)
else:
return {"error": f"Unknown drawing type: {drawing_type}"}
result_path = save_image(img)
result_base64 = encode_image(result_path)
log_tool("draw_on_image", {"drawing_type": drawing_type, "params": params}, result_base64)
return {"result_image": result_base64}
except Exception as e:
log_tool("draw_on_image", {"drawing_type": drawing_type, "params": params}, f"ERROR: {str(e)}")
return {"error": str(e)}
@tool
def generate_simple_image(
image_type: str,
width: int = 500,
height: int = 500,
params: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""
Generate a simple image (gradient, noise, pattern, chart).
Args:
image_type (str): Type of image
width (int), height (int)
params (Dict[str, Any], optional): Specific parameters
Returns:
Dictionary with generated image (base64)
"""
try:
params = params or {}
if image_type == "gradient":
direction = params.get("direction", "horizontal")
start_color = params.get("start_color", (255, 0, 0))
end_color = params.get("end_color", (0, 0, 255))
img = Image.new("RGB", (width, height))
draw = ImageDraw.Draw(img)
if direction == "horizontal":
for x in range(width):
r = int(
start_color[0] + (end_color[0] - start_color[0]) * x / width
)
g = int(
start_color[1] + (end_color[1] - start_color[1]) * x / width
)
b = int(
start_color[2] + (end_color[2] - start_color[2]) * x / width
)
draw.line([(x, 0), (x, height)], fill=(r, g, b))
else:
for y in range(height):
r = int(
start_color[0] + (end_color[0] - start_color[0]) * y / height
)
g = int(
start_color[1] + (end_color[1] - start_color[1]) * y / height
)
b = int(
start_color[2] + (end_color[2] - start_color[2]) * y / height
)
draw.line([(0, y), (width, y)], fill=(r, g, b))
elif image_type == "noise":
noise_array = np.random.randint(0, 256, (height, width, 3), dtype=np.uint8)
img = Image.fromarray(noise_array, "RGB")
else:
return {"error": f"Unsupported image_type {image_type}"}
result_path = save_image(img)
result_base64 = encode_image(result_path)
log_tool("generate_simple_image", {"image_type": image_type, "params": params}, result_base64)
return {"generated_image": result_base64}
except Exception as e:
log_tool("generate_simple_image", {"image_type": image_type, "params": params}, f"ERROR: {str(e)}")
return {"error": str(e)}
@tool
def combine_images(
images_base64: List[str], operation: str, params: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Combine multiple images (collage, stack, blend).
Args:
images_base64 (List[str]): List of base64 images
operation (str): Combination type
params (Dict[str, Any], optional)
Returns:
Dictionary with combined image (base64)
"""
try:
images = [decode_image(b64) for b64 in images_base64]
params = params or {}
if operation == "stack":
direction = params.get("direction", "horizontal")
if direction == "horizontal":
total_width = sum(img.width for img in images)
max_height = max(img.height for img in images)
new_img = Image.new("RGB", (total_width, max_height))
x = 0
for img in images:
new_img.paste(img, (x, 0))
x += img.width
else:
max_width = max(img.width for img in images)
total_height = sum(img.height for img in images)
new_img = Image.new("RGB", (max_width, total_height))
y = 0
for img in images:
new_img.paste(img, (0, y))
y += img.height
else:
return {"error": f"Unsupported combination operation {operation}"}
result_path = save_image(new_img)
result_base64 = encode_image(result_path)
log_tool("combine_images", {"operation": operation, "params": params}, result_base64)
return {"combined_image": result_base64}
except Exception as e:
log_tool("combine_images", {"operation": operation, "params": params}, f"ERROR: {str(e)}")
return {"error": str(e)}
### =============== CODE INTERPRETER TOOLS =============== ###
@tool
def execute_code_multilang(code: str, language: str = "python") -> str:
"""Execute code in Python, Bash, SQL, C, or Java.
Use this when:
- The task requires code execution
- Logic is complex or requires runtime evaluation
- Data processing or scripting is needed
Do NOT use for:
- Simple math (use math tools)
- General reasoning
Input: code + language
Output: execution result (stdout, errors, etc.)
"""
supported_languages = ["python", "bash", "sql", "c", "java"]
language = language.lower()
if language not in supported_languages:
return f"❌ Unsupported language: {language}. Supported languages are: {', '.join(supported_languages)}"
result = interpreter_instance.execute_code(code, language=language)
response = []
if result["status"] == "success":
response.append(f"✅ Code executed successfully in **{language.upper()}**")
if result.get("stdout"):
response.append(
"\n**Standard Output:**\n```\n" + result["stdout"].strip() + "\n```"
)
if result.get("stderr"):
response.append(
"\n**Standard Error (if any):**\n```\n"
+ result["stderr"].strip()
+ "\n```"
)
if result.get("result") is not None:
response.append(
"\n**Execution Result:**\n```\n"
+ str(result["result"]).strip()
+ "\n```"
)
if result.get("dataframes"):
for df_info in result["dataframes"]:
response.append(
f"\n**DataFrame `{df_info['name']}` (Shape: {df_info['shape']})**"
)
df_preview = pd.DataFrame(df_info["head"])
response.append("First 5 rows:\n```\n" + str(df_preview) + "\n```")
if result.get("plots"):
response.append(
f"\n**Generated {len(result['plots'])} plot(s)** (Image data returned separately)"
)
else:
response.append(f"❌ Code execution failed in **{language.upper()}**")
if result.get("stderr"):
response.append(
"\n**Error Log:**\n```\n" + result["stderr"].strip() + "\n```"
)
log_tool("execute_code_multilang", {"code": code, "language": language}, "\n".join(response))
return "\n".join(response)
### =============== DOCUMENT PROCESSING TOOLS =============== ###
@tool
def save_and_read_file(content: str, filename: Optional[str] = None) -> str:
"""
Save content to a file.
Use this when:
- You need to persist generated data for further processing
Input: content + optional filename
Output: file path
"""
temp_dir = tempfile.gettempdir()
if filename is None:
temp_file = tempfile.NamedTemporaryFile(delete=False, dir=temp_dir)
filepath = temp_file.name
else:
filepath = os.path.join(temp_dir, filename)
with open(filepath, "w") as f:
f.write(content)
log_tool("save_and_read_file", {"content": content, "filename": filename}, f"File saved to {filepath}")
return f"File saved to {filepath}. You can read this file to process its contents."
@tool
def download_file_from_url(url: str, filename: Optional[str] = None) -> str:
"""
Download a file from a URL and save it to a temporary location.
Args:
url (str): the URL of the file to download.
filename (str, optional): the name of the file. If not provided, a random name file will be created.
"""
try:
# Parse URL to get filename if not provided
if not filename:
path = urlparse(url).path
filename = os.path.basename(path)
if not filename:
filename = f"downloaded_{uuid.uuid4().hex[:8]}"
# Create temporary file
temp_dir = tempfile.gettempdir()
filepath = os.path.join(temp_dir, filename)
# Download the file
response = requests.get(url, stream=True)
response.raise_for_status()
# Save the file
with open(filepath, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
log_tool("download_file_from_url", url, f"File downloaded to {filepath}")
return f"File downloaded to {filepath}. You can read this file to process its contents."
except Exception as e:
log_tool("download_file_from_url", url, f"ERROR: {str(e)}")
return f"Error downloading file: {str(e)}"
@tool
def extract_text_from_image(image_path: str) -> str:
"""
Extract text from an image using OCR.
Use this when:
- The image contains readable text
Do NOT use for:
- Visual analysis (use analyze_image instead)
Input: image path
Output: extracted text
"""
try:
# Open the image
image = Image.open(image_path)
# Extract text from the image
text = pytesseract.image_to_string(image)
log_tool("extract_text_from_image", image_path, text)
return f"Extracted text from image:\n\n{text}"
except Exception as e:
log_tool("extract_text_from_image", image_path, f"ERROR: {str(e)}")
return f"Error extracting text from image: {str(e)}"
@tool
def analyze_csv_file(file_path: str, query: str) -> str:
"""
Analyze a CSV file using pandas.
Use this when:
- The task involves structured tabular data
- The user asks questions about CSV content
Do NOT use for:
- Non-tabular data
Input: CSV path + query
Output: summary and statistics
"""
try:
# Read the CSV file
df = pd.read_csv(file_path)
# Run various analyses based on the query
result = f"CSV file loaded with {len(df)} rows and {len(df.columns)} columns.\n"
result += f"Columns: {', '.join(df.columns)}\n\n"
# Add summary statistics
result += "Summary statistics:\n"
result += str(df.describe())
log_tool("analyze_csv_file", file_path, result)
return result
except Exception as e:
log_tool("analyze_csv_file", file_path, f"ERROR: {str(e)}")
return f"Error analyzing CSV file: {str(e)}"
@tool
def analyze_excel_file(file_path: str, query: str) -> str:
"""
Analyze an Excel file.
Use this when:
- The task involves spreadsheet data
Input: Excel file path + query
Output: analysis and summary
"""
try:
# Read the Excel file
df = pd.read_excel(file_path)
# Run various analyses based on the query
result = (
f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n"
)
result += f"Columns: {', '.join(df.columns)}\n\n"
# Add summary statistics
result += "Summary statistics:\n"
result += str(df.describe())
log_tool("analyze_excel_tool", {file_path,query},result)
return result
except Exception as e:
log_tool("analyze_excel_tool", {file_path,query},f"ERROR: {str(e)}")
return f"Error analyzing Excel file: {str(e)}"