| |
| """ |
| LLM Analyzer |
| |
| Handles LLM model loading, inference, and analysis for the NZ Legislation Loophole Analysis. |
| Provides optimized prompts and response parsing for legal text analysis. |
| """ |
|
|
| import os |
| import time |
| from typing import List, Dict, Any, Optional, Tuple |
| import json |
| from llama_cpp import Llama |
| import re |
|
|
| class LLMAnalyzer: |
| """LLM-based analyzer for legislation loophole detection""" |
|
|
| def __init__(self, model_config: Dict[str, Any]): |
| """ |
| Initialize the LLM analyzer |
| |
| Args: |
| model_config: Configuration for the LLM model |
| """ |
| self.model_config = model_config |
| self.model = None |
| self.is_loaded = False |
|
|
| |
| self.analysis_templates = { |
| 'standard': { |
| 'depth': 'Standard', |
| 'include_recommendations': True, |
| 'focus_areas': ['loopholes', 'ambiguities', 'unintended_consequences'] |
| }, |
| 'detailed': { |
| 'depth': 'Detailed', |
| 'include_recommendations': True, |
| 'focus_areas': ['loopholes', 'ambiguities', 'unintended_consequences', 'implementation_issues'] |
| }, |
| 'comprehensive': { |
| 'depth': 'Comprehensive', |
| 'include_recommendations': True, |
| 'focus_areas': ['loopholes', 'ambiguities', 'unintended_consequences', |
| 'implementation_issues', 'policy_conflicts', 'enforcement_challenges'] |
| } |
| } |
|
|
| |
| self.prompt_templates = { |
| 'loophole_analysis': self._get_loophole_analysis_template(), |
| 'ambiguity_detection': self._get_ambiguity_detection_template(), |
| 'recommendations': self._get_recommendations_template() |
| } |
|
|
| def _get_loophole_analysis_template(self) -> str: |
| """Get the main loophole analysis prompt template""" |
| return """You are a legal expert analyzing New Zealand legislation for loopholes and ambiguities. |
| |
| LEGISLATION TEXT: |
| {text} |
| |
| TASK: Analyze this legislative text for potential loopholes, ambiguities, or unintended consequences. |
| |
| INSTRUCTIONS: |
| Provide a structured analysis following this format: |
| |
| 1. **Text Meaning**: Explain what the text means and its intended purpose |
| 2. **Key Assumptions**: Identify any assumptions the text makes that could be exploited |
| 3. **Exploitable Interpretations**: Discuss how the text could be interpreted or applied in ways that circumvent its intended purpose |
| 4. **Critical Loopholes**: Identify specific loopholes, ambiguities, or unintended consequences that could be used to bypass the legislation |
| 5. **Circumvention Strategies**: Suggest practical methods or scenarios for exploiting these loopholes to achieve objectives contrary to the legislation's intent |
| |
| {reasoning_format} |
| {recommendations_format} |
| |
| ANALYSIS DEPTH: {depth} |
| FOCUS AREAS: {focus_areas} |
| """ |
|
|
| def _get_ambiguity_detection_template(self) -> str: |
| """Get the ambiguity detection prompt template""" |
| return """Analyze the following legal text for ambiguities and unclear provisions: |
| |
| TEXT: {text} |
| |
| Identify: |
| 1. Vague terms or phrases |
| 2. Ambiguous references |
| 3. Unclear conditions or requirements |
| 4. Missing definitions |
| 5. Conflicting provisions |
| |
| Provide specific examples and suggest clarifications. |
| """ |
|
|
| def _get_recommendations_template(self) -> str: |
| """Get the recommendations prompt template""" |
| return """Based on the loopholes and ambiguities identified, provide specific recommendations for: |
| |
| 1. Legislative amendments to close identified loopholes |
| 2. Additional definitions or clarifications needed |
| 3. Implementation guidelines or regulations |
| 4. Monitoring and enforcement mechanisms |
| |
| Prioritize recommendations by impact and feasibility. |
| """ |
|
|
| def load_model(self) -> bool: |
| """ |
| Load the LLM model |
| |
| Returns: |
| True if model loaded successfully, False otherwise |
| """ |
| if self.is_loaded: |
| return True |
|
|
| try: |
| print("Loading LLM model...") |
|
|
| |
| if self.model_config.get('repo_id'): |
| self.model = Llama.from_pretrained( |
| repo_id=self.model_config['repo_id'], |
| filename=self.model_config.get('filename', ''), |
| n_ctx=self.model_config.get('context_length', 40960), |
| n_threads=min(os.cpu_count(), 8), |
| verbose=False, |
| n_gpu_layers=-1, |
| n_batch=4096, |
| logits_all=False, |
| use_mlock=True, |
| use_mmap=True, |
| ) |
| else: |
| |
| model_path = self.model_config.get('path', '') |
| if not model_path or not os.path.exists(model_path): |
| print(f"Model path not found: {model_path}") |
| return False |
|
|
| self.model = Llama( |
| model_path=model_path, |
| n_ctx=self.model_config.get('context_length', 40960), |
| n_threads=min(os.cpu_count(), 8), |
| verbose=False, |
| n_gpu_layers=-1, |
| n_batch=4096, |
| ) |
|
|
| self.is_loaded = True |
| print("Model loaded successfully") |
| return True |
|
|
| except Exception as e: |
| print(f"Error loading model: {e}") |
| return False |
|
|
| def unload_model(self): |
| """Unload the model to free memory""" |
| if self.model: |
| del self.model |
| self.model = None |
| self.is_loaded = False |
|
|
| def generate_chat_template(self, system_prompt: str, user_message: str = "") -> str: |
| """ |
| Generate a chat template for the model |
| |
| Args: |
| system_prompt: The system prompt |
| user_message: The user message (optional) |
| |
| Returns: |
| Formatted chat template |
| """ |
| chat_messages = [] |
|
|
| |
| if system_prompt: |
| chat_messages.append("<|im_start|>system") |
| chat_messages.append(system_prompt) |
| chat_messages.append("<|im_end|>") |
|
|
| |
| if user_message: |
| chat_messages.append("<|im_start|>user") |
| chat_messages.append(user_message) |
| chat_messages.append("<|im_end|>") |
|
|
| |
| chat_messages.append("<|im_start|>assistant") |
| chat_messages.append("") |
|
|
| return "\n".join(chat_messages) |
|
|
| def analyze_chunk(self, chunk: str, analysis_type: str = 'standard', |
| cache_manager = None) -> Dict[str, Any]: |
| """ |
| Analyze a single text chunk for loopholes and ambiguities |
| |
| Args: |
| chunk: Text chunk to analyze |
| analysis_type: Type of analysis to perform |
| cache_manager: Cache manager instance for caching results |
| |
| Returns: |
| Analysis results |
| """ |
| if not self.is_loaded and not self.load_model(): |
| return { |
| 'error': 'Model not loaded', |
| 'chunk': chunk[:100] + "..." if len(chunk) > 100 else chunk |
| } |
|
|
| |
| if cache_manager: |
| cached_result = cache_manager.get( |
| chunk, |
| self.model_config, |
| {'analysis_type': analysis_type} |
| ) |
| if cached_result: |
| return cached_result |
|
|
| try: |
| |
| template_config = self.analysis_templates.get(analysis_type, self.analysis_templates['standard']) |
|
|
| |
| reasoning_format = """ |
| Write your complete analysis between <start_working_out> and <end_working_out>. |
| |
| Then provide your overall conclusion between <SOLUTION> and </SOLUTION>. |
| """ |
|
|
| recommendations_format = """ |
| **Recommendations**: Provide specific recommendations for addressing identified issues. |
| """ if template_config['include_recommendations'] else "" |
|
|
| full_prompt = self.prompt_templates['loophole_analysis'].format( |
| text=chunk, |
| reasoning_format=reasoning_format, |
| recommendations_format=recommendations_format, |
| depth=template_config['depth'], |
| focus_areas=', '.join(template_config['focus_areas']) |
| ) |
|
|
| |
| chat_template = self.generate_chat_template(full_prompt) |
|
|
| |
| response = self._generate_response(chat_template) |
|
|
| |
| structured_response = self._parse_response(response) |
|
|
| |
| result = { |
| 'chunk': chunk, |
| 'analysis_type': analysis_type, |
| 'model_config': self.model_config, |
| 'response': response, |
| 'structured_analysis': structured_response, |
| 'processing_time': time.time(), |
| 'chunk_size': len(chunk), |
| 'word_count': len(chunk.split()) |
| } |
|
|
| |
| if cache_manager: |
| cache_manager.put(chunk, result, self.model_config, {'analysis_type': analysis_type}) |
|
|
| return result |
|
|
| except Exception as e: |
| return { |
| 'error': str(e), |
| 'chunk': chunk[:100] + "..." if len(chunk) > 100 else chunk |
| } |
|
|
| def _generate_response(self, prompt: str, max_tokens: int = None) -> str: |
| """ |
| Generate a response from the model |
| |
| Args: |
| prompt: Input prompt |
| max_tokens: Maximum tokens to generate |
| |
| Returns: |
| Generated response |
| """ |
| if max_tokens is None: |
| max_tokens = self.model_config.get('max_tokens', 4096) |
|
|
| try: |
| response = self.model( |
| prompt, |
| max_tokens=max_tokens, |
| temperature=self.model_config.get('temperature', 0.3), |
| top_p=self.model_config.get('top_p', 0.85), |
| top_k=self.model_config.get('top_k', 50), |
| repeat_penalty=self.model_config.get('repeat_penalty', 1.15), |
| stop=["<end_working_out>", "</SOLUTION>", "<|im_end|>"], |
| echo=False |
| ) |
|
|
| return response['choices'][0]['text'].strip() |
|
|
| except Exception as e: |
| print(f"Error generating response: {e}") |
| return "" |
|
|
| def _parse_response(self, response: str) -> Dict[str, Any]: |
| """ |
| Parse the LLM response into structured data |
| |
| Args: |
| response: Raw LLM response |
| |
| Returns: |
| Structured analysis data |
| """ |
| structured = { |
| 'text_meaning': '', |
| 'key_assumptions': [], |
| 'exploitable_interpretations': [], |
| 'critical_loopholes': [], |
| 'circumvention_strategies': [], |
| 'recommendations': [], |
| 'confidence_score': 0, |
| 'analysis_quality': 'unknown' |
| } |
|
|
| try: |
| |
| patterns = { |
| 'text_meaning': r'\*\*Text Meaning\*\*:\s*(.*?)(?=\*\*|$)', |
| 'key_assumptions': r'\*\*Key Assumptions\*\*:\s*(.*?)(?=\*\*|$)', |
| 'exploitable_interpretations': r'\*\*Exploitable Interpretations\*\*:\s*(.*?)(?=\*\*|$)', |
| 'critical_loopholes': r'\*\*Critical Loopholes\*\*:\s*(.*?)(?=\*\*|$)', |
| 'circumvention_strategies': r'\*\*Circumvention Strategies\*\*:\s*(.*?)(?=\*\*|$)', |
| 'recommendations': r'\*\*Recommendations\*\*:\s*(.*?)(?=\*\*|$|$)', |
| } |
|
|
| for key, pattern in patterns.items(): |
| matches = re.findall(pattern, response, re.DOTALL | re.IGNORECASE) |
| if matches: |
| content = matches[0].strip() |
| if key in ['key_assumptions', 'exploitable_interpretations', |
| 'critical_loopholes', 'circumvention_strategies', 'recommendations']: |
| |
| items = re.findall(r'(?:\d+\.|-|\•)\s*(.*?)(?=(?:\d+\.|-|\•)|$)', |
| content, re.DOTALL) |
| structured[key] = [item.strip() for item in items if item.strip()] |
| else: |
| structured[key] = content |
|
|
| |
| completeness_score = 0 |
| if structured['text_meaning']: |
| completeness_score += 20 |
| for key in ['key_assumptions', 'exploitable_interpretations', |
| 'critical_loopholes', 'circumvention_strategies']: |
| if structured[key]: |
| completeness_score += 20 |
|
|
| structured['confidence_score'] = min(100, completeness_score) |
|
|
| |
| if structured['confidence_score'] >= 80: |
| structured['analysis_quality'] = 'high' |
| elif structured['confidence_score'] >= 60: |
| structured['analysis_quality'] = 'medium' |
| else: |
| structured['analysis_quality'] = 'low' |
|
|
| except Exception as e: |
| print(f"Error parsing response: {e}") |
| structured['error'] = str(e) |
|
|
| return structured |
|
|
| def batch_analyze_chunks(self, chunks: List[str], analysis_type: str = 'standard', |
| cache_manager = None, progress_callback = None) -> List[Dict[str, Any]]: |
| """ |
| Analyze multiple chunks in batch |
| |
| Args: |
| chunks: List of text chunks to analyze |
| analysis_type: Type of analysis to perform |
| cache_manager: Cache manager instance |
| progress_callback: Callback function for progress updates |
| |
| Returns: |
| List of analysis results |
| """ |
| results = [] |
| total_chunks = len(chunks) |
|
|
| for i, chunk in enumerate(chunks): |
| if progress_callback: |
| progress = (i + 1) / total_chunks |
| progress_callback(progress, f"Analyzing chunk {i + 1}/{total_chunks}") |
|
|
| result = self.analyze_chunk(chunk, analysis_type, cache_manager) |
| results.append(result) |
|
|
| return results |
|
|
| def get_model_info(self) -> Dict[str, Any]: |
| """Get information about the loaded model""" |
| if not self.is_loaded: |
| return {'status': 'not_loaded'} |
|
|
| try: |
| return { |
| 'status': 'loaded', |
| 'config': self.model_config, |
| 'model_type': type(self.model).__name__, |
| 'context_length': self.model_config.get('context_length', 'unknown'), |
| 'vocab_size': getattr(self.model, 'vocab_size', 'unknown') |
| } |
| except Exception as e: |
| return { |
| 'status': 'error', |
| 'error': str(e) |
| } |
|
|
| def validate_model_config(self) -> Dict[str, Any]: |
| """Validate the current model configuration""" |
| validation = { |
| 'is_valid': True, |
| 'issues': [], |
| 'warnings': [] |
| } |
|
|
| |
| required_params = ['context_length', 'max_tokens'] |
| for param in required_params: |
| if param not in self.model_config: |
| validation['issues'].append(f"Missing required parameter: {param}") |
| validation['is_valid'] = False |
|
|
| |
| if 'context_length' in self.model_config: |
| if self.model_config['context_length'] < 1024: |
| validation['issues'].append("Context length too small (minimum: 1024)") |
| validation['is_valid'] = False |
|
|
| if 'max_tokens' in self.model_config: |
| if self.model_config['max_tokens'] < 64: |
| validation['issues'].append("Max tokens too small (minimum: 64)") |
| validation['is_valid'] = False |
|
|
| if 'temperature' in self.model_config: |
| temp = self.model_config['temperature'] |
| if not (0 <= temp <= 2): |
| validation['issues'].append("Temperature out of valid range (0-2)") |
| validation['is_valid'] = False |
|
|
| |
| if 'path' in self.model_config and self.model_config['path']: |
| if not os.path.exists(self.model_config['path']): |
| validation['warnings'].append(f"Model file not found: {self.model_config['path']}") |
|
|
| return validation |
|
|