#!/usr/bin/env python3
"""
NZ Legislation Loophole Analysis Streamlit App
A modern web interface for analyzing New Zealand legislation text to identify
potential loopholes, ambiguities, and unintended consequences using AI.
Features:
- Advanced UI with multi-page layout
- Context memory cache system for improved performance
- Real-time progress monitoring
- Interactive results visualization
- Batch processing capabilities
- Comprehensive configuration management
"""
import streamlit as st
import sys
import os
from pathlib import Path
# Add the current directory to Python path for imports
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# Import core modules
from core.cache_manager import CacheManager
from core.text_processor import TextProcessor
from core.llm_analyzer import LLMAnalyzer
from core.dataset_builder import DatasetBuilder
from utils.config import ConfigManager
from utils.ui_helpers import UIHelpers
from utils.performance import PerformanceMonitor
# Configure page settings
st.set_page_config(
page_title="NZ Legislation Loophole Analyzer",
page_icon="âī¸",
layout="wide",
initial_sidebar_state="expanded",
menu_items={
'Get Help': 'https://github.com/your-repo',
'Report a bug': 'https://github.com/your-repo/issues',
'About': '''
## NZ Legislation Loophole Analyzer
A powerful AI tool for analyzing New Zealand legislation to identify
potential loopholes, ambiguities, and unintended consequences.
**Version:** 1.0.0
**Built with:** Streamlit, Llama.cpp, and advanced caching
'''
}
)
# Initialize session state
def initialize_session_state():
"""Initialize all session state variables"""
if 'cache_manager' not in st.session_state:
st.session_state.cache_manager = CacheManager()
if 'config_manager' not in st.session_state:
st.session_state.config_manager = ConfigManager()
if 'performance_monitor' not in st.session_state:
st.session_state.performance_monitor = PerformanceMonitor()
if 'current_analysis' not in st.session_state:
st.session_state.current_analysis = None
if 'analysis_results' not in st.session_state:
st.session_state.analysis_results = []
if 'processing_status' not in st.session_state:
st.session_state.processing_status = {
'is_running': False,
'progress': 0,
'current_task': '',
'total_chunks': 0,
'processed_chunks': 0
}
def main():
"""Main application function"""
# Initialize session state
initialize_session_state()
# Create sidebar with navigation and status
with st.sidebar:
st.title("âī¸ NZ Legislation Analyzer")
st.markdown("---")
# Navigation
pages = {
"đ Home": "home",
"đ¤ Upload & Process": "upload",
"đ Analysis Results": "results",
"âī¸ Settings": "settings",
"đ Performance": "performance"
}
selected_page = st.selectbox(
"Navigate to:",
list(pages.keys()),
key="nav_select"
)
st.markdown("---")
# Cache status
with st.expander("đ§ Cache Status", expanded=True):
cache_stats = st.session_state.cache_manager.get_stats()
st.metric("Cache Hits", cache_stats['hits'])
st.metric("Cache Misses", cache_stats['misses'])
st.metric("Hit Rate", ".1f")
st.metric("Cached Chunks", cache_stats['entries'])
if st.button("Clear Cache", type="secondary"):
st.session_state.cache_manager.clear_cache()
st.rerun()
# Performance metrics
with st.expander("đ Performance", expanded=True):
perf_stats = st.session_state.performance_monitor.get_stats()
st.metric("Memory Usage", ".1f")
st.metric("Avg Processing Time", ".2f")
# Processing status
if st.session_state.processing_status['is_running']:
with st.expander("đ Processing Status", expanded=True):
st.progress(st.session_state.processing_status['progress'])
st.text(st.session_state.processing_status['current_task'])
st.text(f"Chunk {st.session_state.processing_status['processed_chunks']}/"
f"{st.session_state.processing_status['total_chunks']}")
# Main content area
page = pages[selected_page]
if page == "home":
show_home_page()
elif page == "upload":
show_upload_page()
elif page == "results":
show_results_page()
elif page == "settings":
show_settings_page()
elif page == "performance":
show_performance_page()
# Footer
st.markdown("---")
st.markdown(
"""
NZ Legislation Loophole Analyzer v1.0.0 | Built with Streamlit & Llama.cpp
""",
unsafe_allow_html=True
)
def show_home_page():
"""Display the home page with overview and quick start"""
st.title("đ NZ Legislation Loophole Analyzer")
st.markdown("### AI-Powered Legal Analysis Tool")
col1, col2 = st.columns([2, 1])
with col1:
st.markdown("""
This advanced tool analyzes New Zealand legislation to identify:
đ **Potential Loopholes** - Legal ambiguities that could be exploited
đ **Unintended Consequences** - Hidden implications in legislative language
âī¸ **Ambiguities** - Vague or unclear legal provisions
đ¯ **Circumvention Strategies** - Ways legislation might be bypassed
**Key Features:**
- **Smart Caching**: Avoid re-processing identical content
- **Advanced UI**: Modern interface with real-time progress
- **Batch Processing**: Handle multiple legislation files
- **Performance Monitoring**: Track memory usage and processing speed
- **Export Options**: Multiple formats for analysis results
""")
st.markdown("### Quick Start")
st.markdown("""
1. **Upload** your NZ legislation files (JSON lines or raw text)
2. **Configure** analysis parameters and model settings
3. **Process** the legislation with AI-powered analysis
4. **Review** results with interactive visualizations
5. **Export** findings in multiple formats
""")
with col2:
st.markdown("### Current Configuration")
config = st.session_state.config_manager.get_config()
# Model settings
st.subheader("đ¤ Model Settings")
st.info(f"**Model:** {config['model']['path']}")
st.info(f"**Context Length:** {config['model']['context_length']}")
st.info(f"**Max Tokens:** {config['model']['max_tokens']}")
# Processing settings
st.subheader("âī¸ Processing")
st.info(f"**Chunk Size:** {config['processing']['chunk_size']}")
st.info(f"**Overlap:** {config['processing']['chunk_overlap']}")
st.info(f"**Batch Size:** {config['processing']['batch_size']}")
# Cache settings
st.subheader("đ§ Cache")
cache_stats = st.session_state.cache_manager.get_stats()
st.info(f"**Status:** {'Active' if cache_stats['enabled'] else 'Disabled'}")
st.info(f"**Hit Rate:** {cache_stats['hit_rate']:.1f}%")
if st.button("đ Start Analysis", type="primary", use_container_width=True):
st.switch_page("pages/1_upload.py")
def show_upload_page():
"""Display the upload and processing page"""
st.title("đ¤ Upload & Process Legislation")
# File upload section
st.subheader("đ Upload Legislation Files")
col1, col2 = st.columns([1, 1])
with col1:
uploaded_files = st.file_uploader(
"Select NZ legislation files",
accept_multiple_files=True,
type=['json', 'txt', 'jsonl'],
help="Upload JSON lines format (.jsonl), JSON arrays (.json), or raw text (.txt) files"
)
if uploaded_files:
st.success(f"đ {len(uploaded_files)} file(s) selected")
# Show file details
for file in uploaded_files:
with st.expander(f"đ {file.name}"):
st.write(f"**Size:** {file.size:,} bytes")
st.write(f"**Type:** {file.type}")
# Preview content
if file.type in ['text/plain', 'application/json']:
content = file.read().decode('utf-8')
st.text_area("Preview", content[:500] + "..." if len(content) > 500 else content,
height=100, disabled=True)
file.seek(0) # Reset file pointer
with col2:
# Processing configuration
st.subheader("âī¸ Processing Configuration")
config = st.session_state.config_manager.get_config()
# Model settings
with st.expander("đ¤ Model Configuration", expanded=True):
model_path = st.text_input(
"Model Path",
value=config['model']['path'],
help="Path to your GGUF model file"
)
context_length = st.slider(
"Context Length",
min_value=1024,
max_value=65536,
value=config['model']['context_length'],
step=1024,
help="Maximum context length for the model"
)
max_tokens = st.slider(
"Max Response Tokens",
min_value=256,
max_value=4096,
value=config['model']['max_tokens'],
step=64,
help="Maximum tokens in model response"
)
# Text processing settings
with st.expander("đ Text Processing", expanded=True):
chunk_size = st.slider(
"Chunk Size",
min_value=512,
max_value=8192,
value=config['processing']['chunk_size'],
step=256,
help="Size of text chunks for processing"
)
chunk_overlap = st.slider(
"Chunk Overlap",
min_value=64,
max_value=1024,
value=config['processing']['chunk_overlap'],
step=32,
help="Overlap between chunks for context preservation"
)
# Analysis settings
with st.expander("đ Analysis Settings", expanded=True):
analysis_depth = st.select_slider(
"Analysis Depth",
options=["Basic", "Standard", "Detailed", "Comprehensive"],
value=config['analysis']['depth'],
help="Level of detail in legal analysis"
)
include_recommendations = st.checkbox(
"Include Recommendations",
value=config['analysis']['include_recommendations'],
help="Generate specific recommendations for addressing identified issues"
)
# Process button and status
col1, col2, col3 = st.columns([1, 1, 1])
with col1:
if st.button("đ Start Processing", type="primary", use_container_width=True):
if not uploaded_files:
st.error("Please upload at least one legislation file")
else:
start_processing(uploaded_files, {
'model': {
'path': model_path,
'context_length': context_length,
'max_tokens': max_tokens
},
'processing': {
'chunk_size': chunk_size,
'chunk_overlap': chunk_overlap
},
'analysis': {
'depth': analysis_depth,
'include_recommendations': include_recommendations
}
})
with col2:
if st.button("âšī¸ Stop Processing", use_container_width=True):
stop_processing()
with col3:
if st.button("đ View Results", use_container_width=True):
st.switch_page("pages/2_analysis.py")
def start_processing(files, config):
"""Start the processing workflow"""
st.session_state.processing_status = {
'is_running': True,
'progress': 0,
'current_task': 'Initializing...',
'total_chunks': 0,
'processed_chunks': 0
}
# Update configuration
st.session_state.config_manager.update_config(config)
# TODO: Implement actual processing logic
st.rerun()
def stop_processing():
"""Stop the current processing"""
st.session_state.processing_status['is_running'] = False
st.session_state.processing_status['current_task'] = 'Stopped by user'
def show_results_page():
"""Display analysis results page"""
st.title("đ Analysis Results")
if not st.session_state.analysis_results:
st.info("No analysis results available. Please upload and process legislation files first.")
return
# Results overview
st.subheader("đ Results Overview")
col1, col2, col3, col4 = st.columns(4)
total_results = len(st.session_state.analysis_results)
total_loopholes = sum(len(result.get('loopholes', [])) for result in st.session_state.analysis_results)
avg_confidence = sum(result.get('confidence', 0) for result in st.session_state.analysis_results) / max(total_results, 1)
with col1:
st.metric("Total Analyses", total_results)
with col2:
st.metric("Loopholes Found", total_loopholes)
with col3:
st.metric("Avg Confidence", ".2f")
with col4:
cache_stats = st.session_state.cache_manager.get_stats()
st.metric("Cache Hit Rate", ".1f")
# Results display
st.subheader("đ Detailed Results")
for i, result in enumerate(st.session_state.analysis_results):
with st.expander(f"đ Analysis {i+1}: {result.get('title', 'Unknown Title')}", expanded=i==0):
col1, col2 = st.columns([2, 1])
with col1:
st.markdown("**Summary:**")
st.write(result.get('summary', 'No summary available'))
st.markdown("**Key Findings:**")
for finding in result.get('loopholes', []):
st.markdown(f"- {finding}")
with col2:
st.metric("Confidence", ".2f")
st.metric("Processing Time", ".2f")
st.metric("Chunks Processed", result.get('chunks_processed', 0))
# Export options
st.subheader("đž Export Results")
col1, col2, col3 = st.columns(3)
with col1:
if st.button("đ Export as JSON", use_container_width=True):
export_results('json')
with col2:
if st.button("đ Export as CSV", use_container_width=True):
export_results('csv')
with col3:
if st.button("đ Export as Excel", use_container_width=True):
export_results('excel')
def export_results(format_type):
"""Export analysis results in specified format"""
# TODO: Implement export functionality
st.success(f"Results exported as {format_type.upper()}")
def show_settings_page():
"""Display settings page"""
st.title("âī¸ Settings & Configuration")
tabs = st.tabs(["đ¤ Model Settings", "đ Processing", "đ§ Cache", "đ¨ UI", "đ§ Advanced"])
with tabs[0]:
st.subheader("đ¤ Model Configuration")
config = st.session_state.config_manager.get_config()
model_path = st.text_input(
"Model Path",
value=config['model']['path'],
help="Path to your GGUF model file"
)
repo_id = st.text_input(
"HuggingFace Repo ID",
value=config['model']['repo_id'],
help="HuggingFace repository ID for model download"
)
filename = st.text_input(
"Model Filename",
value=config['model']['filename'],
help="Specific model filename in the repository"
)
context_length = st.slider(
"Context Length",
min_value=1024,
max_value=131072,
value=config['model']['context_length'],
step=1024
)
max_tokens = st.slider(
"Max Response Tokens",
min_value=256,
max_value=8192,
value=config['model']['max_tokens'],
step=64
)
temperature = st.slider(
"Temperature",
min_value=0.0,
max_value=2.0,
value=config['model']['temperature'],
step=0.1,
help="Controls randomness in model output"
)
with tabs[1]:
st.subheader("đ Text Processing")
chunk_size = st.slider(
"Chunk Size",
min_value=256,
max_value=16384,
value=config['processing']['chunk_size'],
step=256
)
chunk_overlap = st.slider(
"Chunk Overlap",
min_value=32,
max_value=2048,
value=config['processing']['chunk_overlap'],
step=32
)
batch_size = st.slider(
"Batch Size",
min_value=1,
max_value=32,
value=config['processing']['batch_size'],
step=1
)
clean_text = st.checkbox(
"Clean Text",
value=config['processing']['clean_text'],
help="Apply text cleaning and normalization"
)
with tabs[2]:
st.subheader("đ§ Cache Configuration")
enable_cache = st.checkbox(
"Enable Caching",
value=config['cache']['enabled'],
help="Use cache to avoid re-processing identical chunks"
)
max_cache_size = st.slider(
"Max Cache Size (MB)",
min_value=100,
max_value=8192,
value=config['cache']['max_size_mb'],
step=100
)
cache_ttl = st.slider(
"Cache TTL (hours)",
min_value=1,
max_value=168,
value=config['cache']['ttl_hours'],
step=1,
help="Time-to-live for cached entries"
)
persistent_cache = st.checkbox(
"Persistent Cache",
value=config['cache']['persistent'],
help="Save cache to disk for persistence across sessions"
)
with tabs[3]:
st.subheader("đ¨ UI Configuration")
theme = st.selectbox(
"Theme",
options=["Auto", "Light", "Dark"],
index=["Auto", "Light", "Dark"].index(config['ui']['theme'])
)
show_progress = st.checkbox(
"Show Progress Bars",
value=config['ui']['show_progress'],
help="Display progress indicators during processing"
)
auto_refresh = st.checkbox(
"Auto-refresh Results",
value=config['ui']['auto_refresh'],
help="Automatically refresh results view"
)
with tabs[4]:
st.subheader("đ§ Advanced Settings")
debug_mode = st.checkbox(
"Debug Mode",
value=config['advanced']['debug_mode'],
help="Enable detailed logging and debugging information"
)
log_level = st.selectbox(
"Log Level",
options=["DEBUG", "INFO", "WARNING", "ERROR"],
index=["DEBUG", "INFO", "WARNING", "ERROR"].index(config['advanced']['log_level'])
)
memory_limit = st.slider(
"Memory Limit (MB)",
min_value=512,
max_value=32768,
value=config['advanced']['memory_limit_mb'],
step=512
)
# Save settings
col1, col2 = st.columns([1, 1])
with col1:
if st.button("đž Save Settings", type="primary", use_container_width=True):
new_config = {
'model': {
'path': model_path,
'repo_id': repo_id,
'filename': filename,
'context_length': context_length,
'max_tokens': max_tokens,
'temperature': temperature
},
'processing': {
'chunk_size': chunk_size,
'chunk_overlap': chunk_overlap,
'batch_size': batch_size,
'clean_text': clean_text
},
'cache': {
'enabled': enable_cache,
'max_size_mb': max_cache_size,
'ttl_hours': cache_ttl,
'persistent': persistent_cache
},
'ui': {
'theme': theme,
'show_progress': show_progress,
'auto_refresh': auto_refresh
},
'advanced': {
'debug_mode': debug_mode,
'log_level': log_level,
'memory_limit_mb': memory_limit
}
}
st.session_state.config_manager.update_config(new_config)
st.success("Settings saved successfully!")
with col2:
if st.button("đ Reset to Defaults", use_container_width=True):
st.session_state.config_manager.reset_to_defaults()
st.success("Settings reset to defaults!")
st.rerun()
def show_performance_page():
"""Display performance monitoring page"""
st.title("đ Performance Dashboard")
# Real-time metrics
st.subheader("đ Real-time Metrics")
col1, col2, col3, col4 = st.columns(4)
perf_stats = st.session_state.performance_monitor.get_stats()
with col1:
st.metric("Memory Usage", ".1f", "MB")
with col2:
st.metric("CPU Usage", ".1f", "%")
with col3:
st.metric("Active Threads", perf_stats.get('active_threads', 0))
with col4:
cache_stats = st.session_state.cache_manager.get_stats()
st.metric("Cache Hit Rate", ".1f", "%")
# Performance charts
st.subheader("đ Performance History")
# TODO: Add interactive charts for performance metrics
# System information
st.subheader("đģ System Information")
col1, col2 = st.columns(2)
with col1:
st.markdown("**Hardware:**")
# TODO: Add system information display
with col2:
st.markdown("**Software:**")
# TODO: Add software information display
# Cache performance
st.subheader("đ§ Cache Performance")
cache_stats = st.session_state.cache_manager.get_stats()
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Total Requests", cache_stats['hits'] + cache_stats['misses'])
with col2:
st.metric("Cache Hits", cache_stats['hits'])
with col3:
st.metric("Cache Misses", cache_stats['misses'])
with col4:
st.metric("Hit Rate", ".1f")
# Performance recommendations
st.subheader("đĄ Performance Recommendations")
recommendations = []
if cache_stats['hit_rate'] < 50:
recommendations.append("Consider increasing cache size or adjusting chunk sizes to improve hit rate")
if perf_stats.get('memory_usage_mb', 0) > 8000:
recommendations.append("High memory usage detected. Consider reducing batch size or chunk size")
if not recommendations:
recommendations.append("Performance is optimal!")
for rec in recommendations:
st.info(rec)
if __name__ == "__main__":
main()