""" ParagraphChunker.py A module for paragraph-level document chunking with token counting and preprocessing. Features: - Paragraph-based document splitting - Content validation - Multi-level delimiter detection - Smart paragraph boundary detection """ import logging import spacy from typing import List, Optional from pathlib import Path from datetime import datetime from langchain_core.documents import Document from core.BaseChunker import BaseChunker logger = logging.getLogger(__name__) class ParagraphChunker(BaseChunker): """Handles document chunking at the paragraph level with token counting.""" PARAGRAPH_MIN_LENGTH = 50 # Minimum characters for a valid paragraph def __init__(self, model_name=None, embedding_model=None): """ Initialize paragraph chunker with specified models. Args: model_name: Name of the model for tokenization embedding_model: Model for generating embeddings """ super().__init__(model_name, embedding_model) self.page_stats = [] # Initialize spaCy for NLP tasks try: self.nlp = spacy.load("en_core_web_sm") except Exception as e: logger.error(f"Error loading spaCy model: {e}") import subprocess logger.info("Installing spaCy model...") subprocess.run(["python", "-m", "spacy", "download", "en_core_web_sm"], capture_output=True) self.nlp = spacy.load("en_core_web_sm") def _split_into_paragraphs(self, text: str) -> List[str]: """ Split text into paragraphs using length and punctuation heuristics. Args: text: The text content to split Returns: List of paragraphs """ # Pre-clean the text text = text.replace('\r', '\n') # First, try double line breaks paragraphs = text.split('\n\n') # If that fails (PDF extraction issue), use sentence-based reconstruction if len(paragraphs) <= 3: print(f"PDF extraction flattened structure. Reconstructing from sentences...") # Use spaCy for sentence detection doc = self.nlp(text) paragraphs = [] current_para = [] current_length = 0 for sent in doc.sents: sent_text = sent.text.strip() if not sent_text: continue # Add sentence to current paragraph current_para.append(sent_text) current_length += len(sent_text) # Check if we should end the current paragraph should_end_paragraph = ( # Paragraph is getting long (300-600 chars is typical) current_length > 300 and # Current sentence ends with proper punctuation sent_text.endswith(('.', '!', '?')) and # We have substantial content len(current_para) >= 2 ) if should_end_paragraph: paragraphs.append(' '.join(current_para)) current_para = [] current_length = 0 # Add the last paragraph if current_para: paragraphs.append(' '.join(current_para)) print(f"Reconstructed {len(paragraphs)} paragraphs using length heuristics") # Clean and filter paragraphs cleaned_paragraphs = [] for para in paragraphs: clean_para = ' '.join(para.split()) if len(clean_para) >= self.PARAGRAPH_MIN_LENGTH: cleaned_paragraphs.append(clean_para) print(f"Final paragraph count: {len(cleaned_paragraphs)}") return cleaned_paragraphs def _process_single_paragraph(self, content: str, page_number: int, para_number: int, preprocess: bool) -> Optional[Document]: """ Process a single paragraph with analysis and metadata. Args: content: The paragraph content page_number: The page number para_number: The paragraph number preprocess: Whether to preprocess the text Returns: Document object with processed content and metadata, or None if paragraph is invalid """ # First check character length if len(content.strip()) < self.PARAGRAPH_MIN_LENGTH: self.page_stats.append(f"Paragraph {para_number} on page {page_number} is too short.") return None # Optionally preprocess the text if preprocess: content = self.preprocess_text(content) # Analyze the paragraph and generate metadata stats = self.analyze_text(content) # Check token threshold if stats["token_count"] < self.TOKEN_THRESHOLD: self.page_stats.append( f"Paragraph {para_number} on page {page_number} dropped: " f"only {stats['token_count']} tokens" ) return None metadata = { "page": page_number, "paragraph": para_number, "char_count": stats["char_count"], "token_count": stats["token_count"], "sentence_count": stats["sentence_count"], "word_count": stats["word_count"], "has_ocr": str(stats.get("has_content", True)) } return Document(page_content=content, metadata=metadata) def paragraph_process_document(self, file_path: str, preprocess: bool = False) -> List[Document]: """ Process PDF document paragraph by paragraph with analysis. Args: file_path: Path to the PDF file preprocess: Whether to preprocess paragraph text Returns: List of Document objects, one per valid paragraph """ try: self.page_stats = [] # Reset stats for this document raw_pages = self.load_document(file_path) processed_paragraphs = [] logger.info(f"Processing document with {len(raw_pages)} pages") for page_idx, page in enumerate(raw_pages): paragraphs = self._split_into_paragraphs(page.page_content) logger.info(f"Page {page_idx+1}: Found {len(paragraphs)} paragraphs") for para_idx, paragraph in enumerate(paragraphs): processed_para = self._process_single_paragraph( paragraph, page_idx + 1, para_idx + 1, preprocess ) if processed_para: processed_paragraphs.append(processed_para) # Output skipped paragraphs for transparency if self.page_stats: logger.info("\n".join(self.page_stats)) logger.info(f"Processed {len(processed_paragraphs)} valid paragraphs") return processed_paragraphs except Exception as e: logger.error(f"Error in paragraph_process_document: {e}") raise def process_document(self, file_path: str, preprocess: bool = True) -> List[Document]: """ Process document using paragraph chunking strategy (implements abstract method). Args: file_path: Path to the PDF file preprocess: Whether to preprocess paragraph text Returns: List of Document objects, one per valid paragraph """ return self.paragraph_process_document(file_path, preprocess) def process_text_file(self, file_path: str, preprocess: bool = False) -> List[Document]: """ Process text file directly, preserving paragraph structure. Args: file_path: Path to the text file preprocess: Whether to preprocess paragraph text Returns: List of Document objects, one per valid paragraph """ try: # Load the text file directly content = self.load_text_file(file_path) # Clean the text using the same logic as PDF conversion content = self.clean_text_for_processing(content) # Split into paragraphs using double line breaks paragraphs = content.split('\n\n') logger.info(f"Found {len(paragraphs)} paragraphs in text file: {file_path}") processed_paragraphs = [] file_name = Path(file_path).name for para_idx, paragraph in enumerate(paragraphs): paragraph = paragraph.strip() if paragraph: processed_para = self._process_single_paragraph_from_text( paragraph, file_path, file_name, para_idx + 1, preprocess ) if processed_para: processed_paragraphs.append(processed_para) logger.info(f"Processed {len(processed_paragraphs)} valid paragraphs from text file") return processed_paragraphs except Exception as e: logger.error(f"Error processing text file: {e}") raise def _process_single_paragraph_from_text(self, content: str, file_path: str, file_name: str, para_number: int, preprocess: bool) -> Optional[Document]: """ Process a single paragraph from text file with analysis and metadata. Args: content: The paragraph content file_path: Full path to the source file file_name: Name of the source file para_number: The paragraph number preprocess: Whether to preprocess the text Returns: Document object with processed content and metadata, or None if paragraph is invalid """ # First check character length if len(content.strip()) < self.PARAGRAPH_MIN_LENGTH: logger.debug(f"Paragraph {para_number} too short ({len(content)} chars), skipping") return None # Preprocess if requested if preprocess: content = self.preprocess_text(content, remove_headers_footers=False) # Analyze the paragraph analysis = self.analyze_text(content) # Validate content quality if not self.is_content_valid(content): logger.debug(f"Paragraph {para_number} failed content validation, skipping") return None # Create metadata metadata = { "source": file_path, "file_name": file_name, "file_type": "txt", "paragraph": para_number, "char_count": analysis["char_count"], "token_count": analysis["token_count"], "sentence_count": analysis["sentence_count"], "word_count": analysis["word_count"], "chunk_type": "paragraph", "processing_timestamp": datetime.now().isoformat(), } # Create and return document doc = Document(page_content=content, metadata=metadata) logger.debug(f"Created paragraph {para_number}: {analysis['char_count']} chars, {analysis['token_count']} tokens") return doc