import PyPDF2 import pdfplumber from docx import Document from pathlib import Path from typing import List, Dict import re import warnings import logging # Suppress PyPDF2 warnings about font descriptors warnings.filterwarnings('ignore', category=UserWarning, module='PyPDF2') logging.getLogger('PyPDF2').setLevel(logging.ERROR) class DocumentProcessor: """Process various document types and extract text content.""" def __init__(self): self.supported_formats = ['.pdf', '.txt', '.docx'] def process_file(self, file_path: Path) -> Dict[str, any]: """ Process a single file and extract its content. Args: file_path: Path to the file Returns: Dictionary containing file metadata and content """ suffix = file_path.suffix.lower() if suffix == '.pdf': content = self._extract_pdf(file_path) elif suffix == '.txt': content = self._extract_txt(file_path) elif suffix == '.docx': content = self._extract_docx(file_path) else: raise ValueError(f"Unsupported file format: {suffix}") return { 'filename': file_path.name, 'path': str(file_path), 'content': content, 'format': suffix } def _extract_pdf(self, file_path: Path) -> str: """Extract text from PDF using pdfplumber with PyPDF2 fallback.""" text = "" try: # Primary: Use pdfplumber (better for complex PDFs) with pdfplumber.open(file_path) as pdf: for page in pdf.pages: page_text = page.extract_text() if page_text: text += page_text + "\n" except Exception as e: # Fallback: Use PyPDF2 with warnings suppressed try: with warnings.catch_warnings(): warnings.simplefilter("ignore") with open(file_path, 'rb') as file: pdf_reader = PyPDF2.PdfReader(file) for page in pdf_reader.pages: try: page_text = page.extract_text() if page_text: text += page_text + "\n" except Exception: continue # Skip problematic pages except Exception as e2: raise ValueError(f"Could not extract text from PDF: {file_path.name}") return self._clean_text(text) def _extract_txt(self, file_path: Path) -> str: """Extract text from TXT file.""" try: with open(file_path, 'r', encoding='utf-8') as file: text = file.read() except UnicodeDecodeError: with open(file_path, 'r', encoding='latin-1') as file: text = file.read() return self._clean_text(text) def _extract_docx(self, file_path: Path) -> str: """Extract text from DOCX file.""" doc = Document(file_path) text = "\n".join([paragraph.text for paragraph in doc.paragraphs]) return self._clean_text(text) def _clean_text(self, text: str) -> str: """Clean and normalize text.""" # Remove excessive whitespace text = re.sub(r'\s+', ' ', text) # Remove special characters but keep punctuation text = re.sub(r'[^\w\s.,!?;:()\-\'\"]+', '', text) return text.strip() def chunk_text(self, text: str, chunk_size: int = 512, overlap: int = 50, semantic: bool = True) -> List[str]: """ Split text into chunks using semantic or simple chunking. Args: text: The text to chunk chunk_size: Target size of each chunk in characters overlap: Number of overlapping characters between chunks semantic: Use semantic chunking (by headers/concepts) if True Returns: List of text chunks """ if semantic: return self._semantic_chunk(text, chunk_size, overlap) else: return self._simple_chunk(text, chunk_size, overlap) def _semantic_chunk(self, text: str, target_size: int = 512, overlap: int = 50) -> List[str]: """ Chunk text by detecting headers and logical sections. Perfect for lecture slides and structured documents. """ chunks = [] # Split by common header patterns # Pattern 1: Lines that are ALL CAPS or Title Case followed by newline # Pattern 2: Lines starting with numbers like "1.", "1.1", etc. # Pattern 3: Lines with clear visual separators # First, split by double newlines (paragraphs) sections = text.split('\n\n') current_chunk = "" current_header = "" for section in sections: section = section.strip() if not section: continue # Check if this looks like a header is_header = self._is_likely_header(section) if is_header and len(current_chunk) > 100: # Save previous chunk and start new one with this header if current_chunk: chunks.append(current_chunk.strip()) current_chunk = section + "\n\n" current_header = section else: # Add to current chunk potential_chunk = current_chunk + section + "\n\n" # If chunk is getting too large, split it if len(potential_chunk) > target_size * 1.5: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = section + "\n\n" else: current_chunk = potential_chunk # Add final chunk if current_chunk: chunks.append(current_chunk.strip()) # If semantic chunking produced too few chunks, fall back to simple chunking if len(chunks) < len(text) / (target_size * 2): return self._simple_chunk(text, target_size, overlap) return chunks def _is_likely_header(self, text: str) -> bool: """Detect if text is likely a header/title.""" # Too long to be a header if len(text) > 200: return False # Single line headers if '\n' not in text: # ALL CAPS if text.isupper() and len(text.split()) <= 10: return True # Title Case if text.istitle() and len(text.split()) <= 10: return True # Numbered sections like "1.", "1.1", "Chapter 1" if re.match(r'^(\d+\.)+\s+', text) or re.match(r'^(Chapter|Section|Part)\s+\d+', text, re.IGNORECASE): return True return False def _simple_chunk(self, text: str, chunk_size: int = 512, overlap: int = 50) -> List[str]: """ Split text into overlapping chunks (original method). """ chunks = [] start = 0 text_length = len(text) while start < text_length: end = start + chunk_size chunk = text[start:end] # Try to break at sentence boundary if end < text_length: last_period = chunk.rfind('.') last_newline = chunk.rfind('\n') break_point = max(last_period, last_newline) if break_point > chunk_size * 0.5: # At least 50% through the chunk chunk = chunk[:break_point + 1] end = start + break_point + 1 chunks.append(chunk.strip()) start = end - overlap return chunks