| |
| """ |
| File Processing Tool for GAIA Agent System |
| Handles multiple file formats: images, audio, Excel/CSV, Python code |
| """ |
|
|
| import os |
| import re |
| import io |
| import logging |
| import mimetypes |
| from typing import Dict, List, Optional, Any, Union |
| from pathlib import Path |
| import pandas as pd |
| from PIL import Image |
| import ast |
|
|
| from tools import BaseTool |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class FileProcessingResult: |
| """Container for file processing results""" |
| |
| def __init__(self, file_path: str, file_type: str, success: bool, |
| content: Any = None, metadata: Dict[str, Any] = None): |
| self.file_path = file_path |
| self.file_type = file_type |
| self.success = success |
| self.content = content |
| self.metadata = metadata or {} |
| |
| def to_dict(self) -> Dict[str, Any]: |
| return { |
| "file_path": self.file_path, |
| "file_type": self.file_type, |
| "success": self.success, |
| "content": self.content, |
| "metadata": self.metadata |
| } |
|
|
| class FileProcessorTool(BaseTool): |
| """ |
| File processor tool for multiple file formats |
| Supports images, audio, Excel/CSV, and Python code analysis |
| """ |
| |
| def __init__(self): |
| super().__init__("file_processor") |
| |
| |
| self.image_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'} |
| self.audio_extensions = {'.mp3', '.wav', '.ogg', '.flac', '.m4a', '.aac'} |
| self.data_extensions = {'.csv', '.xlsx', '.xls', '.json', '.txt'} |
| self.code_extensions = {'.py', '.js', '.java', '.cpp', '.c', '.html', '.css'} |
| |
| def _execute_impl(self, input_data: Any, **kwargs) -> Dict[str, Any]: |
| """ |
| Execute file processing operations based on input type |
| |
| Args: |
| input_data: Can be: |
| - str: File path to process |
| - dict: {"file_path": str, "operation": str, "options": dict} |
| """ |
| |
| if isinstance(input_data, str): |
| return self._process_file(input_data) |
| |
| elif isinstance(input_data, dict): |
| file_path = input_data.get("file_path", "") |
| operation = input_data.get("operation", "auto") |
| options = input_data.get("options", {}) |
| |
| if operation == "auto": |
| return self._process_file(file_path, **options) |
| elif operation == "analyze_image": |
| return self._analyze_image(file_path, **options) |
| elif operation == "process_data": |
| return self._process_data_file(file_path, **options) |
| elif operation == "analyze_code": |
| return self._analyze_code(file_path, **options) |
| else: |
| raise ValueError(f"Unknown operation: {operation}") |
| else: |
| raise ValueError(f"Unsupported input type: {type(input_data)}") |
| |
| def _process_file(self, file_path: str, **options) -> Dict[str, Any]: |
| """ |
| Auto-detect file type and process accordingly |
| """ |
| try: |
| if not os.path.exists(file_path): |
| return { |
| "success": False, |
| "message": f"File not found: {file_path}", |
| "error_type": "file_not_found" |
| } |
| |
| |
| file_extension = Path(file_path).suffix.lower() |
| file_type = self._detect_file_type(file_path, file_extension) |
| |
| logger.info(f"Processing {file_type} file: {file_path}") |
| |
| |
| if file_type == "image": |
| return self._analyze_image(file_path, **options) |
| elif file_type == "audio": |
| return self._analyze_audio(file_path, **options) |
| elif file_type == "data": |
| return self._process_data_file(file_path, **options) |
| elif file_type == "code": |
| return self._analyze_code(file_path, **options) |
| elif file_type == "text": |
| return self._process_text_file(file_path, **options) |
| else: |
| return { |
| "success": False, |
| "message": f"Unsupported file type: {file_type}", |
| "file_path": file_path, |
| "detected_type": file_type |
| } |
| |
| except Exception as e: |
| return { |
| "success": False, |
| "message": f"File processing failed: {str(e)}", |
| "file_path": file_path, |
| "error_type": type(e).__name__ |
| } |
| |
| def _detect_file_type(self, file_path: str, extension: str) -> str: |
| """Detect file type based on extension and MIME type""" |
| |
| if extension in self.image_extensions: |
| return "image" |
| elif extension in self.audio_extensions: |
| return "audio" |
| elif extension in self.data_extensions: |
| return "data" |
| elif extension in self.code_extensions: |
| return "code" |
| elif extension in {'.txt', '.md', '.rst'}: |
| return "text" |
| else: |
| |
| mime_type, _ = mimetypes.guess_type(file_path) |
| if mime_type: |
| if mime_type.startswith('image/'): |
| return "image" |
| elif mime_type.startswith('audio/'): |
| return "audio" |
| elif mime_type.startswith('text/'): |
| return "text" |
| |
| return "unknown" |
| |
| def _analyze_image(self, file_path: str, **options) -> Dict[str, Any]: |
| """ |
| Analyze image files and extract metadata |
| """ |
| try: |
| with Image.open(file_path) as img: |
| |
| metadata = { |
| "format": img.format, |
| "mode": img.mode, |
| "size": img.size, |
| "width": img.width, |
| "height": img.height, |
| "file_size": os.path.getsize(file_path) |
| } |
| |
| |
| if hasattr(img, '_getexif') and img._getexif(): |
| exif = img._getexif() |
| if exif: |
| metadata["exif_data"] = dict(list(exif.items())[:10]) |
| |
| |
| if img.mode in ['RGB', 'RGBA']: |
| colors = img.getcolors(maxcolors=10) |
| if colors: |
| dominant_colors = sorted(colors, reverse=True)[:5] |
| metadata["dominant_colors"] = [ |
| {"count": count, "rgb": color} |
| for count, color in dominant_colors |
| ] |
| |
| |
| content_description = self._describe_image_content(img, metadata) |
| |
| result = FileProcessingResult( |
| file_path=file_path, |
| file_type="image", |
| success=True, |
| content=content_description, |
| metadata=metadata |
| ) |
| |
| return { |
| "success": True, |
| "result": result.to_dict(), |
| "message": f"Successfully analyzed image: {img.width}x{img.height} {img.format}" |
| } |
| |
| except Exception as e: |
| return { |
| "success": False, |
| "message": f"Image analysis failed: {str(e)}", |
| "file_path": file_path, |
| "error_type": type(e).__name__ |
| } |
| |
| def _describe_image_content(self, img: Image.Image, metadata: Dict[str, Any]) -> str: |
| """Generate basic description of image content""" |
| description_parts = [] |
| |
| |
| width, height = img.size |
| if width > height: |
| orientation = "landscape" |
| elif height > width: |
| orientation = "portrait" |
| else: |
| orientation = "square" |
| |
| description_parts.append(f"{orientation} {img.format} image") |
| description_parts.append(f"Dimensions: {width} x {height} pixels") |
| |
| |
| if img.mode == 'RGB': |
| description_parts.append("Full color RGB image") |
| elif img.mode == 'RGBA': |
| description_parts.append("RGB image with transparency") |
| elif img.mode == 'L': |
| description_parts.append("Grayscale image") |
| elif img.mode == '1': |
| description_parts.append("Black and white image") |
| |
| |
| file_size = metadata.get("file_size", 0) |
| if file_size > 0: |
| size_mb = file_size / (1024 * 1024) |
| if size_mb >= 1: |
| description_parts.append(f"File size: {size_mb:.1f} MB") |
| else: |
| size_kb = file_size / 1024 |
| description_parts.append(f"File size: {size_kb:.1f} KB") |
| |
| return ". ".join(description_parts) |
| |
| def _analyze_audio(self, file_path: str, **options) -> Dict[str, Any]: |
| """ |
| Analyze audio files (basic metadata for now) |
| """ |
| try: |
| |
| file_size = os.path.getsize(file_path) |
| file_extension = Path(file_path).suffix.lower() |
| |
| metadata = { |
| "file_extension": file_extension, |
| "file_size": file_size, |
| "file_size_mb": round(file_size / (1024 * 1024), 2) |
| } |
| |
| |
| |
| |
| |
| |
| |
| content_description = f"Audio file ({file_extension}) - {metadata['file_size_mb']} MB" |
| |
| result = FileProcessingResult( |
| file_path=file_path, |
| file_type="audio", |
| success=True, |
| content=content_description, |
| metadata=metadata |
| ) |
| |
| return { |
| "success": True, |
| "result": result.to_dict(), |
| "message": f"Audio file detected: {metadata['file_size_mb']} MB {file_extension}", |
| "note": "Full audio transcription requires additional setup" |
| } |
| |
| except Exception as e: |
| return { |
| "success": False, |
| "message": f"Audio analysis failed: {str(e)}", |
| "file_path": file_path, |
| "error_type": type(e).__name__ |
| } |
| |
| def _process_data_file(self, file_path: str, **options) -> Dict[str, Any]: |
| """ |
| Process Excel, CSV, and other data files |
| """ |
| try: |
| file_extension = Path(file_path).suffix.lower() |
| |
| |
| if file_extension == '.csv': |
| df = pd.read_csv(file_path) |
| elif file_extension in ['.xlsx', '.xls']: |
| df = pd.read_excel(file_path) |
| elif file_extension == '.json': |
| df = pd.read_json(file_path) |
| else: |
| |
| with open(file_path, 'r', encoding='utf-8') as f: |
| content = f.read() |
| return self._process_text_content(content, file_path) |
| |
| |
| metadata = { |
| "shape": df.shape, |
| "columns": df.columns.tolist(), |
| "column_count": len(df.columns), |
| "row_count": len(df), |
| "data_types": df.dtypes.to_dict(), |
| "memory_usage": df.memory_usage(deep=True).sum(), |
| "has_missing_values": df.isnull().any().any() |
| } |
| |
| |
| numeric_columns = df.select_dtypes(include=['number']).columns.tolist() |
| if numeric_columns: |
| metadata["numeric_columns"] = numeric_columns |
| metadata["numeric_stats"] = df[numeric_columns].describe().to_dict() |
| |
| |
| sample_data = df.head(5).to_dict(orient='records') |
| |
| |
| content_description = self._describe_data_content(df, metadata) |
| |
| result = FileProcessingResult( |
| file_path=file_path, |
| file_type="data", |
| success=True, |
| content={ |
| "description": content_description, |
| "sample_data": sample_data, |
| "full_data": df.to_dict(orient='records') if len(df) <= 100 else None |
| }, |
| metadata=metadata |
| ) |
| |
| return { |
| "success": True, |
| "result": result.to_dict(), |
| "message": f"Successfully processed data file: {df.shape[0]} rows, {df.shape[1]} columns" |
| } |
| |
| except Exception as e: |
| return { |
| "success": False, |
| "message": f"Data file processing failed: {str(e)}", |
| "file_path": file_path, |
| "error_type": type(e).__name__ |
| } |
| |
| def _describe_data_content(self, df: pd.DataFrame, metadata: Dict[str, Any]) -> str: |
| """Generate description of data file content""" |
| description_parts = [] |
| |
| |
| rows, cols = df.shape |
| description_parts.append(f"Data table with {rows} rows and {cols} columns") |
| |
| |
| if cols <= 10: |
| column_names = ", ".join(df.columns.tolist()) |
| description_parts.append(f"Columns: {column_names}") |
| else: |
| description_parts.append(f"Columns include: {', '.join(df.columns.tolist()[:5])}... and {cols-5} more") |
| |
| |
| numeric_cols = len(metadata.get("numeric_columns", [])) |
| if numeric_cols > 0: |
| description_parts.append(f"{numeric_cols} numeric columns") |
| |
| |
| if metadata.get("has_missing_values"): |
| description_parts.append("Contains missing values") |
| |
| return ". ".join(description_parts) |
| |
| def _analyze_code(self, file_path: str, **options) -> Dict[str, Any]: |
| """ |
| Analyze code files (focusing on Python for now) |
| """ |
| try: |
| with open(file_path, 'r', encoding='utf-8') as f: |
| code_content = f.read() |
| |
| file_extension = Path(file_path).suffix.lower() |
| |
| if file_extension == '.py': |
| return self._analyze_python_code(code_content, file_path) |
| else: |
| return self._analyze_generic_code(code_content, file_path, file_extension) |
| |
| except Exception as e: |
| return { |
| "success": False, |
| "message": f"Code analysis failed: {str(e)}", |
| "file_path": file_path, |
| "error_type": type(e).__name__ |
| } |
| |
| def _analyze_python_code(self, code_content: str, file_path: str) -> Dict[str, Any]: |
| """Analyze Python code structure and content""" |
| try: |
| |
| tree = ast.parse(code_content) |
| |
| |
| functions = [] |
| classes = [] |
| imports = [] |
| |
| for node in ast.walk(tree): |
| if isinstance(node, ast.FunctionDef): |
| functions.append({ |
| "name": node.name, |
| "line": node.lineno, |
| "args": [arg.arg for arg in node.args.args] |
| }) |
| elif isinstance(node, ast.ClassDef): |
| classes.append({ |
| "name": node.name, |
| "line": node.lineno |
| }) |
| elif isinstance(node, (ast.Import, ast.ImportFrom)): |
| if isinstance(node, ast.Import): |
| for alias in node.names: |
| imports.append(alias.name) |
| else: |
| module = node.module or "" |
| for alias in node.names: |
| imports.append(f"{module}.{alias.name}") |
| |
| |
| lines = code_content.split('\n') |
| metadata = { |
| "total_lines": len(lines), |
| "non_empty_lines": len([line for line in lines if line.strip()]), |
| "comment_lines": len([line for line in lines if line.strip().startswith('#')]), |
| "function_count": len(functions), |
| "class_count": len(classes), |
| "import_count": len(imports), |
| "functions": functions[:10], |
| "classes": classes[:10], |
| "imports": list(set(imports)) |
| } |
| |
| |
| content_description = self._describe_python_code(metadata) |
| |
| result = FileProcessingResult( |
| file_path=file_path, |
| file_type="python_code", |
| success=True, |
| content={ |
| "description": content_description, |
| "code_snippet": code_content[:1000] + "..." if len(code_content) > 1000 else code_content, |
| "full_code": code_content |
| }, |
| metadata=metadata |
| ) |
| |
| return { |
| "success": True, |
| "result": result.to_dict(), |
| "message": f"Python code analyzed: {metadata['function_count']} functions, {metadata['class_count']} classes" |
| } |
| |
| except SyntaxError as e: |
| return { |
| "success": False, |
| "message": f"Python syntax error: {str(e)}", |
| "file_path": file_path, |
| "error_type": "syntax_error" |
| } |
| |
| def _describe_python_code(self, metadata: Dict[str, Any]) -> str: |
| """Generate description of Python code""" |
| description_parts = [] |
| |
| |
| total_lines = metadata.get("total_lines", 0) |
| non_empty_lines = metadata.get("non_empty_lines", 0) |
| description_parts.append(f"Python file with {total_lines} total lines ({non_empty_lines} non-empty)") |
| |
| |
| func_count = metadata.get("function_count", 0) |
| class_count = metadata.get("class_count", 0) |
| |
| if func_count > 0: |
| description_parts.append(f"{func_count} functions defined") |
| if class_count > 0: |
| description_parts.append(f"{class_count} classes defined") |
| |
| |
| imports = metadata.get("imports", []) |
| if imports: |
| if len(imports) <= 5: |
| description_parts.append(f"Imports: {', '.join(imports)}") |
| else: |
| description_parts.append(f"Imports {len(imports)} modules including: {', '.join(imports[:3])}...") |
| |
| return ". ".join(description_parts) |
| |
| def _analyze_generic_code(self, code_content: str, file_path: str, extension: str) -> Dict[str, Any]: |
| """Analyze non-Python code files""" |
| lines = code_content.split('\n') |
| |
| metadata = { |
| "file_extension": extension, |
| "total_lines": len(lines), |
| "non_empty_lines": len([line for line in lines if line.strip()]), |
| "file_size": len(code_content), |
| } |
| |
| |
| content_description = f"{extension.upper()} code file with {metadata['total_lines']} lines" |
| |
| result = FileProcessingResult( |
| file_path=file_path, |
| file_type="code", |
| success=True, |
| content={ |
| "description": content_description, |
| "code_snippet": code_content[:500] + "..." if len(code_content) > 500 else code_content |
| }, |
| metadata=metadata |
| ) |
| |
| return { |
| "success": True, |
| "result": result.to_dict(), |
| "message": f"Code file analyzed: {metadata['total_lines']} lines of {extension.upper()} code" |
| } |
| |
| def _process_text_file(self, file_path: str, **options) -> Dict[str, Any]: |
| """Process plain text files""" |
| try: |
| with open(file_path, 'r', encoding='utf-8') as f: |
| content = f.read() |
| |
| return self._process_text_content(content, file_path) |
| |
| except UnicodeDecodeError: |
| |
| try: |
| with open(file_path, 'r', encoding='latin-1') as f: |
| content = f.read() |
| return self._process_text_content(content, file_path) |
| except Exception as e: |
| return { |
| "success": False, |
| "message": f"Text file processing failed: {str(e)}", |
| "file_path": file_path, |
| "error_type": type(e).__name__ |
| } |
| |
| def _process_text_content(self, content: str, file_path: str) -> Dict[str, Any]: |
| """Process text content and extract metadata""" |
| lines = content.split('\n') |
| words = content.split() |
| |
| metadata = { |
| "character_count": len(content), |
| "word_count": len(words), |
| "line_count": len(lines), |
| "non_empty_lines": len([line for line in lines if line.strip()]), |
| "average_line_length": sum(len(line) for line in lines) / max(len(lines), 1) |
| } |
| |
| |
| preview = content[:500] + "..." if len(content) > 500 else content |
| |
| result = FileProcessingResult( |
| file_path=file_path, |
| file_type="text", |
| success=True, |
| content={ |
| "text": content, |
| "preview": preview |
| }, |
| metadata=metadata |
| ) |
| |
| return { |
| "success": True, |
| "result": result.to_dict(), |
| "message": f"Text file processed: {metadata['word_count']} words, {metadata['line_count']} lines" |
| } |
|
|
| def test_file_processor_tool(): |
| """Test the file processor tool with various file types""" |
| tool = FileProcessorTool() |
| |
| |
| test_files = [] |
| |
| |
| csv_content = """name,age,city |
| John,25,New York |
| Jane,30,San Francisco |
| Bob,35,Chicago""" |
| |
| csv_path = "/tmp/test_data.csv" |
| with open(csv_path, 'w') as f: |
| f.write(csv_content) |
| test_files.append(csv_path) |
| |
| |
| py_content = """#!/usr/bin/env python3 |
| import os |
| import sys |
| |
| def hello_world(): |
| '''Simple greeting function''' |
| return "Hello, World!" |
| |
| class TestClass: |
| def __init__(self): |
| self.value = 42 |
| |
| def get_value(self): |
| return self.value |
| |
| if __name__ == "__main__": |
| print(hello_world()) |
| """ |
| |
| py_path = "/tmp/test_script.py" |
| with open(py_path, 'w') as f: |
| f.write(py_content) |
| test_files.append(py_path) |
| |
| print("🧪 Testing File Processor Tool...") |
| |
| for i, file_path in enumerate(test_files, 1): |
| print(f"\n--- Test {i}: {file_path} ---") |
| try: |
| result = tool.execute(file_path) |
| |
| if result.success: |
| file_result = result.result['result'] |
| print(f"✅ Success: {file_result['file_type']} file") |
| print(f" Message: {result.result.get('message', 'No message')}") |
| if 'metadata' in file_result: |
| metadata = file_result['metadata'] |
| print(f" Metadata: {list(metadata.keys())}") |
| else: |
| print(f"❌ Error: {result.result.get('message', 'Unknown error')}") |
| |
| print(f" Execution time: {result.execution_time:.3f}s") |
| |
| except Exception as e: |
| print(f"❌ Exception: {str(e)}") |
| |
| |
| for file_path in test_files: |
| try: |
| os.remove(file_path) |
| except: |
| pass |
|
|
| if __name__ == "__main__": |
| |
| test_file_processor_tool() |