l / streamlit_app /core /text_processor.py
Princess3's picture
Upload 25 files
c089ca4 verified
#!/usr/bin/env python3
"""
Text Processor
Handles text cleaning, chunking, and preprocessing for the NZ Legislation Loophole Analysis.
Optimized for legal/legislative content with specialized cleaning and structuring.
"""
import re
from typing import List, Dict, Any, Optional, Tuple
import hashlib
import json
class TextProcessor:
"""Advanced text processing for legislation analysis"""
def __init__(self):
"""Initialize the text processor with legal-specific patterns"""
# Legal-specific patterns
self.section_pattern = re.compile(r'^(\d+):', re.MULTILINE)
self.act_name_pattern = re.compile(r'(\b\w+(?:\s+\w+)*)\s+Act\s+(\d{4})', re.IGNORECASE)
self.date_patterns = [
(r'(\d{1,2})\s*(?:January|February|March|April|May|June|July|August|September|October|November|December)\s*(\d{4})',
lambda m: f"{m.group(1)} {m.group(2)}"),
(r'(\d{1,2})/(\d{1,2})/(\d{4})', r'\1/\2/\3'),
(r'(\d{4})-(\d{2})-(\d{2})', r'\1-\2-\3')
]
# NZ-specific legal terms
self.nz_terms = {
'New Zealand': 'New Zealand',
'Parliament': 'Parliament',
'Crown': 'Crown',
'Government': 'Government',
'Treaty of Waitangi': 'Treaty of Waitangi',
'NZB': 'NZB',
'Her Majesty': 'Her Majesty',
'Governor-General': 'Governor-General'
}
def clean_text(self, text: str, preserve_structure: bool = True) -> str:
"""
Clean and normalize text for better processing, optimized for legal content
Args:
text: Raw text to clean
preserve_structure: Whether to preserve legal document structure
Returns:
Cleaned text
"""
if not text:
return ""
# Preserve section numbers and legal structure if requested
if preserve_structure:
# Keep section numbers like "1:", "2:", etc.
text = self.section_pattern.sub(r'\1', text)
# Remove excessive whitespace but preserve paragraph structure
text = re.sub(r'[ \t]+', ' ', text) # Replace multiple spaces/tabs with single space
text = re.sub(r'\n\s*\n', '\n\n', text) # Preserve paragraph breaks but clean up
text = re.sub(r'\n{3,}', '\n\n', text) # Reduce excessive newlines to double
# Remove control characters but preserve legal formatting
text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]', '', text)
# Handle legal-specific characters and formatting
allowed_chars = r'\w\s\.\,\!\?\;\:\-\(\)\[\]\{\}\"\'\/\@\#\$\%\^\&\*\+\=\~\`°§'
text = re.sub(r'[^' + allowed_chars + ']', '', text)
# Normalize quotes and apostrophes for legal text
text = re.sub(r'[""]', '"', text)
text = re.sub(r"['']", "'", text)
text = re.sub(r'`', "'", text)
# Clean up legal numbering and references
text = re.sub(r'section\s+(\d+)', r'section \1', text, flags=re.IGNORECASE)
# Normalize date formats
for pattern, replacement in self.date_patterns:
if callable(replacement):
text = re.compile(pattern, re.IGNORECASE).sub(replacement, text)
else:
text = re.compile(pattern, re.IGNORECASE).sub(replacement, text)
# Normalize act names with years
text = self.act_name_pattern.sub(r'\1 Act', text)
# Clean up amendment references
text = re.sub(r'[Aa]mendment\(s\)\s+incorporated\s+in\s+the\s+[Aa]ct\(s\)', 'Amendments incorporated', text)
# Normalize section references
text = re.sub(r'section\s+\d+\(\d+\)\([a-zA-Z]\)', lambda m: m.group(0).lower(), text)
# Generic pattern for legal document sections
text = re.sub(r'(\b(?:section|part|chapter|article|clause|subsection|paragraph))\s+(\d+[a-zA-Z]*)',
lambda m: f"{m.group(1)} {m.group(2)}", text, flags=re.IGNORECASE)
# NZ-specific legal enhancements
for term, normalized in self.nz_terms.items():
text = re.sub(re.escape(term), normalized, text, flags=re.IGNORECASE)
# Handle Maori-specific characters if present
maori_chars = 'āēīōūwhĀĒĪŌŪWH'
allowed_chars += maori_chars
text = re.sub(r'[^' + allowed_chars + ']', '', text)
# Remove empty lines and trim while preserving legal structure
lines = []
for line in text.split('\n'):
stripped = line.strip()
if stripped: # Keep non-empty lines
if preserve_structure and re.match(r'^\d+:', stripped):
lines.append(stripped) # Preserve section headers
else:
lines.append(stripped)
text = '\n'.join(lines)
return text.strip()
def chunk_text(self, text: str, chunk_size: int = 4096, overlap: int = 256,
method: str = "sentence") -> List[str]:
"""
Split text into overlapping chunks for processing
Args:
text: Text to chunk
chunk_size: Size of each chunk
overlap: Overlap between chunks
method: Chunking method ('sentence', 'word', 'character')
Returns:
List of text chunks
"""
if not text or len(text) <= chunk_size:
return [text] if text else []
chunks = []
if method == "sentence":
chunks = self._chunk_by_sentence(text, chunk_size, overlap)
elif method == "word":
chunks = self._chunk_by_word(text, chunk_size, overlap)
else: # character
chunks = self._chunk_by_character(text, chunk_size, overlap)
return chunks
def _chunk_by_sentence(self, text: str, chunk_size: int, overlap: int) -> List[str]:
"""Chunk text by sentence boundaries"""
# Split into sentences (rough approximation)
sentence_pattern = r'(?<=[.!?])\s+'
sentences = re.split(sentence_pattern, text)
chunks = []
current_chunk = ""
overlap_text = ""
for sentence in sentences:
if not sentence.strip():
continue
# Check if adding this sentence would exceed chunk size
potential_chunk = current_chunk + sentence + " "
if len(potential_chunk) > chunk_size and current_chunk:
# Save current chunk
chunks.append(current_chunk.strip())
# Start new chunk with overlap
if overlap > 0 and len(current_chunk) > overlap:
overlap_text = current_chunk[-overlap:].strip()
current_chunk = overlap_text + " " + sentence + " "
else:
current_chunk = sentence + " "
else:
current_chunk = potential_chunk
# Add the last chunk
if current_chunk.strip():
chunks.append(current_chunk.strip())
return chunks
def _chunk_by_word(self, text: str, chunk_size: int, overlap: int) -> List[str]:
"""Chunk text by word boundaries"""
words = text.split()
chunks = []
if not words:
return []
start = 0
while start < len(words):
end = start + 1
chunk_words = []
# Build chunk up to chunk_size
while end <= len(words):
potential_chunk = " ".join(words[start:end])
if len(potential_chunk) > chunk_size:
break
chunk_words = words[start:end]
end += 1
if chunk_words:
chunk = " ".join(chunk_words)
chunks.append(chunk)
# Move start position with overlap
overlap_words = max(0, min(overlap // 5, len(chunk_words))) # Rough word overlap
start = max(start + 1, end - overlap_words)
else:
break
return chunks
def _chunk_by_character(self, text: str, chunk_size: int, overlap: int) -> List[str]:
"""Chunk text by character count (simple fallback)"""
chunks = []
start = 0
while start < len(text):
end = min(start + chunk_size, len(text))
chunk = text[start:end]
chunks.append(chunk)
# Move start with overlap
start = end - overlap if end < len(text) else len(text)
return chunks
def extract_metadata(self, text: str) -> Dict[str, Any]:
"""Extract metadata from legislation text"""
metadata = {
'sections': [],
'acts_referenced': [],
'dates': [],
'word_count': len(text.split()),
'character_count': len(text),
'has_nz_references': False,
'has_maori_terms': False
}
# Extract section numbers
sections = self.section_pattern.findall(text)
metadata['sections'] = [int(s) for s in sections]
# Extract referenced acts
acts = self.act_name_pattern.findall(text)
metadata['acts_referenced'] = [f"{act[0]} Act" for act in acts]
# Check for NZ-specific references
nz_indicators = ['New Zealand', 'Parliament', 'Crown', 'Government', 'Treaty of Waitangi']
metadata['has_nz_references'] = any(term in text for term in nz_indicators)
# Check for Maori terms
maori_indicators = ['ā', 'ē', 'ī', 'ō', 'ū', 'whakapapa', 'tangata whenua', 'mana']
metadata['has_maori_terms'] = any(term in text.lower() for term in maori_indicators)
# Extract dates (basic)
date_pattern = r'\b\d{1,2}[/-]\d{1,2}[/-]\d{4}\b'
dates = re.findall(date_pattern, text)
metadata['dates'] = dates
return metadata
def calculate_text_hash(self, text: str) -> str:
"""Calculate SHA-256 hash of text for caching"""
return hashlib.sha256(text.encode('utf-8')).hexdigest()
def get_chunk_statistics(self, chunks: List[str]) -> Dict[str, Any]:
"""Get statistics about text chunks"""
if not chunks:
return {
'total_chunks': 0,
'avg_chunk_size': 0,
'min_chunk_size': 0,
'max_chunk_size': 0,
'total_characters': 0
}
chunk_sizes = [len(chunk) for chunk in chunks]
return {
'total_chunks': len(chunks),
'avg_chunk_size': sum(chunk_sizes) / len(chunks),
'min_chunk_size': min(chunk_sizes),
'max_chunk_size': max(chunk_sizes),
'total_characters': sum(chunk_sizes)
}
def preprocess_legislation_json(self, json_data: Dict[str, Any]) -> Dict[str, Any]:
"""Preprocess legislation data from JSON format"""
processed = {
'id': json_data.get('id', ''),
'title': json_data.get('title', ''),
'year': json_data.get('year', ''),
'source': json_data.get('source', ''),
'original_text': json_data.get('text', ''),
'cleaned_text': '',
'chunks': [],
'metadata': {},
'processing_stats': {}
}
# Clean the text
raw_text = json_data.get('text', '')
processed['cleaned_text'] = self.clean_text(raw_text)
# Extract metadata
processed['metadata'] = self.extract_metadata(processed['cleaned_text'])
return processed
def batch_process_texts(self, texts: List[str], chunk_size: int = 4096,
overlap: int = 256) -> List[Dict[str, Any]]:
"""Process multiple texts in batch"""
results = []
for text in texts:
cleaned = self.clean_text(text)
chunks = self.chunk_text(cleaned, chunk_size, overlap)
metadata = self.extract_metadata(cleaned)
stats = self.get_chunk_statistics(chunks)
result = {
'original_text': text,
'cleaned_text': cleaned,
'chunks': chunks,
'metadata': metadata,
'processing_stats': stats
}
results.append(result)
return results
def validate_text_quality(self, text: str) -> Dict[str, Any]:
"""Validate and assess text quality for processing"""
quality = {
'is_valid': True,
'issues': [],
'score': 100,
'metrics': {}
}
# Check minimum length
if len(text.strip()) < 10:
quality['issues'].append("Text too short")
quality['score'] -= 50
# Check for excessive special characters
special_chars = len(re.findall(r'[^\w\s]', text))
special_ratio = special_chars / len(text) if text else 0
if special_ratio > 0.3:
quality['issues'].append("High special character ratio")
quality['score'] -= 20
# Check for legal content indicators
legal_indicators = ['section', 'act', 'law', 'regulation', 'clause', 'subsection']
has_legal_content = any(indicator in text.lower() for indicator in legal_indicators)
if not has_legal_content:
quality['issues'].append("May not be legal content")
quality['score'] -= 30
quality['is_valid'] = len(quality['issues']) == 0
quality['metrics'] = {
'length': len(text),
'word_count': len(text.split()),
'special_char_ratio': special_ratio,
'has_legal_content': has_legal_content
}
return quality