Paramify-test / app.py
bluestpanda
2nd
9714df8
#!/usr/bin/env python3
"""
Hugging Face Streamlit App for LLM Field Analyzer
Upload a JSON file and analyze important fields with pattern generation.
"""
import streamlit as st
import json
from pathlib import Path
from typing import Dict, Any
import io
# Page configuration
st.set_page_config(
page_title="Field Correlation Analyzer",
page_icon="πŸ€–",
layout="wide"
)
# Import our modules
try:
from structure_analysis import (
detect_summary_fields,
classify_data_structure,
get_hierarchy_summary
)
except ImportError:
st.error("⚠️ structure_analysis.py not found. Make sure all files are uploaded.")
st.stop()
# Session state
if 'analysis_result' not in st.session_state:
st.session_state.analysis_result = None
def analyze_with_llm(data: Dict[str, Any], target_field: str = "rotation_enabled") -> Dict[str, Any]:
"""
Analyze data and generate a prompt for LLM analysis.
Returns structured analysis without requiring Ollama.
"""
# Detect summary fields
summary_fields = detect_summary_fields(data)
classification = classify_data_structure(data)
hierarchy_summary = get_hierarchy_summary(data)
# Extract samples
sample_object = {}
if 'results' in data:
for section in data['results'].values():
if isinstance(section, list) and len(section) > 0:
sample_object = section[0]
break
elif isinstance(section, dict):
for key, value in section.items():
if isinstance(value, list) and len(value) > 0:
sample_object = value[0] if isinstance(value[0], dict) else {}
break
summary_sample = data.get('results', {}).get('summary', {}) or data.get('summary', {})
# Count objects with target field
def count_objects_with_field(obj, field_name):
count = 0
if isinstance(obj, dict):
if field_name in obj:
count += 1
for v in obj.values():
count += count_objects_with_field(v, field_name)
elif isinstance(obj, list):
for item in obj:
count += count_objects_with_field(item, field_name)
return count
total_objects = count_objects_with_field(data, target_field)
# Generate analysis
analysis = {
"summary_fields_detected": summary_fields[:10],
"classification": classification,
"hierarchy_summary": hierarchy_summary,
"total_objects": total_objects,
"sample_object": sample_object,
"summary_sample": summary_sample,
"recommended_fields": []
}
# Recommend fields based on priority
if summary_fields:
analysis["recommended_fields"].extend(summary_fields[:3])
if classification.get('config_fields'):
analysis["recommended_fields"].extend(classification['config_fields'][:2])
if sample_object:
analysis["recommended_fields"].extend([k for k in sample_object.keys() if target_field in k.lower()])
return analysis
def generate_regex_patterns(field_names: list, data_sample: dict, summary_sample: dict) -> list:
"""Generate regex patterns for given fields."""
patterns = []
for field in field_names:
# Try to find the field value type
field_lower = field.lower()
# Check in summary first
if 'summary' in str(field):
field_name = field.split('.')[-1]
# Boolean pattern
if field_name in summary_sample and isinstance(summary_sample.get(field_name), bool):
patterns.append(f'"summary.{field_name}"\\s*:\\s*(true|false)')
# Number pattern
elif isinstance(summary_sample.get(field_name), (int, float)):
patterns.append(f'"summary.{field_name}"\\s*:\\s*(\\d+)')
# Check in object
elif field in data_sample:
value = data_sample[field]
if isinstance(value, bool):
patterns.append(f'"{field}"\\s*:\\s*(true|false)')
elif isinstance(value, (int, float)):
patterns.append(f'"{field}"\\s*:\\s*(\\d+)')
elif isinstance(value, str):
patterns.append(f'"{field}"\\s*:\\s*"([^"]*)"')
else:
# Generic pattern based on field name
if 'percentage' in field_lower or 'count' in field_lower or 'total' in field_lower:
patterns.append(f'"{field}"\\s*:\\s*(\\d+)')
elif 'enabled' in field_lower or 'enforced' in field_lower:
patterns.append(f'"{field}"\\s*:\\s*(true|false)')
else:
patterns.append(f'"{field}"\\s*:\\s*"([^"]*)"')
return patterns
def main():
"""Main application."""
st.title("πŸ€– Field Correlation Analyzer")
st.markdown("Upload a JSON file to analyze important fields and generate regex patterns")
# File upload
uploaded_file = st.file_uploader(
"Choose a JSON file",
type=['json'],
help="Upload a JSON file with structured data"
)
if uploaded_file is not None:
# Read and parse JSON
try:
content = uploaded_file.read()
data = json.loads(content)
st.success("βœ… File loaded successfully!")
# Sidebar for settings
with st.sidebar:
st.header("βš™οΈ Settings")
# Target field input
target_field = st.text_input(
"Target Field",
value="rotation_enabled",
help="The field you want to analyze"
)
# Analyze button
if st.button("πŸ” Analyze", type="primary"):
with st.spinner("Analyzing data structure..."):
analysis_result = analyze_with_llm(data, target_field)
st.session_state.analysis_result = analysis_result
st.session_state.data = data
# Display results if available
if st.session_state.analysis_result:
analysis = st.session_state.analysis_result
# Summary metrics
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Summary Fields", len(analysis['summary_fields_detected']))
with col2:
st.metric("Total Objects", analysis['total_objects'])
with col3:
st.metric("Has Summary", "Yes" if analysis['hierarchy_summary']['has_summary'] else "No")
with col4:
st.metric("Config Fields", len(analysis['classification'].get('config_fields', [])))
st.markdown("---")
# Create tabs
tab1, tab2, tab3, tab4 = st.tabs([
"πŸ“Š Structure Analysis",
"🎯 Field Recommendations",
"πŸ“ Generated Patterns",
"πŸ“„ Raw Data"
])
with tab1:
st.subheader("Data Hierarchy")
# Summary fields
if analysis['summary_fields_detected']:
st.markdown("#### Level 1: Summary/Aggregate Fields (Highest Priority)")
for field in analysis['summary_fields_detected'][:10]:
st.write(f"βœ“ `{field}`")
# Config fields
config_fields = analysis['classification'].get('config_fields', [])
if config_fields:
st.markdown("#### Level 2: Configuration/Compliance Fields")
for field in config_fields[:10]:
st.write(f"βœ“ `{field}`")
# Object arrays
object_arrays = analysis['classification'].get('object_arrays', [])
if object_arrays:
st.markdown("#### Level 3: Object Arrays")
for field in object_arrays[:5]:
st.write(f"βœ“ `{field}`")
# Show sample data
with st.expander("πŸ“‹ View Summary Data Sample"):
st.json(analysis['summary_sample'])
with st.expander("πŸ“‹ View Object Data Sample"):
st.json(analysis['sample_object'])
with tab2:
st.subheader("Recommended Fields for Analysis")
if analysis['recommended_fields']:
st.info("These fields are recommended based on the data hierarchy and target field.")
# Let user select fields
selected_fields = st.multiselect(
"Select fields to generate patterns for:",
analysis['recommended_fields'],
default=analysis['recommended_fields'][:3]
)
if selected_fields and st.button("Generate Patterns"):
patterns = generate_regex_patterns(
selected_fields,
analysis['sample_object'],
analysis['summary_sample']
)
st.session_state.generated_patterns = {
'fields': selected_fields,
'patterns': patterns
}
else:
st.warning("No recommended fields found.")
with tab3:
if 'generated_patterns' in st.session_state:
patterns_data = st.session_state.generated_patterns
st.subheader("Generated Regex Patterns")
# Show patterns
for i, (field, pattern) in enumerate(zip(patterns_data['fields'], patterns_data['patterns']), 1):
st.markdown(f"**Pattern {i}: {field}**")
st.code(pattern, language="regex", line_numbers=False)
st.markdown("---")
# Copy to clipboard
all_patterns = "\n".join(patterns_data['patterns'])
st.text_area(
"All Patterns (copy this):",
all_patterns,
height=100
)
# JSON export
export_data = {
"test_name": "Field Analysis",
"important_fields": patterns_data['fields'],
"reasoning": "Fields identified using hierarchical analysis prioritizing summary/aggregate fields",
"generated_regex": patterns_data['patterns']
}
st.download_button(
label="πŸ“₯ Download as JSON",
data=json.dumps(export_data, indent=2),
file_name="analysis_result.json",
mime="application/json"
)
else:
st.info("πŸ‘† Go to 'Field Recommendations' tab to select fields and generate patterns.")
with tab4:
st.subheader("Raw Data Structure")
# Full data viewer
st.json(data)
# Download raw data
st.download_button(
label="πŸ“₯ Download Raw Data",
data=json.dumps(data, indent=2),
file_name="raw_data.json",
mime="application/json"
)
except json.JSONDecodeError as e:
st.error(f"❌ Invalid JSON file: {e}")
except Exception as e:
st.error(f"❌ Error processing file: {e}")
else:
# Show example when no file uploaded
st.info("πŸ‘† Please upload a JSON file to begin analysis")
with st.expander("πŸ“– How to use"):
st.markdown("""
**Steps:**
1. Upload a JSON file with structured data
2. Set the target field you want to analyze (e.g., `rotation_enabled`)
3. Click "Analyze" to process the data
4. Review the structure analysis and field recommendations
5. Select fields and generate regex patterns
6. Download the results as JSON
**What this tool does:**
- Detects summary/aggregate fields automatically
- Classifies data structure by hierarchy levels
- Recommends important fields for validation
- Generates regex patterns for field extraction
""")
with st.expander("πŸ“‹ Example JSON Structure"):
example = {
"results": {
"summary": {
"total_keys": 13,
"rotated_keys": 6,
"rotation_percentage": 46
},
"kms_keys": {
"object": [
{
"key_id": "12345",
"rotation_enabled": True,
"key_state": "Enabled"
}
]
}
}
}
st.json(example)
if __name__ == "__main__":
main()