Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Hugging Face Streamlit App for LLM Field Analyzer | |
| Upload a JSON file and analyze important fields with pattern generation. | |
| """ | |
| import streamlit as st | |
| import json | |
| from pathlib import Path | |
| from typing import Dict, Any | |
| import io | |
| import sys | |
| # Page configuration (MUST be first Streamlit command) | |
| st.set_page_config( | |
| page_title="Field Correlation Analyzer", | |
| page_icon="🤖", | |
| layout="wide" | |
| ) | |
| # Import modules silently | |
| from structure_analysis import ( | |
| detect_summary_fields, | |
| classify_data_structure, | |
| get_hierarchy_summary | |
| ) | |
| # Session state | |
| if 'analysis_result' not in st.session_state: | |
| st.session_state.analysis_result = None | |
| def analyze_with_llm(data: Dict[str, Any], target_field: str = "rotation_enabled") -> Dict[str, Any]: | |
| """ | |
| Analyze data and generate a prompt for LLM analysis. | |
| Returns structured analysis without requiring Ollama. | |
| """ | |
| print(f"DEBUG: Starting analysis with target_field: {target_field}") | |
| print(f"DEBUG: Data type: {type(data)}") | |
| print(f"DEBUG: Data keys: {list(data.keys()) if isinstance(data, dict) else 'Not a dict'}") | |
| # Detect summary fields | |
| print("DEBUG: Detecting summary fields...") | |
| summary_fields = detect_summary_fields(data) | |
| print(f"DEBUG: Found summary fields: {summary_fields}") | |
| print("DEBUG: Classifying data structure...") | |
| classification = classify_data_structure(data) | |
| print(f"DEBUG: Classification result: {classification}") | |
| print("DEBUG: Getting hierarchy summary...") | |
| hierarchy_summary = get_hierarchy_summary(data) | |
| print(f"DEBUG: Hierarchy summary: {hierarchy_summary}") | |
| # Extract samples | |
| print("DEBUG: Extracting samples...") | |
| sample_object = {} | |
| if 'results' in data: | |
| print("DEBUG: Found 'results' key in data") | |
| for section_name, section in data['results'].items(): | |
| print(f"DEBUG: Processing section '{section_name}': {type(section)}") | |
| if isinstance(section, list) and len(section) > 0: | |
| sample_object = section[0] | |
| print(f"DEBUG: Found sample object from list: {sample_object}") | |
| break | |
| elif isinstance(section, dict): | |
| for key, value in section.items(): | |
| if isinstance(value, list) and len(value) > 0: | |
| sample_object = value[0] if isinstance(value[0], dict) else {} | |
| print(f"DEBUG: Found sample object from dict list: {sample_object}") | |
| break | |
| else: | |
| print("DEBUG: No 'results' key found in data") | |
| summary_sample = data.get('results', {}).get('summary', {}) or data.get('summary', {}) | |
| print(f"DEBUG: Summary sample: {summary_sample}") | |
| # Count objects with target field | |
| def count_objects_with_field(obj, field_name): | |
| count = 0 | |
| if isinstance(obj, dict): | |
| if field_name in obj: | |
| count += 1 | |
| for v in obj.values(): | |
| count += count_objects_with_field(v, field_name) | |
| elif isinstance(obj, list): | |
| for item in obj: | |
| count += count_objects_with_field(item, field_name) | |
| return count | |
| print("DEBUG: Counting objects with target field...") | |
| total_objects = count_objects_with_field(data, target_field) | |
| print(f"DEBUG: Total objects with '{target_field}': {total_objects}") | |
| # Generate analysis | |
| print("DEBUG: Generating analysis...") | |
| analysis = { | |
| "summary_fields_detected": summary_fields[:10], | |
| "classification": classification, | |
| "hierarchy_summary": hierarchy_summary, | |
| "total_objects": total_objects, | |
| "sample_object": sample_object, | |
| "summary_sample": summary_sample, | |
| "recommended_fields": [] | |
| } | |
| print(f"DEBUG: Initial analysis: {analysis}") | |
| # Recommend fields based on priority | |
| print("DEBUG: Generating field recommendations...") | |
| if summary_fields: | |
| analysis["recommended_fields"].extend(summary_fields[:3]) | |
| print(f"DEBUG: Added summary fields: {summary_fields[:3]}") | |
| if classification.get('config_fields'): | |
| analysis["recommended_fields"].extend(classification['config_fields'][:2]) | |
| print(f"DEBUG: Added config fields: {classification['config_fields'][:2]}") | |
| if sample_object: | |
| target_related = [k for k in sample_object.keys() if target_field in k.lower()] | |
| analysis["recommended_fields"].extend(target_related) | |
| print(f"DEBUG: Added target-related fields: {target_related}") | |
| print(f"DEBUG: Final recommended fields: {analysis['recommended_fields']}") | |
| print("DEBUG: Analysis completed successfully") | |
| return analysis | |
| def generate_regex_patterns(field_names: list, data_sample: dict, summary_sample: dict) -> list: | |
| """Generate regex patterns for given fields.""" | |
| patterns = [] | |
| for field in field_names: | |
| # Try to find the field value type | |
| field_lower = field.lower() | |
| # Check in summary first | |
| if 'summary' in str(field): | |
| field_name = field.split('.')[-1] | |
| # Boolean pattern | |
| if field_name in summary_sample and isinstance(summary_sample.get(field_name), bool): | |
| patterns.append(f'"summary.{field_name}"\\s*:\\s*(true|false)') | |
| # Number pattern | |
| elif isinstance(summary_sample.get(field_name), (int, float)): | |
| patterns.append(f'"summary.{field_name}"\\s*:\\s*(\\d+)') | |
| # Check in object | |
| elif field in data_sample: | |
| value = data_sample[field] | |
| if isinstance(value, bool): | |
| patterns.append(f'"{field}"\\s*:\\s*(true|false)') | |
| elif isinstance(value, (int, float)): | |
| patterns.append(f'"{field}"\\s*:\\s*(\\d+)') | |
| elif isinstance(value, str): | |
| patterns.append(f'"{field}"\\s*:\\s*"([^"]*)"') | |
| else: | |
| # Generic pattern based on field name | |
| if 'percentage' in field_lower or 'count' in field_lower or 'total' in field_lower: | |
| patterns.append(f'"{field}"\\s*:\\s*(\\d+)') | |
| elif 'enabled' in field_lower or 'enforced' in field_lower: | |
| patterns.append(f'"{field}"\\s*:\\s*(true|false)') | |
| else: | |
| patterns.append(f'"{field}"\\s*:\\s*"([^"]*)"') | |
| return patterns | |
| def main(): | |
| """Main application.""" | |
| st.title("Field Analyzer") | |
| # Upload method selection | |
| upload_method = st.radio( | |
| "", | |
| ["File Upload", "Text Paste"], | |
| horizontal=True, | |
| key="upload_method" | |
| ) | |
| uploaded_file = None | |
| pasted_content = None | |
| if upload_method == "File Upload": | |
| uploaded_file = st.file_uploader( | |
| "Upload JSON file", | |
| type=['json'], | |
| key="json_file_uploader" | |
| ) | |
| else: | |
| pasted_content = st.text_area( | |
| "Paste JSON", | |
| height=150, | |
| key="pasted_json" | |
| ) | |
| # Process either uploaded file or pasted content | |
| content_str = None | |
| file_name = None | |
| if upload_method == "Text Paste" and pasted_content: | |
| content_str = pasted_content | |
| file_name = "pasted_content.json" | |
| elif uploaded_file is not None: | |
| file_name = uploaded_file.name | |
| if content_str or uploaded_file is not None: | |
| try: | |
| if not content_str: | |
| # Read from uploaded file | |
| uploaded_file.seek(0) | |
| content = uploaded_file.read() | |
| uploaded_file.seek(0) | |
| if len(content) == 0: | |
| st.error("File is empty") | |
| return | |
| try: | |
| content_str = content.decode('utf-8') | |
| except UnicodeDecodeError: | |
| st.error("File encoding error") | |
| return | |
| data = json.loads(content_str) | |
| st.success(f"Loaded: {file_name}") | |
| with st.sidebar: | |
| target_field = st.text_input("Target Field", value="rotation_enabled") | |
| if st.button("Analyze", type="primary"): | |
| with st.spinner("Analyzing..."): | |
| try: | |
| analysis_result = analyze_with_llm(data, target_field) | |
| st.session_state.analysis_result = analysis_result | |
| st.session_state.data = data | |
| except Exception as e: | |
| st.error(f"Analysis failed: {e}") | |
| # Display results if available | |
| if st.session_state.analysis_result: | |
| analysis = st.session_state.analysis_result | |
| # Summary metrics | |
| col1, col2, col3, col4 = st.columns(4) | |
| with col1: | |
| st.metric("Summary Fields", len(analysis['summary_fields_detected'])) | |
| with col2: | |
| st.metric("Total Objects", analysis['total_objects']) | |
| with col3: | |
| st.metric("Has Summary", "Yes" if analysis['hierarchy_summary']['has_summary'] else "No") | |
| with col4: | |
| st.metric("Config Fields", len(analysis['classification'].get('config_fields', []))) | |
| st.markdown("---") | |
| tab1, tab2, tab3, tab4, tab5 = st.tabs([ | |
| "Analysis", | |
| "Fields", | |
| "Patterns", | |
| "Data", | |
| "Debug" | |
| ]) | |
| with tab1: | |
| if analysis['summary_fields_detected']: | |
| st.write("**Summary Fields**") | |
| for field in analysis['summary_fields_detected'][:10]: | |
| st.write(f"`{field}`") | |
| config_fields = analysis['classification'].get('config_fields', []) | |
| if config_fields: | |
| st.write("**Config Fields**") | |
| for field in config_fields[:10]: | |
| st.write(f"`{field}`") | |
| object_arrays = analysis['classification'].get('object_arrays', []) | |
| if object_arrays: | |
| st.write("**Object Arrays**") | |
| for field in object_arrays[:5]: | |
| st.write(f"`{field}`") | |
| with st.expander("Summary Sample"): | |
| st.json(analysis['summary_sample']) | |
| with st.expander("Object Sample"): | |
| st.json(analysis['sample_object']) | |
| with tab2: | |
| if analysis['recommended_fields']: | |
| selected_fields = st.multiselect( | |
| "Select fields:", | |
| analysis['recommended_fields'], | |
| default=analysis['recommended_fields'][:3] | |
| ) | |
| if selected_fields and st.button("Generate"): | |
| patterns = generate_regex_patterns( | |
| selected_fields, | |
| analysis['sample_object'], | |
| analysis['summary_sample'] | |
| ) | |
| st.session_state.generated_patterns = { | |
| 'fields': selected_fields, | |
| 'patterns': patterns | |
| } | |
| with tab3: | |
| if 'generated_patterns' in st.session_state: | |
| patterns_data = st.session_state.generated_patterns | |
| for field, pattern in zip(patterns_data['fields'], patterns_data['patterns']): | |
| st.write(f"**{field}**") | |
| st.code(pattern) | |
| st.write("") | |
| all_patterns = "\n".join(patterns_data['patterns']) | |
| st.text_area("All Patterns:", all_patterns, height=100) | |
| export_data = { | |
| "fields": patterns_data['fields'], | |
| "patterns": patterns_data['patterns'] | |
| } | |
| st.download_button( | |
| "Download JSON", | |
| data=json.dumps(export_data, indent=2), | |
| file_name="analysis.json", | |
| mime="application/json" | |
| ) | |
| with tab4: | |
| st.json(data) | |
| st.download_button( | |
| "Download Raw", | |
| data=json.dumps(data, indent=2), | |
| file_name="raw.json", | |
| mime="application/json" | |
| ) | |
| with tab5: | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.write("**Upload**") | |
| st.text(f"File: {uploaded_file.name if uploaded_file else 'N/A'}") | |
| st.text(f"Size: {uploaded_file.size if uploaded_file else 0} bytes") | |
| st.text(f"Streamlit: {st.__version__}") | |
| with col2: | |
| st.write("**Analysis**") | |
| if st.session_state.get('analysis_result'): | |
| a = st.session_state.analysis_result | |
| st.text(f"Fields: {len(a.get('summary_fields_detected', []))}") | |
| st.text(f"Objects: {a.get('total_objects', 0)}") | |
| except json.JSONDecodeError as e: | |
| st.error(f"Invalid JSON: {e}") | |
| except Exception as e: | |
| st.error(f"Error: {e}") | |
| if __name__ == "__main__": | |
| main() | |