Spaces:
Sleeping
Sleeping
| """ | |
| ParagraphChunker.py | |
| A module for paragraph-level document chunking with token counting and preprocessing. | |
| Features: | |
| - Paragraph-based document splitting | |
| - Content validation | |
| - Multi-level delimiter detection | |
| - Smart paragraph boundary detection | |
| """ | |
| import logging | |
| import spacy | |
| from typing import List, Optional | |
| from pathlib import Path | |
| from datetime import datetime | |
| from langchain_core.documents import Document | |
| from core.BaseChunker import BaseChunker | |
| logger = logging.getLogger(__name__) | |
| class ParagraphChunker(BaseChunker): | |
| """Handles document chunking at the paragraph level with token counting.""" | |
| PARAGRAPH_MIN_LENGTH = 50 # Minimum characters for a valid paragraph | |
| def __init__(self, model_name=None, embedding_model=None): | |
| """ | |
| Initialize paragraph chunker with specified models. | |
| Args: | |
| model_name: Name of the model for tokenization | |
| embedding_model: Model for generating embeddings | |
| """ | |
| super().__init__(model_name, embedding_model) | |
| self.page_stats = [] | |
| # Initialize spaCy for NLP tasks | |
| try: | |
| self.nlp = spacy.load("en_core_web_sm") | |
| except Exception as e: | |
| logger.error(f"Error loading spaCy model: {e}") | |
| import subprocess | |
| logger.info("Installing spaCy model...") | |
| subprocess.run(["python", "-m", "spacy", "download", "en_core_web_sm"], | |
| capture_output=True) | |
| self.nlp = spacy.load("en_core_web_sm") | |
| def _split_into_paragraphs(self, text: str) -> List[str]: | |
| """ | |
| Split text into paragraphs using length and punctuation heuristics. | |
| Args: | |
| text: The text content to split | |
| Returns: | |
| List of paragraphs | |
| """ | |
| # Pre-clean the text | |
| text = text.replace('\r', '\n') | |
| # First, try double line breaks | |
| paragraphs = text.split('\n\n') | |
| # If that fails (PDF extraction issue), use sentence-based reconstruction | |
| if len(paragraphs) <= 3: | |
| print(f"PDF extraction flattened structure. Reconstructing from sentences...") | |
| # Use spaCy for sentence detection | |
| doc = self.nlp(text) | |
| paragraphs = [] | |
| current_para = [] | |
| current_length = 0 | |
| for sent in doc.sents: | |
| sent_text = sent.text.strip() | |
| if not sent_text: | |
| continue | |
| # Add sentence to current paragraph | |
| current_para.append(sent_text) | |
| current_length += len(sent_text) | |
| # Check if we should end the current paragraph | |
| should_end_paragraph = ( | |
| # Paragraph is getting long (300-600 chars is typical) | |
| current_length > 300 and | |
| # Current sentence ends with proper punctuation | |
| sent_text.endswith(('.', '!', '?')) and | |
| # We have substantial content | |
| len(current_para) >= 2 | |
| ) | |
| if should_end_paragraph: | |
| paragraphs.append(' '.join(current_para)) | |
| current_para = [] | |
| current_length = 0 | |
| # Add the last paragraph | |
| if current_para: | |
| paragraphs.append(' '.join(current_para)) | |
| print(f"Reconstructed {len(paragraphs)} paragraphs using length heuristics") | |
| # Clean and filter paragraphs | |
| cleaned_paragraphs = [] | |
| for para in paragraphs: | |
| clean_para = ' '.join(para.split()) | |
| if len(clean_para) >= self.PARAGRAPH_MIN_LENGTH: | |
| cleaned_paragraphs.append(clean_para) | |
| print(f"Final paragraph count: {len(cleaned_paragraphs)}") | |
| return cleaned_paragraphs | |
| def _process_single_paragraph(self, content: str, page_number: int, | |
| para_number: int, preprocess: bool) -> Optional[Document]: | |
| """ | |
| Process a single paragraph with analysis and metadata. | |
| Args: | |
| content: The paragraph content | |
| page_number: The page number | |
| para_number: The paragraph number | |
| preprocess: Whether to preprocess the text | |
| Returns: | |
| Document object with processed content and metadata, or None if paragraph is invalid | |
| """ | |
| # First check character length | |
| if len(content.strip()) < self.PARAGRAPH_MIN_LENGTH: | |
| self.page_stats.append(f"Paragraph {para_number} on page {page_number} is too short.") | |
| return None | |
| # Optionally preprocess the text | |
| if preprocess: | |
| content = self.preprocess_text(content) | |
| # Analyze the paragraph and generate metadata | |
| stats = self.analyze_text(content) | |
| # Check token threshold | |
| if stats["token_count"] < self.TOKEN_THRESHOLD: | |
| self.page_stats.append( | |
| f"Paragraph {para_number} on page {page_number} dropped: " | |
| f"only {stats['token_count']} tokens" | |
| ) | |
| return None | |
| metadata = { | |
| "page": page_number, | |
| "paragraph": para_number, | |
| "char_count": stats["char_count"], | |
| "token_count": stats["token_count"], | |
| "sentence_count": stats["sentence_count"], | |
| "word_count": stats["word_count"], | |
| "has_ocr": str(stats.get("has_content", True)) | |
| } | |
| return Document(page_content=content, metadata=metadata) | |
| def paragraph_process_document(self, file_path: str, preprocess: bool = False) -> List[Document]: | |
| """ | |
| Process PDF document paragraph by paragraph with analysis. | |
| Args: | |
| file_path: Path to the PDF file | |
| preprocess: Whether to preprocess paragraph text | |
| Returns: | |
| List of Document objects, one per valid paragraph | |
| """ | |
| try: | |
| self.page_stats = [] # Reset stats for this document | |
| raw_pages = self.load_document(file_path) | |
| processed_paragraphs = [] | |
| logger.info(f"Processing document with {len(raw_pages)} pages") | |
| for page_idx, page in enumerate(raw_pages): | |
| paragraphs = self._split_into_paragraphs(page.page_content) | |
| logger.info(f"Page {page_idx+1}: Found {len(paragraphs)} paragraphs") | |
| for para_idx, paragraph in enumerate(paragraphs): | |
| processed_para = self._process_single_paragraph( | |
| paragraph, | |
| page_idx + 1, | |
| para_idx + 1, | |
| preprocess | |
| ) | |
| if processed_para: | |
| processed_paragraphs.append(processed_para) | |
| # Output skipped paragraphs for transparency | |
| if self.page_stats: | |
| logger.info("\n".join(self.page_stats)) | |
| logger.info(f"Processed {len(processed_paragraphs)} valid paragraphs") | |
| return processed_paragraphs | |
| except Exception as e: | |
| logger.error(f"Error in paragraph_process_document: {e}") | |
| raise | |
| def process_document(self, file_path: str, preprocess: bool = True) -> List[Document]: | |
| """ | |
| Process document using paragraph chunking strategy (implements abstract method). | |
| Args: | |
| file_path: Path to the PDF file | |
| preprocess: Whether to preprocess paragraph text | |
| Returns: | |
| List of Document objects, one per valid paragraph | |
| """ | |
| return self.paragraph_process_document(file_path, preprocess) | |
| def process_text_file(self, file_path: str, preprocess: bool = False) -> List[Document]: | |
| """ | |
| Process text file directly, preserving paragraph structure. | |
| Args: | |
| file_path: Path to the text file | |
| preprocess: Whether to preprocess paragraph text | |
| Returns: | |
| List of Document objects, one per valid paragraph | |
| """ | |
| try: | |
| # Load the text file directly | |
| content = self.load_text_file(file_path) | |
| # Clean the text using the same logic as PDF conversion | |
| content = self.clean_text_for_processing(content) | |
| # Split into paragraphs using double line breaks | |
| paragraphs = content.split('\n\n') | |
| logger.info(f"Found {len(paragraphs)} paragraphs in text file: {file_path}") | |
| processed_paragraphs = [] | |
| file_name = Path(file_path).name | |
| for para_idx, paragraph in enumerate(paragraphs): | |
| paragraph = paragraph.strip() | |
| if paragraph: | |
| processed_para = self._process_single_paragraph_from_text( | |
| paragraph, | |
| file_path, | |
| file_name, | |
| para_idx + 1, | |
| preprocess | |
| ) | |
| if processed_para: | |
| processed_paragraphs.append(processed_para) | |
| logger.info(f"Processed {len(processed_paragraphs)} valid paragraphs from text file") | |
| return processed_paragraphs | |
| except Exception as e: | |
| logger.error(f"Error processing text file: {e}") | |
| raise | |
| def _process_single_paragraph_from_text(self, content: str, file_path: str, | |
| file_name: str, para_number: int, | |
| preprocess: bool) -> Optional[Document]: | |
| """ | |
| Process a single paragraph from text file with analysis and metadata. | |
| Args: | |
| content: The paragraph content | |
| file_path: Full path to the source file | |
| file_name: Name of the source file | |
| para_number: The paragraph number | |
| preprocess: Whether to preprocess the text | |
| Returns: | |
| Document object with processed content and metadata, or None if paragraph is invalid | |
| """ | |
| # First check character length | |
| if len(content.strip()) < self.PARAGRAPH_MIN_LENGTH: | |
| logger.debug(f"Paragraph {para_number} too short ({len(content)} chars), skipping") | |
| return None | |
| # Preprocess if requested | |
| if preprocess: | |
| content = self.preprocess_text(content, remove_headers_footers=False) | |
| # Analyze the paragraph | |
| analysis = self.analyze_text(content) | |
| # Validate content quality | |
| if not self.is_content_valid(content): | |
| logger.debug(f"Paragraph {para_number} failed content validation, skipping") | |
| return None | |
| # Create metadata | |
| metadata = { | |
| "source": file_path, | |
| "file_name": file_name, | |
| "file_type": "txt", | |
| "paragraph": para_number, | |
| "char_count": analysis["char_count"], | |
| "token_count": analysis["token_count"], | |
| "sentence_count": analysis["sentence_count"], | |
| "word_count": analysis["word_count"], | |
| "chunk_type": "paragraph", | |
| "processing_timestamp": datetime.now().isoformat(), | |
| } | |
| # Create and return document | |
| doc = Document(page_content=content, metadata=metadata) | |
| logger.debug(f"Created paragraph {para_number}: {analysis['char_count']} chars, {analysis['token_count']} tokens") | |
| return doc |