| |
| """ |
| NZ Legislation Loophole Analysis Dataset Creation Tool |
| |
| This script processes New Zealand legislation text to create a finetuning dataset for AI models |
| that can identify potential loopholes, ambiguities, and unintended consequences in legal text. |
| |
| The script: |
| 1. Loads and cleans NZ legislation text, preserving legal structure and terminology |
| 2. Chunks the text into manageable sections with overlap for context |
| 3. Uses an LLM to analyze each chunk for legal issues |
| 4. Generates a structured dataset for training AI models on legal loophole detection |
| |
| Usage: |
| python trl.py |
| |
| Requirements: |
| - llama-cpp-python with GGUF model support |
| - psutil for memory monitoring |
| - Input file: nz-legislation.txt containing NZ legislation in JSON lines format |
| |
| Output: |
| - JSON dataset saved to nz_legislation_dataset/nz_legislation_loophole_dataset.json |
| """ |
|
|
| import os |
| import json |
| import time |
| import psutil |
| from typing import List, Dict, Any |
| import numpy as np |
| from llama_cpp import Llama |
| import re |
|
|
| |
| class ProgressManager: |
| """Simple placeholder for progress tracking""" |
| def __init__(self): |
| pass |
|
|
| def show_memory_usage(label: str): |
| """Simple memory usage display""" |
| process = psutil.Process(os.getpid()) |
| memory_mb = process.memory_info().rss / 1024 / 1024 |
| print(f"{label}: {memory_mb:.2f} MB") |
|
|
| |
| INPUT_FILE = "nz-legislation.txt" |
| OUTPUT_DIR = "nz_legislation_dataset" |
| CHUNK_SIZE = 4096 |
| CHUNK_OVERLAP = 256 |
| BATCH_SIZE = 16 |
| MODEL_PATH = "qwen3.gguf" |
| MAX_TOKENS = 4096 |
|
|
| |
| os.makedirs(OUTPUT_DIR, exist_ok=True) |
|
|
| def load_model(progress_manager: ProgressManager = None): |
| """Load the LLM model for text generation with progress tracking""" |
| if progress_manager is None: |
| progress_manager = ProgressManager() |
|
|
| print("Loading LLM model...") |
| show_memory_usage("Initial memory usage") |
|
|
| start_time = time.time() |
| try: |
| llm = Llama.from_pretrained( |
| repo_id="DavidAU/Qwen3-Zero-Coder-Reasoning-0.8B-NEO-EX-GGUF", |
| filename="Qwen3-Zero-Coder-Reasoning-0.8B-NEO-EX-D_AU-IQ4_XS-imat.gguf", |
| n_ctx=40960, |
| n_threads=8, |
| verbose=False, |
| n_gpu_layers=-1, |
| n_batch=4096, |
| logits_all=False, |
| use_mlock=True, |
| use_mmap=True, |
| ) |
| except Exception as e: |
| print(f"Error loading model: {e}") |
| print("Trying with basic configuration...") |
| |
| model = Llama( |
| model_path=MODEL_PATH, |
| n_ctx=40960, |
| n_threads=8, |
| verbose=False, |
| n_gpu_layers=-1, |
| n_batch=4096 |
| ) |
|
|
| load_time = time.time() - start_time |
| print(f"LLM model loaded in {load_time:.2f}s") |
| show_memory_usage("Memory after model load") |
|
|
| return model |
|
|
| def clean_text(text: str) -> str: |
| """Clean and normalize text for better embedding quality, optimized for legal/legislative content""" |
| import re |
| |
| |
| |
| text = re.sub(r'^(\d+:)', r'\1', text, flags=re.MULTILINE) |
| |
| |
| text = re.sub(r'[ \t]+', ' ', text) |
| text = re.sub(r'\n\s*\n', '\n\n', text) |
| text = re.sub(r'\n{3,}', '\n\n', text) |
| |
| |
| text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]', '', text) |
| |
| |
| |
| allowed_chars = r'\w\s\.\,\!\?\;\:\-\(\)\[\]\{\}\"\'\/\@\#\$\%\^\&\*\+\=\~\`°§' |
| text = re.sub(r'[^' + allowed_chars + ']', '', text) |
| |
| |
| text = re.sub(r'[""]', '"', text) |
| text = re.sub(r"['']", "'", text) |
| text = re.sub(r'`', "'", text) |
| |
| |
| |
| text = re.sub(r'section\s+(\d+)', r'section \1', text, flags=re.IGNORECASE) |
| text = re.sub(r'(\d+)\s*[Jj]anuary', r'\1 January', text) |
| text = re.sub(r'(\d+)\s*[Jj]uly', r'\1 July', text) |
| text = re.sub(r'(\d+)\s*[Aa]pril', r'\1 April', text) |
| text = re.sub(r'(\d+)\s*[Ff]ebruary', r'\1 February', text) |
| text = re.sub(r'(\d+)\s*[Dd]ecember', r'\1 December', text) |
| text = re.sub(r'(\d+)\s*[Aa]ugust', r'\1 August', text) |
| text = re.sub(r'(\d+)\s*[Mm]arch', r'\1 March', text) |
| text = re.sub(r'(\d+)\s*[Mm]ay', r'\1 May', text) |
| text = re.sub(r'(\d+)\s*[Jj]une', r'\1 June', text) |
| text = re.sub(r'(\d+)\s*[Ss]eptember', r'\1 September', text) |
| text = re.sub(r'(\d+)\s*[Oo]ctober', r'\1 October', text) |
| text = re.sub(r'(\d+)\s*[Nn]ovember', r'\1 November', text) |
| |
| |
| text = re.sub(r'\s+([\.!\?\,\;\:])', r'\1', text) |
| text = re.sub(r'([\.!\?\,\;\:])\s*', r'\1 ', text) |
| |
| |
| |
| text = re.sub(r'(\b\w+(?:\s+\w+)*)\s+Act\s+(\d{4})', r'\1 Act', text) |
| |
| |
| text = re.sub(r'[Aa]mendment\(s\)\s+incorporated\s+in\s+the\s+[Aa]ct\(s\)', 'Amendments incorporated', text) |
| text = re.sub(r'section\s+\d+\(\d+\)\([a-zA-Z]\)', lambda m: m.group(0).lower(), text) |
| |
| |
| text = re.sub(r'(\b(?:section|part|chapter|article|clause|subsection|paragraph))\s+(\d+[a-zA-Z]*)', |
| lambda m: f"{m.group(1)} {m.group(2)}", text, flags=re.IGNORECASE) |
|
|
| |
| |
| text = re.sub(r'\b[Nn]ew\s+[Zz]ealand\b', 'New Zealand', text) |
| text = re.sub(r'\b[Pp]arliament\b', 'Parliament', text) |
| text = re.sub(r'\b[Cc]rown\b', 'Crown', text) |
| text = re.sub(r'\b[Gg]overnment\b', 'Government', text) |
|
|
| |
| text = re.sub(r'\b[Nn][Zz][Bb]\s+(\d+)', r'NZB \1', text) |
| text = re.sub(r'[Tt]reaty\s+[Oo]f\s+[Ww]aitangi', 'Treaty of Waitangi', text, flags=re.IGNORECASE) |
|
|
| |
| |
| maori_chars = 'ΔΔΔ«ΕΕ«whΔΔΔͺΕΕͺWH' |
| allowed_chars += maori_chars |
| text = re.sub(r'[^' + allowed_chars + ']', '', text) |
| |
| |
| lines = [] |
| for line in text.split('\n'): |
| stripped = line.strip() |
| if stripped: |
| |
| if re.match(r'^\d+:', stripped): |
| lines.append(stripped) |
| else: |
| lines.append(stripped) |
| |
| text = '\n'.join(lines) |
| |
| |
| text = text.strip() |
| |
| return text |
|
|
| |
| REASONING_START = "<start_working_out>" |
| REASONING_END = "<end_working_out>" |
| SOLUTION_START = "<SOLUTION>" |
| SOLUTION_END = "</SOLUTION>" |
|
|
| def create_system_prompt(text: str) -> str: |
| """Create a system prompt for analyzing legislative text for loopholes and ambiguities""" |
| return f"""You are a legal expert analyzing New Zealand legislation for loopholes and ambiguities. |
| |
| LEGISLATION TEXT: |
| {text} |
| |
| TASK: Analyze this legislative text and identify potential loopholes, ambiguities, or unintended consequences. |
| |
| REASONING: Provide a structured analysis in the following format: |
| |
| 1. **Text Meaning**: Explain what the text means and its intended purpose |
| 2. **Key Assumptions**: Identify any assumptions the text makes that could be exploited |
| 3. **Exploitable Interpretations**: Discuss how the text could be interpreted or applied in ways that circumvent its intended purpose |
| 4. **Critical Loopholes**: Identify specific loopholes, ambiguities, or unintended consequences that could be used to bypass the legislation |
| 5. **Circumvention Strategies**: Suggest practical methods or scenarios for exploiting these loopholes to achieve objectives contrary to the legislation's intent |
| |
| Write your complete analysis between {REASONING_START} and {REASONING_END}. |
| |
| Then provide your overall conclusion between {SOLUTION_START} and {SOLUTION_END}. |
| """ |
|
|
| def generate_chat_template(system_prompt: str) -> str: |
| """ |
| Generate a chat template using the GGUF model's native chat format. |
| This uses the proper message structure with BOS/EOS tokens for better model compatibility. |
| """ |
| |
| chat_messages = [] |
|
|
| |
| if system_prompt: |
| chat_messages.append("<|im_start|>system") |
| chat_messages.append(system_prompt) |
| chat_messages.append("<|im_end|>") |
|
|
| |
| chat_messages.append("<|im_start|>user") |
| chat_messages.append("Analyze the given legislative text for loopholes, ambiguities, and unintended consequences. Provide a structured legal analysis following the specified format.") |
| chat_messages.append("<|im_end|>") |
|
|
| |
| chat_messages.append("<|im_start|>assistant") |
| chat_messages.append("") |
|
|
| return "\n".join(chat_messages) |
|
|
| def chunk_text(text: str, chunk_size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> List[str]: |
| """Split text into overlapping chunks for processing""" |
| if len(text) <= chunk_size: |
| return [text] |
|
|
| chunks = [] |
| start = 0 |
| while start < len(text): |
| end = start + chunk_size |
| chunk = text[start:end] |
|
|
| |
| if end < len(text): |
| |
| sentence_end = max( |
| chunk.rfind('. ', max(0, len(chunk) - 100)), |
| chunk.rfind('! ', max(0, len(chunk) - 100)), |
| chunk.rfind('? ', max(0, len(chunk) - 100)) |
| ) |
| if sentence_end != -1: |
| chunk = chunk[:sentence_end + 2] |
|
|
| chunks.append(chunk) |
| start = end - overlap if end < len(text) else len(text) |
|
|
| return chunks |
|
|
| def generate_response(model, prompt: str, max_tokens: int = MAX_TOKENS) -> str: |
| """ |
| Generate a response from the model for a given prompt with optimized parameters for legal analysis. |
| |
| Parameter Explanations: |
| - temperature=0.3: Balanced creativity for legal analysis (not too random, not too deterministic) |
| - top_p=0.85: Nucleus sampling - considers top 85% probability mass for coherent legal text |
| - top_k=50: Top-k sampling - considers top 50 tokens for better legal terminology selection |
| - min_p=0.05: Minimum probability threshold to avoid low-quality tokens |
| |
| Anti-Repetition Parameters: |
| - repetition_penalty=1.15: Penalizes repetition of phrases (15% penalty) |
| - presence_penalty=0.1: Encourages topic diversity across the response |
| - frequency_penalty=0.1: Reduces overuse of frequent tokens |
| |
| Advanced Sampling: |
| - typical_p=0.95: Focuses on typical token probabilities for legal text patterns |
| - tfs_z=0.95: Tail-free sampling for more natural legal reasoning |
| - mirostat_mode=2: Mirostat v2 for perplexity-controlled generation |
| - mirostat_tau=4.0: Target entropy level for legal analysis |
| - mirostat_eta=0.15: Learning rate for perplexity adaptation |
| """ |
| try: |
| response = model( |
| prompt, |
| max_tokens=max_tokens, |
| |
| temperature=0.3, |
| top_p=0.85, |
| top_k=50, |
| min_p=0.05, |
|
|
| |
| repeat_penalty=1.15, |
| presence_penalty=0.1, |
| frequency_penalty=0.1, |
|
|
| |
| typical_p=0.95, |
| tfs_z=0.95, |
| mirostat_mode=2, |
| mirostat_tau=4.0, |
| mirostat_eta=0.15, |
|
|
| |
| stop=[SOLUTION_END, "</SOLUTION>", "<end_working_out>"] |
| ) |
| return response['choices'][0]['text'].strip() |
| except Exception as e: |
| print(f"Error generating response: {e}") |
| |
| try: |
| response = model( |
| prompt, |
| max_tokens=max_tokens, |
| temperature=0.3, |
| top_p=0.85, |
| top_k=50, |
| repeat_penalty=1.15, |
| stop=[SOLUTION_END, "</SOLUTION>"] |
| ) |
| return response['choices'][0]['text'].strip() |
| except Exception as e2: |
| print(f"Fallback also failed: {e2}") |
| return "" |
|
|
| def parse_legislation_json(file_path: str) -> List[Dict[str, Any]]: |
| """Parse the JSON lines format of NZ legislation dataset""" |
| legislation_entries = [] |
|
|
| try: |
| with open(file_path, 'r', encoding='utf-8') as f: |
| for line_num, line in enumerate(f, 1): |
| line = line.strip() |
| if line: |
| try: |
| entry = json.loads(line) |
| if 'id' in entry and 'text' in entry: |
| legislation_entries.append(entry) |
| else: |
| print(f"Warning: Line {line_num} missing required fields, skipping") |
| except json.JSONDecodeError as e: |
| print(f"Warning: Could not parse line {line_num}: {e}") |
| continue |
| except Exception as e: |
| print(f"Error reading legislation file: {e}") |
| return [] |
|
|
| print(f"Successfully parsed {len(legislation_entries)} legislation entries") |
| return legislation_entries |
|
|
| def create_finetuning_dataset(input_file: str, model, output_file: str = None) -> List[Dict[str, Any]]: |
| """Create a finetuning dataset by processing NZ legislation JSON dataset with incremental saving""" |
| if output_file is None: |
| output_file = os.path.join(OUTPUT_DIR, "nz_legislation_loophole_dataset.json") |
|
|
| |
| temp_file = output_file.replace('.json', '_temp.jsonl') |
| backup_file = output_file.replace('.json', '_backup.json') |
|
|
| print(f"Parsing legislation dataset from {input_file}") |
| legislation_entries = parse_legislation_json(input_file) |
|
|
| if not legislation_entries: |
| print("No legislation entries found to process") |
| return [] |
|
|
| dataset = [] |
| total_entries = len(legislation_entries) |
| saved_count = 0 |
|
|
| print(f"Processing {total_entries} legislation entries...") |
| print(f"Dataset will be saved incrementally to: {temp_file}") |
|
|
| try: |
| |
| with open(temp_file, 'w', encoding='utf-8') as temp_f: |
| for entry_num, entry in enumerate(legislation_entries, 1): |
| legislation_id = entry.get('id', f'entry_{entry_num}') |
| title = entry.get('title', 'Unknown Title') |
| year = entry.get('year', 'Unknown Year') |
| raw_text = entry.get('text', '') |
|
|
| print(f"\nProcessing entry {entry_num}/{total_entries}: {title} ({year}) - ID: {legislation_id}") |
|
|
| |
| cleaned_text = clean_text(raw_text) |
|
|
| |
| chunks = chunk_text(cleaned_text) |
|
|
| print(f" - Text length: {len(raw_text)} characters") |
| print(f" - Number of chunks: {len(chunks)}") |
|
|
| |
| for chunk_id, chunk in enumerate(chunks): |
| |
| system_prompt = create_system_prompt(chunk) |
| full_prompt = generate_chat_template(system_prompt) |
|
|
| |
| response = generate_response(model, full_prompt) |
|
|
| |
| print(f"\nπ **Generated Analysis for {title} (Chunk {chunk_id + 1}/{len(chunks)})**:") |
| print(f" Response length: {len(response)} characters") |
|
|
| |
| preview = response.replace('\n', ' ').strip() |
| print(f" Preview: {preview}") |
|
|
| |
| has_reasoning = '<start_working_out>' in response or 'reasoning' in response.lower() |
| has_loopholes = 'loophole' in response.lower() or 'ambiguity' in response.lower() or 'issue' in response.lower() |
| has_recommendations = 'recommend' in response.lower() or 'suggest' in response.lower() |
|
|
| print(f" Analysis quality: {'β
' if has_reasoning else 'β'} Reasoning | {'β
' if has_loopholes else 'β'} Loopholes | {'β
' if has_recommendations else 'β'} Recommendations") |
|
|
| |
| dataset_entry = { |
| "prompt": full_prompt, |
| "response": response, |
| "legislation_id": legislation_id, |
| "title": title, |
| "year": year, |
| "chunk_id": chunk_id, |
| "total_chunks": len(chunks), |
| "text_length": len(chunk), |
| "original_text_length": len(raw_text) |
| } |
|
|
| |
| json.dump(dataset_entry, temp_f, ensure_ascii=False) |
| temp_f.write('\n') |
| temp_f.flush() |
|
|
| dataset.append(dataset_entry) |
| saved_count += 1 |
|
|
| |
| if saved_count % 10 == 0: |
| print(f" β Saved {saved_count} entries so far...") |
|
|
| print(f"\nβ All entries processed and saved to temporary file") |
| print(f"β Total entries saved: {saved_count}") |
|
|
| |
| if os.path.exists(output_file): |
| print(f"Creating backup of existing dataset...") |
| os.rename(output_file, backup_file) |
|
|
| |
| print(f"Converting to final JSON format...") |
| with open(temp_file, 'r', encoding='utf-8') as temp_f: |
| lines = temp_f.readlines() |
|
|
| final_dataset = [] |
| for line in lines: |
| if line.strip(): |
| final_dataset.append(json.loads(line)) |
|
|
| |
| with open(output_file, 'w', encoding='utf-8') as f: |
| json.dump(final_dataset, f, indent=2, ensure_ascii=False) |
|
|
| print(f"β Final dataset saved to: {output_file}") |
|
|
| |
| if os.path.exists(temp_file): |
| os.remove(temp_file) |
| print(f"β Temporary file cleaned up") |
|
|
| |
| if os.path.exists(backup_file): |
| os.remove(backup_file) |
| print(f"β Backup file cleaned up") |
|
|
| print(f"\nπ Dataset creation complete!") |
| print(f" β’ Processed {total_entries} legislation documents") |
| print(f" β’ Generated {len(final_dataset)} analysis entries") |
| print(f" β’ Total chunks processed: {sum(entry.get('total_chunks', 1) for entry in final_dataset[:total_entries])}") |
|
|
| return final_dataset |
|
|
| except KeyboardInterrupt: |
| print(f"\nβ οΈ Process interrupted by user") |
| print(f" β’ Partial dataset saved to: {temp_file}") |
| print(f" β’ {saved_count} entries saved so far") |
| print(f" β’ You can resume processing or use the temporary file") |
| raise |
|
|
| except Exception as e: |
| print(f"\nβ Error during processing: {e}") |
| print(f" β’ Partial dataset saved to: {temp_file}") |
| print(f" β’ {saved_count} entries saved so far") |
| if os.path.exists(backup_file): |
| print(f" β’ Original dataset restored from backup") |
| os.rename(backup_file, output_file) |
| raise |
|
|
| def main(): |
| """Main execution function""" |
| print("Starting NZ Legislation Loophole Analysis Dataset Creation") |
| print("=" * 60) |
|
|
| |
| model = load_model() |
|
|
| |
| dataset = create_finetuning_dataset(INPUT_FILE, model) |
|
|
| |
| if hasattr(model, 'close'): |
| model.close() |
|
|
| print("\nDataset creation completed successfully!") |
| print(f"Output saved to: {os.path.join(OUTPUT_DIR, 'nz_legislation_loophole_dataset.json')}") |
|
|
| if __name__ == "__main__": |
| main() |
|
|