#!/usr/bin/env python3 """ Hugging Face Streamlit App for LLM Field Analyzer Upload a JSON file and analyze important fields with pattern generation. """ import streamlit as st import json from pathlib import Path from typing import Dict, Any import io # Page configuration st.set_page_config( page_title="Field Correlation Analyzer", page_icon="🤖", layout="wide" ) # Import our modules try: from structure_analysis import ( detect_summary_fields, classify_data_structure, get_hierarchy_summary ) except ImportError: st.error("⚠️ structure_analysis.py not found. Make sure all files are uploaded.") st.stop() # Session state if 'analysis_result' not in st.session_state: st.session_state.analysis_result = None def analyze_with_llm(data: Dict[str, Any], target_field: str = "rotation_enabled") -> Dict[str, Any]: """ Analyze data and generate a prompt for LLM analysis. Returns structured analysis without requiring Ollama. """ # Detect summary fields summary_fields = detect_summary_fields(data) classification = classify_data_structure(data) hierarchy_summary = get_hierarchy_summary(data) # Extract samples sample_object = {} if 'results' in data: for section in data['results'].values(): if isinstance(section, list) and len(section) > 0: sample_object = section[0] break elif isinstance(section, dict): for key, value in section.items(): if isinstance(value, list) and len(value) > 0: sample_object = value[0] if isinstance(value[0], dict) else {} break summary_sample = data.get('results', {}).get('summary', {}) or data.get('summary', {}) # Count objects with target field def count_objects_with_field(obj, field_name): count = 0 if isinstance(obj, dict): if field_name in obj: count += 1 for v in obj.values(): count += count_objects_with_field(v, field_name) elif isinstance(obj, list): for item in obj: count += count_objects_with_field(item, field_name) return count total_objects = count_objects_with_field(data, target_field) # Generate analysis analysis = { "summary_fields_detected": summary_fields[:10], "classification": classification, "hierarchy_summary": hierarchy_summary, "total_objects": total_objects, "sample_object": sample_object, "summary_sample": summary_sample, "recommended_fields": [] } # Recommend fields based on priority if summary_fields: analysis["recommended_fields"].extend(summary_fields[:3]) if classification.get('config_fields'): analysis["recommended_fields"].extend(classification['config_fields'][:2]) if sample_object: analysis["recommended_fields"].extend([k for k in sample_object.keys() if target_field in k.lower()]) return analysis def generate_regex_patterns(field_names: list, data_sample: dict, summary_sample: dict) -> list: """Generate regex patterns for given fields.""" patterns = [] for field in field_names: # Try to find the field value type field_lower = field.lower() # Check in summary first if 'summary' in str(field): field_name = field.split('.')[-1] # Boolean pattern if field_name in summary_sample and isinstance(summary_sample.get(field_name), bool): patterns.append(f'"summary.{field_name}"\\s*:\\s*(true|false)') # Number pattern elif isinstance(summary_sample.get(field_name), (int, float)): patterns.append(f'"summary.{field_name}"\\s*:\\s*(\\d+)') # Check in object elif field in data_sample: value = data_sample[field] if isinstance(value, bool): patterns.append(f'"{field}"\\s*:\\s*(true|false)') elif isinstance(value, (int, float)): patterns.append(f'"{field}"\\s*:\\s*(\\d+)') elif isinstance(value, str): patterns.append(f'"{field}"\\s*:\\s*"([^"]*)"') else: # Generic pattern based on field name if 'percentage' in field_lower or 'count' in field_lower or 'total' in field_lower: patterns.append(f'"{field}"\\s*:\\s*(\\d+)') elif 'enabled' in field_lower or 'enforced' in field_lower: patterns.append(f'"{field}"\\s*:\\s*(true|false)') else: patterns.append(f'"{field}"\\s*:\\s*"([^"]*)"') return patterns def main(): """Main application.""" st.title("🤖 Field Correlation Analyzer") st.markdown("Upload a JSON file to analyze important fields and generate regex patterns") # File upload uploaded_file = st.file_uploader( "Choose a JSON file", type=['json'], help="Upload a JSON file with structured data" ) if uploaded_file is not None: # Read and parse JSON try: content = uploaded_file.read() data = json.loads(content) st.success("✅ File loaded successfully!") # Sidebar for settings with st.sidebar: st.header("⚙️ Settings") # Target field input target_field = st.text_input( "Target Field", value="rotation_enabled", help="The field you want to analyze" ) # Analyze button if st.button("🔍 Analyze", type="primary"): with st.spinner("Analyzing data structure..."): analysis_result = analyze_with_llm(data, target_field) st.session_state.analysis_result = analysis_result st.session_state.data = data # Display results if available if st.session_state.analysis_result: analysis = st.session_state.analysis_result # Summary metrics col1, col2, col3, col4 = st.columns(4) with col1: st.metric("Summary Fields", len(analysis['summary_fields_detected'])) with col2: st.metric("Total Objects", analysis['total_objects']) with col3: st.metric("Has Summary", "Yes" if analysis['hierarchy_summary']['has_summary'] else "No") with col4: st.metric("Config Fields", len(analysis['classification'].get('config_fields', []))) st.markdown("---") # Create tabs tab1, tab2, tab3, tab4 = st.tabs([ "📊 Structure Analysis", "🎯 Field Recommendations", "📝 Generated Patterns", "📄 Raw Data" ]) with tab1: st.subheader("Data Hierarchy") # Summary fields if analysis['summary_fields_detected']: st.markdown("#### Level 1: Summary/Aggregate Fields (Highest Priority)") for field in analysis['summary_fields_detected'][:10]: st.write(f"✓ `{field}`") # Config fields config_fields = analysis['classification'].get('config_fields', []) if config_fields: st.markdown("#### Level 2: Configuration/Compliance Fields") for field in config_fields[:10]: st.write(f"✓ `{field}`") # Object arrays object_arrays = analysis['classification'].get('object_arrays', []) if object_arrays: st.markdown("#### Level 3: Object Arrays") for field in object_arrays[:5]: st.write(f"✓ `{field}`") # Show sample data with st.expander("📋 View Summary Data Sample"): st.json(analysis['summary_sample']) with st.expander("📋 View Object Data Sample"): st.json(analysis['sample_object']) with tab2: st.subheader("Recommended Fields for Analysis") if analysis['recommended_fields']: st.info("These fields are recommended based on the data hierarchy and target field.") # Let user select fields selected_fields = st.multiselect( "Select fields to generate patterns for:", analysis['recommended_fields'], default=analysis['recommended_fields'][:3] ) if selected_fields and st.button("Generate Patterns"): patterns = generate_regex_patterns( selected_fields, analysis['sample_object'], analysis['summary_sample'] ) st.session_state.generated_patterns = { 'fields': selected_fields, 'patterns': patterns } else: st.warning("No recommended fields found.") with tab3: if 'generated_patterns' in st.session_state: patterns_data = st.session_state.generated_patterns st.subheader("Generated Regex Patterns") # Show patterns for i, (field, pattern) in enumerate(zip(patterns_data['fields'], patterns_data['patterns']), 1): st.markdown(f"**Pattern {i}: {field}**") st.code(pattern, language="regex", line_numbers=False) st.markdown("---") # Copy to clipboard all_patterns = "\n".join(patterns_data['patterns']) st.text_area( "All Patterns (copy this):", all_patterns, height=100 ) # JSON export export_data = { "test_name": "Field Analysis", "important_fields": patterns_data['fields'], "reasoning": "Fields identified using hierarchical analysis prioritizing summary/aggregate fields", "generated_regex": patterns_data['patterns'] } st.download_button( label="📥 Download as JSON", data=json.dumps(export_data, indent=2), file_name="analysis_result.json", mime="application/json" ) else: st.info("👆 Go to 'Field Recommendations' tab to select fields and generate patterns.") with tab4: st.subheader("Raw Data Structure") # Full data viewer st.json(data) # Download raw data st.download_button( label="📥 Download Raw Data", data=json.dumps(data, indent=2), file_name="raw_data.json", mime="application/json" ) except json.JSONDecodeError as e: st.error(f"❌ Invalid JSON file: {e}") except Exception as e: st.error(f"❌ Error processing file: {e}") else: # Show example when no file uploaded st.info("👆 Please upload a JSON file to begin analysis") with st.expander("📖 How to use"): st.markdown(""" **Steps:** 1. Upload a JSON file with structured data 2. Set the target field you want to analyze (e.g., `rotation_enabled`) 3. Click "Analyze" to process the data 4. Review the structure analysis and field recommendations 5. Select fields and generate regex patterns 6. Download the results as JSON **What this tool does:** - Detects summary/aggregate fields automatically - Classifies data structure by hierarchy levels - Recommends important fields for validation - Generates regex patterns for field extraction """) with st.expander("📋 Example JSON Structure"): example = { "results": { "summary": { "total_keys": 13, "rotated_keys": 6, "rotation_percentage": 46 }, "kms_keys": { "object": [ { "key_id": "12345", "rotation_enabled": True, "key_state": "Enabled" } ] } } } st.json(example) if __name__ == "__main__": main()