#!/usr/bin/env python3 """ Hugging Face Streamlit App for LLM Field Analyzer Upload a JSON file and analyze important fields with pattern generation. """ import streamlit as st import json from pathlib import Path from typing import Dict, Any import io import sys # Page configuration (MUST be first Streamlit command) st.set_page_config( page_title="Field Correlation Analyzer", page_icon="🤖", layout="wide" ) # Import modules silently from structure_analysis import ( detect_summary_fields, classify_data_structure, get_hierarchy_summary ) # Session state if 'analysis_result' not in st.session_state: st.session_state.analysis_result = None def analyze_with_llm(data: Dict[str, Any], target_field: str = "rotation_enabled") -> Dict[str, Any]: """ Analyze data and generate a prompt for LLM analysis. Returns structured analysis without requiring Ollama. """ print(f"DEBUG: Starting analysis with target_field: {target_field}") print(f"DEBUG: Data type: {type(data)}") print(f"DEBUG: Data keys: {list(data.keys()) if isinstance(data, dict) else 'Not a dict'}") # Detect summary fields print("DEBUG: Detecting summary fields...") summary_fields = detect_summary_fields(data) print(f"DEBUG: Found summary fields: {summary_fields}") print("DEBUG: Classifying data structure...") classification = classify_data_structure(data) print(f"DEBUG: Classification result: {classification}") print("DEBUG: Getting hierarchy summary...") hierarchy_summary = get_hierarchy_summary(data) print(f"DEBUG: Hierarchy summary: {hierarchy_summary}") # Extract samples print("DEBUG: Extracting samples...") sample_object = {} if 'results' in data: print("DEBUG: Found 'results' key in data") for section_name, section in data['results'].items(): print(f"DEBUG: Processing section '{section_name}': {type(section)}") if isinstance(section, list) and len(section) > 0: sample_object = section[0] print(f"DEBUG: Found sample object from list: {sample_object}") break elif isinstance(section, dict): for key, value in section.items(): if isinstance(value, list) and len(value) > 0: sample_object = value[0] if isinstance(value[0], dict) else {} print(f"DEBUG: Found sample object from dict list: {sample_object}") break else: print("DEBUG: No 'results' key found in data") summary_sample = data.get('results', {}).get('summary', {}) or data.get('summary', {}) print(f"DEBUG: Summary sample: {summary_sample}") # Count objects with target field def count_objects_with_field(obj, field_name): count = 0 if isinstance(obj, dict): if field_name in obj: count += 1 for v in obj.values(): count += count_objects_with_field(v, field_name) elif isinstance(obj, list): for item in obj: count += count_objects_with_field(item, field_name) return count print("DEBUG: Counting objects with target field...") total_objects = count_objects_with_field(data, target_field) print(f"DEBUG: Total objects with '{target_field}': {total_objects}") # Generate analysis print("DEBUG: Generating analysis...") analysis = { "summary_fields_detected": summary_fields[:10], "classification": classification, "hierarchy_summary": hierarchy_summary, "total_objects": total_objects, "sample_object": sample_object, "summary_sample": summary_sample, "recommended_fields": [] } print(f"DEBUG: Initial analysis: {analysis}") # Recommend fields based on priority print("DEBUG: Generating field recommendations...") if summary_fields: analysis["recommended_fields"].extend(summary_fields[:3]) print(f"DEBUG: Added summary fields: {summary_fields[:3]}") if classification.get('config_fields'): analysis["recommended_fields"].extend(classification['config_fields'][:2]) print(f"DEBUG: Added config fields: {classification['config_fields'][:2]}") if sample_object: target_related = [k for k in sample_object.keys() if target_field in k.lower()] analysis["recommended_fields"].extend(target_related) print(f"DEBUG: Added target-related fields: {target_related}") print(f"DEBUG: Final recommended fields: {analysis['recommended_fields']}") print("DEBUG: Analysis completed successfully") return analysis def generate_regex_patterns(field_names: list, data_sample: dict, summary_sample: dict) -> list: """Generate regex patterns for given fields.""" patterns = [] for field in field_names: # Try to find the field value type field_lower = field.lower() # Check in summary first if 'summary' in str(field): field_name = field.split('.')[-1] # Boolean pattern if field_name in summary_sample and isinstance(summary_sample.get(field_name), bool): patterns.append(f'"summary.{field_name}"\\s*:\\s*(true|false)') # Number pattern elif isinstance(summary_sample.get(field_name), (int, float)): patterns.append(f'"summary.{field_name}"\\s*:\\s*(\\d+)') # Check in object elif field in data_sample: value = data_sample[field] if isinstance(value, bool): patterns.append(f'"{field}"\\s*:\\s*(true|false)') elif isinstance(value, (int, float)): patterns.append(f'"{field}"\\s*:\\s*(\\d+)') elif isinstance(value, str): patterns.append(f'"{field}"\\s*:\\s*"([^"]*)"') else: # Generic pattern based on field name if 'percentage' in field_lower or 'count' in field_lower or 'total' in field_lower: patterns.append(f'"{field}"\\s*:\\s*(\\d+)') elif 'enabled' in field_lower or 'enforced' in field_lower: patterns.append(f'"{field}"\\s*:\\s*(true|false)') else: patterns.append(f'"{field}"\\s*:\\s*"([^"]*)"') return patterns def main(): """Main application.""" st.title("Field Analyzer") # Upload method selection upload_method = st.radio( "", ["File Upload", "Text Paste"], horizontal=True, key="upload_method" ) uploaded_file = None pasted_content = None if upload_method == "File Upload": uploaded_file = st.file_uploader( "Upload JSON file", type=['json'], key="json_file_uploader" ) else: pasted_content = st.text_area( "Paste JSON", height=150, key="pasted_json" ) # Process either uploaded file or pasted content content_str = None file_name = None if upload_method == "Text Paste" and pasted_content: content_str = pasted_content file_name = "pasted_content.json" elif uploaded_file is not None: file_name = uploaded_file.name if content_str or uploaded_file is not None: try: if not content_str: # Read from uploaded file uploaded_file.seek(0) content = uploaded_file.read() uploaded_file.seek(0) if len(content) == 0: st.error("File is empty") return try: content_str = content.decode('utf-8') except UnicodeDecodeError: st.error("File encoding error") return data = json.loads(content_str) st.success(f"Loaded: {file_name}") with st.sidebar: target_field = st.text_input("Target Field", value="rotation_enabled") if st.button("Analyze", type="primary"): with st.spinner("Analyzing..."): try: analysis_result = analyze_with_llm(data, target_field) st.session_state.analysis_result = analysis_result st.session_state.data = data except Exception as e: st.error(f"Analysis failed: {e}") # Display results if available if st.session_state.analysis_result: analysis = st.session_state.analysis_result # Summary metrics col1, col2, col3, col4 = st.columns(4) with col1: st.metric("Summary Fields", len(analysis['summary_fields_detected'])) with col2: st.metric("Total Objects", analysis['total_objects']) with col3: st.metric("Has Summary", "Yes" if analysis['hierarchy_summary']['has_summary'] else "No") with col4: st.metric("Config Fields", len(analysis['classification'].get('config_fields', []))) st.markdown("---") tab1, tab2, tab3, tab4, tab5 = st.tabs([ "Analysis", "Fields", "Patterns", "Data", "Debug" ]) with tab1: if analysis['summary_fields_detected']: st.write("**Summary Fields**") for field in analysis['summary_fields_detected'][:10]: st.write(f"`{field}`") config_fields = analysis['classification'].get('config_fields', []) if config_fields: st.write("**Config Fields**") for field in config_fields[:10]: st.write(f"`{field}`") object_arrays = analysis['classification'].get('object_arrays', []) if object_arrays: st.write("**Object Arrays**") for field in object_arrays[:5]: st.write(f"`{field}`") with st.expander("Summary Sample"): st.json(analysis['summary_sample']) with st.expander("Object Sample"): st.json(analysis['sample_object']) with tab2: if analysis['recommended_fields']: selected_fields = st.multiselect( "Select fields:", analysis['recommended_fields'], default=analysis['recommended_fields'][:3] ) if selected_fields and st.button("Generate"): patterns = generate_regex_patterns( selected_fields, analysis['sample_object'], analysis['summary_sample'] ) st.session_state.generated_patterns = { 'fields': selected_fields, 'patterns': patterns } with tab3: if 'generated_patterns' in st.session_state: patterns_data = st.session_state.generated_patterns for field, pattern in zip(patterns_data['fields'], patterns_data['patterns']): st.write(f"**{field}**") st.code(pattern) st.write("") all_patterns = "\n".join(patterns_data['patterns']) st.text_area("All Patterns:", all_patterns, height=100) export_data = { "fields": patterns_data['fields'], "patterns": patterns_data['patterns'] } st.download_button( "Download JSON", data=json.dumps(export_data, indent=2), file_name="analysis.json", mime="application/json" ) with tab4: st.json(data) st.download_button( "Download Raw", data=json.dumps(data, indent=2), file_name="raw.json", mime="application/json" ) with tab5: col1, col2 = st.columns(2) with col1: st.write("**Upload**") st.text(f"File: {uploaded_file.name if uploaded_file else 'N/A'}") st.text(f"Size: {uploaded_file.size if uploaded_file else 0} bytes") st.text(f"Streamlit: {st.__version__}") with col2: st.write("**Analysis**") if st.session_state.get('analysis_result'): a = st.session_state.analysis_result st.text(f"Fields: {len(a.get('summary_fields_detected', []))}") st.text(f"Objects: {a.get('total_objects', 0)}") except json.JSONDecodeError as e: st.error(f"Invalid JSON: {e}") except Exception as e: st.error(f"Error: {e}") if __name__ == "__main__": main()