shekkari21's picture
added session and memory
64462d2
"""File operation tools for the agent framework."""
import zipfile
import sys
import os
import base64
from pathlib import Path
from dotenv import load_dotenv
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from agent_framework import tool
# Load environment variables
load_dotenv()
# Import optional dependencies
try:
import pandas as pd
PANDAS_AVAILABLE = True
except ImportError:
PANDAS_AVAILABLE = False
except Exception as e:
print(f"Warning: pandas import failed with: {e}")
PANDAS_AVAILABLE = False
try:
import openpyxl
OPENPYXL_AVAILABLE = True
except ImportError:
OPENPYXL_AVAILABLE = False
except Exception as e:
print(f"Warning: openpyxl import failed with: {e}")
OPENPYXL_AVAILABLE = False
try:
import fitz # pymupdf
PYPDF_AVAILABLE = True
except ImportError:
PYPDF_AVAILABLE = False
try:
from openai import OpenAI
OPENAI_AVAILABLE = True
except ImportError:
OPENAI_AVAILABLE = False
@tool
def unzip_file(zip_path: str, extract_to: str = None) -> str:
"""Extract a zip file to the specified directory.
Args:
zip_path: Path to the zip file to extract
extract_to: Directory to extract to. If None, creates a folder with the zip filename.
Returns:
String describing the extraction results, including file count and contents list.
Example:
result = unzip_file("archive.zip", "extracted/")
"""
zip_path = Path(zip_path)
if not zip_path.exists():
return f"Error: File not found: {zip_path}"
# Default extraction path: create folder with zip filename
if extract_to is None:
extract_to = zip_path.parent / zip_path.stem
else:
extract_to = Path(extract_to)
extract_to.mkdir(parents=True, exist_ok=True)
try:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
file_list = zip_ref.namelist()
zip_ref.extractall(extract_to)
# Format results
result = f"Successfully extracted {len(file_list)} files to {extract_to}/\n\n"
result += "Contents:\n"
for f in file_list[:20]:
result += f" - {f}\n"
if len(file_list) > 20:
result += f" ... and {len(file_list) - 20} more files\n"
return result
except Exception as e:
return f"Error extracting zip file: {str(e)}"
@tool
def list_files(path: str = ".") -> str:
"""List files and directories in the given path."""
path = Path(path)
if not path.exists():
return f"Path not found: {path}"
if not path.is_dir():
return f"Not a directory: {path}"
items = []
for item in sorted(path.iterdir()):
if item.name.startswith('.'):
continue
if item.is_dir():
items.append(f"{item.name}/")
else:
items.append(f"{item.name}")
# Sort directories first
dirs = [i for i in items if i.endswith('/')]
files = [i for i in items if not i.endswith('/')]
result = f"Directory: {path}\n"
for item in dirs + files:
result += f" {item}\n"
return result
# Helper function - not exposed as tool (starts with _)
def _read_text_file(file_path: str, start_line: int, end_line: int) -> str:
with open(file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
# Adjust line numbers (1-indexed to 0-indexed)
start_idx = max(0, start_line - 1)
end_idx = len(lines) if end_line == -1 else min(end_line, len(lines))
selected_lines = lines[start_idx:end_idx]
result = []
for i, line in enumerate(selected_lines, start=start_line):
result.append(f"{i:4d} | {line.rstrip()}")
return '\n'.join(result)
# Helper function - not exposed as tool
def _read_csv(file_path: str) -> str:
if not PANDAS_AVAILABLE:
return "Error: pandas is required for CSV reading. Install with: pip install pandas"
try:
df = pd.read_csv(file_path)
result = f"CSV file: {file_path}\n"
result += f"Shape: {df.shape[0]} rows x {df.shape[1]} columns\n\n"
result += df.to_string(index=False)
return result
except Exception as e:
return f"Error reading CSV file: {str(e)}"
# Helper function - not exposed as tool
def _read_excel(file_path: str) -> str:
if not PANDAS_AVAILABLE:
return "Error: pandas is required for Excel reading. Install with: pip install pandas openpyxl"
# Check for openpyxl specifically for .xlsx files
if file_path.endswith('.xlsx') and not OPENPYXL_AVAILABLE:
return ("Error: openpyxl package is not installed. "
"To read .xlsx files, install it with: pip install openpyxl or uv pip install openpyxl. "
"The package is listed in pyproject.toml but may not be installed in the current environment.")
try:
# Explicitly use openpyxl for .xlsx files
if file_path.endswith('.xlsx'):
df = pd.read_excel(file_path, engine='openpyxl')
else:
df = pd.read_excel(file_path)
# Use to_string() instead of to_markdown() to avoid tabulate dependency
# Format as a clean table
result = f"Excel file: {file_path}\n"
result += f"Shape: {df.shape[0]} rows x {df.shape[1]} columns\n\n"
result += df.to_string(index=False)
return result
except ImportError as e:
error_msg = str(e).lower()
if 'openpyxl' in error_msg:
return ("Error: openpyxl is required for .xlsx files. "
"Install with: pip install openpyxl or uv pip install openpyxl. "
"Then restart the Python environment.")
if 'tabulate' in error_msg:
# Fallback if somehow to_string fails too
return f"Error: tabulate dependency issue. {str(e)}"
return f"Error: Missing dependency. {str(e)}. Install required packages: pip install pandas openpyxl"
except Exception as e:
return f"Error reading Excel file: {str(e)}"
TEXT_EXTENSIONS = ['.txt', '.py', '.js', '.json', '.md', '.html',
'.css', '.xml', '.yaml', '.yml', '.log', '.sh']
SPREADSHEET_EXTENSIONS = ['.xlsx', '.xls', '.csv']
@tool
def read_file(file_path: str, start_line: int = 1, end_line: int = -1) -> str:
"""Read file content. Supports txt, py, json, md, csv, xlsx."""
path = Path(file_path)
if not path.exists():
return f"File not found: {file_path}"
ext = path.suffix.lower()
if ext in TEXT_EXTENSIONS:
return _read_text_file(file_path, start_line, end_line)
elif ext == '.csv':
return _read_csv(file_path)
elif ext in SPREADSHEET_EXTENSIONS:
return _read_excel(file_path)
else:
return _read_text_file(file_path, start_line, end_line)
IMAGE_EXTENSIONS = ['.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp']
AUDIO_EXTENSIONS = ['.mp3', '.wav', '.m4a', '.flac', '.ogg', '.webm']
PDF_EXTENSIONS = ['.pdf']
@tool
def read_media_file(file_path: str, query: str) -> str:
"""Analyze an image, audio, or PDF file using LLM."""
ext = Path(file_path).suffix.lower()
if ext in IMAGE_EXTENSIONS:
return _analyze_image(file_path, query)
elif ext in AUDIO_EXTENSIONS:
return _analyze_audio(file_path, query)
elif ext in PDF_EXTENSIONS:
return _analyze_pdf(file_path, query)
else:
return f"Unsupported media format: {ext}"
# Helper function - not exposed as tool
def _analyze_image(file_path: str, query: str) -> str:
if not OPENAI_AVAILABLE:
return "Error: openai is required for image analysis. Install with: pip install openai"
with open(file_path, "rb") as f:
image_data = base64.b64encode(f.read()).decode("utf-8")
ext = Path(file_path).suffix.lower().lstrip('.')
media_type = "image/jpeg" if ext == "jpg" else f"image/{ext}"
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
response = client.chat.completions.create(
model="gpt-4o",
messages=[{
"role": "user",
"content": [
{"type": "text", "text": query},
{"type": "image_url", "image_url": {
"url": f"data:{media_type};base64,{image_data}"
}}
]
}]
)
return response.choices[0].message.content
# Helper function - not exposed as tool
def _analyze_audio(file_path: str, query: str) -> str:
if not OPENAI_AVAILABLE:
return "Error: openai is required for audio analysis. Install with: pip install openai"
with open(file_path, "rb") as f:
audio_data = base64.b64encode(f.read()).decode("utf-8")
ext = Path(file_path).suffix.lower().lstrip('.')
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
response = client.chat.completions.create(
model="gpt-4o-audio-preview",
messages=[{
"role": "user",
"content": [
{"type": "text", "text": query},
{"type": "input_audio", "input_audio": {
"data": audio_data,
"format": ext
}}
]
}]
)
return response.choices[0].message.content
# Helper function - not exposed as tool
def _analyze_pdf(file_path: str, query: str) -> str:
if not PYPDF_AVAILABLE:
return "Error: pymupdf is required for PDF analysis. Install with: pip install pymupdf"
if not OPENAI_AVAILABLE:
return "Error: openai is required for PDF analysis. Install with: pip install openai"
doc = fitz.open(file_path)
# Extract text for context
text_content = ""
for page in doc:
text_content += page.get_text()
# Convert pages to images
images = []
for page in doc[:5]: # First 5 pages
pix = page.get_pixmap(matrix=fitz.Matrix(2, 2))
img_bytes = pix.tobytes("png")
images.append(base64.b64encode(img_bytes).decode('utf-8'))
# Build content with text and images
content = [{
"type": "text",
"text": f"{query}\n\nExtracted text:\n{text_content[:3000]}"
}]
for img_b64 in images:
content.append({
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{img_b64}"}
})
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": content}]
)
return response.choices[0].message.content
@tool(
name="delete_file",
description="Delete a file from the filesystem",
requires_confirmation=True,
confirmation_message="The agent wants to delete a file. Arguments: {arguments}. "
"This action cannot be undone. Do you approve?"
)
def delete_file(filename: str) -> str:
"""Delete the specified file."""
import os
os.remove(filename)
return f"Successfully deleted {filename}"