Prasanthkumar's picture
Upload 4 files
f64893d verified
import os
import uuid
import requests
import tempfile
from PIL import Image
import pytesseract
import pandas as pd
from urllib.parse import urlparse
from langchain_core.tools import tool
from typing import Optional
import logging
import pandasql as psql
# ------------------- 🔧 Logger Setup -------------------
def setup_logger():
logger = logging.getLogger("FileToolLogger")
logger.setLevel(logging.INFO)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
logger = setup_logger()
# ------------------- 📄 Save Content to File -------------------
@tool
def save_and_read_file(content: str, filename: Optional[str] = None) -> str:
"""
Save content to a file and return the path.
Args:
content (str): the content to save to the file
filename (str, optional): the name of the file. If not provided, a random name file will be created.
"""
temp_dir = tempfile.gettempdir()
if filename is None:
temp_file = tempfile.NamedTemporaryFile(delete=False, dir=temp_dir)
filepath = temp_file.name
else:
filepath = os.path.join(temp_dir, filename)
with open(filepath, "w") as f:
f.write(content)
return f"File saved to {filepath}. You can read this file to process its contents."
# ------------------- 📄 Save Content to File -------------------
@tool
def download_file_from_url(url: str, filename: Optional[str] = None) -> str:
"""
Download a file from a URL and save it to a temporary location.
Args:
url (str): the URL of the file to download.
filename (str, optional): the name of the file. If not provided, a random name file will be created.
"""
try:
# Parse URL to get filename if not provided
if not filename:
path = urlparse(url).path
filename = os.path.basename(path)
if not filename:
filename = f"downloaded_{uuid.uuid4().hex[:8]}"
# Create temporary file
temp_dir = tempfile.gettempdir()
filepath = os.path.join(temp_dir, filename)
# Download the file
response = requests.get(url, stream=True)
response.raise_for_status()
# Save the file
with open(filepath, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return f"File downloaded to {filepath}. You can read this file to process its contents."
except Exception as e:
return f"Error downloading file: {str(e)}"
@tool
def extract_text_from_image(image_path: str) -> str:
"""
Extract text from an image using OCR library pytesseract (if available).
Args:
image_path (str): the path to the image file.
"""
try:
# Open the image
image = Image.open(image_path)
# Extract text from the image
text = pytesseract.image_to_string(image)
return f"Extracted text from image:\n\n{text}"
except Exception as e:
return f"Error extracting text from image: {str(e)}"
@tool
def analyze_csv_file(file_path: str, query: Optional[str] = None) -> str:
"""
Analyze a CSV file using pandas and answer a question about it.
Args:
file_path (str): the path to the CSV file.
query (str): Question about the data
"""
if not os.path.isfile(file_path) or not file_path.endswith((".csv")):
return "Invalid or missing csv file."
try :
df = pd.read_csv(file_path)
columns = df.columns
result = [f"CSV loaded with shape: {df.shape}", f" Columns: {', '.join(columns)}"]
if query:
result.append(f"\n Query: {query}")
result_df = psql.sqldf(query, {"df": df})
result.append("Query Result:\n" + result_df.to_string(index=False))
else:
result.append("\nSummary:\n" + str(df.describe(include='all')))
return "\n".join(result)
except Exception as e:
return f"Error analyzing CSV file: {str(e)}"
@tool
def analyze_excel_file(file_path: str, query: Optional[str] = None) -> str:
"""
Analyze a excel file using pandas and answer a question about it.
Args:
file_path (str): the path to the xls or xlsx file.
query (str): Question about the data
"""
if not os.path.isfile(file_path) or not file_path.endswith((".xls", ".xlsx")):
return "Invalid or missing Excel file."
try :
df = pd.read_excel(file_path)
columns = df.columns
result = [f"CSV loaded with shape: {df.shape}", f" Columns: {', '.join(columns)}"]
if query:
result.append(f"\n Query: {query}")
result_df = psql.sqldf(query, {"df": df})
result.append("Query Result:\n" + result_df.to_string(index=False))
else:
result.append("\nSummary:\n" + str(df.describe(include='all')))
return "\n".join(result)
except Exception as e:
return f"Error analyzing Excel file: {str(e)}"