import os import io import sqlite3 import pandas as pd from typing import List, Dict, Any from pathlib import Path # Document processing libraries import PyPDF2 import pdfplumber from docx import Document import pytesseract from PIL import Image # ML libraries from sentence_transformers import SentenceTransformer from config import Config class DocumentProcessor: """Handle document processing for various file types""" def __init__(self, config: Config = None): self.config = config or Config() # Initialize embedding model print(f"Loading embedding model: {self.config.EMBEDDING_MODEL}") self.embedding_model = SentenceTransformer(self.config.EMBEDDING_MODEL) # Configure Tesseract if available self._setup_tesseract() def _setup_tesseract(self): """Setup Tesseract OCR configuration""" try: if os.path.exists(self.config.TESSERACT_CMD): pytesseract.pytesseract.tesseract_cmd = self.config.TESSERACT_CMD print("✅ Tesseract OCR configured successfully") except Exception as e: print(f"⚠️ Tesseract setup warning: {e}") def extract_text_from_pdf(self, file_path: str) -> str: """Extract text from PDF using multiple methods""" text = "" try: # Primary method: pdfplumber with pdfplumber.open(file_path) as pdf: for page_num, page in enumerate(pdf.pages): try: page_text = page.extract_text() if page_text and page_text.strip(): text += f"\n[Page {page_num + 1}]\n{page_text}\n" except Exception as e: print(f"Warning: Could not extract text from page {page_num + 1}: {e}") except Exception as e: print(f"pdfplumber failed, trying PyPDF2: {e}") # Fallback method: PyPDF2 try: with open(file_path, 'rb') as file: pdf_reader = PyPDF2.PdfReader(file) for page_num, page in enumerate(pdf_reader.pages): try: page_text = page.extract_text() if page_text and page_text.strip(): text += f"\n[Page {page_num + 1}]\n{page_text}\n" except Exception as e: print(f"Warning: Could not extract text from page {page_num + 1}: {e}") except Exception as e: print(f"PyPDF2 also failed: {e}") raise ValueError(f"Could not extract text from PDF: {e}") if not text.strip(): raise ValueError("No text content found in PDF") return text def extract_text_from_docx(self, file_path: str) -> str: """Extract text from Word document""" try: doc = Document(file_path) text = "" # Extract paragraph text for para_num, paragraph in enumerate(doc.paragraphs): if paragraph.text.strip(): text += f"{paragraph.text}\n" # Extract table text if any for table_num, table in enumerate(doc.tables): text += f"\n[Table {table_num + 1}]\n" for row in table.rows: row_text = " | ".join([cell.text.strip() for cell in row.cells]) if row_text.strip(): text += f"{row_text}\n" if not text.strip(): raise ValueError("No text content found in Word document") return text except Exception as e: raise ValueError(f"Could not process Word document: {e}") def extract_text_from_image(self, image_data: bytes) -> str: """Extract text from image using OCR""" try: # Open image image = Image.open(io.BytesIO(image_data)) # Convert to RGB if necessary if image.mode != 'RGB': image = image.convert('RGB') # Perform OCR text = pytesseract.image_to_string( image, lang=self.config.OCR_LANGUAGE, config='--psm 6' # Uniform block of text ) if not text.strip(): # Try different PSM mode text = pytesseract.image_to_string( image, lang=self.config.OCR_LANGUAGE, config='--psm 3' # Fully automatic page segmentation ) return text.strip() except Exception as e: raise ValueError(f"OCR failed: {e}") def extract_text_from_csv(self, file_path: str) -> str: """Extract text from CSV file""" try: # Try different encodings encodings = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1'] df = None for encoding in encodings: try: df = pd.read_csv(file_path, encoding=encoding) break except UnicodeDecodeError: continue if df is None: raise ValueError("Could not read CSV with any supported encoding") # Convert DataFrame to text text = f"CSV Data from: {Path(file_path).name}\n\n" text += f"Shape: {df.shape[0]} rows, {df.shape[1]} columns\n\n" # Add column information text += "Columns:\n" for col in df.columns: text += f"- {col}\n" text += "\n" # Add sample data (first few rows) text += "Sample Data:\n" text += df.head(10).to_string(index=False) + "\n\n" # Add summary statistics for numeric columns numeric_cols = df.select_dtypes(include=['number']).columns if len(numeric_cols) > 0: text += "Numeric Summary:\n" text += df[numeric_cols].describe().to_string() + "\n\n" return text except Exception as e: raise ValueError(f"Could not process CSV file: {e}") def extract_text_from_db(self, file_path: str) -> str: """Extract text from SQLite database""" try: conn = sqlite3.connect(file_path) text = f"SQLite Database: {Path(file_path).name}\n\n" # Get all table names cursor = conn.cursor() cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") tables = cursor.fetchall() if not tables: raise ValueError("No tables found in database") text += f"Tables found: {len(tables)}\n\n" for table_name_tuple in tables: table_name = table_name_tuple[0] text += f"=== Table: {table_name} ===\n" try: # Get table schema cursor.execute(f"PRAGMA table_info({table_name})") columns = cursor.fetchall() text += "Columns:\n" for col in columns: text += f"- {col[1]} ({col[2]})\n" # Get row count cursor.execute(f"SELECT COUNT(*) FROM {table_name}") row_count = cursor.fetchone()[0] text += f"Row count: {row_count}\n\n" # Get sample data df = pd.read_sql_query(f"SELECT * FROM {table_name} LIMIT 10", conn) text += "Sample Data:\n" text += df.to_string(index=False) + "\n\n" except Exception as e: text += f"Error reading table {table_name}: {e}\n\n" conn.close() return text except Exception as e: raise ValueError(f"Could not process SQLite database: {e}") def chunk_text(self, text: str, metadata: Dict[str, Any] = None) -> List[Dict[str, Any]]: """Split text into chunks with overlap and metadata""" if not text.strip(): return [] # Clean text text = self._clean_text(text) chunks = [] words = text.split() if len(words) <= self.config.CHUNK_SIZE: # If text is smaller than chunk size, return as single chunk chunks.append({ 'text': text, 'metadata': metadata or {}, 'chunk_index': 0, 'word_count': len(words) }) else: # Split into overlapping chunks for i in range(0, len(words), self.config.CHUNK_SIZE - self.config.CHUNK_OVERLAP): chunk_words = words[i:i + self.config.CHUNK_SIZE] chunk_text = " ".join(chunk_words) chunk_metadata = (metadata or {}).copy() chunk_metadata.update({ 'chunk_index': len(chunks), 'word_count': len(chunk_words), 'start_word': i, 'end_word': i + len(chunk_words) }) chunks.append({ 'text': chunk_text, 'metadata': chunk_metadata }) # Break if we've covered all words if i + self.config.CHUNK_SIZE >= len(words): break return chunks def _clean_text(self, text: str) -> str: """Clean and normalize text""" # Remove excessive whitespace import re text = re.sub(r'\s+', ' ', text) # Remove special characters that might cause issues text = re.sub(r'[^\w\s\.,!?;:()\-\'"$%&@#]', ' ', text) # Remove excessive punctuation text = re.sub(r'[.]{3,}', '...', text) text = re.sub(r'[-]{3,}', '---', text) return text.strip() def process_document(self, file_path: str, file_type: str) -> List[str]: """Process document based on file type and return text chunks""" try: # Extract text based on file type if file_type.lower() == '.pdf': text = self.extract_text_from_pdf(file_path) elif file_type.lower() == '.docx': text = self.extract_text_from_docx(file_path) elif file_type.lower() == '.txt': with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: text = f.read() elif file_type.lower() in ['.jpg', '.jpeg', '.png']: with open(file_path, 'rb') as f: text = self.extract_text_from_image(f.read()) elif file_type.lower() == '.csv': text = self.extract_text_from_csv(file_path) elif file_type.lower() == '.db': text = self.extract_text_from_db(file_path) else: raise ValueError(f"Unsupported file type: {file_type}") if not text or not text.strip(): raise ValueError("No text content extracted from file") # Create metadata metadata = { 'filename': Path(file_path).name, 'file_type': file_type, 'file_size': os.path.getsize(file_path) } # Chunk the text chunks_data = self.chunk_text(text, metadata) # Return just the text chunks for backward compatibility return [chunk['text'] for chunk in chunks_data] except Exception as e: print(f"Error processing document {file_path}: {e}") raise def get_supported_formats(self) -> Dict[str, str]: """Get supported file formats""" return { '.pdf': 'PDF documents', '.docx': 'Microsoft Word documents', '.txt': 'Plain text files', '.jpg': 'JPEG images (with OCR)', '.jpeg': 'JPEG images (with OCR)', '.png': 'PNG images (with OCR)', '.csv': 'Comma-separated values', '.db': 'SQLite databases' }