Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| # Simplified PDF Processor for Hugging Face Spaces | |
| This module provides comprehensive PDF processing functionality for the RAG system. | |
| ## Overview | |
| The PDF processor handles the complete pipeline from raw PDF files to structured, | |
| searchable document chunks. It includes: | |
| - **Text Extraction**: Robust PDF text extraction with error handling | |
| - **Text Cleaning**: Intelligent preprocessing and normalization | |
| - **Metadata Extraction**: Document title, author, and file information | |
| - **Smart Chunking**: Multiple chunk sizes for optimal retrieval | |
| - **Query Preprocessing**: Text normalization for search queries | |
| ## Key Features | |
| - 📄 **Multi-format Support**: Handles various PDF structures and layouts | |
| - 🧹 **Intelligent Cleaning**: Removes noise while preserving important content | |
| - 📏 **Flexible Chunking**: Multiple chunk sizes for different use cases | |
| - 🔍 **Search Optimization**: Preprocessing for better retrieval performance | |
| - 🛡️ **Error Handling**: Graceful handling of corrupted or problematic files | |
| ## Architecture | |
| The processor follows a modular design: | |
| 1. **Text Extraction**: Raw PDF to text conversion | |
| 2. **Text Cleaning**: Noise removal and normalization | |
| 3. **Metadata Extraction**: Document information extraction | |
| 4. **Chunking**: Intelligent text segmentation | |
| 5. **Query Processing**: Search query optimization | |
| ## Usage Example | |
| ```python | |
| processor = SimplePDFProcessor() | |
| processed_doc = processor.process_document("document.pdf", [100, 400]) | |
| print(f"Processed {len(processed_doc.chunks)} chunks") | |
| ``` | |
| """ | |
| import os | |
| import re | |
| import uuid | |
| from typing import List, Dict, Optional | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| import pypdf | |
| from loguru import logger | |
| # ============================================================================= | |
| # DATA STRUCTURES | |
| # ============================================================================= | |
| class DocumentChunk: | |
| """ | |
| Represents a processed document chunk with metadata | |
| Attributes: | |
| text: The cleaned and processed text content | |
| doc_id: Unique identifier for the source document | |
| filename: Name of the source PDF file | |
| chunk_id: Unique identifier for this specific chunk | |
| chunk_size: Target size used for chunking (in tokens) | |
| """ | |
| text: str | |
| doc_id: str | |
| filename: str | |
| chunk_id: str | |
| chunk_size: int | |
| class ProcessedDocument: | |
| """ | |
| Represents a completely processed PDF document | |
| Attributes: | |
| filename: Name of the PDF file | |
| title: Extracted or inferred document title | |
| author: Extracted or inferred document author | |
| chunks: List of processed document chunks | |
| """ | |
| filename: str | |
| title: str | |
| author: str | |
| chunks: List[DocumentChunk] | |
| # ============================================================================= | |
| # MAIN PDF PROCESSOR CLASS | |
| # ============================================================================= | |
| class SimplePDFProcessor: | |
| """ | |
| Simplified PDF processor for Hugging Face Spaces | |
| This class provides comprehensive PDF processing capabilities including: | |
| - Text extraction and cleaning | |
| - Metadata extraction | |
| - Intelligent chunking | |
| - Query preprocessing | |
| - Error handling and logging | |
| """ | |
| def __init__(self): | |
| """ | |
| Initialize the PDF processor with default settings | |
| Sets up stop words and processing parameters for optimal | |
| document processing and search performance. | |
| """ | |
| # Common English stop words for query preprocessing | |
| self.stop_words = { | |
| "the", | |
| "a", | |
| "an", | |
| "and", | |
| "or", | |
| "but", | |
| "in", | |
| "on", | |
| "at", | |
| "to", | |
| "for", | |
| "of", | |
| "with", | |
| "by", | |
| "is", | |
| "are", | |
| "was", | |
| "were", | |
| "be", | |
| "been", | |
| "being", | |
| "have", | |
| "has", | |
| "had", | |
| "do", | |
| "does", | |
| "did", | |
| "will", | |
| "would", | |
| "could", | |
| "should", | |
| "may", | |
| "might", | |
| "can", | |
| "this", | |
| "that", | |
| "these", | |
| "those", | |
| } | |
| def process_document( | |
| self, file_path: str, chunk_sizes: List[int] = None | |
| ) -> ProcessedDocument: | |
| """ | |
| Process a PDF document through the complete pipeline | |
| This method orchestrates the entire PDF processing workflow: | |
| 1. Extracts text from the PDF file | |
| 2. Cleans and normalizes the text | |
| 3. Extracts document metadata | |
| 4. Creates chunks of different sizes | |
| 5. Returns a structured document object | |
| Args: | |
| file_path: Path to the PDF file to process | |
| chunk_sizes: List of chunk sizes to create (in tokens) | |
| Returns: | |
| ProcessedDocument object with metadata and chunks | |
| Raises: | |
| Exception: If document processing fails | |
| """ | |
| if chunk_sizes is None: | |
| chunk_sizes = [100, 400] # Default chunk sizes | |
| try: | |
| # Step 1: Extract raw text from PDF | |
| text = self._extract_text(file_path) | |
| # Step 2: Clean and normalize the text | |
| cleaned_text = self._clean_text(text) | |
| # Step 3: Extract document metadata | |
| metadata = self._extract_metadata(file_path) | |
| # Step 4: Create chunks of different sizes | |
| chunks = [] | |
| doc_id = str(uuid.uuid4()) # Generate unique document ID | |
| for chunk_size in chunk_sizes: | |
| chunk_list = self._create_chunks( | |
| cleaned_text, chunk_size, doc_id, metadata["filename"] | |
| ) | |
| chunks.extend(chunk_list) | |
| # Step 5: Return processed document | |
| return ProcessedDocument( | |
| filename=metadata["filename"], | |
| title=metadata["title"], | |
| author=metadata["author"], | |
| chunks=chunks, | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error processing document {file_path}: {e}") | |
| raise | |
| def _extract_text(self, file_path: str) -> str: | |
| """ | |
| Extract text content from a PDF file | |
| This method: | |
| 1. Opens the PDF file safely | |
| 2. Iterates through all pages | |
| 3. Extracts text from each page | |
| 4. Combines all text with proper spacing | |
| 5. Handles extraction errors gracefully | |
| Args: | |
| file_path: Path to the PDF file | |
| Returns: | |
| Extracted text content as a string | |
| Raises: | |
| Exception: If text extraction fails | |
| """ | |
| try: | |
| with open(file_path, "rb") as file: | |
| # Create PDF reader object | |
| pdf_reader = pypdf.PdfReader(file) | |
| text = "" | |
| # Extract text from each page | |
| for page in pdf_reader.pages: | |
| page_text = page.extract_text() | |
| if page_text: | |
| text += page_text + "\n" | |
| return text | |
| except Exception as e: | |
| logger.error(f"Error extracting text from {file_path}: {e}") | |
| raise | |
| def _clean_text(self, text: str) -> str: | |
| """ | |
| Clean and normalize extracted text | |
| This method performs comprehensive text cleaning: | |
| 1. Removes excessive whitespace and newlines | |
| 2. Normalizes special characters while preserving punctuation | |
| 3. Removes page numbers and headers/footers | |
| 4. Ensures consistent formatting | |
| Args: | |
| text: Raw extracted text from PDF | |
| Returns: | |
| Cleaned and normalized text | |
| """ | |
| # Remove excessive whitespace (multiple spaces, tabs, etc.) | |
| text = re.sub(r"\s+", " ", text) | |
| # Remove special characters but preserve important punctuation | |
| # This keeps: letters, numbers, spaces, and common punctuation | |
| text = re.sub(r"[^\w\s\.\,\!\?\;\:\-\(\)\[\]\{\}]", "", text) | |
| # Remove standalone page numbers at line ends | |
| # These are often artifacts from PDF extraction | |
| text = re.sub(r"\b\d+\b(?=\s*\n)", "", text) | |
| # Normalize excessive newlines to consistent paragraph breaks | |
| text = re.sub(r"\n\s*\n\s*\n+", "\n\n", text) | |
| return text.strip() | |
| def _extract_metadata(self, file_path: str) -> Dict[str, str]: | |
| """ | |
| Extract metadata from PDF file | |
| This method attempts to extract: | |
| 1. Document title from PDF metadata | |
| 2. Author information from PDF metadata | |
| 3. Falls back to filename if metadata is unavailable | |
| Args: | |
| file_path: Path to the PDF file | |
| Returns: | |
| Dictionary containing filename, title, and author | |
| """ | |
| try: | |
| with open(file_path, "rb") as file: | |
| pdf_reader = pypdf.PdfReader(file) | |
| info = pdf_reader.metadata | |
| return { | |
| "filename": Path(file_path).name, | |
| "title": ( | |
| info.get("/Title", Path(file_path).stem) | |
| if info | |
| else Path(file_path).stem | |
| ), | |
| "author": info.get("/Author", "Unknown") if info else "Unknown", | |
| } | |
| except Exception as e: | |
| logger.warning(f"Error extracting metadata from {file_path}: {e}") | |
| # Fallback to basic information | |
| return { | |
| "filename": Path(file_path).name, | |
| "title": Path(file_path).stem, | |
| "author": "Unknown", | |
| } | |
| def _create_chunks( | |
| self, text: str, chunk_size: int, doc_id: str, filename: str | |
| ) -> List[DocumentChunk]: | |
| """ | |
| Create text chunks of specified size | |
| This method implements intelligent chunking: | |
| 1. Splits text into sentences for natural boundaries | |
| 2. Groups sentences into chunks of target size | |
| 3. Ensures chunks don't exceed the specified token limit | |
| 4. Creates unique identifiers for each chunk | |
| Args: | |
| text: Clean text to chunk | |
| chunk_size: Target chunk size in tokens | |
| doc_id: Unique document identifier | |
| filename: Source filename | |
| Returns: | |
| List of DocumentChunk objects | |
| """ | |
| chunks = [] | |
| # Split text into sentences for natural chunking | |
| sentences = self._split_into_sentences(text) | |
| current_chunk = "" | |
| chunk_id = 0 | |
| for sentence in sentences: | |
| # Estimate token count (rough approximation using word count) | |
| estimated_tokens = len(sentence.split()) | |
| # Add sentence to current chunk if it fits | |
| if len(current_chunk.split()) + estimated_tokens <= chunk_size: | |
| current_chunk += sentence + " " | |
| else: | |
| # Save current chunk if not empty | |
| if current_chunk.strip(): | |
| chunks.append( | |
| DocumentChunk( | |
| text=current_chunk.strip(), | |
| doc_id=doc_id, | |
| filename=filename, | |
| chunk_id=f"{doc_id}_{chunk_id}", | |
| chunk_size=chunk_size, | |
| ) | |
| ) | |
| chunk_id += 1 | |
| # Start new chunk with current sentence | |
| current_chunk = sentence + " " | |
| # Add the last chunk if not empty | |
| if current_chunk.strip(): | |
| chunks.append( | |
| DocumentChunk( | |
| text=current_chunk.strip(), | |
| doc_id=doc_id, | |
| filename=filename, | |
| chunk_id=f"{doc_id}_{chunk_id}", | |
| chunk_size=chunk_size, | |
| ) | |
| ) | |
| return chunks | |
| def _split_into_sentences(self, text: str) -> List[str]: | |
| """ | |
| Split text into sentences for intelligent chunking | |
| This method: | |
| 1. Uses regex patterns to identify sentence boundaries | |
| 2. Filters out very short sentences (likely noise) | |
| 3. Ensures minimum sentence quality | |
| Args: | |
| text: Text to split into sentences | |
| Returns: | |
| List of sentence strings | |
| """ | |
| # Split on sentence-ending punctuation | |
| sentences = re.split(r"[.!?]+", text) | |
| # Clean and filter sentences | |
| cleaned_sentences = [] | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| # Only include sentences with meaningful content (minimum 3 words) | |
| if sentence and len(sentence.split()) > 3: | |
| cleaned_sentences.append(sentence) | |
| return cleaned_sentences | |
| def preprocess_query(self, query: str) -> str: | |
| """ | |
| Preprocess query text for better search performance | |
| This method applies text normalization techniques: | |
| 1. Converts to lowercase for case-insensitive matching | |
| 2. Removes punctuation that might interfere with search | |
| 3. Filters out common stop words | |
| 4. Returns normalized query string | |
| Args: | |
| query: Raw query string from user | |
| Returns: | |
| Preprocessed query string optimized for search | |
| """ | |
| # Convert to lowercase for consistent matching | |
| query = query.lower() | |
| # Remove punctuation that might interfere with search | |
| query = re.sub(r"[^\w\s]", "", query) | |
| # Remove stop words to focus on meaningful terms | |
| words = query.split() | |
| filtered_words = [word for word in words if word not in self.stop_words] | |
| return " ".join(filtered_words) | |