l / streamlit_app /core /llm_analyzer.py
Princess3's picture
Upload 25 files
c089ca4 verified
#!/usr/bin/env python3
"""
LLM Analyzer
Handles LLM model loading, inference, and analysis for the NZ Legislation Loophole Analysis.
Provides optimized prompts and response parsing for legal text analysis.
"""
import os
import time
from typing import List, Dict, Any, Optional, Tuple
import json
from llama_cpp import Llama
import re
class LLMAnalyzer:
"""LLM-based analyzer for legislation loophole detection"""
def __init__(self, model_config: Dict[str, Any]):
"""
Initialize the LLM analyzer
Args:
model_config: Configuration for the LLM model
"""
self.model_config = model_config
self.model = None
self.is_loaded = False
# Analysis templates
self.analysis_templates = {
'standard': {
'depth': 'Standard',
'include_recommendations': True,
'focus_areas': ['loopholes', 'ambiguities', 'unintended_consequences']
},
'detailed': {
'depth': 'Detailed',
'include_recommendations': True,
'focus_areas': ['loopholes', 'ambiguities', 'unintended_consequences', 'implementation_issues']
},
'comprehensive': {
'depth': 'Comprehensive',
'include_recommendations': True,
'focus_areas': ['loopholes', 'ambiguities', 'unintended_consequences',
'implementation_issues', 'policy_conflicts', 'enforcement_challenges']
}
}
# Prompt templates
self.prompt_templates = {
'loophole_analysis': self._get_loophole_analysis_template(),
'ambiguity_detection': self._get_ambiguity_detection_template(),
'recommendations': self._get_recommendations_template()
}
def _get_loophole_analysis_template(self) -> str:
"""Get the main loophole analysis prompt template"""
return """You are a legal expert analyzing New Zealand legislation for loopholes and ambiguities.
LEGISLATION TEXT:
{text}
TASK: Analyze this legislative text for potential loopholes, ambiguities, or unintended consequences.
INSTRUCTIONS:
Provide a structured analysis following this format:
1. **Text Meaning**: Explain what the text means and its intended purpose
2. **Key Assumptions**: Identify any assumptions the text makes that could be exploited
3. **Exploitable Interpretations**: Discuss how the text could be interpreted or applied in ways that circumvent its intended purpose
4. **Critical Loopholes**: Identify specific loopholes, ambiguities, or unintended consequences that could be used to bypass the legislation
5. **Circumvention Strategies**: Suggest practical methods or scenarios for exploiting these loopholes to achieve objectives contrary to the legislation's intent
{reasoning_format}
{recommendations_format}
ANALYSIS DEPTH: {depth}
FOCUS AREAS: {focus_areas}
"""
def _get_ambiguity_detection_template(self) -> str:
"""Get the ambiguity detection prompt template"""
return """Analyze the following legal text for ambiguities and unclear provisions:
TEXT: {text}
Identify:
1. Vague terms or phrases
2. Ambiguous references
3. Unclear conditions or requirements
4. Missing definitions
5. Conflicting provisions
Provide specific examples and suggest clarifications.
"""
def _get_recommendations_template(self) -> str:
"""Get the recommendations prompt template"""
return """Based on the loopholes and ambiguities identified, provide specific recommendations for:
1. Legislative amendments to close identified loopholes
2. Additional definitions or clarifications needed
3. Implementation guidelines or regulations
4. Monitoring and enforcement mechanisms
Prioritize recommendations by impact and feasibility.
"""
def load_model(self) -> bool:
"""
Load the LLM model
Returns:
True if model loaded successfully, False otherwise
"""
if self.is_loaded:
return True
try:
print("Loading LLM model...")
# Try to load from HuggingFace
if self.model_config.get('repo_id'):
self.model = Llama.from_pretrained(
repo_id=self.model_config['repo_id'],
filename=self.model_config.get('filename', ''),
n_ctx=self.model_config.get('context_length', 40960),
n_threads=min(os.cpu_count(), 8),
verbose=False,
n_gpu_layers=-1,
n_batch=4096,
logits_all=False,
use_mlock=True,
use_mmap=True,
)
else:
# Load from local path
model_path = self.model_config.get('path', '')
if not model_path or not os.path.exists(model_path):
print(f"Model path not found: {model_path}")
return False
self.model = Llama(
model_path=model_path,
n_ctx=self.model_config.get('context_length', 40960),
n_threads=min(os.cpu_count(), 8),
verbose=False,
n_gpu_layers=-1,
n_batch=4096,
)
self.is_loaded = True
print("Model loaded successfully")
return True
except Exception as e:
print(f"Error loading model: {e}")
return False
def unload_model(self):
"""Unload the model to free memory"""
if self.model:
del self.model
self.model = None
self.is_loaded = False
def generate_chat_template(self, system_prompt: str, user_message: str = "") -> str:
"""
Generate a chat template for the model
Args:
system_prompt: The system prompt
user_message: The user message (optional)
Returns:
Formatted chat template
"""
chat_messages = []
# System message
if system_prompt:
chat_messages.append("<|im_start|>system")
chat_messages.append(system_prompt)
chat_messages.append("<|im_end|>")
# User message
if user_message:
chat_messages.append("<|im_start|>user")
chat_messages.append(user_message)
chat_messages.append("<|im_end|>")
# Assistant message with generation prompt
chat_messages.append("<|im_start|>assistant")
chat_messages.append("") # Empty for generation
return "\n".join(chat_messages)
def analyze_chunk(self, chunk: str, analysis_type: str = 'standard',
cache_manager = None) -> Dict[str, Any]:
"""
Analyze a single text chunk for loopholes and ambiguities
Args:
chunk: Text chunk to analyze
analysis_type: Type of analysis to perform
cache_manager: Cache manager instance for caching results
Returns:
Analysis results
"""
if not self.is_loaded and not self.load_model():
return {
'error': 'Model not loaded',
'chunk': chunk[:100] + "..." if len(chunk) > 100 else chunk
}
# Check cache first
if cache_manager:
cached_result = cache_manager.get(
chunk,
self.model_config,
{'analysis_type': analysis_type}
)
if cached_result:
return cached_result
try:
# Prepare analysis template
template_config = self.analysis_templates.get(analysis_type, self.analysis_templates['standard'])
# Build the full prompt
reasoning_format = """
Write your complete analysis between <start_working_out> and <end_working_out>.
Then provide your overall conclusion between <SOLUTION> and </SOLUTION>.
"""
recommendations_format = """
**Recommendations**: Provide specific recommendations for addressing identified issues.
""" if template_config['include_recommendations'] else ""
full_prompt = self.prompt_templates['loophole_analysis'].format(
text=chunk,
reasoning_format=reasoning_format,
recommendations_format=recommendations_format,
depth=template_config['depth'],
focus_areas=', '.join(template_config['focus_areas'])
)
# Generate chat template
chat_template = self.generate_chat_template(full_prompt)
# Generate response
response = self._generate_response(chat_template)
# Parse and structure the response
structured_response = self._parse_response(response)
# Add metadata
result = {
'chunk': chunk,
'analysis_type': analysis_type,
'model_config': self.model_config,
'response': response,
'structured_analysis': structured_response,
'processing_time': time.time(),
'chunk_size': len(chunk),
'word_count': len(chunk.split())
}
# Cache the result
if cache_manager:
cache_manager.put(chunk, result, self.model_config, {'analysis_type': analysis_type})
return result
except Exception as e:
return {
'error': str(e),
'chunk': chunk[:100] + "..." if len(chunk) > 100 else chunk
}
def _generate_response(self, prompt: str, max_tokens: int = None) -> str:
"""
Generate a response from the model
Args:
prompt: Input prompt
max_tokens: Maximum tokens to generate
Returns:
Generated response
"""
if max_tokens is None:
max_tokens = self.model_config.get('max_tokens', 4096)
try:
response = self.model(
prompt,
max_tokens=max_tokens,
temperature=self.model_config.get('temperature', 0.3),
top_p=self.model_config.get('top_p', 0.85),
top_k=self.model_config.get('top_k', 50),
repeat_penalty=self.model_config.get('repeat_penalty', 1.15),
stop=["<end_working_out>", "</SOLUTION>", "<|im_end|>"],
echo=False
)
return response['choices'][0]['text'].strip()
except Exception as e:
print(f"Error generating response: {e}")
return ""
def _parse_response(self, response: str) -> Dict[str, Any]:
"""
Parse the LLM response into structured data
Args:
response: Raw LLM response
Returns:
Structured analysis data
"""
structured = {
'text_meaning': '',
'key_assumptions': [],
'exploitable_interpretations': [],
'critical_loopholes': [],
'circumvention_strategies': [],
'recommendations': [],
'confidence_score': 0,
'analysis_quality': 'unknown'
}
try:
# Extract sections using regex patterns
patterns = {
'text_meaning': r'\*\*Text Meaning\*\*:\s*(.*?)(?=\*\*|$)',
'key_assumptions': r'\*\*Key Assumptions\*\*:\s*(.*?)(?=\*\*|$)',
'exploitable_interpretations': r'\*\*Exploitable Interpretations\*\*:\s*(.*?)(?=\*\*|$)',
'critical_loopholes': r'\*\*Critical Loopholes\*\*:\s*(.*?)(?=\*\*|$)',
'circumvention_strategies': r'\*\*Circumvention Strategies\*\*:\s*(.*?)(?=\*\*|$)',
'recommendations': r'\*\*Recommendations\*\*:\s*(.*?)(?=\*\*|$|$)',
}
for key, pattern in patterns.items():
matches = re.findall(pattern, response, re.DOTALL | re.IGNORECASE)
if matches:
content = matches[0].strip()
if key in ['key_assumptions', 'exploitable_interpretations',
'critical_loopholes', 'circumvention_strategies', 'recommendations']:
# Split into list items
items = re.findall(r'(?:\d+\.|-|\•)\s*(.*?)(?=(?:\d+\.|-|\•)|$)',
content, re.DOTALL)
structured[key] = [item.strip() for item in items if item.strip()]
else:
structured[key] = content
# Calculate confidence score based on analysis completeness
completeness_score = 0
if structured['text_meaning']:
completeness_score += 20
for key in ['key_assumptions', 'exploitable_interpretations',
'critical_loopholes', 'circumvention_strategies']:
if structured[key]:
completeness_score += 20
structured['confidence_score'] = min(100, completeness_score)
# Determine analysis quality
if structured['confidence_score'] >= 80:
structured['analysis_quality'] = 'high'
elif structured['confidence_score'] >= 60:
structured['analysis_quality'] = 'medium'
else:
structured['analysis_quality'] = 'low'
except Exception as e:
print(f"Error parsing response: {e}")
structured['error'] = str(e)
return structured
def batch_analyze_chunks(self, chunks: List[str], analysis_type: str = 'standard',
cache_manager = None, progress_callback = None) -> List[Dict[str, Any]]:
"""
Analyze multiple chunks in batch
Args:
chunks: List of text chunks to analyze
analysis_type: Type of analysis to perform
cache_manager: Cache manager instance
progress_callback: Callback function for progress updates
Returns:
List of analysis results
"""
results = []
total_chunks = len(chunks)
for i, chunk in enumerate(chunks):
if progress_callback:
progress = (i + 1) / total_chunks
progress_callback(progress, f"Analyzing chunk {i + 1}/{total_chunks}")
result = self.analyze_chunk(chunk, analysis_type, cache_manager)
results.append(result)
return results
def get_model_info(self) -> Dict[str, Any]:
"""Get information about the loaded model"""
if not self.is_loaded:
return {'status': 'not_loaded'}
try:
return {
'status': 'loaded',
'config': self.model_config,
'model_type': type(self.model).__name__,
'context_length': self.model_config.get('context_length', 'unknown'),
'vocab_size': getattr(self.model, 'vocab_size', 'unknown')
}
except Exception as e:
return {
'status': 'error',
'error': str(e)
}
def validate_model_config(self) -> Dict[str, Any]:
"""Validate the current model configuration"""
validation = {
'is_valid': True,
'issues': [],
'warnings': []
}
# Check required parameters
required_params = ['context_length', 'max_tokens']
for param in required_params:
if param not in self.model_config:
validation['issues'].append(f"Missing required parameter: {param}")
validation['is_valid'] = False
# Check parameter ranges
if 'context_length' in self.model_config:
if self.model_config['context_length'] < 1024:
validation['issues'].append("Context length too small (minimum: 1024)")
validation['is_valid'] = False
if 'max_tokens' in self.model_config:
if self.model_config['max_tokens'] < 64:
validation['issues'].append("Max tokens too small (minimum: 64)")
validation['is_valid'] = False
if 'temperature' in self.model_config:
temp = self.model_config['temperature']
if not (0 <= temp <= 2):
validation['issues'].append("Temperature out of valid range (0-2)")
validation['is_valid'] = False
# Check model path/file
if 'path' in self.model_config and self.model_config['path']:
if not os.path.exists(self.model_config['path']):
validation['warnings'].append(f"Model file not found: {self.model_config['path']}")
return validation