Paramify-test / src /streamlit_app.py
bluestpanda
Simplify UI design - minimal clean interface
6269828
#!/usr/bin/env python3
"""
Hugging Face Streamlit App for LLM Field Analyzer
Upload a JSON file and analyze important fields with pattern generation.
"""
import streamlit as st
import json
from pathlib import Path
from typing import Dict, Any
import io
import sys
# Page configuration (MUST be first Streamlit command)
st.set_page_config(
page_title="Field Correlation Analyzer",
page_icon="🤖",
layout="wide"
)
# Import modules silently
from structure_analysis import (
detect_summary_fields,
classify_data_structure,
get_hierarchy_summary
)
# Session state
if 'analysis_result' not in st.session_state:
st.session_state.analysis_result = None
def analyze_with_llm(data: Dict[str, Any], target_field: str = "rotation_enabled") -> Dict[str, Any]:
"""
Analyze data and generate a prompt for LLM analysis.
Returns structured analysis without requiring Ollama.
"""
print(f"DEBUG: Starting analysis with target_field: {target_field}")
print(f"DEBUG: Data type: {type(data)}")
print(f"DEBUG: Data keys: {list(data.keys()) if isinstance(data, dict) else 'Not a dict'}")
# Detect summary fields
print("DEBUG: Detecting summary fields...")
summary_fields = detect_summary_fields(data)
print(f"DEBUG: Found summary fields: {summary_fields}")
print("DEBUG: Classifying data structure...")
classification = classify_data_structure(data)
print(f"DEBUG: Classification result: {classification}")
print("DEBUG: Getting hierarchy summary...")
hierarchy_summary = get_hierarchy_summary(data)
print(f"DEBUG: Hierarchy summary: {hierarchy_summary}")
# Extract samples
print("DEBUG: Extracting samples...")
sample_object = {}
if 'results' in data:
print("DEBUG: Found 'results' key in data")
for section_name, section in data['results'].items():
print(f"DEBUG: Processing section '{section_name}': {type(section)}")
if isinstance(section, list) and len(section) > 0:
sample_object = section[0]
print(f"DEBUG: Found sample object from list: {sample_object}")
break
elif isinstance(section, dict):
for key, value in section.items():
if isinstance(value, list) and len(value) > 0:
sample_object = value[0] if isinstance(value[0], dict) else {}
print(f"DEBUG: Found sample object from dict list: {sample_object}")
break
else:
print("DEBUG: No 'results' key found in data")
summary_sample = data.get('results', {}).get('summary', {}) or data.get('summary', {})
print(f"DEBUG: Summary sample: {summary_sample}")
# Count objects with target field
def count_objects_with_field(obj, field_name):
count = 0
if isinstance(obj, dict):
if field_name in obj:
count += 1
for v in obj.values():
count += count_objects_with_field(v, field_name)
elif isinstance(obj, list):
for item in obj:
count += count_objects_with_field(item, field_name)
return count
print("DEBUG: Counting objects with target field...")
total_objects = count_objects_with_field(data, target_field)
print(f"DEBUG: Total objects with '{target_field}': {total_objects}")
# Generate analysis
print("DEBUG: Generating analysis...")
analysis = {
"summary_fields_detected": summary_fields[:10],
"classification": classification,
"hierarchy_summary": hierarchy_summary,
"total_objects": total_objects,
"sample_object": sample_object,
"summary_sample": summary_sample,
"recommended_fields": []
}
print(f"DEBUG: Initial analysis: {analysis}")
# Recommend fields based on priority
print("DEBUG: Generating field recommendations...")
if summary_fields:
analysis["recommended_fields"].extend(summary_fields[:3])
print(f"DEBUG: Added summary fields: {summary_fields[:3]}")
if classification.get('config_fields'):
analysis["recommended_fields"].extend(classification['config_fields'][:2])
print(f"DEBUG: Added config fields: {classification['config_fields'][:2]}")
if sample_object:
target_related = [k for k in sample_object.keys() if target_field in k.lower()]
analysis["recommended_fields"].extend(target_related)
print(f"DEBUG: Added target-related fields: {target_related}")
print(f"DEBUG: Final recommended fields: {analysis['recommended_fields']}")
print("DEBUG: Analysis completed successfully")
return analysis
def generate_regex_patterns(field_names: list, data_sample: dict, summary_sample: dict) -> list:
"""Generate regex patterns for given fields."""
patterns = []
for field in field_names:
# Try to find the field value type
field_lower = field.lower()
# Check in summary first
if 'summary' in str(field):
field_name = field.split('.')[-1]
# Boolean pattern
if field_name in summary_sample and isinstance(summary_sample.get(field_name), bool):
patterns.append(f'"summary.{field_name}"\\s*:\\s*(true|false)')
# Number pattern
elif isinstance(summary_sample.get(field_name), (int, float)):
patterns.append(f'"summary.{field_name}"\\s*:\\s*(\\d+)')
# Check in object
elif field in data_sample:
value = data_sample[field]
if isinstance(value, bool):
patterns.append(f'"{field}"\\s*:\\s*(true|false)')
elif isinstance(value, (int, float)):
patterns.append(f'"{field}"\\s*:\\s*(\\d+)')
elif isinstance(value, str):
patterns.append(f'"{field}"\\s*:\\s*"([^"]*)"')
else:
# Generic pattern based on field name
if 'percentage' in field_lower or 'count' in field_lower or 'total' in field_lower:
patterns.append(f'"{field}"\\s*:\\s*(\\d+)')
elif 'enabled' in field_lower or 'enforced' in field_lower:
patterns.append(f'"{field}"\\s*:\\s*(true|false)')
else:
patterns.append(f'"{field}"\\s*:\\s*"([^"]*)"')
return patterns
def main():
"""Main application."""
st.title("Field Analyzer")
# Upload method selection
upload_method = st.radio(
"",
["File Upload", "Text Paste"],
horizontal=True,
key="upload_method"
)
uploaded_file = None
pasted_content = None
if upload_method == "File Upload":
uploaded_file = st.file_uploader(
"Upload JSON file",
type=['json'],
key="json_file_uploader"
)
else:
pasted_content = st.text_area(
"Paste JSON",
height=150,
key="pasted_json"
)
# Process either uploaded file or pasted content
content_str = None
file_name = None
if upload_method == "Text Paste" and pasted_content:
content_str = pasted_content
file_name = "pasted_content.json"
elif uploaded_file is not None:
file_name = uploaded_file.name
if content_str or uploaded_file is not None:
try:
if not content_str:
# Read from uploaded file
uploaded_file.seek(0)
content = uploaded_file.read()
uploaded_file.seek(0)
if len(content) == 0:
st.error("File is empty")
return
try:
content_str = content.decode('utf-8')
except UnicodeDecodeError:
st.error("File encoding error")
return
data = json.loads(content_str)
st.success(f"Loaded: {file_name}")
with st.sidebar:
target_field = st.text_input("Target Field", value="rotation_enabled")
if st.button("Analyze", type="primary"):
with st.spinner("Analyzing..."):
try:
analysis_result = analyze_with_llm(data, target_field)
st.session_state.analysis_result = analysis_result
st.session_state.data = data
except Exception as e:
st.error(f"Analysis failed: {e}")
# Display results if available
if st.session_state.analysis_result:
analysis = st.session_state.analysis_result
# Summary metrics
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Summary Fields", len(analysis['summary_fields_detected']))
with col2:
st.metric("Total Objects", analysis['total_objects'])
with col3:
st.metric("Has Summary", "Yes" if analysis['hierarchy_summary']['has_summary'] else "No")
with col4:
st.metric("Config Fields", len(analysis['classification'].get('config_fields', [])))
st.markdown("---")
tab1, tab2, tab3, tab4, tab5 = st.tabs([
"Analysis",
"Fields",
"Patterns",
"Data",
"Debug"
])
with tab1:
if analysis['summary_fields_detected']:
st.write("**Summary Fields**")
for field in analysis['summary_fields_detected'][:10]:
st.write(f"`{field}`")
config_fields = analysis['classification'].get('config_fields', [])
if config_fields:
st.write("**Config Fields**")
for field in config_fields[:10]:
st.write(f"`{field}`")
object_arrays = analysis['classification'].get('object_arrays', [])
if object_arrays:
st.write("**Object Arrays**")
for field in object_arrays[:5]:
st.write(f"`{field}`")
with st.expander("Summary Sample"):
st.json(analysis['summary_sample'])
with st.expander("Object Sample"):
st.json(analysis['sample_object'])
with tab2:
if analysis['recommended_fields']:
selected_fields = st.multiselect(
"Select fields:",
analysis['recommended_fields'],
default=analysis['recommended_fields'][:3]
)
if selected_fields and st.button("Generate"):
patterns = generate_regex_patterns(
selected_fields,
analysis['sample_object'],
analysis['summary_sample']
)
st.session_state.generated_patterns = {
'fields': selected_fields,
'patterns': patterns
}
with tab3:
if 'generated_patterns' in st.session_state:
patterns_data = st.session_state.generated_patterns
for field, pattern in zip(patterns_data['fields'], patterns_data['patterns']):
st.write(f"**{field}**")
st.code(pattern)
st.write("")
all_patterns = "\n".join(patterns_data['patterns'])
st.text_area("All Patterns:", all_patterns, height=100)
export_data = {
"fields": patterns_data['fields'],
"patterns": patterns_data['patterns']
}
st.download_button(
"Download JSON",
data=json.dumps(export_data, indent=2),
file_name="analysis.json",
mime="application/json"
)
with tab4:
st.json(data)
st.download_button(
"Download Raw",
data=json.dumps(data, indent=2),
file_name="raw.json",
mime="application/json"
)
with tab5:
col1, col2 = st.columns(2)
with col1:
st.write("**Upload**")
st.text(f"File: {uploaded_file.name if uploaded_file else 'N/A'}")
st.text(f"Size: {uploaded_file.size if uploaded_file else 0} bytes")
st.text(f"Streamlit: {st.__version__}")
with col2:
st.write("**Analysis**")
if st.session_state.get('analysis_result'):
a = st.session_state.analysis_result
st.text(f"Fields: {len(a.get('summary_fields_detected', []))}")
st.text(f"Objects: {a.get('total_objects', 0)}")
except json.JSONDecodeError as e:
st.error(f"Invalid JSON: {e}")
except Exception as e:
st.error(f"Error: {e}")
if __name__ == "__main__":
main()