Spaces:
Runtime error
Runtime error
| import os | |
| import PyPDF2 | |
| import docx | |
| import pandas as pd | |
| import json | |
| import csv | |
| from typing import List, Dict, Any, Optional | |
| import logging | |
| from pathlib import Path | |
| from config.settings import Settings | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class FileProcessor: | |
| def __init__(self): | |
| self.supported_extensions = { | |
| '.txt': self._process_text, | |
| '.pdf': self._process_pdf, | |
| '.docx': self._process_docx, | |
| '.doc': self._process_docx, | |
| '.csv': self._process_csv, | |
| '.xlsx': self._process_excel, | |
| '.xls': self._process_excel, | |
| '.json': self._process_json, | |
| '.py': self._process_code, | |
| '.js': self._process_code, | |
| '.html': self._process_code, | |
| '.css': self._process_code, | |
| '.md': self._process_text, | |
| } | |
| def process_file(self, file_path: str) -> Dict[str, Any]: | |
| """ | |
| Process a file and extract its content | |
| """ | |
| try: | |
| file_path = Path(file_path) | |
| if not file_path.exists(): | |
| return {'error': f'File not found: {file_path}'} | |
| # Check file size | |
| file_size = file_path.stat().st_size / (1024 * 1024) # MB | |
| if file_size > Settings.MAX_FILE_SIZE_MB: | |
| return {'error': f'File too large: {file_size:.1f}MB (max: {Settings.MAX_FILE_SIZE_MB}MB)'} | |
| extension = file_path.suffix.lower() | |
| if extension not in self.supported_extensions: | |
| return {'error': f'Unsupported file type: {extension}'} | |
| # Process the file | |
| processor = self.supported_extensions[extension] | |
| content = processor(file_path) | |
| return { | |
| 'filename': file_path.name, | |
| 'extension': extension, | |
| 'size_mb': file_size, | |
| 'content': content, | |
| 'metadata': self._extract_metadata(file_path) | |
| } | |
| except Exception as e: | |
| logger.error(f"Error processing file {file_path}: {e}") | |
| return {'error': str(e)} | |
| def _process_text(self, file_path: Path) -> str: | |
| """ | |
| Process plain text files | |
| """ | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| return f.read() | |
| except UnicodeDecodeError: | |
| # Try with different encoding | |
| with open(file_path, 'r', encoding='latin-1') as f: | |
| return f.read() | |
| def _process_pdf(self, file_path: Path) -> str: | |
| """ | |
| Process PDF files | |
| """ | |
| try: | |
| text_content = [] | |
| with open(file_path, 'rb') as f: | |
| pdf_reader = PyPDF2.PdfReader(f) | |
| for page_num, page in enumerate(pdf_reader.pages): | |
| try: | |
| text = page.extract_text() | |
| if text.strip(): | |
| text_content.append(f"--- Page {page_num + 1} ---\n{text}") | |
| except Exception as e: | |
| logger.warning(f"Error extracting page {page_num + 1}: {e}") | |
| continue | |
| return "\n\n".join(text_content) | |
| except Exception as e: | |
| logger.error(f"Error processing PDF: {e}") | |
| return f"Error processing PDF: {str(e)}" | |
| def _process_docx(self, file_path: Path) -> str: | |
| """ | |
| Process Word documents | |
| """ | |
| try: | |
| doc = docx.Document(file_path) | |
| paragraphs = [] | |
| for paragraph in doc.paragraphs: | |
| if paragraph.text.strip(): | |
| paragraphs.append(paragraph.text) | |
| # Also extract tables | |
| for table in doc.tables: | |
| table_data = [] | |
| for row in table.rows: | |
| row_data = [cell.text.strip() for cell in row.cells] | |
| table_data.append(" | ".join(row_data)) | |
| if table_data: | |
| paragraphs.append("\n--- Table ---\n" + "\n".join(table_data)) | |
| return "\n\n".join(paragraphs) | |
| except Exception as e: | |
| logger.error(f"Error processing DOCX: {e}") | |
| return f"Error processing DOCX: {str(e)}" | |
| def _process_csv(self, file_path: Path) -> str: | |
| """ | |
| Process CSV files | |
| """ | |
| try: | |
| df = pd.read_csv(file_path) | |
| # Basic info about the CSV | |
| info_parts = [ | |
| f"CSV File Analysis:", | |
| f"Rows: {len(df)}", | |
| f"Columns: {len(df.columns)}", | |
| f"Column Names: {', '.join(df.columns.tolist())}", | |
| "", | |
| "First 5 rows:", | |
| df.head().to_string(), | |
| "", | |
| "Data Types:", | |
| df.dtypes.to_string(), | |
| "", | |
| "Basic Statistics:", | |
| df.describe().to_string() if len(df.select_dtypes(include=['number']).columns) > 0 else "No numeric columns" | |
| ] | |
| return "\n".join(info_parts) | |
| except Exception as e: | |
| logger.error(f"Error processing CSV: {e}") | |
| return f"Error processing CSV: {str(e)}" | |
| def _process_excel(self, file_path: Path) -> str: | |
| """ | |
| Process Excel files | |
| """ | |
| try: | |
| # Read all sheets | |
| excel_file = pd.ExcelFile(file_path) | |
| content_parts = [f"Excel File: {file_path.name}"] | |
| content_parts.append(f"Sheets: {', '.join(excel_file.sheet_names)}") | |
| for sheet_name in excel_file.sheet_names: | |
| df = pd.read_excel(file_path, sheet_name=sheet_name) | |
| content_parts.append(f"\n--- Sheet: {sheet_name} ---") | |
| content_parts.append(f"Rows: {len(df)}, Columns: {len(df.columns)}") | |
| content_parts.append(f"Columns: {', '.join(df.columns.tolist())}") | |
| content_parts.append("\nFirst 3 rows:") | |
| content_parts.append(df.head(3).to_string()) | |
| return "\n".join(content_parts) | |
| except Exception as e: | |
| logger.error(f"Error processing Excel: {e}") | |
| return f"Error processing Excel: {str(e)}" | |
| def _process_json(self, file_path: Path) -> str: | |
| """ | |
| Process JSON files | |
| """ | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| # Format JSON for better readability | |
| if isinstance(data, dict): | |
| content_parts = [ | |
| f"JSON Object with {len(data)} keys:", | |
| f"Keys: {', '.join(data.keys())}", | |
| "", | |
| "Content (formatted):", | |
| json.dumps(data, indent=2, ensure_ascii=False)[:2000] + "..." if len(str(data)) > 2000 else json.dumps(data, indent=2, ensure_ascii=False) | |
| ] | |
| elif isinstance(data, list): | |
| content_parts = [ | |
| f"JSON Array with {len(data)} items", | |
| f"First item type: {type(data[0]).__name__}" if data else "Empty array", | |
| "", | |
| "Content (first 3 items):", | |
| json.dumps(data[:3], indent=2, ensure_ascii=False) | |
| ] | |
| else: | |
| content_parts = [ | |
| f"JSON {type(data).__name__}:", | |
| str(data) | |
| ] | |
| return "\n".join(content_parts) | |
| except Exception as e: | |
| logger.error(f"Error processing JSON: {e}") | |
| return f"Error processing JSON: {str(e)}" | |
| def _process_code(self, file_path: Path) -> str: | |
| """ | |
| Process code files | |
| """ | |
| try: | |
| content = self._process_text(file_path) | |
| # Add some analysis | |
| lines = content.split('\n') | |
| non_empty_lines = [line for line in lines if line.strip()] | |
| analysis_parts = [ | |
| f"Code File Analysis:", | |
| f"Language: {file_path.suffix[1:].upper()}", | |
| f"Total lines: {len(lines)}", | |
| f"Non-empty lines: {len(non_empty_lines)}", | |
| f"Estimated complexity: {'High' if len(non_empty_lines) > 100 else 'Medium' if len(non_empty_lines) > 50 else 'Low'}", | |
| "", | |
| "Content:", | |
| content | |
| ] | |
| return "\n".join(analysis_parts) | |
| except Exception as e: | |
| logger.error(f"Error processing code file: {e}") | |
| return f"Error processing code file: {str(e)}" | |
| def _extract_metadata(self, file_path: Path) -> Dict[str, Any]: | |
| """ | |
| Extract file metadata | |
| """ | |
| try: | |
| stat = file_path.stat() | |
| return { | |
| 'size_bytes': stat.st_size, | |
| 'created': stat.st_ctime, | |
| 'modified': stat.st_mtime, | |
| 'extension': file_path.suffix, | |
| 'name': file_path.stem | |
| } | |
| except Exception as e: | |
| logger.error(f"Error extracting metadata: {e}") | |
| return {} | |
| def process_multiple_files(self, file_paths: List[str]) -> List[Dict[str, Any]]: | |
| """ | |
| Process multiple files | |
| """ | |
| results = [] | |
| for file_path in file_paths: | |
| result = self.process_file(file_path) | |
| results.append(result) | |
| return results | |
| def extract_key_information(self, content: str, file_type: str) -> Dict[str, Any]: | |
| """ | |
| Extract key information from processed content | |
| """ | |
| try: | |
| key_info = { | |
| 'word_count': len(content.split()), | |
| 'char_count': len(content), | |
| 'line_count': len(content.split('\n')), | |
| 'file_type': file_type | |
| } | |
| # Type-specific extraction | |
| if file_type in ['.csv', '.xlsx', '.xls']: | |
| # Extract numerical data mentions | |
| import re | |
| numbers = re.findall(r'\d+', content) | |
| key_info['numeric_values_found'] = len(numbers) | |
| elif file_type in ['.py', '.js', '.html', '.css']: | |
| # Extract function/class names for code files | |
| import re | |
| if file_type == '.py': | |
| functions = re.findall(r'def\s+(\w+)', content) | |
| classes = re.findall(r'class\s+(\w+)', content) | |
| key_info['functions'] = functions[:10] # First 10 | |
| key_info['classes'] = classes[:10] | |
| return key_info | |
| except Exception as e: | |
| logger.error(f"Error extracting key information: {e}") | |
| return {'error': str(e)} | |
| def save_processed_content(self, content: str, output_path: str) -> bool: | |
| """ | |
| Save processed content to a file | |
| """ | |
| try: | |
| with open(output_path, 'w', encoding='utf-8') as f: | |
| f.write(content) | |
| logger.info(f"Saved processed content to: {output_path}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error saving content: {e}") | |
| return False | |
| def get_supported_formats(self) -> List[str]: | |
| """ | |
| Get list of supported file formats | |
| """ | |
| return list(self.supported_extensions.keys()) | |
| def format_file_summary_for_llm(self, file_result: Dict[str, Any]) -> str: | |
| """ | |
| Format file processing results for LLM consumption | |
| """ | |
| if 'error' in file_result: | |
| return f"Error processing file: {file_result['error']}" | |
| summary_parts = [ | |
| f"File: {file_result['filename']}", | |
| f"Type: {file_result['extension']}", | |
| f"Size: {file_result['size_mb']:.2f} MB", | |
| "", | |
| "Content Summary:", | |
| file_result['content'][:1000] + "..." if len(file_result['content']) > 1000 else file_result['content'] | |
| ] | |
| return "\n".join(summary_parts) |