AI_Toolkit / src /core /ParagraphChunker.py
NavyDevilDoc's picture
Upload 10 files
c0f31c1 verified
"""
ParagraphChunker.py
A module for paragraph-level document chunking with token counting and preprocessing.
Features:
- Paragraph-based document splitting
- Content validation
- Multi-level delimiter detection
- Smart paragraph boundary detection
"""
import logging
import spacy
from typing import List, Optional
from pathlib import Path
from datetime import datetime
from langchain_core.documents import Document
from core.BaseChunker import BaseChunker
logger = logging.getLogger(__name__)
class ParagraphChunker(BaseChunker):
"""Handles document chunking at the paragraph level with token counting."""
PARAGRAPH_MIN_LENGTH = 50 # Minimum characters for a valid paragraph
def __init__(self, model_name=None, embedding_model=None):
"""
Initialize paragraph chunker with specified models.
Args:
model_name: Name of the model for tokenization
embedding_model: Model for generating embeddings
"""
super().__init__(model_name, embedding_model)
self.page_stats = []
# Initialize spaCy for NLP tasks
try:
self.nlp = spacy.load("en_core_web_sm")
except Exception as e:
logger.error(f"Error loading spaCy model: {e}")
import subprocess
logger.info("Installing spaCy model...")
subprocess.run(["python", "-m", "spacy", "download", "en_core_web_sm"],
capture_output=True)
self.nlp = spacy.load("en_core_web_sm")
def _split_into_paragraphs(self, text: str) -> List[str]:
"""
Split text into paragraphs using length and punctuation heuristics.
Args:
text: The text content to split
Returns:
List of paragraphs
"""
# Pre-clean the text
text = text.replace('\r', '\n')
# First, try double line breaks
paragraphs = text.split('\n\n')
# If that fails (PDF extraction issue), use sentence-based reconstruction
if len(paragraphs) <= 3:
print(f"PDF extraction flattened structure. Reconstructing from sentences...")
# Use spaCy for sentence detection
doc = self.nlp(text)
paragraphs = []
current_para = []
current_length = 0
for sent in doc.sents:
sent_text = sent.text.strip()
if not sent_text:
continue
# Add sentence to current paragraph
current_para.append(sent_text)
current_length += len(sent_text)
# Check if we should end the current paragraph
should_end_paragraph = (
# Paragraph is getting long (300-600 chars is typical)
current_length > 300 and
# Current sentence ends with proper punctuation
sent_text.endswith(('.', '!', '?')) and
# We have substantial content
len(current_para) >= 2
)
if should_end_paragraph:
paragraphs.append(' '.join(current_para))
current_para = []
current_length = 0
# Add the last paragraph
if current_para:
paragraphs.append(' '.join(current_para))
print(f"Reconstructed {len(paragraphs)} paragraphs using length heuristics")
# Clean and filter paragraphs
cleaned_paragraphs = []
for para in paragraphs:
clean_para = ' '.join(para.split())
if len(clean_para) >= self.PARAGRAPH_MIN_LENGTH:
cleaned_paragraphs.append(clean_para)
print(f"Final paragraph count: {len(cleaned_paragraphs)}")
return cleaned_paragraphs
def _process_single_paragraph(self, content: str, page_number: int,
para_number: int, preprocess: bool) -> Optional[Document]:
"""
Process a single paragraph with analysis and metadata.
Args:
content: The paragraph content
page_number: The page number
para_number: The paragraph number
preprocess: Whether to preprocess the text
Returns:
Document object with processed content and metadata, or None if paragraph is invalid
"""
# First check character length
if len(content.strip()) < self.PARAGRAPH_MIN_LENGTH:
self.page_stats.append(f"Paragraph {para_number} on page {page_number} is too short.")
return None
# Optionally preprocess the text
if preprocess:
content = self.preprocess_text(content)
# Analyze the paragraph and generate metadata
stats = self.analyze_text(content)
# Check token threshold
if stats["token_count"] < self.TOKEN_THRESHOLD:
self.page_stats.append(
f"Paragraph {para_number} on page {page_number} dropped: "
f"only {stats['token_count']} tokens"
)
return None
metadata = {
"page": page_number,
"paragraph": para_number,
"char_count": stats["char_count"],
"token_count": stats["token_count"],
"sentence_count": stats["sentence_count"],
"word_count": stats["word_count"],
"has_ocr": str(stats.get("has_content", True))
}
return Document(page_content=content, metadata=metadata)
def paragraph_process_document(self, file_path: str, preprocess: bool = False) -> List[Document]:
"""
Process PDF document paragraph by paragraph with analysis.
Args:
file_path: Path to the PDF file
preprocess: Whether to preprocess paragraph text
Returns:
List of Document objects, one per valid paragraph
"""
try:
self.page_stats = [] # Reset stats for this document
raw_pages = self.load_document(file_path)
processed_paragraphs = []
logger.info(f"Processing document with {len(raw_pages)} pages")
for page_idx, page in enumerate(raw_pages):
paragraphs = self._split_into_paragraphs(page.page_content)
logger.info(f"Page {page_idx+1}: Found {len(paragraphs)} paragraphs")
for para_idx, paragraph in enumerate(paragraphs):
processed_para = self._process_single_paragraph(
paragraph,
page_idx + 1,
para_idx + 1,
preprocess
)
if processed_para:
processed_paragraphs.append(processed_para)
# Output skipped paragraphs for transparency
if self.page_stats:
logger.info("\n".join(self.page_stats))
logger.info(f"Processed {len(processed_paragraphs)} valid paragraphs")
return processed_paragraphs
except Exception as e:
logger.error(f"Error in paragraph_process_document: {e}")
raise
def process_document(self, file_path: str, preprocess: bool = True) -> List[Document]:
"""
Process document using paragraph chunking strategy (implements abstract method).
Args:
file_path: Path to the PDF file
preprocess: Whether to preprocess paragraph text
Returns:
List of Document objects, one per valid paragraph
"""
return self.paragraph_process_document(file_path, preprocess)
def process_text_file(self, file_path: str, preprocess: bool = False) -> List[Document]:
"""
Process text file directly, preserving paragraph structure.
Args:
file_path: Path to the text file
preprocess: Whether to preprocess paragraph text
Returns:
List of Document objects, one per valid paragraph
"""
try:
# Load the text file directly
content = self.load_text_file(file_path)
# Clean the text using the same logic as PDF conversion
content = self.clean_text_for_processing(content)
# Split into paragraphs using double line breaks
paragraphs = content.split('\n\n')
logger.info(f"Found {len(paragraphs)} paragraphs in text file: {file_path}")
processed_paragraphs = []
file_name = Path(file_path).name
for para_idx, paragraph in enumerate(paragraphs):
paragraph = paragraph.strip()
if paragraph:
processed_para = self._process_single_paragraph_from_text(
paragraph,
file_path,
file_name,
para_idx + 1,
preprocess
)
if processed_para:
processed_paragraphs.append(processed_para)
logger.info(f"Processed {len(processed_paragraphs)} valid paragraphs from text file")
return processed_paragraphs
except Exception as e:
logger.error(f"Error processing text file: {e}")
raise
def _process_single_paragraph_from_text(self, content: str, file_path: str,
file_name: str, para_number: int,
preprocess: bool) -> Optional[Document]:
"""
Process a single paragraph from text file with analysis and metadata.
Args:
content: The paragraph content
file_path: Full path to the source file
file_name: Name of the source file
para_number: The paragraph number
preprocess: Whether to preprocess the text
Returns:
Document object with processed content and metadata, or None if paragraph is invalid
"""
# First check character length
if len(content.strip()) < self.PARAGRAPH_MIN_LENGTH:
logger.debug(f"Paragraph {para_number} too short ({len(content)} chars), skipping")
return None
# Preprocess if requested
if preprocess:
content = self.preprocess_text(content, remove_headers_footers=False)
# Analyze the paragraph
analysis = self.analyze_text(content)
# Validate content quality
if not self.is_content_valid(content):
logger.debug(f"Paragraph {para_number} failed content validation, skipping")
return None
# Create metadata
metadata = {
"source": file_path,
"file_name": file_name,
"file_type": "txt",
"paragraph": para_number,
"char_count": analysis["char_count"],
"token_count": analysis["token_count"],
"sentence_count": analysis["sentence_count"],
"word_count": analysis["word_count"],
"chunk_type": "paragraph",
"processing_timestamp": datetime.now().isoformat(),
}
# Create and return document
doc = Document(page_content=content, metadata=metadata)
logger.debug(f"Created paragraph {para_number}: {analysis['char_count']} chars, {analysis['token_count']} tokens")
return doc