| | |
| | """ |
| | File Processing Tool for GAIA Agent System |
| | Handles multiple file formats: images, audio, Excel/CSV, Python code |
| | """ |
| |
|
| | import os |
| | import re |
| | import io |
| | import logging |
| | import mimetypes |
| | from typing import Dict, List, Optional, Any, Union |
| | from pathlib import Path |
| | import pandas as pd |
| | from PIL import Image |
| | import ast |
| |
|
| | from tools import BaseTool |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | class FileProcessingResult: |
| | """Container for file processing results""" |
| | |
| | def __init__(self, file_path: str, file_type: str, success: bool, |
| | content: Any = None, metadata: Dict[str, Any] = None): |
| | self.file_path = file_path |
| | self.file_type = file_type |
| | self.success = success |
| | self.content = content |
| | self.metadata = metadata or {} |
| | |
| | def to_dict(self) -> Dict[str, Any]: |
| | return { |
| | "file_path": self.file_path, |
| | "file_type": self.file_type, |
| | "success": self.success, |
| | "content": self.content, |
| | "metadata": self.metadata |
| | } |
| |
|
| | class FileProcessorTool(BaseTool): |
| | """ |
| | File processor tool for multiple file formats |
| | Supports images, audio, Excel/CSV, and Python code analysis |
| | """ |
| | |
| | def __init__(self): |
| | super().__init__("file_processor") |
| | |
| | |
| | self.image_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'} |
| | self.audio_extensions = {'.mp3', '.wav', '.ogg', '.flac', '.m4a', '.aac'} |
| | self.data_extensions = {'.csv', '.xlsx', '.xls', '.json', '.txt'} |
| | self.code_extensions = {'.py', '.js', '.java', '.cpp', '.c', '.html', '.css'} |
| | |
| | def _execute_impl(self, input_data: Any, **kwargs) -> Dict[str, Any]: |
| | """ |
| | Execute file processing operations based on input type |
| | |
| | Args: |
| | input_data: Can be: |
| | - str: File path to process |
| | - dict: {"file_path": str, "operation": str, "options": dict} |
| | """ |
| | |
| | if isinstance(input_data, str): |
| | return self._process_file(input_data) |
| | |
| | elif isinstance(input_data, dict): |
| | file_path = input_data.get("file_path", "") |
| | operation = input_data.get("operation", "auto") |
| | options = input_data.get("options", {}) |
| | |
| | if operation == "auto": |
| | return self._process_file(file_path, **options) |
| | elif operation == "analyze_image": |
| | return self._analyze_image(file_path, **options) |
| | elif operation == "process_data": |
| | return self._process_data_file(file_path, **options) |
| | elif operation == "analyze_code": |
| | return self._analyze_code(file_path, **options) |
| | else: |
| | raise ValueError(f"Unknown operation: {operation}") |
| | else: |
| | raise ValueError(f"Unsupported input type: {type(input_data)}") |
| | |
| | def _process_file(self, file_path: str, **options) -> Dict[str, Any]: |
| | """ |
| | Auto-detect file type and process accordingly |
| | """ |
| | try: |
| | if not os.path.exists(file_path): |
| | return { |
| | "success": False, |
| | "message": f"File not found: {file_path}", |
| | "error_type": "file_not_found" |
| | } |
| | |
| | |
| | file_extension = Path(file_path).suffix.lower() |
| | file_type = self._detect_file_type(file_path, file_extension) |
| | |
| | logger.info(f"Processing {file_type} file: {file_path}") |
| | |
| | |
| | if file_type == "image": |
| | return self._analyze_image(file_path, **options) |
| | elif file_type == "audio": |
| | return self._analyze_audio(file_path, **options) |
| | elif file_type == "data": |
| | return self._process_data_file(file_path, **options) |
| | elif file_type == "code": |
| | return self._analyze_code(file_path, **options) |
| | elif file_type == "text": |
| | return self._process_text_file(file_path, **options) |
| | else: |
| | return { |
| | "success": False, |
| | "message": f"Unsupported file type: {file_type}", |
| | "file_path": file_path, |
| | "detected_type": file_type |
| | } |
| | |
| | except Exception as e: |
| | return { |
| | "success": False, |
| | "message": f"File processing failed: {str(e)}", |
| | "file_path": file_path, |
| | "error_type": type(e).__name__ |
| | } |
| | |
| | def _detect_file_type(self, file_path: str, extension: str) -> str: |
| | """Detect file type based on extension and MIME type""" |
| | |
| | if extension in self.image_extensions: |
| | return "image" |
| | elif extension in self.audio_extensions: |
| | return "audio" |
| | elif extension in self.data_extensions: |
| | return "data" |
| | elif extension in self.code_extensions: |
| | return "code" |
| | elif extension in {'.txt', '.md', '.rst'}: |
| | return "text" |
| | else: |
| | |
| | mime_type, _ = mimetypes.guess_type(file_path) |
| | if mime_type: |
| | if mime_type.startswith('image/'): |
| | return "image" |
| | elif mime_type.startswith('audio/'): |
| | return "audio" |
| | elif mime_type.startswith('text/'): |
| | return "text" |
| | |
| | return "unknown" |
| | |
| | def _analyze_image(self, file_path: str, **options) -> Dict[str, Any]: |
| | """ |
| | Analyze image files and extract metadata |
| | """ |
| | try: |
| | with Image.open(file_path) as img: |
| | |
| | metadata = { |
| | "format": img.format, |
| | "mode": img.mode, |
| | "size": img.size, |
| | "width": img.width, |
| | "height": img.height, |
| | "file_size": os.path.getsize(file_path) |
| | } |
| | |
| | |
| | if hasattr(img, '_getexif') and img._getexif(): |
| | exif = img._getexif() |
| | if exif: |
| | metadata["exif_data"] = dict(list(exif.items())[:10]) |
| | |
| | |
| | if img.mode in ['RGB', 'RGBA']: |
| | colors = img.getcolors(maxcolors=10) |
| | if colors: |
| | dominant_colors = sorted(colors, reverse=True)[:5] |
| | metadata["dominant_colors"] = [ |
| | {"count": count, "rgb": color} |
| | for count, color in dominant_colors |
| | ] |
| | |
| | |
| | content_description = self._describe_image_content(img, metadata) |
| | |
| | result = FileProcessingResult( |
| | file_path=file_path, |
| | file_type="image", |
| | success=True, |
| | content=content_description, |
| | metadata=metadata |
| | ) |
| | |
| | return { |
| | "success": True, |
| | "result": result.to_dict(), |
| | "message": f"Successfully analyzed image: {img.width}x{img.height} {img.format}" |
| | } |
| | |
| | except Exception as e: |
| | return { |
| | "success": False, |
| | "message": f"Image analysis failed: {str(e)}", |
| | "file_path": file_path, |
| | "error_type": type(e).__name__ |
| | } |
| | |
| | def _describe_image_content(self, img: Image.Image, metadata: Dict[str, Any]) -> str: |
| | """Generate basic description of image content""" |
| | description_parts = [] |
| | |
| | |
| | width, height = img.size |
| | if width > height: |
| | orientation = "landscape" |
| | elif height > width: |
| | orientation = "portrait" |
| | else: |
| | orientation = "square" |
| | |
| | description_parts.append(f"{orientation} {img.format} image") |
| | description_parts.append(f"Dimensions: {width} x {height} pixels") |
| | |
| | |
| | if img.mode == 'RGB': |
| | description_parts.append("Full color RGB image") |
| | elif img.mode == 'RGBA': |
| | description_parts.append("RGB image with transparency") |
| | elif img.mode == 'L': |
| | description_parts.append("Grayscale image") |
| | elif img.mode == '1': |
| | description_parts.append("Black and white image") |
| | |
| | |
| | file_size = metadata.get("file_size", 0) |
| | if file_size > 0: |
| | size_mb = file_size / (1024 * 1024) |
| | if size_mb >= 1: |
| | description_parts.append(f"File size: {size_mb:.1f} MB") |
| | else: |
| | size_kb = file_size / 1024 |
| | description_parts.append(f"File size: {size_kb:.1f} KB") |
| | |
| | return ". ".join(description_parts) |
| | |
| | def _analyze_audio(self, file_path: str, **options) -> Dict[str, Any]: |
| | """ |
| | Analyze audio files (basic metadata for now) |
| | """ |
| | try: |
| | |
| | file_size = os.path.getsize(file_path) |
| | file_extension = Path(file_path).suffix.lower() |
| | |
| | metadata = { |
| | "file_extension": file_extension, |
| | "file_size": file_size, |
| | "file_size_mb": round(file_size / (1024 * 1024), 2) |
| | } |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | content_description = f"Audio file ({file_extension}) - {metadata['file_size_mb']} MB" |
| | |
| | result = FileProcessingResult( |
| | file_path=file_path, |
| | file_type="audio", |
| | success=True, |
| | content=content_description, |
| | metadata=metadata |
| | ) |
| | |
| | return { |
| | "success": True, |
| | "result": result.to_dict(), |
| | "message": f"Audio file detected: {metadata['file_size_mb']} MB {file_extension}", |
| | "note": "Full audio transcription requires additional setup" |
| | } |
| | |
| | except Exception as e: |
| | return { |
| | "success": False, |
| | "message": f"Audio analysis failed: {str(e)}", |
| | "file_path": file_path, |
| | "error_type": type(e).__name__ |
| | } |
| | |
| | def _process_data_file(self, file_path: str, **options) -> Dict[str, Any]: |
| | """ |
| | Process Excel, CSV, and other data files |
| | """ |
| | try: |
| | file_extension = Path(file_path).suffix.lower() |
| | |
| | |
| | if file_extension == '.csv': |
| | df = pd.read_csv(file_path) |
| | elif file_extension in ['.xlsx', '.xls']: |
| | df = pd.read_excel(file_path) |
| | elif file_extension == '.json': |
| | df = pd.read_json(file_path) |
| | else: |
| | |
| | with open(file_path, 'r', encoding='utf-8') as f: |
| | content = f.read() |
| | return self._process_text_content(content, file_path) |
| | |
| | |
| | metadata = { |
| | "shape": df.shape, |
| | "columns": df.columns.tolist(), |
| | "column_count": len(df.columns), |
| | "row_count": len(df), |
| | "data_types": df.dtypes.to_dict(), |
| | "memory_usage": df.memory_usage(deep=True).sum(), |
| | "has_missing_values": df.isnull().any().any() |
| | } |
| | |
| | |
| | numeric_columns = df.select_dtypes(include=['number']).columns.tolist() |
| | if numeric_columns: |
| | metadata["numeric_columns"] = numeric_columns |
| | metadata["numeric_stats"] = df[numeric_columns].describe().to_dict() |
| | |
| | |
| | sample_data = df.head(5).to_dict(orient='records') |
| | |
| | |
| | content_description = self._describe_data_content(df, metadata) |
| | |
| | result = FileProcessingResult( |
| | file_path=file_path, |
| | file_type="data", |
| | success=True, |
| | content={ |
| | "description": content_description, |
| | "sample_data": sample_data, |
| | "full_data": df.to_dict(orient='records') if len(df) <= 100 else None |
| | }, |
| | metadata=metadata |
| | ) |
| | |
| | return { |
| | "success": True, |
| | "result": result.to_dict(), |
| | "message": f"Successfully processed data file: {df.shape[0]} rows, {df.shape[1]} columns" |
| | } |
| | |
| | except Exception as e: |
| | return { |
| | "success": False, |
| | "message": f"Data file processing failed: {str(e)}", |
| | "file_path": file_path, |
| | "error_type": type(e).__name__ |
| | } |
| | |
| | def _describe_data_content(self, df: pd.DataFrame, metadata: Dict[str, Any]) -> str: |
| | """Generate description of data file content""" |
| | description_parts = [] |
| | |
| | |
| | rows, cols = df.shape |
| | description_parts.append(f"Data table with {rows} rows and {cols} columns") |
| | |
| | |
| | if cols <= 10: |
| | column_names = ", ".join(df.columns.tolist()) |
| | description_parts.append(f"Columns: {column_names}") |
| | else: |
| | description_parts.append(f"Columns include: {', '.join(df.columns.tolist()[:5])}... and {cols-5} more") |
| | |
| | |
| | numeric_cols = len(metadata.get("numeric_columns", [])) |
| | if numeric_cols > 0: |
| | description_parts.append(f"{numeric_cols} numeric columns") |
| | |
| | |
| | if metadata.get("has_missing_values"): |
| | description_parts.append("Contains missing values") |
| | |
| | return ". ".join(description_parts) |
| | |
| | def _analyze_code(self, file_path: str, **options) -> Dict[str, Any]: |
| | """ |
| | Analyze code files (focusing on Python for now) |
| | """ |
| | try: |
| | with open(file_path, 'r', encoding='utf-8') as f: |
| | code_content = f.read() |
| | |
| | file_extension = Path(file_path).suffix.lower() |
| | |
| | if file_extension == '.py': |
| | return self._analyze_python_code(code_content, file_path) |
| | else: |
| | return self._analyze_generic_code(code_content, file_path, file_extension) |
| | |
| | except Exception as e: |
| | return { |
| | "success": False, |
| | "message": f"Code analysis failed: {str(e)}", |
| | "file_path": file_path, |
| | "error_type": type(e).__name__ |
| | } |
| | |
| | def _analyze_python_code(self, code_content: str, file_path: str) -> Dict[str, Any]: |
| | """Analyze Python code structure and content""" |
| | try: |
| | |
| | tree = ast.parse(code_content) |
| | |
| | |
| | functions = [] |
| | classes = [] |
| | imports = [] |
| | |
| | for node in ast.walk(tree): |
| | if isinstance(node, ast.FunctionDef): |
| | functions.append({ |
| | "name": node.name, |
| | "line": node.lineno, |
| | "args": [arg.arg for arg in node.args.args] |
| | }) |
| | elif isinstance(node, ast.ClassDef): |
| | classes.append({ |
| | "name": node.name, |
| | "line": node.lineno |
| | }) |
| | elif isinstance(node, (ast.Import, ast.ImportFrom)): |
| | if isinstance(node, ast.Import): |
| | for alias in node.names: |
| | imports.append(alias.name) |
| | else: |
| | module = node.module or "" |
| | for alias in node.names: |
| | imports.append(f"{module}.{alias.name}") |
| | |
| | |
| | lines = code_content.split('\n') |
| | metadata = { |
| | "total_lines": len(lines), |
| | "non_empty_lines": len([line for line in lines if line.strip()]), |
| | "comment_lines": len([line for line in lines if line.strip().startswith('#')]), |
| | "function_count": len(functions), |
| | "class_count": len(classes), |
| | "import_count": len(imports), |
| | "functions": functions[:10], |
| | "classes": classes[:10], |
| | "imports": list(set(imports)) |
| | } |
| | |
| | |
| | content_description = self._describe_python_code(metadata) |
| | |
| | result = FileProcessingResult( |
| | file_path=file_path, |
| | file_type="python_code", |
| | success=True, |
| | content={ |
| | "description": content_description, |
| | "code_snippet": code_content[:1000] + "..." if len(code_content) > 1000 else code_content, |
| | "full_code": code_content |
| | }, |
| | metadata=metadata |
| | ) |
| | |
| | return { |
| | "success": True, |
| | "result": result.to_dict(), |
| | "message": f"Python code analyzed: {metadata['function_count']} functions, {metadata['class_count']} classes" |
| | } |
| | |
| | except SyntaxError as e: |
| | return { |
| | "success": False, |
| | "message": f"Python syntax error: {str(e)}", |
| | "file_path": file_path, |
| | "error_type": "syntax_error" |
| | } |
| | |
| | def _describe_python_code(self, metadata: Dict[str, Any]) -> str: |
| | """Generate description of Python code""" |
| | description_parts = [] |
| | |
| | |
| | total_lines = metadata.get("total_lines", 0) |
| | non_empty_lines = metadata.get("non_empty_lines", 0) |
| | description_parts.append(f"Python file with {total_lines} total lines ({non_empty_lines} non-empty)") |
| | |
| | |
| | func_count = metadata.get("function_count", 0) |
| | class_count = metadata.get("class_count", 0) |
| | |
| | if func_count > 0: |
| | description_parts.append(f"{func_count} functions defined") |
| | if class_count > 0: |
| | description_parts.append(f"{class_count} classes defined") |
| | |
| | |
| | imports = metadata.get("imports", []) |
| | if imports: |
| | if len(imports) <= 5: |
| | description_parts.append(f"Imports: {', '.join(imports)}") |
| | else: |
| | description_parts.append(f"Imports {len(imports)} modules including: {', '.join(imports[:3])}...") |
| | |
| | return ". ".join(description_parts) |
| | |
| | def _analyze_generic_code(self, code_content: str, file_path: str, extension: str) -> Dict[str, Any]: |
| | """Analyze non-Python code files""" |
| | lines = code_content.split('\n') |
| | |
| | metadata = { |
| | "file_extension": extension, |
| | "total_lines": len(lines), |
| | "non_empty_lines": len([line for line in lines if line.strip()]), |
| | "file_size": len(code_content), |
| | } |
| | |
| | |
| | content_description = f"{extension.upper()} code file with {metadata['total_lines']} lines" |
| | |
| | result = FileProcessingResult( |
| | file_path=file_path, |
| | file_type="code", |
| | success=True, |
| | content={ |
| | "description": content_description, |
| | "code_snippet": code_content[:500] + "..." if len(code_content) > 500 else code_content |
| | }, |
| | metadata=metadata |
| | ) |
| | |
| | return { |
| | "success": True, |
| | "result": result.to_dict(), |
| | "message": f"Code file analyzed: {metadata['total_lines']} lines of {extension.upper()} code" |
| | } |
| | |
| | def _process_text_file(self, file_path: str, **options) -> Dict[str, Any]: |
| | """Process plain text files""" |
| | try: |
| | with open(file_path, 'r', encoding='utf-8') as f: |
| | content = f.read() |
| | |
| | return self._process_text_content(content, file_path) |
| | |
| | except UnicodeDecodeError: |
| | |
| | try: |
| | with open(file_path, 'r', encoding='latin-1') as f: |
| | content = f.read() |
| | return self._process_text_content(content, file_path) |
| | except Exception as e: |
| | return { |
| | "success": False, |
| | "message": f"Text file processing failed: {str(e)}", |
| | "file_path": file_path, |
| | "error_type": type(e).__name__ |
| | } |
| | |
| | def _process_text_content(self, content: str, file_path: str) -> Dict[str, Any]: |
| | """Process text content and extract metadata""" |
| | lines = content.split('\n') |
| | words = content.split() |
| | |
| | metadata = { |
| | "character_count": len(content), |
| | "word_count": len(words), |
| | "line_count": len(lines), |
| | "non_empty_lines": len([line for line in lines if line.strip()]), |
| | "average_line_length": sum(len(line) for line in lines) / max(len(lines), 1) |
| | } |
| | |
| | |
| | preview = content[:500] + "..." if len(content) > 500 else content |
| | |
| | result = FileProcessingResult( |
| | file_path=file_path, |
| | file_type="text", |
| | success=True, |
| | content={ |
| | "text": content, |
| | "preview": preview |
| | }, |
| | metadata=metadata |
| | ) |
| | |
| | return { |
| | "success": True, |
| | "result": result.to_dict(), |
| | "message": f"Text file processed: {metadata['word_count']} words, {metadata['line_count']} lines" |
| | } |
| |
|
| | def test_file_processor_tool(): |
| | """Test the file processor tool with various file types""" |
| | tool = FileProcessorTool() |
| | |
| | |
| | test_files = [] |
| | |
| | |
| | csv_content = """name,age,city |
| | John,25,New York |
| | Jane,30,San Francisco |
| | Bob,35,Chicago""" |
| | |
| | csv_path = "/tmp/test_data.csv" |
| | with open(csv_path, 'w') as f: |
| | f.write(csv_content) |
| | test_files.append(csv_path) |
| | |
| | |
| | py_content = """#!/usr/bin/env python3 |
| | import os |
| | import sys |
| | |
| | def hello_world(): |
| | '''Simple greeting function''' |
| | return "Hello, World!" |
| | |
| | class TestClass: |
| | def __init__(self): |
| | self.value = 42 |
| | |
| | def get_value(self): |
| | return self.value |
| | |
| | if __name__ == "__main__": |
| | print(hello_world()) |
| | """ |
| | |
| | py_path = "/tmp/test_script.py" |
| | with open(py_path, 'w') as f: |
| | f.write(py_content) |
| | test_files.append(py_path) |
| | |
| | print("🧪 Testing File Processor Tool...") |
| | |
| | for i, file_path in enumerate(test_files, 1): |
| | print(f"\n--- Test {i}: {file_path} ---") |
| | try: |
| | result = tool.execute(file_path) |
| | |
| | if result.success: |
| | file_result = result.result['result'] |
| | print(f"✅ Success: {file_result['file_type']} file") |
| | print(f" Message: {result.result.get('message', 'No message')}") |
| | if 'metadata' in file_result: |
| | metadata = file_result['metadata'] |
| | print(f" Metadata: {list(metadata.keys())}") |
| | else: |
| | print(f"❌ Error: {result.result.get('message', 'Unknown error')}") |
| | |
| | print(f" Execution time: {result.execution_time:.3f}s") |
| | |
| | except Exception as e: |
| | print(f"❌ Exception: {str(e)}") |
| | |
| | |
| | for file_path in test_files: |
| | try: |
| | os.remove(file_path) |
| | except: |
| | pass |
| |
|
| | if __name__ == "__main__": |
| | |
| | test_file_processor_tool() |