| |
| """ |
| Text Processor |
| |
| Handles text cleaning, chunking, and preprocessing for the NZ Legislation Loophole Analysis. |
| Optimized for legal/legislative content with specialized cleaning and structuring. |
| """ |
|
|
| import re |
| from typing import List, Dict, Any, Optional, Tuple |
| import hashlib |
| import json |
|
|
| class TextProcessor: |
| """Advanced text processing for legislation analysis""" |
|
|
| def __init__(self): |
| """Initialize the text processor with legal-specific patterns""" |
| |
| self.section_pattern = re.compile(r'^(\d+):', re.MULTILINE) |
| self.act_name_pattern = re.compile(r'(\b\w+(?:\s+\w+)*)\s+Act\s+(\d{4})', re.IGNORECASE) |
| self.date_patterns = [ |
| (r'(\d{1,2})\s*(?:January|February|March|April|May|June|July|August|September|October|November|December)\s*(\d{4})', |
| lambda m: f"{m.group(1)} {m.group(2)}"), |
| (r'(\d{1,2})/(\d{1,2})/(\d{4})', r'\1/\2/\3'), |
| (r'(\d{4})-(\d{2})-(\d{2})', r'\1-\2-\3') |
| ] |
|
|
| |
| self.nz_terms = { |
| 'New Zealand': 'New Zealand', |
| 'Parliament': 'Parliament', |
| 'Crown': 'Crown', |
| 'Government': 'Government', |
| 'Treaty of Waitangi': 'Treaty of Waitangi', |
| 'NZB': 'NZB', |
| 'Her Majesty': 'Her Majesty', |
| 'Governor-General': 'Governor-General' |
| } |
|
|
| def clean_text(self, text: str, preserve_structure: bool = True) -> str: |
| """ |
| Clean and normalize text for better processing, optimized for legal content |
| |
| Args: |
| text: Raw text to clean |
| preserve_structure: Whether to preserve legal document structure |
| |
| Returns: |
| Cleaned text |
| """ |
| if not text: |
| return "" |
|
|
| |
| if preserve_structure: |
| |
| text = self.section_pattern.sub(r'\1', text) |
|
|
| |
| text = re.sub(r'[ \t]+', ' ', text) |
| text = re.sub(r'\n\s*\n', '\n\n', text) |
| text = re.sub(r'\n{3,}', '\n\n', text) |
|
|
| |
| text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]', '', text) |
|
|
| |
| allowed_chars = r'\w\s\.\,\!\?\;\:\-\(\)\[\]\{\}\"\'\/\@\#\$\%\^\&\*\+\=\~\`°§' |
| text = re.sub(r'[^' + allowed_chars + ']', '', text) |
|
|
| |
| text = re.sub(r'[""]', '"', text) |
| text = re.sub(r"['']", "'", text) |
| text = re.sub(r'`', "'", text) |
|
|
| |
| text = re.sub(r'section\s+(\d+)', r'section \1', text, flags=re.IGNORECASE) |
|
|
| |
| for pattern, replacement in self.date_patterns: |
| if callable(replacement): |
| text = re.compile(pattern, re.IGNORECASE).sub(replacement, text) |
| else: |
| text = re.compile(pattern, re.IGNORECASE).sub(replacement, text) |
|
|
| |
| text = self.act_name_pattern.sub(r'\1 Act', text) |
|
|
| |
| text = re.sub(r'[Aa]mendment\(s\)\s+incorporated\s+in\s+the\s+[Aa]ct\(s\)', 'Amendments incorporated', text) |
|
|
| |
| text = re.sub(r'section\s+\d+\(\d+\)\([a-zA-Z]\)', lambda m: m.group(0).lower(), text) |
|
|
| |
| text = re.sub(r'(\b(?:section|part|chapter|article|clause|subsection|paragraph))\s+(\d+[a-zA-Z]*)', |
| lambda m: f"{m.group(1)} {m.group(2)}", text, flags=re.IGNORECASE) |
|
|
| |
| for term, normalized in self.nz_terms.items(): |
| text = re.sub(re.escape(term), normalized, text, flags=re.IGNORECASE) |
|
|
| |
| maori_chars = 'āēīōūwhĀĒĪŌŪWH' |
| allowed_chars += maori_chars |
| text = re.sub(r'[^' + allowed_chars + ']', '', text) |
|
|
| |
| lines = [] |
| for line in text.split('\n'): |
| stripped = line.strip() |
| if stripped: |
| if preserve_structure and re.match(r'^\d+:', stripped): |
| lines.append(stripped) |
| else: |
| lines.append(stripped) |
|
|
| text = '\n'.join(lines) |
|
|
| return text.strip() |
|
|
| def chunk_text(self, text: str, chunk_size: int = 4096, overlap: int = 256, |
| method: str = "sentence") -> List[str]: |
| """ |
| Split text into overlapping chunks for processing |
| |
| Args: |
| text: Text to chunk |
| chunk_size: Size of each chunk |
| overlap: Overlap between chunks |
| method: Chunking method ('sentence', 'word', 'character') |
| |
| Returns: |
| List of text chunks |
| """ |
| if not text or len(text) <= chunk_size: |
| return [text] if text else [] |
|
|
| chunks = [] |
|
|
| if method == "sentence": |
| chunks = self._chunk_by_sentence(text, chunk_size, overlap) |
| elif method == "word": |
| chunks = self._chunk_by_word(text, chunk_size, overlap) |
| else: |
| chunks = self._chunk_by_character(text, chunk_size, overlap) |
|
|
| return chunks |
|
|
| def _chunk_by_sentence(self, text: str, chunk_size: int, overlap: int) -> List[str]: |
| """Chunk text by sentence boundaries""" |
| |
| sentence_pattern = r'(?<=[.!?])\s+' |
| sentences = re.split(sentence_pattern, text) |
|
|
| chunks = [] |
| current_chunk = "" |
| overlap_text = "" |
|
|
| for sentence in sentences: |
| if not sentence.strip(): |
| continue |
|
|
| |
| potential_chunk = current_chunk + sentence + " " |
|
|
| if len(potential_chunk) > chunk_size and current_chunk: |
| |
| chunks.append(current_chunk.strip()) |
|
|
| |
| if overlap > 0 and len(current_chunk) > overlap: |
| overlap_text = current_chunk[-overlap:].strip() |
| current_chunk = overlap_text + " " + sentence + " " |
| else: |
| current_chunk = sentence + " " |
| else: |
| current_chunk = potential_chunk |
|
|
| |
| if current_chunk.strip(): |
| chunks.append(current_chunk.strip()) |
|
|
| return chunks |
|
|
| def _chunk_by_word(self, text: str, chunk_size: int, overlap: int) -> List[str]: |
| """Chunk text by word boundaries""" |
| words = text.split() |
| chunks = [] |
|
|
| if not words: |
| return [] |
|
|
| start = 0 |
| while start < len(words): |
| end = start + 1 |
| chunk_words = [] |
|
|
| |
| while end <= len(words): |
| potential_chunk = " ".join(words[start:end]) |
| if len(potential_chunk) > chunk_size: |
| break |
| chunk_words = words[start:end] |
| end += 1 |
|
|
| if chunk_words: |
| chunk = " ".join(chunk_words) |
| chunks.append(chunk) |
|
|
| |
| overlap_words = max(0, min(overlap // 5, len(chunk_words))) |
| start = max(start + 1, end - overlap_words) |
| else: |
| break |
|
|
| return chunks |
|
|
| def _chunk_by_character(self, text: str, chunk_size: int, overlap: int) -> List[str]: |
| """Chunk text by character count (simple fallback)""" |
| chunks = [] |
| start = 0 |
|
|
| while start < len(text): |
| end = min(start + chunk_size, len(text)) |
| chunk = text[start:end] |
| chunks.append(chunk) |
|
|
| |
| start = end - overlap if end < len(text) else len(text) |
|
|
| return chunks |
|
|
| def extract_metadata(self, text: str) -> Dict[str, Any]: |
| """Extract metadata from legislation text""" |
| metadata = { |
| 'sections': [], |
| 'acts_referenced': [], |
| 'dates': [], |
| 'word_count': len(text.split()), |
| 'character_count': len(text), |
| 'has_nz_references': False, |
| 'has_maori_terms': False |
| } |
|
|
| |
| sections = self.section_pattern.findall(text) |
| metadata['sections'] = [int(s) for s in sections] |
|
|
| |
| acts = self.act_name_pattern.findall(text) |
| metadata['acts_referenced'] = [f"{act[0]} Act" for act in acts] |
|
|
| |
| nz_indicators = ['New Zealand', 'Parliament', 'Crown', 'Government', 'Treaty of Waitangi'] |
| metadata['has_nz_references'] = any(term in text for term in nz_indicators) |
|
|
| |
| maori_indicators = ['ā', 'ē', 'ī', 'ō', 'ū', 'whakapapa', 'tangata whenua', 'mana'] |
| metadata['has_maori_terms'] = any(term in text.lower() for term in maori_indicators) |
|
|
| |
| date_pattern = r'\b\d{1,2}[/-]\d{1,2}[/-]\d{4}\b' |
| dates = re.findall(date_pattern, text) |
| metadata['dates'] = dates |
|
|
| return metadata |
|
|
| def calculate_text_hash(self, text: str) -> str: |
| """Calculate SHA-256 hash of text for caching""" |
| return hashlib.sha256(text.encode('utf-8')).hexdigest() |
|
|
| def get_chunk_statistics(self, chunks: List[str]) -> Dict[str, Any]: |
| """Get statistics about text chunks""" |
| if not chunks: |
| return { |
| 'total_chunks': 0, |
| 'avg_chunk_size': 0, |
| 'min_chunk_size': 0, |
| 'max_chunk_size': 0, |
| 'total_characters': 0 |
| } |
|
|
| chunk_sizes = [len(chunk) for chunk in chunks] |
|
|
| return { |
| 'total_chunks': len(chunks), |
| 'avg_chunk_size': sum(chunk_sizes) / len(chunks), |
| 'min_chunk_size': min(chunk_sizes), |
| 'max_chunk_size': max(chunk_sizes), |
| 'total_characters': sum(chunk_sizes) |
| } |
|
|
| def preprocess_legislation_json(self, json_data: Dict[str, Any]) -> Dict[str, Any]: |
| """Preprocess legislation data from JSON format""" |
| processed = { |
| 'id': json_data.get('id', ''), |
| 'title': json_data.get('title', ''), |
| 'year': json_data.get('year', ''), |
| 'source': json_data.get('source', ''), |
| 'original_text': json_data.get('text', ''), |
| 'cleaned_text': '', |
| 'chunks': [], |
| 'metadata': {}, |
| 'processing_stats': {} |
| } |
|
|
| |
| raw_text = json_data.get('text', '') |
| processed['cleaned_text'] = self.clean_text(raw_text) |
|
|
| |
| processed['metadata'] = self.extract_metadata(processed['cleaned_text']) |
|
|
| return processed |
|
|
| def batch_process_texts(self, texts: List[str], chunk_size: int = 4096, |
| overlap: int = 256) -> List[Dict[str, Any]]: |
| """Process multiple texts in batch""" |
| results = [] |
|
|
| for text in texts: |
| cleaned = self.clean_text(text) |
| chunks = self.chunk_text(cleaned, chunk_size, overlap) |
| metadata = self.extract_metadata(cleaned) |
| stats = self.get_chunk_statistics(chunks) |
|
|
| result = { |
| 'original_text': text, |
| 'cleaned_text': cleaned, |
| 'chunks': chunks, |
| 'metadata': metadata, |
| 'processing_stats': stats |
| } |
|
|
| results.append(result) |
|
|
| return results |
|
|
| def validate_text_quality(self, text: str) -> Dict[str, Any]: |
| """Validate and assess text quality for processing""" |
| quality = { |
| 'is_valid': True, |
| 'issues': [], |
| 'score': 100, |
| 'metrics': {} |
| } |
|
|
| |
| if len(text.strip()) < 10: |
| quality['issues'].append("Text too short") |
| quality['score'] -= 50 |
|
|
| |
| special_chars = len(re.findall(r'[^\w\s]', text)) |
| special_ratio = special_chars / len(text) if text else 0 |
| if special_ratio > 0.3: |
| quality['issues'].append("High special character ratio") |
| quality['score'] -= 20 |
|
|
| |
| legal_indicators = ['section', 'act', 'law', 'regulation', 'clause', 'subsection'] |
| has_legal_content = any(indicator in text.lower() for indicator in legal_indicators) |
| if not has_legal_content: |
| quality['issues'].append("May not be legal content") |
| quality['score'] -= 30 |
|
|
| quality['is_valid'] = len(quality['issues']) == 0 |
| quality['metrics'] = { |
| 'length': len(text), |
| 'word_count': len(text.split()), |
| 'special_char_ratio': special_ratio, |
| 'has_legal_content': has_legal_content |
| } |
|
|
| return quality |
|
|