bluestpanda
Simplify app startup and add .gitignore
4c2ecb4
raw
history blame
25.9 kB
#!/usr/bin/env python3
"""
File Upload Analyzer - Streamlit Frontend
This is a copy of file_upload_app.py for Hugging Face Spaces deployment.
"""
import streamlit as st
import json
import sys
import os
from pathlib import Path
from typing import Dict, Any
import io
import requests
# Try to import structure_analysis, fallback to inline if not available
try:
from structure_analysis import (
detect_summary_fields,
classify_data_structure,
get_hierarchy_summary
)
except ImportError:
# Inline fallback implementations
def detect_summary_fields(data: Any, path: str = "") -> list:
"""Detect summary fields."""
fields = []
summary_indicators = ['total', 'count', 'percentage', 'summary', 'aggregate', 'statistics', 'percent']
def traverse(obj, current_path=""):
if isinstance(obj, dict):
for key, value in obj.items():
field_path = f"{current_path}.{key}" if current_path else key
if any(ind in key.lower() for ind in summary_indicators):
fields.append(field_path)
if isinstance(value, (dict, list)):
traverse(value, field_path)
elif isinstance(obj, list) and len(obj) > 0:
traverse(obj[0], current_path)
traverse(data, path)
return fields
def classify_data_structure(data: Any) -> dict:
"""Classify data structure."""
return {
'summary_fields': [],
'config_fields': [],
'object_arrays': [],
'object_fields': []
}
def get_hierarchy_summary(data: Any) -> dict:
"""Get hierarchy summary."""
return {
'has_summary': False,
'has_config': False,
'summary_fields': [],
'config_fields': [],
'levels_present': []
}
# Detect if running on Streamlit Cloud or Hugging Face
IS_STREAMLIT_CLOUD = os.getenv("STREAMLIT_SHARING_BASE_URL") is not None
IS_HUGGINGFACE = os.getenv("SPACE_ID") is not None
IS_ONLINE = IS_STREAMLIT_CLOUD or IS_HUGGINGFACE
# Page config - must be first
st.set_page_config(
page_title="JSON Field Analyzer",
page_icon="πŸ“Š",
layout="wide",
initial_sidebar_state="expanded"
)
# Custom CSS
st.markdown("""
<style>
.main > div {
padding-top: 1rem;
}
.stButton>button {
width: 100%;
}
h1 {
font-size: 2rem;
}
h2 {
font-size: 1.3rem;
border-bottom: 2px solid #0e1117;
padding-bottom: 0.3rem;
}
.highlight {
background-color: #f0f2f6;
color: #262730;
padding: 1rem;
border-radius: 5px;
border-left: 4px solid #1f77b4;
margin: 1rem 0;
}
.highlight p {
color: #262730;
margin: 0;
}
.result-box {
background-color: #f0f2f6;
padding: 1.5rem;
border-radius: 10px;
margin: 1rem 0;
}
</style>
""", unsafe_allow_html=True)
class FileAnalyzer:
"""Analyzer for uploaded JSON files."""
OLLAMA_API_URL = "http://localhost:11434/api/generate"
MODEL_NAME = "llama3.2:3b"
def __init__(self, data: Dict[str, Any], llm_provider="ollama", api_key=None):
self.data = data
self.metadata = None
self.llm_provider = llm_provider
self.api_key = api_key
def extract_metadata(self, target_field: str) -> Dict[str, Any]:
"""Extract key metadata from the JSON data for LLM analysis."""
# Enhanced: Detect summary fields and classify structure
summary_fields = detect_summary_fields(self.data)
classification = classify_data_structure(self.data)
hierarchy_summary = get_hierarchy_summary(self.data)
# Try to find objects in the data structure
objects_with_target = self._find_objects_with_target(target_field)
total = len(objects_with_target)
target_true = sum(1 for obj in objects_with_target if obj.get(target_field) is True)
percentage = (target_true / total * 100) if total > 0 else 0
metadata = {
"total_objects": total,
"target_count": target_true,
"percentage": round(percentage, 2),
"summary_fields_detected": summary_fields[:10],
"classification": classification,
"hierarchy_summary": hierarchy_summary,
"has_summary_level": hierarchy_summary['has_summary'],
"has_config_level": hierarchy_summary['has_config']
}
self.metadata = metadata
return metadata
def _find_objects_with_target(self, target_field: str) -> list:
"""Find all objects in the data structure that contain the target field."""
found = []
def find_fields(obj):
if isinstance(obj, dict):
if target_field in obj:
found.append(obj)
for value in obj.values():
find_fields(value)
elif isinstance(obj, list):
for item in obj:
find_fields(item)
find_fields(self.data)
return found
def generate_prompt(self, target_field: str) -> str:
"""Generate a hierarchy-aware prompt for the LLM."""
if not self.metadata:
self.extract_metadata(target_field)
hierarchy = self.metadata.get('hierarchy_summary', {})
summary_fields = self.metadata.get('summary_fields_detected', [])
classification = self.metadata.get('classification', {})
# Get sample object
sample = {}
def find_sample(obj):
if isinstance(obj, dict):
if target_field in obj:
return obj
for v in obj.values():
result = find_sample(v)
if result:
return result
elif isinstance(obj, list) and len(obj) > 0:
return find_sample(obj[0])
return {}
sample = find_sample(self.data)
# Get summary sample
summary_sample = self.data.get('results', {}).get('summary', {}) or self.data.get('summary', {})
# Create samples
sample_object = json.dumps({k: sample[k] for k in list(sample.keys())[:5]}, indent=2) if sample else "{}"
sample_summary = json.dumps(summary_sample, indent=2) if summary_sample else "{}"
# Build hierarchy instruction
hierarchy_text = f"""
DATA HIERARCHY (analyze in this priority order):
LEVEL 1 - Summary/Aggregate Fields (HIGHEST PRIORITY):
"""
if summary_fields:
for field in summary_fields[:5]:
hierarchy_text += f" βœ“ {field}\n"
if len(summary_fields) > 5:
hierarchy_text += f" ... and {len(summary_fields) - 5} more\n"
else:
hierarchy_text += " No summary fields detected\n"
hierarchy_text += f"""
LEVEL 2 - Configuration/Compliance Fields:
"""
config_fields = classification.get('config_fields', [])
if config_fields:
for field in config_fields[:3]:
hierarchy_text += f" βœ“ {field}\n"
else:
hierarchy_text += " No config fields detected\n"
hierarchy_text += f"""
LEVEL 3 - Individual Objects:
βœ“ Sample object fields shown below
CRITICAL INSTRUCTION: Check summary fields FIRST! They are the most important for validation.
"""
prompt = f"""You are analyzing JSON data to identify important fields related to "{target_field}".
{hierarchy_text}
CONTEXT:
- Total objects: {self.metadata.get('total_objects', 0)}
- Objects with "{target_field}" = true: {self.metadata.get('target_count', 0)}
- Percentage: {self.metadata.get('percentage', 0)}%
- Has summary level data: {self.metadata.get('has_summary_level', False)}
SAMPLE SUMMARY DATA (check this first):
{sample_summary}
SAMPLE OBJECT DATA:
{sample_object}
TASK:
Identify 3-4 important fields related to "{target_field}" in this priority order:
1. FIRST: Summary/aggregate fields (totals, percentages, counts)
2. SECOND: Configuration/compliance fields
3. THIRD: Individual object fields (if needed)
Generate regex patterns that match JSON format (with quotes).
VALIDATION PATTERN EXAMPLES:
- Compare two aggregate values: "field1"\\s*:\\s*(\\d+)[\\s\\S]*?"field2"\\s*:\\s*(\\d+)
- Extract percentage: "field_percentage"\\s*:\\s*(\\d+)
- Extract boolean: "field_name"\\s*:\\s*(true|false)
- Extract status: "compliance"\\s*:\\s*"([^"]*)"
Output ONLY valid JSON:
{{
"test_name": "Field Analysis: {target_field}",
"important_fields": ["field1", "field2", "field3"],
"reasoning": "Explain prioritization and why these fields matter",
"generated_regex": ["regex1", "regex2", "regex3"]
}}
"""
return prompt
def call_llm(self, prompt: str) -> str:
"""Call the appropriate LLM based on provider."""
if self.llm_provider == "ollama":
return self._call_ollama(prompt)
elif self.llm_provider == "openai":
return self._call_openai(prompt)
elif self.llm_provider == "anthropic":
return self._call_anthropic(prompt)
elif self.llm_provider == "huggingface":
return self._call_huggingface(prompt)
else:
raise ValueError(f"Unknown LLM provider: {self.llm_provider}")
def _call_ollama(self, prompt: str) -> str:
"""Call the Ollama API to generate a response."""
try:
payload = {
"model": self.MODEL_NAME,
"prompt": prompt,
"stream": False,
"format": "json"
}
response = requests.post(self.OLLAMA_API_URL, json=payload, timeout=120)
response.raise_for_status()
result = response.json()
return result.get('response', '')
except requests.exceptions.ConnectionError:
raise ConnectionError("Cannot connect to Ollama. Make sure Ollama is running.")
except requests.exceptions.Timeout:
raise TimeoutError("Ollama request timed out.")
except requests.exceptions.RequestException as e:
raise Exception(f"Failed to call Ollama API - {e}")
def _call_openai(self, prompt: str) -> str:
"""Call the OpenAI API to generate a response."""
try:
from openai import OpenAI
client = OpenAI(api_key=self.api_key)
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a JSON data analysis assistant. Always respond with valid JSON."},
{"role": "user", "content": prompt}
],
temperature=0.3,
max_tokens=2000
)
return response.choices[0].message.content
except ImportError:
raise ImportError("OpenAI library not installed. Install with: pip install openai")
except Exception as e:
raise Exception(f"Failed to call OpenAI API - {e}")
def _call_anthropic(self, prompt: str) -> str:
"""Call the Anthropic API to generate a response."""
try:
from anthropic import Anthropic
client = Anthropic(api_key=self.api_key)
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=2000,
temperature=0.3,
system="You are a JSON data analysis assistant. Always respond with valid JSON.",
messages=[
{"role": "user", "content": prompt}
]
)
return response.content[0].text
except ImportError:
raise ImportError("Anthropic library not installed. Install with: pip install anthropic")
except Exception as e:
raise Exception(f"Failed to call Anthropic API - {e}")
def _call_huggingface(self, prompt: str) -> str:
"""Call the Hugging Face Inference API (FREE) to generate a response."""
try:
# Use a good free model for text generation
model_name = self.api_key or "mistralai/Mistral-7B-Instruct-v0.3" # Default free model
headers = {
"Authorization": f"Bearer {self.api_key}" if self.api_key else None,
"Content-Type": "application/json"
}
# Remove None values
headers = {k: v for k, v in headers.items() if v is not None}
# Create a properly formatted prompt
full_prompt = f"""<s>[INST]You are a JSON data analysis assistant. Always respond with valid JSON only, no explanations.
{prompt}[/INST]"""
payload = {
"inputs": full_prompt,
"parameters": {
"max_new_tokens": 1000,
"temperature": 0.3,
"return_full_text": False
}
}
api_url = f"https://api-inference.huggingface.co/models/{model_name}"
response = requests.post(api_url, json=payload, headers=headers, timeout=60)
if response.status_code == 503:
raise Exception("Model is loading. Please wait a moment and try again.")
response.raise_for_status()
result = response.json()
# Handle different response formats
if isinstance(result, list) and len(result) > 0:
return result[0].get('generated_text', '')
elif isinstance(result, dict):
return result.get('generated_text', '')
else:
return str(result)
except Exception as e:
raise Exception(f"Failed to call Hugging Face API - {e}")
def parse_llm_output(self, output: str) -> Dict[str, Any]:
"""Parse and validate the LLM JSON output."""
try:
output = output.strip()
if output.startswith("```json"):
output = output[7:]
if output.startswith("```"):
output = output[3:]
if output.endswith("```"):
output = output[:-3]
output = output.strip()
result = json.loads(output)
return result
except json.JSONDecodeError as e:
raise ValueError(f"LLM output is not valid JSON - {e}")
def analyze(self, target_field: str = "rotation_enabled") -> Dict[str, Any]:
"""Main analysis function."""
self.extract_metadata(target_field)
prompt = self.generate_prompt(target_field)
llm_output = self.call_llm(prompt)
result = self.parse_llm_output(llm_output)
return result
def main():
"""Main Streamlit application."""
st.title("πŸ“Š JSON Field Analyzer")
if IS_HUGGINGFACE:
st.info("πŸ†“ Running on Hugging Face - FREE Hugging Face AI model available! No API key needed.")
st.markdown("**Upload a JSON file and analyze important fields using LLM**")
# Sidebar for configuration
with st.sidebar:
st.header("βš™οΈ Configuration")
# Show environment info
if IS_ONLINE and not IS_HUGGINGFACE:
st.info("🌐 Running online - Cloud LLM required")
# LLM Provider Selection
# Default to Hugging Face (free) if online, Ollama on local
if IS_ONLINE:
default_index = 3 # Hugging Face (Free)
else:
default_index = 0 # Ollama
llm_provider = st.selectbox(
"πŸ€– LLM Provider",
["Ollama (Local)", "OpenAI (Cloud)", "Anthropic Claude (Cloud)", "Hugging Face (Free 🌟)"],
index=default_index,
help="Choose your LLM provider - Hugging Face is FREE and no API key needed!"
)
# Extract provider name and model
if llm_provider == "Ollama (Local)":
provider_name = "ollama"
api_key = None
if IS_ONLINE:
st.error("❌ Ollama not available on this platform")
st.markdown("**Please select a cloud LLM provider:**")
st.markdown("- OpenAI (Cloud) - GPT-4o Mini")
st.markdown("- Anthropic Claude (Cloud) - Recommended")
else:
st.info("πŸ“ Using local Ollama")
elif llm_provider == "OpenAI (Cloud)":
provider_name = "openai"
api_key = os.getenv("OPENAI_API_KEY") or st.text_input(
"OpenAI API Key",
type="password",
help="Enter your OpenAI API key (or set OPENAI_API_KEY env var)"
)
if not api_key:
st.warning("⚠️ Please enter your OpenAI API key")
st.info("πŸ’‘ Get key: https://platform.openai.com/api-keys")
elif llm_provider == "Anthropic Claude (Cloud)":
provider_name = "anthropic"
api_key = os.getenv("ANTHROPIC_API_KEY") or st.text_input(
"Anthropic API Key",
type="password",
help="Enter your Anthropic API key (or set ANTHROPIC_API_KEY env var)"
)
if not api_key:
st.warning("⚠️ Please enter your Anthropic API key")
st.info("πŸ’‘ Get key: https://console.anthropic.com")
else: # Hugging Face (Free)
provider_name = "huggingface"
api_key = os.getenv("HUGGINGFACE_API_KEY") or st.text_input(
"Hugging Face API Key (Optional)",
type="password",
help="Optional: Enter your HF token for faster inference (or set HUGGINGFACE_API_KEY env var)"
)
if not api_key:
st.info("✨ Using free Hugging Face Inference API - no key needed!")
st.info("πŸ’‘ Optional: Add your token in Settings > Secrets for better performance")
st.markdown("---")
target_field = st.text_input(
"Target Field",
value="rotation_enabled",
help="The field you want to analyze (e.g., rotation_enabled, ssl_enforced)"
)
st.markdown("---")
st.markdown("### πŸ“‹ Setup Guides")
with st.expander("πŸ”§ Local Ollama Setup"):
st.code("""
brew install ollama
ollama serve
ollama pull llama3.2:3b
""", language="bash")
with st.expander("☁️ Cloud API Setup"):
st.markdown("""
**OpenAI:**
- Get key: https://platform.openai.com/api-keys
- Model: GPT-4o Mini
**Anthropic:**
- Get key: https://console.anthropic.com
- Model: Claude 3.5 Sonnet
""")
# File upload section
st.markdown("---")
st.header("πŸ“€ Upload JSON File")
uploaded_file = st.file_uploader(
"Choose a JSON file",
type=['json'],
help="Upload a JSON file to analyze"
)
# Display file info if uploaded
if uploaded_file is not None:
try:
# Read file contents
content = uploaded_file.read()
data = json.loads(content)
st.success("βœ… File uploaded successfully!")
# Show file info
col1, col2 = st.columns(2)
with col1:
st.metric("File Size", f"{len(content) / 1024:.2f} KB")
with col2:
st.metric("JSON Structure", "Valid" if isinstance(data, (dict, list)) else "Invalid")
# Analyze button
st.markdown("---")
col1, col2, col3 = st.columns([1, 2, 1])
with col2:
analyze_button = st.button("πŸ” Analyze with LLM", type="primary", use_container_width=True)
# Run analysis
if analyze_button:
# Prevent Ollama usage on online platforms
if provider_name == "ollama" and IS_ONLINE:
st.error("❌ Ollama is not available on this platform")
st.info("πŸ’‘ Please select 'Anthropic Claude (Cloud)' or 'OpenAI (Cloud)' from the sidebar")
# Validate API key for cloud providers (except Hugging Face which is optional)
elif provider_name in ["openai", "anthropic"] and not api_key:
st.error("❌ Please enter an API key for the selected cloud provider")
else:
try:
with st.spinner(f"Analyzing with {llm_provider}... This may take a moment."):
analyzer = FileAnalyzer(data, llm_provider=provider_name, api_key=api_key)
result = analyzer.analyze(target_field=target_field)
# Display results
st.markdown("---")
st.header("πŸ“Š Analysis Results")
# Main results in columns
col1, col2 = st.columns(2)
with col1:
st.subheader("πŸ€– Important Fields")
for i, field in enumerate(result.get('important_fields', []), 1):
st.markdown(f"**{i}. {field}**")
with col2:
st.subheader("πŸ’‘ Reasoning")
st.markdown(f'<div class="highlight">{result.get("reasoning", "N/A")}</div>',
unsafe_allow_html=True)
# Regex patterns
st.markdown("---")
st.subheader("πŸ”§ Generated Regex Patterns")
regex_patterns = result.get('generated_regex', [])
for i, pattern in enumerate(regex_patterns, 1):
st.markdown(f"**Pattern {i}:**")
st.code(pattern, language="regex")
# Raw JSON output
with st.expander("πŸ“„ View Raw JSON Output"):
st.json(result)
# Download results
st.markdown("---")
result_json = json.dumps(result, indent=2)
st.download_button(
label="⬇️ Download Results",
data=result_json,
file_name=f"analysis_{target_field}.json",
mime="application/json"
)
except ConnectionError as e:
st.error(f"❌ {e}")
if provider_name == "ollama":
st.info("πŸ’‘ Start Ollama with: `ollama serve`")
else:
st.info("πŸ’‘ Check your internet connection and API key")
except TimeoutError as e:
st.error(f"❌ {e}")
st.info("πŸ’‘ The analysis took too long. Try again or use a larger timeout.")
except Exception as e:
st.error(f"❌ Error during analysis: {e}")
st.exception(e)
except json.JSONDecodeError:
st.error("❌ Invalid JSON file. Please upload a valid JSON file.")
except Exception as e:
st.error(f"❌ Error reading file: {e}")
st.exception(e)
else:
# Show example when no file is uploaded
st.info("πŸ‘† Please upload a JSON file to get started")
with st.expander("πŸ“– How it works"):
st.markdown("""
### Workflow:
1. **Upload**: Upload your JSON file using the file uploader above
2. **Configure**: Set the target field name in the sidebar (default: `rotation_enabled`)
3. **Analyze**: Click the "Analyze with LLM" button
4. **Review**: View the important fields, reasoning, and regex patterns
5. **Download**: Save the results as JSON
### What it does:
- Analyzes your JSON structure to detect summary fields, configurations, and objects
- Uses LLM to identify important fields related to your target
- Generates regex patterns for data extraction and validation
- Provides reasoning for why each field is important
### Use cases:
- AWS compliance validation (KMS rotation, SSL enforcement, etc.)
- Data quality checks
- Automated validation pattern generation
- Field correlation analysis
""")
# Call main function - Streamlit will handle errors
main()