Spaces:
Sleeping
Sleeping
| """File operation tools for the agent framework.""" | |
| import zipfile | |
| import sys | |
| import os | |
| import base64 | |
| from pathlib import Path | |
| from dotenv import load_dotenv | |
| # Add parent directory to path for imports | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| from agent_framework import tool | |
| # Load environment variables | |
| load_dotenv() | |
| # Import optional dependencies | |
| try: | |
| import pandas as pd | |
| PANDAS_AVAILABLE = True | |
| except ImportError: | |
| PANDAS_AVAILABLE = False | |
| except Exception as e: | |
| print(f"Warning: pandas import failed with: {e}") | |
| PANDAS_AVAILABLE = False | |
| try: | |
| import openpyxl | |
| OPENPYXL_AVAILABLE = True | |
| except ImportError: | |
| OPENPYXL_AVAILABLE = False | |
| except Exception as e: | |
| print(f"Warning: openpyxl import failed with: {e}") | |
| OPENPYXL_AVAILABLE = False | |
| try: | |
| import fitz # pymupdf | |
| PYPDF_AVAILABLE = True | |
| except ImportError: | |
| PYPDF_AVAILABLE = False | |
| try: | |
| from openai import OpenAI | |
| OPENAI_AVAILABLE = True | |
| except ImportError: | |
| OPENAI_AVAILABLE = False | |
| def unzip_file(zip_path: str, extract_to: str = None) -> str: | |
| """Extract a zip file to the specified directory. | |
| Args: | |
| zip_path: Path to the zip file to extract | |
| extract_to: Directory to extract to. If None, creates a folder with the zip filename. | |
| Returns: | |
| String describing the extraction results, including file count and contents list. | |
| Example: | |
| result = unzip_file("archive.zip", "extracted/") | |
| """ | |
| zip_path = Path(zip_path) | |
| if not zip_path.exists(): | |
| return f"Error: File not found: {zip_path}" | |
| # Default extraction path: create folder with zip filename | |
| if extract_to is None: | |
| extract_to = zip_path.parent / zip_path.stem | |
| else: | |
| extract_to = Path(extract_to) | |
| extract_to.mkdir(parents=True, exist_ok=True) | |
| try: | |
| with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
| file_list = zip_ref.namelist() | |
| zip_ref.extractall(extract_to) | |
| # Format results | |
| result = f"Successfully extracted {len(file_list)} files to {extract_to}/\n\n" | |
| result += "Contents:\n" | |
| for f in file_list[:20]: | |
| result += f" - {f}\n" | |
| if len(file_list) > 20: | |
| result += f" ... and {len(file_list) - 20} more files\n" | |
| return result | |
| except Exception as e: | |
| return f"Error extracting zip file: {str(e)}" | |
| def list_files(path: str = ".") -> str: | |
| """List files and directories in the given path.""" | |
| path = Path(path) | |
| if not path.exists(): | |
| return f"Path not found: {path}" | |
| if not path.is_dir(): | |
| return f"Not a directory: {path}" | |
| items = [] | |
| for item in sorted(path.iterdir()): | |
| if item.name.startswith('.'): | |
| continue | |
| if item.is_dir(): | |
| items.append(f"{item.name}/") | |
| else: | |
| items.append(f"{item.name}") | |
| # Sort directories first | |
| dirs = [i for i in items if i.endswith('/')] | |
| files = [i for i in items if not i.endswith('/')] | |
| result = f"Directory: {path}\n" | |
| for item in dirs + files: | |
| result += f" {item}\n" | |
| return result | |
| # Helper function - not exposed as tool (starts with _) | |
| def _read_text_file(file_path: str, start_line: int, end_line: int) -> str: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| lines = f.readlines() | |
| # Adjust line numbers (1-indexed to 0-indexed) | |
| start_idx = max(0, start_line - 1) | |
| end_idx = len(lines) if end_line == -1 else min(end_line, len(lines)) | |
| selected_lines = lines[start_idx:end_idx] | |
| result = [] | |
| for i, line in enumerate(selected_lines, start=start_line): | |
| result.append(f"{i:4d} | {line.rstrip()}") | |
| return '\n'.join(result) | |
| # Helper function - not exposed as tool | |
| def _read_csv(file_path: str) -> str: | |
| if not PANDAS_AVAILABLE: | |
| return "Error: pandas is required for CSV reading. Install with: pip install pandas" | |
| try: | |
| df = pd.read_csv(file_path) | |
| result = f"CSV file: {file_path}\n" | |
| result += f"Shape: {df.shape[0]} rows x {df.shape[1]} columns\n\n" | |
| result += df.to_string(index=False) | |
| return result | |
| except Exception as e: | |
| return f"Error reading CSV file: {str(e)}" | |
| # Helper function - not exposed as tool | |
| def _read_excel(file_path: str) -> str: | |
| if not PANDAS_AVAILABLE: | |
| return "Error: pandas is required for Excel reading. Install with: pip install pandas openpyxl" | |
| # Check for openpyxl specifically for .xlsx files | |
| if file_path.endswith('.xlsx') and not OPENPYXL_AVAILABLE: | |
| return ("Error: openpyxl package is not installed. " | |
| "To read .xlsx files, install it with: pip install openpyxl or uv pip install openpyxl. " | |
| "The package is listed in pyproject.toml but may not be installed in the current environment.") | |
| try: | |
| # Explicitly use openpyxl for .xlsx files | |
| if file_path.endswith('.xlsx'): | |
| df = pd.read_excel(file_path, engine='openpyxl') | |
| else: | |
| df = pd.read_excel(file_path) | |
| # Use to_string() instead of to_markdown() to avoid tabulate dependency | |
| # Format as a clean table | |
| result = f"Excel file: {file_path}\n" | |
| result += f"Shape: {df.shape[0]} rows x {df.shape[1]} columns\n\n" | |
| result += df.to_string(index=False) | |
| return result | |
| except ImportError as e: | |
| error_msg = str(e).lower() | |
| if 'openpyxl' in error_msg: | |
| return ("Error: openpyxl is required for .xlsx files. " | |
| "Install with: pip install openpyxl or uv pip install openpyxl. " | |
| "Then restart the Python environment.") | |
| if 'tabulate' in error_msg: | |
| # Fallback if somehow to_string fails too | |
| return f"Error: tabulate dependency issue. {str(e)}" | |
| return f"Error: Missing dependency. {str(e)}. Install required packages: pip install pandas openpyxl" | |
| except Exception as e: | |
| return f"Error reading Excel file: {str(e)}" | |
| TEXT_EXTENSIONS = ['.txt', '.py', '.js', '.json', '.md', '.html', | |
| '.css', '.xml', '.yaml', '.yml', '.log', '.sh'] | |
| SPREADSHEET_EXTENSIONS = ['.xlsx', '.xls', '.csv'] | |
| def read_file(file_path: str, start_line: int = 1, end_line: int = -1) -> str: | |
| """Read file content. Supports txt, py, json, md, csv, xlsx.""" | |
| path = Path(file_path) | |
| if not path.exists(): | |
| return f"File not found: {file_path}" | |
| ext = path.suffix.lower() | |
| if ext in TEXT_EXTENSIONS: | |
| return _read_text_file(file_path, start_line, end_line) | |
| elif ext == '.csv': | |
| return _read_csv(file_path) | |
| elif ext in SPREADSHEET_EXTENSIONS: | |
| return _read_excel(file_path) | |
| else: | |
| return _read_text_file(file_path, start_line, end_line) | |
| IMAGE_EXTENSIONS = ['.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp'] | |
| AUDIO_EXTENSIONS = ['.mp3', '.wav', '.m4a', '.flac', '.ogg', '.webm'] | |
| PDF_EXTENSIONS = ['.pdf'] | |
| def read_media_file(file_path: str, query: str) -> str: | |
| """Analyze an image, audio, or PDF file using LLM.""" | |
| ext = Path(file_path).suffix.lower() | |
| if ext in IMAGE_EXTENSIONS: | |
| return _analyze_image(file_path, query) | |
| elif ext in AUDIO_EXTENSIONS: | |
| return _analyze_audio(file_path, query) | |
| elif ext in PDF_EXTENSIONS: | |
| return _analyze_pdf(file_path, query) | |
| else: | |
| return f"Unsupported media format: {ext}" | |
| # Helper function - not exposed as tool | |
| def _analyze_image(file_path: str, query: str) -> str: | |
| if not OPENAI_AVAILABLE: | |
| return "Error: openai is required for image analysis. Install with: pip install openai" | |
| with open(file_path, "rb") as f: | |
| image_data = base64.b64encode(f.read()).decode("utf-8") | |
| ext = Path(file_path).suffix.lower().lstrip('.') | |
| media_type = "image/jpeg" if ext == "jpg" else f"image/{ext}" | |
| client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| response = client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[{ | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": query}, | |
| {"type": "image_url", "image_url": { | |
| "url": f"data:{media_type};base64,{image_data}" | |
| }} | |
| ] | |
| }] | |
| ) | |
| return response.choices[0].message.content | |
| # Helper function - not exposed as tool | |
| def _analyze_audio(file_path: str, query: str) -> str: | |
| if not OPENAI_AVAILABLE: | |
| return "Error: openai is required for audio analysis. Install with: pip install openai" | |
| with open(file_path, "rb") as f: | |
| audio_data = base64.b64encode(f.read()).decode("utf-8") | |
| ext = Path(file_path).suffix.lower().lstrip('.') | |
| client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| response = client.chat.completions.create( | |
| model="gpt-4o-audio-preview", | |
| messages=[{ | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": query}, | |
| {"type": "input_audio", "input_audio": { | |
| "data": audio_data, | |
| "format": ext | |
| }} | |
| ] | |
| }] | |
| ) | |
| return response.choices[0].message.content | |
| # Helper function - not exposed as tool | |
| def _analyze_pdf(file_path: str, query: str) -> str: | |
| if not PYPDF_AVAILABLE: | |
| return "Error: pymupdf is required for PDF analysis. Install with: pip install pymupdf" | |
| if not OPENAI_AVAILABLE: | |
| return "Error: openai is required for PDF analysis. Install with: pip install openai" | |
| doc = fitz.open(file_path) | |
| # Extract text for context | |
| text_content = "" | |
| for page in doc: | |
| text_content += page.get_text() | |
| # Convert pages to images | |
| images = [] | |
| for page in doc[:5]: # First 5 pages | |
| pix = page.get_pixmap(matrix=fitz.Matrix(2, 2)) | |
| img_bytes = pix.tobytes("png") | |
| images.append(base64.b64encode(img_bytes).decode('utf-8')) | |
| # Build content with text and images | |
| content = [{ | |
| "type": "text", | |
| "text": f"{query}\n\nExtracted text:\n{text_content[:3000]}" | |
| }] | |
| for img_b64 in images: | |
| content.append({ | |
| "type": "image_url", | |
| "image_url": {"url": f"data:image/png;base64,{img_b64}"} | |
| }) | |
| client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| response = client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[{"role": "user", "content": content}] | |
| ) | |
| return response.choices[0].message.content | |
| def delete_file(filename: str) -> str: | |
| """Delete the specified file.""" | |
| import os | |
| os.remove(filename) | |
| return f"Successfully deleted {filename}" |