|
|
""" |
|
|
Text chunking module |
|
|
Intelligently splits legal documents into meaningful chunks |
|
|
""" |
|
|
|
|
|
import re |
|
|
import logging |
|
|
from typing import List, Tuple, Optional, Dict |
|
|
from pathlib import Path |
|
|
|
|
|
from .config import ( |
|
|
CHUNK_SIZE_MIN_WORDS, |
|
|
CHUNK_SIZE_MAX_WORDS, |
|
|
CHUNK_SIZE_TARGET_WORDS, |
|
|
CHUNK_OVERLAP_WORDS, |
|
|
COMPILED_SECTION_PATTERNS |
|
|
) |
|
|
from .models import DocumentChunk, ChunkMetadata |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class LegalDocumentChunker: |
|
|
"""Chunks legal documents with section/article awareness""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
min_words: int = CHUNK_SIZE_MIN_WORDS, |
|
|
max_words: int = CHUNK_SIZE_MAX_WORDS, |
|
|
target_words: int = CHUNK_SIZE_TARGET_WORDS, |
|
|
overlap_words: int = CHUNK_OVERLAP_WORDS |
|
|
): |
|
|
""" |
|
|
Initialize chunker |
|
|
|
|
|
Args: |
|
|
min_words: Minimum words per chunk |
|
|
max_words: Maximum words per chunk |
|
|
target_words: Target words per chunk |
|
|
overlap_words: Words to overlap between chunks |
|
|
""" |
|
|
self.min_words = min_words |
|
|
self.max_words = max_words |
|
|
self.target_words = target_words |
|
|
self.overlap_words = overlap_words |
|
|
|
|
|
def chunk_document( |
|
|
self, |
|
|
text: str, |
|
|
source_file: str, |
|
|
pages_data: List[Dict[str, any]] = None |
|
|
) -> List[DocumentChunk]: |
|
|
""" |
|
|
Chunk a document into meaningful pieces |
|
|
|
|
|
Args: |
|
|
text: Full document text |
|
|
source_file: Source filename |
|
|
pages_data: Optional page data for page number tracking |
|
|
|
|
|
Returns: |
|
|
List of DocumentChunk objects |
|
|
""" |
|
|
logger.info(f"Chunking document: {source_file}") |
|
|
|
|
|
|
|
|
sections = self._split_by_sections(text) |
|
|
|
|
|
|
|
|
all_chunks = [] |
|
|
chunk_counter = 0 |
|
|
|
|
|
for section_title, section_text in sections: |
|
|
section_chunks = self._chunk_section( |
|
|
section_text, |
|
|
section_title, |
|
|
source_file, |
|
|
chunk_counter |
|
|
) |
|
|
all_chunks.extend(section_chunks) |
|
|
chunk_counter += len(section_chunks) |
|
|
|
|
|
logger.info(f"Created {len(all_chunks)} chunks from {source_file}") |
|
|
|
|
|
return all_chunks |
|
|
|
|
|
def _split_by_sections(self, text: str) -> List[Tuple[Optional[str], str]]: |
|
|
""" |
|
|
Split text by sections/articles |
|
|
|
|
|
Returns: |
|
|
List of (section_title, section_text) tuples |
|
|
""" |
|
|
sections = [] |
|
|
current_section = None |
|
|
current_text = [] |
|
|
|
|
|
lines = text.split('\n') |
|
|
|
|
|
for line in lines: |
|
|
|
|
|
section_match = self._detect_section(line) |
|
|
|
|
|
if section_match: |
|
|
|
|
|
if current_text: |
|
|
sections.append((current_section, '\n'.join(current_text))) |
|
|
current_text = [] |
|
|
|
|
|
|
|
|
current_section = section_match |
|
|
|
|
|
current_text = [line] |
|
|
else: |
|
|
current_text.append(line) |
|
|
|
|
|
|
|
|
if current_text: |
|
|
sections.append((current_section, '\n'.join(current_text))) |
|
|
|
|
|
|
|
|
if len(sections) == 0: |
|
|
sections.append((None, text)) |
|
|
|
|
|
logger.info(f"Detected {len(sections)} sections in document") |
|
|
|
|
|
return sections |
|
|
|
|
|
def _detect_section(self, line: str) -> Optional[str]: |
|
|
""" |
|
|
Detect if a line contains a section/article marker |
|
|
|
|
|
Returns: |
|
|
Section title if detected, None otherwise |
|
|
""" |
|
|
for pattern in COMPILED_SECTION_PATTERNS: |
|
|
match = pattern.search(line) |
|
|
if match: |
|
|
|
|
|
if len(match.groups()) >= 2: |
|
|
|
|
|
return f"{match.group(1)}. {match.group(2)}" |
|
|
else: |
|
|
|
|
|
return match.group(0) |
|
|
|
|
|
return None |
|
|
|
|
|
def _chunk_section( |
|
|
self, |
|
|
section_text: str, |
|
|
section_title: Optional[str], |
|
|
source_file: str, |
|
|
start_counter: int |
|
|
) -> List[DocumentChunk]: |
|
|
""" |
|
|
Chunk a single section into appropriate sizes |
|
|
|
|
|
Args: |
|
|
section_text: Text of the section |
|
|
section_title: Title/identifier of the section |
|
|
source_file: Source filename |
|
|
start_counter: Starting chunk number |
|
|
|
|
|
Returns: |
|
|
List of chunks for this section |
|
|
""" |
|
|
words = section_text.split() |
|
|
word_count = len(words) |
|
|
|
|
|
|
|
|
if word_count <= self.max_words: |
|
|
chunk = self._create_chunk( |
|
|
text=section_text, |
|
|
chunk_id=f"{Path(source_file).stem}_chunk_{start_counter:04d}", |
|
|
source_file=source_file, |
|
|
article_section=section_title |
|
|
) |
|
|
return [chunk] |
|
|
|
|
|
|
|
|
chunks = [] |
|
|
start_idx = 0 |
|
|
chunk_num = start_counter |
|
|
max_iterations = word_count |
|
|
iteration_count = 0 |
|
|
|
|
|
while start_idx < word_count and iteration_count < max_iterations: |
|
|
iteration_count += 1 |
|
|
|
|
|
|
|
|
end_idx = min(start_idx + self.target_words, word_count) |
|
|
|
|
|
|
|
|
if end_idx <= start_idx: |
|
|
logger.warning(f"Chunking issue: end_idx ({end_idx}) <= start_idx ({start_idx}), breaking") |
|
|
break |
|
|
|
|
|
|
|
|
if end_idx < word_count: |
|
|
|
|
|
chunk_words = words[start_idx:end_idx] |
|
|
chunk_text = ' '.join(chunk_words) |
|
|
|
|
|
|
|
|
last_period = max( |
|
|
chunk_text.rfind('. '), |
|
|
chunk_text.rfind('! '), |
|
|
chunk_text.rfind('? ') |
|
|
) |
|
|
|
|
|
if last_period > len(chunk_text) * 0.5: |
|
|
|
|
|
words_before_period = chunk_text[:last_period + 1].split() |
|
|
new_end_idx = start_idx + len(words_before_period) |
|
|
|
|
|
if new_end_idx > start_idx: |
|
|
end_idx = new_end_idx |
|
|
|
|
|
|
|
|
chunk_words = words[start_idx:end_idx] |
|
|
chunk_text = ' '.join(chunk_words) |
|
|
|
|
|
chunk = self._create_chunk( |
|
|
text=chunk_text, |
|
|
chunk_id=f"{Path(source_file).stem}_chunk_{chunk_num:04d}", |
|
|
source_file=source_file, |
|
|
article_section=section_title |
|
|
) |
|
|
chunks.append(chunk) |
|
|
|
|
|
|
|
|
|
|
|
overlap = min(self.overlap_words, end_idx - start_idx - 1) |
|
|
next_start_idx = end_idx - overlap |
|
|
|
|
|
|
|
|
if next_start_idx <= start_idx: |
|
|
next_start_idx = start_idx + 1 |
|
|
|
|
|
start_idx = next_start_idx |
|
|
chunk_num += 1 |
|
|
|
|
|
if iteration_count >= max_iterations: |
|
|
logger.warning(f"Hit max iterations ({max_iterations}) while chunking section") |
|
|
|
|
|
return chunks |
|
|
|
|
|
def _create_chunk( |
|
|
self, |
|
|
text: str, |
|
|
chunk_id: str, |
|
|
source_file: str, |
|
|
article_section: Optional[str] = None |
|
|
) -> DocumentChunk: |
|
|
"""Create a DocumentChunk object""" |
|
|
words = text.split() |
|
|
|
|
|
metadata = ChunkMetadata( |
|
|
source_file=source_file, |
|
|
article_section=article_section, |
|
|
word_count=len(words), |
|
|
char_count=len(text) |
|
|
) |
|
|
|
|
|
return DocumentChunk( |
|
|
chunk_id=chunk_id, |
|
|
text=text, |
|
|
metadata=metadata |
|
|
) |
|
|
|