#!/usr/bin/env python3 """ NZ Legislation Loophole Analysis Streamlit App A modern web interface for analyzing New Zealand legislation text to identify potential loopholes, ambiguities, and unintended consequences using AI. Features: - Advanced UI with multi-page layout - Context memory cache system for improved performance - Real-time progress monitoring - Interactive results visualization - Batch processing capabilities - Comprehensive configuration management """ import streamlit as st import sys import os from pathlib import Path # Add the current directory to Python path for imports sys.path.append(os.path.dirname(os.path.abspath(__file__))) # Import core modules from core.cache_manager import CacheManager from core.text_processor import TextProcessor from core.llm_analyzer import LLMAnalyzer from core.dataset_builder import DatasetBuilder from utils.config import ConfigManager from utils.ui_helpers import UIHelpers from utils.performance import PerformanceMonitor # Configure page settings st.set_page_config( page_title="NZ Legislation Loophole Analyzer", page_icon="âš–ī¸", layout="wide", initial_sidebar_state="expanded", menu_items={ 'Get Help': 'https://github.com/your-repo', 'Report a bug': 'https://github.com/your-repo/issues', 'About': ''' ## NZ Legislation Loophole Analyzer A powerful AI tool for analyzing New Zealand legislation to identify potential loopholes, ambiguities, and unintended consequences. **Version:** 1.0.0 **Built with:** Streamlit, Llama.cpp, and advanced caching ''' } ) # Initialize session state def initialize_session_state(): """Initialize all session state variables""" if 'cache_manager' not in st.session_state: st.session_state.cache_manager = CacheManager() if 'config_manager' not in st.session_state: st.session_state.config_manager = ConfigManager() if 'performance_monitor' not in st.session_state: st.session_state.performance_monitor = PerformanceMonitor() if 'current_analysis' not in st.session_state: st.session_state.current_analysis = None if 'analysis_results' not in st.session_state: st.session_state.analysis_results = [] if 'processing_status' not in st.session_state: st.session_state.processing_status = { 'is_running': False, 'progress': 0, 'current_task': '', 'total_chunks': 0, 'processed_chunks': 0 } def main(): """Main application function""" # Initialize session state initialize_session_state() # Create sidebar with navigation and status with st.sidebar: st.title("âš–ī¸ NZ Legislation Analyzer") st.markdown("---") # Navigation pages = { "🏠 Home": "home", "📤 Upload & Process": "upload", "📊 Analysis Results": "results", "âš™ī¸ Settings": "settings", "📈 Performance": "performance" } selected_page = st.selectbox( "Navigate to:", list(pages.keys()), key="nav_select" ) st.markdown("---") # Cache status with st.expander("🧠 Cache Status", expanded=True): cache_stats = st.session_state.cache_manager.get_stats() st.metric("Cache Hits", cache_stats['hits']) st.metric("Cache Misses", cache_stats['misses']) st.metric("Hit Rate", ".1f") st.metric("Cached Chunks", cache_stats['entries']) if st.button("Clear Cache", type="secondary"): st.session_state.cache_manager.clear_cache() st.rerun() # Performance metrics with st.expander("📊 Performance", expanded=True): perf_stats = st.session_state.performance_monitor.get_stats() st.metric("Memory Usage", ".1f") st.metric("Avg Processing Time", ".2f") # Processing status if st.session_state.processing_status['is_running']: with st.expander("🔄 Processing Status", expanded=True): st.progress(st.session_state.processing_status['progress']) st.text(st.session_state.processing_status['current_task']) st.text(f"Chunk {st.session_state.processing_status['processed_chunks']}/" f"{st.session_state.processing_status['total_chunks']}") # Main content area page = pages[selected_page] if page == "home": show_home_page() elif page == "upload": show_upload_page() elif page == "results": show_results_page() elif page == "settings": show_settings_page() elif page == "performance": show_performance_page() # Footer st.markdown("---") st.markdown( """
NZ Legislation Loophole Analyzer v1.0.0 | Built with Streamlit & Llama.cpp
""", unsafe_allow_html=True ) def show_home_page(): """Display the home page with overview and quick start""" st.title("🏠 NZ Legislation Loophole Analyzer") st.markdown("### AI-Powered Legal Analysis Tool") col1, col2 = st.columns([2, 1]) with col1: st.markdown(""" This advanced tool analyzes New Zealand legislation to identify: 🔍 **Potential Loopholes** - Legal ambiguities that could be exploited 📋 **Unintended Consequences** - Hidden implications in legislative language âš–ī¸ **Ambiguities** - Vague or unclear legal provisions đŸŽ¯ **Circumvention Strategies** - Ways legislation might be bypassed **Key Features:** - **Smart Caching**: Avoid re-processing identical content - **Advanced UI**: Modern interface with real-time progress - **Batch Processing**: Handle multiple legislation files - **Performance Monitoring**: Track memory usage and processing speed - **Export Options**: Multiple formats for analysis results """) st.markdown("### Quick Start") st.markdown(""" 1. **Upload** your NZ legislation files (JSON lines or raw text) 2. **Configure** analysis parameters and model settings 3. **Process** the legislation with AI-powered analysis 4. **Review** results with interactive visualizations 5. **Export** findings in multiple formats """) with col2: st.markdown("### Current Configuration") config = st.session_state.config_manager.get_config() # Model settings st.subheader("🤖 Model Settings") st.info(f"**Model:** {config['model']['path']}") st.info(f"**Context Length:** {config['model']['context_length']}") st.info(f"**Max Tokens:** {config['model']['max_tokens']}") # Processing settings st.subheader("âš™ī¸ Processing") st.info(f"**Chunk Size:** {config['processing']['chunk_size']}") st.info(f"**Overlap:** {config['processing']['chunk_overlap']}") st.info(f"**Batch Size:** {config['processing']['batch_size']}") # Cache settings st.subheader("🧠 Cache") cache_stats = st.session_state.cache_manager.get_stats() st.info(f"**Status:** {'Active' if cache_stats['enabled'] else 'Disabled'}") st.info(f"**Hit Rate:** {cache_stats['hit_rate']:.1f}%") if st.button("🚀 Start Analysis", type="primary", use_container_width=True): st.switch_page("pages/1_upload.py") def show_upload_page(): """Display the upload and processing page""" st.title("📤 Upload & Process Legislation") # File upload section st.subheader("📁 Upload Legislation Files") col1, col2 = st.columns([1, 1]) with col1: uploaded_files = st.file_uploader( "Select NZ legislation files", accept_multiple_files=True, type=['json', 'txt', 'jsonl'], help="Upload JSON lines format (.jsonl), JSON arrays (.json), or raw text (.txt) files" ) if uploaded_files: st.success(f"📄 {len(uploaded_files)} file(s) selected") # Show file details for file in uploaded_files: with st.expander(f"📋 {file.name}"): st.write(f"**Size:** {file.size:,} bytes") st.write(f"**Type:** {file.type}") # Preview content if file.type in ['text/plain', 'application/json']: content = file.read().decode('utf-8') st.text_area("Preview", content[:500] + "..." if len(content) > 500 else content, height=100, disabled=True) file.seek(0) # Reset file pointer with col2: # Processing configuration st.subheader("âš™ī¸ Processing Configuration") config = st.session_state.config_manager.get_config() # Model settings with st.expander("🤖 Model Configuration", expanded=True): model_path = st.text_input( "Model Path", value=config['model']['path'], help="Path to your GGUF model file" ) context_length = st.slider( "Context Length", min_value=1024, max_value=65536, value=config['model']['context_length'], step=1024, help="Maximum context length for the model" ) max_tokens = st.slider( "Max Response Tokens", min_value=256, max_value=4096, value=config['model']['max_tokens'], step=64, help="Maximum tokens in model response" ) # Text processing settings with st.expander("📝 Text Processing", expanded=True): chunk_size = st.slider( "Chunk Size", min_value=512, max_value=8192, value=config['processing']['chunk_size'], step=256, help="Size of text chunks for processing" ) chunk_overlap = st.slider( "Chunk Overlap", min_value=64, max_value=1024, value=config['processing']['chunk_overlap'], step=32, help="Overlap between chunks for context preservation" ) # Analysis settings with st.expander("🔍 Analysis Settings", expanded=True): analysis_depth = st.select_slider( "Analysis Depth", options=["Basic", "Standard", "Detailed", "Comprehensive"], value=config['analysis']['depth'], help="Level of detail in legal analysis" ) include_recommendations = st.checkbox( "Include Recommendations", value=config['analysis']['include_recommendations'], help="Generate specific recommendations for addressing identified issues" ) # Process button and status col1, col2, col3 = st.columns([1, 1, 1]) with col1: if st.button("🔄 Start Processing", type="primary", use_container_width=True): if not uploaded_files: st.error("Please upload at least one legislation file") else: start_processing(uploaded_files, { 'model': { 'path': model_path, 'context_length': context_length, 'max_tokens': max_tokens }, 'processing': { 'chunk_size': chunk_size, 'chunk_overlap': chunk_overlap }, 'analysis': { 'depth': analysis_depth, 'include_recommendations': include_recommendations } }) with col2: if st.button("âšī¸ Stop Processing", use_container_width=True): stop_processing() with col3: if st.button("📊 View Results", use_container_width=True): st.switch_page("pages/2_analysis.py") def start_processing(files, config): """Start the processing workflow""" st.session_state.processing_status = { 'is_running': True, 'progress': 0, 'current_task': 'Initializing...', 'total_chunks': 0, 'processed_chunks': 0 } # Update configuration st.session_state.config_manager.update_config(config) # TODO: Implement actual processing logic st.rerun() def stop_processing(): """Stop the current processing""" st.session_state.processing_status['is_running'] = False st.session_state.processing_status['current_task'] = 'Stopped by user' def show_results_page(): """Display analysis results page""" st.title("📊 Analysis Results") if not st.session_state.analysis_results: st.info("No analysis results available. Please upload and process legislation files first.") return # Results overview st.subheader("📈 Results Overview") col1, col2, col3, col4 = st.columns(4) total_results = len(st.session_state.analysis_results) total_loopholes = sum(len(result.get('loopholes', [])) for result in st.session_state.analysis_results) avg_confidence = sum(result.get('confidence', 0) for result in st.session_state.analysis_results) / max(total_results, 1) with col1: st.metric("Total Analyses", total_results) with col2: st.metric("Loopholes Found", total_loopholes) with col3: st.metric("Avg Confidence", ".2f") with col4: cache_stats = st.session_state.cache_manager.get_stats() st.metric("Cache Hit Rate", ".1f") # Results display st.subheader("🔍 Detailed Results") for i, result in enumerate(st.session_state.analysis_results): with st.expander(f"📋 Analysis {i+1}: {result.get('title', 'Unknown Title')}", expanded=i==0): col1, col2 = st.columns([2, 1]) with col1: st.markdown("**Summary:**") st.write(result.get('summary', 'No summary available')) st.markdown("**Key Findings:**") for finding in result.get('loopholes', []): st.markdown(f"- {finding}") with col2: st.metric("Confidence", ".2f") st.metric("Processing Time", ".2f") st.metric("Chunks Processed", result.get('chunks_processed', 0)) # Export options st.subheader("💾 Export Results") col1, col2, col3 = st.columns(3) with col1: if st.button("📄 Export as JSON", use_container_width=True): export_results('json') with col2: if st.button("📊 Export as CSV", use_container_width=True): export_results('csv') with col3: if st.button("📋 Export as Excel", use_container_width=True): export_results('excel') def export_results(format_type): """Export analysis results in specified format""" # TODO: Implement export functionality st.success(f"Results exported as {format_type.upper()}") def show_settings_page(): """Display settings page""" st.title("âš™ī¸ Settings & Configuration") tabs = st.tabs(["🤖 Model Settings", "📝 Processing", "🧠 Cache", "🎨 UI", "🔧 Advanced"]) with tabs[0]: st.subheader("🤖 Model Configuration") config = st.session_state.config_manager.get_config() model_path = st.text_input( "Model Path", value=config['model']['path'], help="Path to your GGUF model file" ) repo_id = st.text_input( "HuggingFace Repo ID", value=config['model']['repo_id'], help="HuggingFace repository ID for model download" ) filename = st.text_input( "Model Filename", value=config['model']['filename'], help="Specific model filename in the repository" ) context_length = st.slider( "Context Length", min_value=1024, max_value=131072, value=config['model']['context_length'], step=1024 ) max_tokens = st.slider( "Max Response Tokens", min_value=256, max_value=8192, value=config['model']['max_tokens'], step=64 ) temperature = st.slider( "Temperature", min_value=0.0, max_value=2.0, value=config['model']['temperature'], step=0.1, help="Controls randomness in model output" ) with tabs[1]: st.subheader("📝 Text Processing") chunk_size = st.slider( "Chunk Size", min_value=256, max_value=16384, value=config['processing']['chunk_size'], step=256 ) chunk_overlap = st.slider( "Chunk Overlap", min_value=32, max_value=2048, value=config['processing']['chunk_overlap'], step=32 ) batch_size = st.slider( "Batch Size", min_value=1, max_value=32, value=config['processing']['batch_size'], step=1 ) clean_text = st.checkbox( "Clean Text", value=config['processing']['clean_text'], help="Apply text cleaning and normalization" ) with tabs[2]: st.subheader("🧠 Cache Configuration") enable_cache = st.checkbox( "Enable Caching", value=config['cache']['enabled'], help="Use cache to avoid re-processing identical chunks" ) max_cache_size = st.slider( "Max Cache Size (MB)", min_value=100, max_value=8192, value=config['cache']['max_size_mb'], step=100 ) cache_ttl = st.slider( "Cache TTL (hours)", min_value=1, max_value=168, value=config['cache']['ttl_hours'], step=1, help="Time-to-live for cached entries" ) persistent_cache = st.checkbox( "Persistent Cache", value=config['cache']['persistent'], help="Save cache to disk for persistence across sessions" ) with tabs[3]: st.subheader("🎨 UI Configuration") theme = st.selectbox( "Theme", options=["Auto", "Light", "Dark"], index=["Auto", "Light", "Dark"].index(config['ui']['theme']) ) show_progress = st.checkbox( "Show Progress Bars", value=config['ui']['show_progress'], help="Display progress indicators during processing" ) auto_refresh = st.checkbox( "Auto-refresh Results", value=config['ui']['auto_refresh'], help="Automatically refresh results view" ) with tabs[4]: st.subheader("🔧 Advanced Settings") debug_mode = st.checkbox( "Debug Mode", value=config['advanced']['debug_mode'], help="Enable detailed logging and debugging information" ) log_level = st.selectbox( "Log Level", options=["DEBUG", "INFO", "WARNING", "ERROR"], index=["DEBUG", "INFO", "WARNING", "ERROR"].index(config['advanced']['log_level']) ) memory_limit = st.slider( "Memory Limit (MB)", min_value=512, max_value=32768, value=config['advanced']['memory_limit_mb'], step=512 ) # Save settings col1, col2 = st.columns([1, 1]) with col1: if st.button("💾 Save Settings", type="primary", use_container_width=True): new_config = { 'model': { 'path': model_path, 'repo_id': repo_id, 'filename': filename, 'context_length': context_length, 'max_tokens': max_tokens, 'temperature': temperature }, 'processing': { 'chunk_size': chunk_size, 'chunk_overlap': chunk_overlap, 'batch_size': batch_size, 'clean_text': clean_text }, 'cache': { 'enabled': enable_cache, 'max_size_mb': max_cache_size, 'ttl_hours': cache_ttl, 'persistent': persistent_cache }, 'ui': { 'theme': theme, 'show_progress': show_progress, 'auto_refresh': auto_refresh }, 'advanced': { 'debug_mode': debug_mode, 'log_level': log_level, 'memory_limit_mb': memory_limit } } st.session_state.config_manager.update_config(new_config) st.success("Settings saved successfully!") with col2: if st.button("🔄 Reset to Defaults", use_container_width=True): st.session_state.config_manager.reset_to_defaults() st.success("Settings reset to defaults!") st.rerun() def show_performance_page(): """Display performance monitoring page""" st.title("📈 Performance Dashboard") # Real-time metrics st.subheader("📊 Real-time Metrics") col1, col2, col3, col4 = st.columns(4) perf_stats = st.session_state.performance_monitor.get_stats() with col1: st.metric("Memory Usage", ".1f", "MB") with col2: st.metric("CPU Usage", ".1f", "%") with col3: st.metric("Active Threads", perf_stats.get('active_threads', 0)) with col4: cache_stats = st.session_state.cache_manager.get_stats() st.metric("Cache Hit Rate", ".1f", "%") # Performance charts st.subheader("📈 Performance History") # TODO: Add interactive charts for performance metrics # System information st.subheader("đŸ’ģ System Information") col1, col2 = st.columns(2) with col1: st.markdown("**Hardware:**") # TODO: Add system information display with col2: st.markdown("**Software:**") # TODO: Add software information display # Cache performance st.subheader("🧠 Cache Performance") cache_stats = st.session_state.cache_manager.get_stats() col1, col2, col3, col4 = st.columns(4) with col1: st.metric("Total Requests", cache_stats['hits'] + cache_stats['misses']) with col2: st.metric("Cache Hits", cache_stats['hits']) with col3: st.metric("Cache Misses", cache_stats['misses']) with col4: st.metric("Hit Rate", ".1f") # Performance recommendations st.subheader("💡 Performance Recommendations") recommendations = [] if cache_stats['hit_rate'] < 50: recommendations.append("Consider increasing cache size or adjusting chunk sizes to improve hit rate") if perf_stats.get('memory_usage_mb', 0) > 8000: recommendations.append("High memory usage detected. Consider reducing batch size or chunk size") if not recommendations: recommendations.append("Performance is optimal!") for rec in recommendations: st.info(rec) if __name__ == "__main__": main()