#!/usr/bin/env python3 """ Text Processor Handles text cleaning, chunking, and preprocessing for the NZ Legislation Loophole Analysis. Optimized for legal/legislative content with specialized cleaning and structuring. """ import re from typing import List, Dict, Any, Optional, Tuple import hashlib import json class TextProcessor: """Advanced text processing for legislation analysis""" def __init__(self): """Initialize the text processor with legal-specific patterns""" # Legal-specific patterns self.section_pattern = re.compile(r'^(\d+):', re.MULTILINE) self.act_name_pattern = re.compile(r'(\b\w+(?:\s+\w+)*)\s+Act\s+(\d{4})', re.IGNORECASE) self.date_patterns = [ (r'(\d{1,2})\s*(?:January|February|March|April|May|June|July|August|September|October|November|December)\s*(\d{4})', lambda m: f"{m.group(1)} {m.group(2)}"), (r'(\d{1,2})/(\d{1,2})/(\d{4})', r'\1/\2/\3'), (r'(\d{4})-(\d{2})-(\d{2})', r'\1-\2-\3') ] # NZ-specific legal terms self.nz_terms = { 'New Zealand': 'New Zealand', 'Parliament': 'Parliament', 'Crown': 'Crown', 'Government': 'Government', 'Treaty of Waitangi': 'Treaty of Waitangi', 'NZB': 'NZB', 'Her Majesty': 'Her Majesty', 'Governor-General': 'Governor-General' } def clean_text(self, text: str, preserve_structure: bool = True) -> str: """ Clean and normalize text for better processing, optimized for legal content Args: text: Raw text to clean preserve_structure: Whether to preserve legal document structure Returns: Cleaned text """ if not text: return "" # Preserve section numbers and legal structure if requested if preserve_structure: # Keep section numbers like "1:", "2:", etc. text = self.section_pattern.sub(r'\1', text) # Remove excessive whitespace but preserve paragraph structure text = re.sub(r'[ \t]+', ' ', text) # Replace multiple spaces/tabs with single space text = re.sub(r'\n\s*\n', '\n\n', text) # Preserve paragraph breaks but clean up text = re.sub(r'\n{3,}', '\n\n', text) # Reduce excessive newlines to double # Remove control characters but preserve legal formatting text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]', '', text) # Handle legal-specific characters and formatting allowed_chars = r'\w\s\.\,\!\?\;\:\-\(\)\[\]\{\}\"\'\/\@\#\$\%\^\&\*\+\=\~\`°§' text = re.sub(r'[^' + allowed_chars + ']', '', text) # Normalize quotes and apostrophes for legal text text = re.sub(r'[""]', '"', text) text = re.sub(r"['']", "'", text) text = re.sub(r'`', "'", text) # Clean up legal numbering and references text = re.sub(r'section\s+(\d+)', r'section \1', text, flags=re.IGNORECASE) # Normalize date formats for pattern, replacement in self.date_patterns: if callable(replacement): text = re.compile(pattern, re.IGNORECASE).sub(replacement, text) else: text = re.compile(pattern, re.IGNORECASE).sub(replacement, text) # Normalize act names with years text = self.act_name_pattern.sub(r'\1 Act', text) # Clean up amendment references text = re.sub(r'[Aa]mendment\(s\)\s+incorporated\s+in\s+the\s+[Aa]ct\(s\)', 'Amendments incorporated', text) # Normalize section references text = re.sub(r'section\s+\d+\(\d+\)\([a-zA-Z]\)', lambda m: m.group(0).lower(), text) # Generic pattern for legal document sections text = re.sub(r'(\b(?:section|part|chapter|article|clause|subsection|paragraph))\s+(\d+[a-zA-Z]*)', lambda m: f"{m.group(1)} {m.group(2)}", text, flags=re.IGNORECASE) # NZ-specific legal enhancements for term, normalized in self.nz_terms.items(): text = re.sub(re.escape(term), normalized, text, flags=re.IGNORECASE) # Handle Maori-specific characters if present maori_chars = 'āēīōūwhĀĒĪŌŪWH' allowed_chars += maori_chars text = re.sub(r'[^' + allowed_chars + ']', '', text) # Remove empty lines and trim while preserving legal structure lines = [] for line in text.split('\n'): stripped = line.strip() if stripped: # Keep non-empty lines if preserve_structure and re.match(r'^\d+:', stripped): lines.append(stripped) # Preserve section headers else: lines.append(stripped) text = '\n'.join(lines) return text.strip() def chunk_text(self, text: str, chunk_size: int = 4096, overlap: int = 256, method: str = "sentence") -> List[str]: """ Split text into overlapping chunks for processing Args: text: Text to chunk chunk_size: Size of each chunk overlap: Overlap between chunks method: Chunking method ('sentence', 'word', 'character') Returns: List of text chunks """ if not text or len(text) <= chunk_size: return [text] if text else [] chunks = [] if method == "sentence": chunks = self._chunk_by_sentence(text, chunk_size, overlap) elif method == "word": chunks = self._chunk_by_word(text, chunk_size, overlap) else: # character chunks = self._chunk_by_character(text, chunk_size, overlap) return chunks def _chunk_by_sentence(self, text: str, chunk_size: int, overlap: int) -> List[str]: """Chunk text by sentence boundaries""" # Split into sentences (rough approximation) sentence_pattern = r'(?<=[.!?])\s+' sentences = re.split(sentence_pattern, text) chunks = [] current_chunk = "" overlap_text = "" for sentence in sentences: if not sentence.strip(): continue # Check if adding this sentence would exceed chunk size potential_chunk = current_chunk + sentence + " " if len(potential_chunk) > chunk_size and current_chunk: # Save current chunk chunks.append(current_chunk.strip()) # Start new chunk with overlap if overlap > 0 and len(current_chunk) > overlap: overlap_text = current_chunk[-overlap:].strip() current_chunk = overlap_text + " " + sentence + " " else: current_chunk = sentence + " " else: current_chunk = potential_chunk # Add the last chunk if current_chunk.strip(): chunks.append(current_chunk.strip()) return chunks def _chunk_by_word(self, text: str, chunk_size: int, overlap: int) -> List[str]: """Chunk text by word boundaries""" words = text.split() chunks = [] if not words: return [] start = 0 while start < len(words): end = start + 1 chunk_words = [] # Build chunk up to chunk_size while end <= len(words): potential_chunk = " ".join(words[start:end]) if len(potential_chunk) > chunk_size: break chunk_words = words[start:end] end += 1 if chunk_words: chunk = " ".join(chunk_words) chunks.append(chunk) # Move start position with overlap overlap_words = max(0, min(overlap // 5, len(chunk_words))) # Rough word overlap start = max(start + 1, end - overlap_words) else: break return chunks def _chunk_by_character(self, text: str, chunk_size: int, overlap: int) -> List[str]: """Chunk text by character count (simple fallback)""" chunks = [] start = 0 while start < len(text): end = min(start + chunk_size, len(text)) chunk = text[start:end] chunks.append(chunk) # Move start with overlap start = end - overlap if end < len(text) else len(text) return chunks def extract_metadata(self, text: str) -> Dict[str, Any]: """Extract metadata from legislation text""" metadata = { 'sections': [], 'acts_referenced': [], 'dates': [], 'word_count': len(text.split()), 'character_count': len(text), 'has_nz_references': False, 'has_maori_terms': False } # Extract section numbers sections = self.section_pattern.findall(text) metadata['sections'] = [int(s) for s in sections] # Extract referenced acts acts = self.act_name_pattern.findall(text) metadata['acts_referenced'] = [f"{act[0]} Act" for act in acts] # Check for NZ-specific references nz_indicators = ['New Zealand', 'Parliament', 'Crown', 'Government', 'Treaty of Waitangi'] metadata['has_nz_references'] = any(term in text for term in nz_indicators) # Check for Maori terms maori_indicators = ['ā', 'ē', 'ī', 'ō', 'ū', 'whakapapa', 'tangata whenua', 'mana'] metadata['has_maori_terms'] = any(term in text.lower() for term in maori_indicators) # Extract dates (basic) date_pattern = r'\b\d{1,2}[/-]\d{1,2}[/-]\d{4}\b' dates = re.findall(date_pattern, text) metadata['dates'] = dates return metadata def calculate_text_hash(self, text: str) -> str: """Calculate SHA-256 hash of text for caching""" return hashlib.sha256(text.encode('utf-8')).hexdigest() def get_chunk_statistics(self, chunks: List[str]) -> Dict[str, Any]: """Get statistics about text chunks""" if not chunks: return { 'total_chunks': 0, 'avg_chunk_size': 0, 'min_chunk_size': 0, 'max_chunk_size': 0, 'total_characters': 0 } chunk_sizes = [len(chunk) for chunk in chunks] return { 'total_chunks': len(chunks), 'avg_chunk_size': sum(chunk_sizes) / len(chunks), 'min_chunk_size': min(chunk_sizes), 'max_chunk_size': max(chunk_sizes), 'total_characters': sum(chunk_sizes) } def preprocess_legislation_json(self, json_data: Dict[str, Any]) -> Dict[str, Any]: """Preprocess legislation data from JSON format""" processed = { 'id': json_data.get('id', ''), 'title': json_data.get('title', ''), 'year': json_data.get('year', ''), 'source': json_data.get('source', ''), 'original_text': json_data.get('text', ''), 'cleaned_text': '', 'chunks': [], 'metadata': {}, 'processing_stats': {} } # Clean the text raw_text = json_data.get('text', '') processed['cleaned_text'] = self.clean_text(raw_text) # Extract metadata processed['metadata'] = self.extract_metadata(processed['cleaned_text']) return processed def batch_process_texts(self, texts: List[str], chunk_size: int = 4096, overlap: int = 256) -> List[Dict[str, Any]]: """Process multiple texts in batch""" results = [] for text in texts: cleaned = self.clean_text(text) chunks = self.chunk_text(cleaned, chunk_size, overlap) metadata = self.extract_metadata(cleaned) stats = self.get_chunk_statistics(chunks) result = { 'original_text': text, 'cleaned_text': cleaned, 'chunks': chunks, 'metadata': metadata, 'processing_stats': stats } results.append(result) return results def validate_text_quality(self, text: str) -> Dict[str, Any]: """Validate and assess text quality for processing""" quality = { 'is_valid': True, 'issues': [], 'score': 100, 'metrics': {} } # Check minimum length if len(text.strip()) < 10: quality['issues'].append("Text too short") quality['score'] -= 50 # Check for excessive special characters special_chars = len(re.findall(r'[^\w\s]', text)) special_ratio = special_chars / len(text) if text else 0 if special_ratio > 0.3: quality['issues'].append("High special character ratio") quality['score'] -= 20 # Check for legal content indicators legal_indicators = ['section', 'act', 'law', 'regulation', 'clause', 'subsection'] has_legal_content = any(indicator in text.lower() for indicator in legal_indicators) if not has_legal_content: quality['issues'].append("May not be legal content") quality['score'] -= 30 quality['is_valid'] = len(quality['issues']) == 0 quality['metrics'] = { 'length': len(text), 'word_count': len(text.split()), 'special_char_ratio': special_ratio, 'has_legal_content': has_legal_content } return quality