#!/usr/bin/env python3 """ LLM Analyzer Handles LLM model loading, inference, and analysis for the NZ Legislation Loophole Analysis. Provides optimized prompts and response parsing for legal text analysis. """ import os import time from typing import List, Dict, Any, Optional, Tuple import json from llama_cpp import Llama import re class LLMAnalyzer: """LLM-based analyzer for legislation loophole detection""" def __init__(self, model_config: Dict[str, Any]): """ Initialize the LLM analyzer Args: model_config: Configuration for the LLM model """ self.model_config = model_config self.model = None self.is_loaded = False # Analysis templates self.analysis_templates = { 'standard': { 'depth': 'Standard', 'include_recommendations': True, 'focus_areas': ['loopholes', 'ambiguities', 'unintended_consequences'] }, 'detailed': { 'depth': 'Detailed', 'include_recommendations': True, 'focus_areas': ['loopholes', 'ambiguities', 'unintended_consequences', 'implementation_issues'] }, 'comprehensive': { 'depth': 'Comprehensive', 'include_recommendations': True, 'focus_areas': ['loopholes', 'ambiguities', 'unintended_consequences', 'implementation_issues', 'policy_conflicts', 'enforcement_challenges'] } } # Prompt templates self.prompt_templates = { 'loophole_analysis': self._get_loophole_analysis_template(), 'ambiguity_detection': self._get_ambiguity_detection_template(), 'recommendations': self._get_recommendations_template() } def _get_loophole_analysis_template(self) -> str: """Get the main loophole analysis prompt template""" return """You are a legal expert analyzing New Zealand legislation for loopholes and ambiguities. LEGISLATION TEXT: {text} TASK: Analyze this legislative text for potential loopholes, ambiguities, or unintended consequences. INSTRUCTIONS: Provide a structured analysis following this format: 1. **Text Meaning**: Explain what the text means and its intended purpose 2. **Key Assumptions**: Identify any assumptions the text makes that could be exploited 3. **Exploitable Interpretations**: Discuss how the text could be interpreted or applied in ways that circumvent its intended purpose 4. **Critical Loopholes**: Identify specific loopholes, ambiguities, or unintended consequences that could be used to bypass the legislation 5. **Circumvention Strategies**: Suggest practical methods or scenarios for exploiting these loopholes to achieve objectives contrary to the legislation's intent {reasoning_format} {recommendations_format} ANALYSIS DEPTH: {depth} FOCUS AREAS: {focus_areas} """ def _get_ambiguity_detection_template(self) -> str: """Get the ambiguity detection prompt template""" return """Analyze the following legal text for ambiguities and unclear provisions: TEXT: {text} Identify: 1. Vague terms or phrases 2. Ambiguous references 3. Unclear conditions or requirements 4. Missing definitions 5. Conflicting provisions Provide specific examples and suggest clarifications. """ def _get_recommendations_template(self) -> str: """Get the recommendations prompt template""" return """Based on the loopholes and ambiguities identified, provide specific recommendations for: 1. Legislative amendments to close identified loopholes 2. Additional definitions or clarifications needed 3. Implementation guidelines or regulations 4. Monitoring and enforcement mechanisms Prioritize recommendations by impact and feasibility. """ def load_model(self) -> bool: """ Load the LLM model Returns: True if model loaded successfully, False otherwise """ if self.is_loaded: return True try: print("Loading LLM model...") # Try to load from HuggingFace if self.model_config.get('repo_id'): self.model = Llama.from_pretrained( repo_id=self.model_config['repo_id'], filename=self.model_config.get('filename', ''), n_ctx=self.model_config.get('context_length', 40960), n_threads=min(os.cpu_count(), 8), verbose=False, n_gpu_layers=-1, n_batch=4096, logits_all=False, use_mlock=True, use_mmap=True, ) else: # Load from local path model_path = self.model_config.get('path', '') if not model_path or not os.path.exists(model_path): print(f"Model path not found: {model_path}") return False self.model = Llama( model_path=model_path, n_ctx=self.model_config.get('context_length', 40960), n_threads=min(os.cpu_count(), 8), verbose=False, n_gpu_layers=-1, n_batch=4096, ) self.is_loaded = True print("Model loaded successfully") return True except Exception as e: print(f"Error loading model: {e}") return False def unload_model(self): """Unload the model to free memory""" if self.model: del self.model self.model = None self.is_loaded = False def generate_chat_template(self, system_prompt: str, user_message: str = "") -> str: """ Generate a chat template for the model Args: system_prompt: The system prompt user_message: The user message (optional) Returns: Formatted chat template """ chat_messages = [] # System message if system_prompt: chat_messages.append("<|im_start|>system") chat_messages.append(system_prompt) chat_messages.append("<|im_end|>") # User message if user_message: chat_messages.append("<|im_start|>user") chat_messages.append(user_message) chat_messages.append("<|im_end|>") # Assistant message with generation prompt chat_messages.append("<|im_start|>assistant") chat_messages.append("") # Empty for generation return "\n".join(chat_messages) def analyze_chunk(self, chunk: str, analysis_type: str = 'standard', cache_manager = None) -> Dict[str, Any]: """ Analyze a single text chunk for loopholes and ambiguities Args: chunk: Text chunk to analyze analysis_type: Type of analysis to perform cache_manager: Cache manager instance for caching results Returns: Analysis results """ if not self.is_loaded and not self.load_model(): return { 'error': 'Model not loaded', 'chunk': chunk[:100] + "..." if len(chunk) > 100 else chunk } # Check cache first if cache_manager: cached_result = cache_manager.get( chunk, self.model_config, {'analysis_type': analysis_type} ) if cached_result: return cached_result try: # Prepare analysis template template_config = self.analysis_templates.get(analysis_type, self.analysis_templates['standard']) # Build the full prompt reasoning_format = """ Write your complete analysis between and . Then provide your overall conclusion between and . """ recommendations_format = """ **Recommendations**: Provide specific recommendations for addressing identified issues. """ if template_config['include_recommendations'] else "" full_prompt = self.prompt_templates['loophole_analysis'].format( text=chunk, reasoning_format=reasoning_format, recommendations_format=recommendations_format, depth=template_config['depth'], focus_areas=', '.join(template_config['focus_areas']) ) # Generate chat template chat_template = self.generate_chat_template(full_prompt) # Generate response response = self._generate_response(chat_template) # Parse and structure the response structured_response = self._parse_response(response) # Add metadata result = { 'chunk': chunk, 'analysis_type': analysis_type, 'model_config': self.model_config, 'response': response, 'structured_analysis': structured_response, 'processing_time': time.time(), 'chunk_size': len(chunk), 'word_count': len(chunk.split()) } # Cache the result if cache_manager: cache_manager.put(chunk, result, self.model_config, {'analysis_type': analysis_type}) return result except Exception as e: return { 'error': str(e), 'chunk': chunk[:100] + "..." if len(chunk) > 100 else chunk } def _generate_response(self, prompt: str, max_tokens: int = None) -> str: """ Generate a response from the model Args: prompt: Input prompt max_tokens: Maximum tokens to generate Returns: Generated response """ if max_tokens is None: max_tokens = self.model_config.get('max_tokens', 4096) try: response = self.model( prompt, max_tokens=max_tokens, temperature=self.model_config.get('temperature', 0.3), top_p=self.model_config.get('top_p', 0.85), top_k=self.model_config.get('top_k', 50), repeat_penalty=self.model_config.get('repeat_penalty', 1.15), stop=["", "", "<|im_end|>"], echo=False ) return response['choices'][0]['text'].strip() except Exception as e: print(f"Error generating response: {e}") return "" def _parse_response(self, response: str) -> Dict[str, Any]: """ Parse the LLM response into structured data Args: response: Raw LLM response Returns: Structured analysis data """ structured = { 'text_meaning': '', 'key_assumptions': [], 'exploitable_interpretations': [], 'critical_loopholes': [], 'circumvention_strategies': [], 'recommendations': [], 'confidence_score': 0, 'analysis_quality': 'unknown' } try: # Extract sections using regex patterns patterns = { 'text_meaning': r'\*\*Text Meaning\*\*:\s*(.*?)(?=\*\*|$)', 'key_assumptions': r'\*\*Key Assumptions\*\*:\s*(.*?)(?=\*\*|$)', 'exploitable_interpretations': r'\*\*Exploitable Interpretations\*\*:\s*(.*?)(?=\*\*|$)', 'critical_loopholes': r'\*\*Critical Loopholes\*\*:\s*(.*?)(?=\*\*|$)', 'circumvention_strategies': r'\*\*Circumvention Strategies\*\*:\s*(.*?)(?=\*\*|$)', 'recommendations': r'\*\*Recommendations\*\*:\s*(.*?)(?=\*\*|$|$)', } for key, pattern in patterns.items(): matches = re.findall(pattern, response, re.DOTALL | re.IGNORECASE) if matches: content = matches[0].strip() if key in ['key_assumptions', 'exploitable_interpretations', 'critical_loopholes', 'circumvention_strategies', 'recommendations']: # Split into list items items = re.findall(r'(?:\d+\.|-|\•)\s*(.*?)(?=(?:\d+\.|-|\•)|$)', content, re.DOTALL) structured[key] = [item.strip() for item in items if item.strip()] else: structured[key] = content # Calculate confidence score based on analysis completeness completeness_score = 0 if structured['text_meaning']: completeness_score += 20 for key in ['key_assumptions', 'exploitable_interpretations', 'critical_loopholes', 'circumvention_strategies']: if structured[key]: completeness_score += 20 structured['confidence_score'] = min(100, completeness_score) # Determine analysis quality if structured['confidence_score'] >= 80: structured['analysis_quality'] = 'high' elif structured['confidence_score'] >= 60: structured['analysis_quality'] = 'medium' else: structured['analysis_quality'] = 'low' except Exception as e: print(f"Error parsing response: {e}") structured['error'] = str(e) return structured def batch_analyze_chunks(self, chunks: List[str], analysis_type: str = 'standard', cache_manager = None, progress_callback = None) -> List[Dict[str, Any]]: """ Analyze multiple chunks in batch Args: chunks: List of text chunks to analyze analysis_type: Type of analysis to perform cache_manager: Cache manager instance progress_callback: Callback function for progress updates Returns: List of analysis results """ results = [] total_chunks = len(chunks) for i, chunk in enumerate(chunks): if progress_callback: progress = (i + 1) / total_chunks progress_callback(progress, f"Analyzing chunk {i + 1}/{total_chunks}") result = self.analyze_chunk(chunk, analysis_type, cache_manager) results.append(result) return results def get_model_info(self) -> Dict[str, Any]: """Get information about the loaded model""" if not self.is_loaded: return {'status': 'not_loaded'} try: return { 'status': 'loaded', 'config': self.model_config, 'model_type': type(self.model).__name__, 'context_length': self.model_config.get('context_length', 'unknown'), 'vocab_size': getattr(self.model, 'vocab_size', 'unknown') } except Exception as e: return { 'status': 'error', 'error': str(e) } def validate_model_config(self) -> Dict[str, Any]: """Validate the current model configuration""" validation = { 'is_valid': True, 'issues': [], 'warnings': [] } # Check required parameters required_params = ['context_length', 'max_tokens'] for param in required_params: if param not in self.model_config: validation['issues'].append(f"Missing required parameter: {param}") validation['is_valid'] = False # Check parameter ranges if 'context_length' in self.model_config: if self.model_config['context_length'] < 1024: validation['issues'].append("Context length too small (minimum: 1024)") validation['is_valid'] = False if 'max_tokens' in self.model_config: if self.model_config['max_tokens'] < 64: validation['issues'].append("Max tokens too small (minimum: 64)") validation['is_valid'] = False if 'temperature' in self.model_config: temp = self.model_config['temperature'] if not (0 <= temp <= 2): validation['issues'].append("Temperature out of valid range (0-2)") validation['is_valid'] = False # Check model path/file if 'path' in self.model_config and self.model_config['path']: if not os.path.exists(self.model_config['path']): validation['warnings'].append(f"Model file not found: {self.model_config['path']}") return validation