Spaces:
Runtime error
Runtime error
| import os | |
| import io | |
| import sqlite3 | |
| import pandas as pd | |
| from typing import List, Dict, Any | |
| from pathlib import Path | |
| # Document processing libraries | |
| import PyPDF2 | |
| import pdfplumber | |
| from docx import Document | |
| import pytesseract | |
| from PIL import Image | |
| # ML libraries | |
| from sentence_transformers import SentenceTransformer | |
| from config import Config | |
| class DocumentProcessor: | |
| """Handle document processing for various file types""" | |
| def __init__(self, config: Config = None): | |
| self.config = config or Config() | |
| # Initialize embedding model | |
| print(f"Loading embedding model: {self.config.EMBEDDING_MODEL}") | |
| self.embedding_model = SentenceTransformer(self.config.EMBEDDING_MODEL) | |
| # Configure Tesseract if available | |
| self._setup_tesseract() | |
| def _setup_tesseract(self): | |
| """Setup Tesseract OCR configuration""" | |
| try: | |
| if os.path.exists(self.config.TESSERACT_CMD): | |
| pytesseract.pytesseract.tesseract_cmd = self.config.TESSERACT_CMD | |
| print("✅ Tesseract OCR configured successfully") | |
| except Exception as e: | |
| print(f"⚠️ Tesseract setup warning: {e}") | |
| def extract_text_from_pdf(self, file_path: str) -> str: | |
| """Extract text from PDF using multiple methods""" | |
| text = "" | |
| try: | |
| # Primary method: pdfplumber | |
| with pdfplumber.open(file_path) as pdf: | |
| for page_num, page in enumerate(pdf.pages): | |
| try: | |
| page_text = page.extract_text() | |
| if page_text and page_text.strip(): | |
| text += f"\n[Page {page_num + 1}]\n{page_text}\n" | |
| except Exception as e: | |
| print(f"Warning: Could not extract text from page {page_num + 1}: {e}") | |
| except Exception as e: | |
| print(f"pdfplumber failed, trying PyPDF2: {e}") | |
| # Fallback method: PyPDF2 | |
| try: | |
| with open(file_path, 'rb') as file: | |
| pdf_reader = PyPDF2.PdfReader(file) | |
| for page_num, page in enumerate(pdf_reader.pages): | |
| try: | |
| page_text = page.extract_text() | |
| if page_text and page_text.strip(): | |
| text += f"\n[Page {page_num + 1}]\n{page_text}\n" | |
| except Exception as e: | |
| print(f"Warning: Could not extract text from page {page_num + 1}: {e}") | |
| except Exception as e: | |
| print(f"PyPDF2 also failed: {e}") | |
| raise ValueError(f"Could not extract text from PDF: {e}") | |
| if not text.strip(): | |
| raise ValueError("No text content found in PDF") | |
| return text | |
| def extract_text_from_docx(self, file_path: str) -> str: | |
| """Extract text from Word document""" | |
| try: | |
| doc = Document(file_path) | |
| text = "" | |
| # Extract paragraph text | |
| for para_num, paragraph in enumerate(doc.paragraphs): | |
| if paragraph.text.strip(): | |
| text += f"{paragraph.text}\n" | |
| # Extract table text if any | |
| for table_num, table in enumerate(doc.tables): | |
| text += f"\n[Table {table_num + 1}]\n" | |
| for row in table.rows: | |
| row_text = " | ".join([cell.text.strip() for cell in row.cells]) | |
| if row_text.strip(): | |
| text += f"{row_text}\n" | |
| if not text.strip(): | |
| raise ValueError("No text content found in Word document") | |
| return text | |
| except Exception as e: | |
| raise ValueError(f"Could not process Word document: {e}") | |
| def extract_text_from_image(self, image_data: bytes) -> str: | |
| """Extract text from image using OCR""" | |
| try: | |
| # Open image | |
| image = Image.open(io.BytesIO(image_data)) | |
| # Convert to RGB if necessary | |
| if image.mode != 'RGB': | |
| image = image.convert('RGB') | |
| # Perform OCR | |
| text = pytesseract.image_to_string( | |
| image, | |
| lang=self.config.OCR_LANGUAGE, | |
| config='--psm 6' # Uniform block of text | |
| ) | |
| if not text.strip(): | |
| # Try different PSM mode | |
| text = pytesseract.image_to_string( | |
| image, | |
| lang=self.config.OCR_LANGUAGE, | |
| config='--psm 3' # Fully automatic page segmentation | |
| ) | |
| return text.strip() | |
| except Exception as e: | |
| raise ValueError(f"OCR failed: {e}") | |
| def extract_text_from_csv(self, file_path: str) -> str: | |
| """Extract text from CSV file""" | |
| try: | |
| # Try different encodings | |
| encodings = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1'] | |
| df = None | |
| for encoding in encodings: | |
| try: | |
| df = pd.read_csv(file_path, encoding=encoding) | |
| break | |
| except UnicodeDecodeError: | |
| continue | |
| if df is None: | |
| raise ValueError("Could not read CSV with any supported encoding") | |
| # Convert DataFrame to text | |
| text = f"CSV Data from: {Path(file_path).name}\n\n" | |
| text += f"Shape: {df.shape[0]} rows, {df.shape[1]} columns\n\n" | |
| # Add column information | |
| text += "Columns:\n" | |
| for col in df.columns: | |
| text += f"- {col}\n" | |
| text += "\n" | |
| # Add sample data (first few rows) | |
| text += "Sample Data:\n" | |
| text += df.head(10).to_string(index=False) + "\n\n" | |
| # Add summary statistics for numeric columns | |
| numeric_cols = df.select_dtypes(include=['number']).columns | |
| if len(numeric_cols) > 0: | |
| text += "Numeric Summary:\n" | |
| text += df[numeric_cols].describe().to_string() + "\n\n" | |
| return text | |
| except Exception as e: | |
| raise ValueError(f"Could not process CSV file: {e}") | |
| def extract_text_from_db(self, file_path: str) -> str: | |
| """Extract text from SQLite database""" | |
| try: | |
| conn = sqlite3.connect(file_path) | |
| text = f"SQLite Database: {Path(file_path).name}\n\n" | |
| # Get all table names | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") | |
| tables = cursor.fetchall() | |
| if not tables: | |
| raise ValueError("No tables found in database") | |
| text += f"Tables found: {len(tables)}\n\n" | |
| for table_name_tuple in tables: | |
| table_name = table_name_tuple[0] | |
| text += f"=== Table: {table_name} ===\n" | |
| try: | |
| # Get table schema | |
| cursor.execute(f"PRAGMA table_info({table_name})") | |
| columns = cursor.fetchall() | |
| text += "Columns:\n" | |
| for col in columns: | |
| text += f"- {col[1]} ({col[2]})\n" | |
| # Get row count | |
| cursor.execute(f"SELECT COUNT(*) FROM {table_name}") | |
| row_count = cursor.fetchone()[0] | |
| text += f"Row count: {row_count}\n\n" | |
| # Get sample data | |
| df = pd.read_sql_query(f"SELECT * FROM {table_name} LIMIT 10", conn) | |
| text += "Sample Data:\n" | |
| text += df.to_string(index=False) + "\n\n" | |
| except Exception as e: | |
| text += f"Error reading table {table_name}: {e}\n\n" | |
| conn.close() | |
| return text | |
| except Exception as e: | |
| raise ValueError(f"Could not process SQLite database: {e}") | |
| def chunk_text(self, text: str, metadata: Dict[str, Any] = None) -> List[Dict[str, Any]]: | |
| """Split text into chunks with overlap and metadata""" | |
| if not text.strip(): | |
| return [] | |
| # Clean text | |
| text = self._clean_text(text) | |
| chunks = [] | |
| words = text.split() | |
| if len(words) <= self.config.CHUNK_SIZE: | |
| # If text is smaller than chunk size, return as single chunk | |
| chunks.append({ | |
| 'text': text, | |
| 'metadata': metadata or {}, | |
| 'chunk_index': 0, | |
| 'word_count': len(words) | |
| }) | |
| else: | |
| # Split into overlapping chunks | |
| for i in range(0, len(words), self.config.CHUNK_SIZE - self.config.CHUNK_OVERLAP): | |
| chunk_words = words[i:i + self.config.CHUNK_SIZE] | |
| chunk_text = " ".join(chunk_words) | |
| chunk_metadata = (metadata or {}).copy() | |
| chunk_metadata.update({ | |
| 'chunk_index': len(chunks), | |
| 'word_count': len(chunk_words), | |
| 'start_word': i, | |
| 'end_word': i + len(chunk_words) | |
| }) | |
| chunks.append({ | |
| 'text': chunk_text, | |
| 'metadata': chunk_metadata | |
| }) | |
| # Break if we've covered all words | |
| if i + self.config.CHUNK_SIZE >= len(words): | |
| break | |
| return chunks | |
| def _clean_text(self, text: str) -> str: | |
| """Clean and normalize text""" | |
| # Remove excessive whitespace | |
| import re | |
| text = re.sub(r'\s+', ' ', text) | |
| # Remove special characters that might cause issues | |
| text = re.sub(r'[^\w\s\.,!?;:()\-\'"$%&@#]', ' ', text) | |
| # Remove excessive punctuation | |
| text = re.sub(r'[.]{3,}', '...', text) | |
| text = re.sub(r'[-]{3,}', '---', text) | |
| return text.strip() | |
| def process_document(self, file_path: str, file_type: str) -> List[str]: | |
| """Process document based on file type and return text chunks""" | |
| try: | |
| # Extract text based on file type | |
| if file_type.lower() == '.pdf': | |
| text = self.extract_text_from_pdf(file_path) | |
| elif file_type.lower() == '.docx': | |
| text = self.extract_text_from_docx(file_path) | |
| elif file_type.lower() == '.txt': | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| text = f.read() | |
| elif file_type.lower() in ['.jpg', '.jpeg', '.png']: | |
| with open(file_path, 'rb') as f: | |
| text = self.extract_text_from_image(f.read()) | |
| elif file_type.lower() == '.csv': | |
| text = self.extract_text_from_csv(file_path) | |
| elif file_type.lower() == '.db': | |
| text = self.extract_text_from_db(file_path) | |
| else: | |
| raise ValueError(f"Unsupported file type: {file_type}") | |
| if not text or not text.strip(): | |
| raise ValueError("No text content extracted from file") | |
| # Create metadata | |
| metadata = { | |
| 'filename': Path(file_path).name, | |
| 'file_type': file_type, | |
| 'file_size': os.path.getsize(file_path) | |
| } | |
| # Chunk the text | |
| chunks_data = self.chunk_text(text, metadata) | |
| # Return just the text chunks for backward compatibility | |
| return [chunk['text'] for chunk in chunks_data] | |
| except Exception as e: | |
| print(f"Error processing document {file_path}: {e}") | |
| raise | |
| def get_supported_formats(self) -> Dict[str, str]: | |
| """Get supported file formats""" | |
| return { | |
| '.pdf': 'PDF documents', | |
| '.docx': 'Microsoft Word documents', | |
| '.txt': 'Plain text files', | |
| '.jpg': 'JPEG images (with OCR)', | |
| '.jpeg': 'JPEG images (with OCR)', | |
| '.png': 'PNG images (with OCR)', | |
| '.csv': 'Comma-separated values', | |
| '.db': 'SQLite databases' | |
| } |