#!/usr/bin/env python3 """ File Upload Analyzer - Streamlit Frontend This is a copy of file_upload_app.py for Hugging Face Spaces deployment. """ import streamlit as st import json import sys import os from pathlib import Path from typing import Dict, Any import io import requests # Try to import structure_analysis, fallback to inline if not available try: from structure_analysis import ( detect_summary_fields, classify_data_structure, get_hierarchy_summary ) except ImportError: # Inline fallback implementations def detect_summary_fields(data: Any, path: str = "") -> list: """Detect summary fields.""" fields = [] summary_indicators = ['total', 'count', 'percentage', 'summary', 'aggregate', 'statistics', 'percent'] def traverse(obj, current_path=""): if isinstance(obj, dict): for key, value in obj.items(): field_path = f"{current_path}.{key}" if current_path else key if any(ind in key.lower() for ind in summary_indicators): fields.append(field_path) if isinstance(value, (dict, list)): traverse(value, field_path) elif isinstance(obj, list) and len(obj) > 0: traverse(obj[0], current_path) traverse(data, path) return fields def classify_data_structure(data: Any) -> dict: """Classify data structure.""" return { 'summary_fields': [], 'config_fields': [], 'object_arrays': [], 'object_fields': [] } def get_hierarchy_summary(data: Any) -> dict: """Get hierarchy summary.""" return { 'has_summary': False, 'has_config': False, 'summary_fields': [], 'config_fields': [], 'levels_present': [] } # Detect if running on Streamlit Cloud or Hugging Face IS_STREAMLIT_CLOUD = os.getenv("STREAMLIT_SHARING_BASE_URL") is not None IS_HUGGINGFACE = os.getenv("SPACE_ID") is not None IS_ONLINE = IS_STREAMLIT_CLOUD or IS_HUGGINGFACE # Page config - must be first st.set_page_config( page_title="JSON Field Analyzer", page_icon="📊", layout="wide", initial_sidebar_state="expanded" ) # Custom CSS st.markdown(""" """, unsafe_allow_html=True) class FileAnalyzer: """Analyzer for uploaded JSON files.""" OLLAMA_API_URL = "http://localhost:11434/api/generate" MODEL_NAME = "llama3.2:3b" def __init__(self, data: Dict[str, Any], llm_provider="ollama", api_key=None): self.data = data self.metadata = None self.llm_provider = llm_provider self.api_key = api_key def extract_metadata(self, target_field: str) -> Dict[str, Any]: """Extract key metadata from the JSON data for LLM analysis.""" # Enhanced: Detect summary fields and classify structure summary_fields = detect_summary_fields(self.data) classification = classify_data_structure(self.data) hierarchy_summary = get_hierarchy_summary(self.data) # Try to find objects in the data structure objects_with_target = self._find_objects_with_target(target_field) total = len(objects_with_target) target_true = sum(1 for obj in objects_with_target if obj.get(target_field) is True) percentage = (target_true / total * 100) if total > 0 else 0 metadata = { "total_objects": total, "target_count": target_true, "percentage": round(percentage, 2), "summary_fields_detected": summary_fields[:10], "classification": classification, "hierarchy_summary": hierarchy_summary, "has_summary_level": hierarchy_summary['has_summary'], "has_config_level": hierarchy_summary['has_config'] } self.metadata = metadata return metadata def _find_objects_with_target(self, target_field: str) -> list: """Find all objects in the data structure that contain the target field.""" found = [] def find_fields(obj): if isinstance(obj, dict): if target_field in obj: found.append(obj) for value in obj.values(): find_fields(value) elif isinstance(obj, list): for item in obj: find_fields(item) find_fields(self.data) return found def generate_prompt(self, target_field: str) -> str: """Generate a hierarchy-aware prompt for the LLM.""" if not self.metadata: self.extract_metadata(target_field) hierarchy = self.metadata.get('hierarchy_summary', {}) summary_fields = self.metadata.get('summary_fields_detected', []) classification = self.metadata.get('classification', {}) # Get sample object sample = {} def find_sample(obj): if isinstance(obj, dict): if target_field in obj: return obj for v in obj.values(): result = find_sample(v) if result: return result elif isinstance(obj, list) and len(obj) > 0: return find_sample(obj[0]) return {} sample = find_sample(self.data) # Get summary sample summary_sample = self.data.get('results', {}).get('summary', {}) or self.data.get('summary', {}) # Create samples sample_object = json.dumps({k: sample[k] for k in list(sample.keys())[:5]}, indent=2) if sample else "{}" sample_summary = json.dumps(summary_sample, indent=2) if summary_sample else "{}" # Build hierarchy instruction hierarchy_text = f""" DATA HIERARCHY (analyze in this priority order): LEVEL 1 - Summary/Aggregate Fields (HIGHEST PRIORITY): """ if summary_fields: for field in summary_fields[:5]: hierarchy_text += f" ✓ {field}\n" if len(summary_fields) > 5: hierarchy_text += f" ... and {len(summary_fields) - 5} more\n" else: hierarchy_text += " No summary fields detected\n" hierarchy_text += f""" LEVEL 2 - Configuration/Compliance Fields: """ config_fields = classification.get('config_fields', []) if config_fields: for field in config_fields[:3]: hierarchy_text += f" ✓ {field}\n" else: hierarchy_text += " No config fields detected\n" hierarchy_text += f""" LEVEL 3 - Individual Objects: ✓ Sample object fields shown below CRITICAL INSTRUCTION: Check summary fields FIRST! They are the most important for validation. """ prompt = f"""You are analyzing JSON data to identify important fields related to "{target_field}". {hierarchy_text} CONTEXT: - Total objects: {self.metadata.get('total_objects', 0)} - Objects with "{target_field}" = true: {self.metadata.get('target_count', 0)} - Percentage: {self.metadata.get('percentage', 0)}% - Has summary level data: {self.metadata.get('has_summary_level', False)} SAMPLE SUMMARY DATA (check this first): {sample_summary} SAMPLE OBJECT DATA: {sample_object} TASK: Identify 3-4 important fields related to "{target_field}" in this priority order: 1. FIRST: Summary/aggregate fields (totals, percentages, counts) 2. SECOND: Configuration/compliance fields 3. THIRD: Individual object fields (if needed) Generate regex patterns that match JSON format (with quotes). VALIDATION PATTERN EXAMPLES: - Compare two aggregate values: "field1"\\s*:\\s*(\\d+)[\\s\\S]*?"field2"\\s*:\\s*(\\d+) - Extract percentage: "field_percentage"\\s*:\\s*(\\d+) - Extract boolean: "field_name"\\s*:\\s*(true|false) - Extract status: "compliance"\\s*:\\s*"([^"]*)" Output ONLY valid JSON: {{ "test_name": "Field Analysis: {target_field}", "important_fields": ["field1", "field2", "field3"], "reasoning": "Explain prioritization and why these fields matter", "generated_regex": ["regex1", "regex2", "regex3"] }} """ return prompt def call_llm(self, prompt: str) -> str: """Call the appropriate LLM based on provider.""" if self.llm_provider == "ollama": return self._call_ollama(prompt) elif self.llm_provider == "openai": return self._call_openai(prompt) elif self.llm_provider == "anthropic": return self._call_anthropic(prompt) elif self.llm_provider == "huggingface": return self._call_huggingface(prompt) else: raise ValueError(f"Unknown LLM provider: {self.llm_provider}") def _call_ollama(self, prompt: str) -> str: """Call the Ollama API to generate a response.""" try: payload = { "model": self.MODEL_NAME, "prompt": prompt, "stream": False, "format": "json" } response = requests.post(self.OLLAMA_API_URL, json=payload, timeout=120) response.raise_for_status() result = response.json() return result.get('response', '') except requests.exceptions.ConnectionError: raise ConnectionError("Cannot connect to Ollama. Make sure Ollama is running.") except requests.exceptions.Timeout: raise TimeoutError("Ollama request timed out.") except requests.exceptions.RequestException as e: raise Exception(f"Failed to call Ollama API - {e}") def _call_openai(self, prompt: str) -> str: """Call the OpenAI API to generate a response.""" try: from openai import OpenAI client = OpenAI(api_key=self.api_key) response = client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": "You are a JSON data analysis assistant. Always respond with valid JSON."}, {"role": "user", "content": prompt} ], temperature=0.3, max_tokens=2000 ) return response.choices[0].message.content except ImportError: raise ImportError("OpenAI library not installed. Install with: pip install openai") except Exception as e: raise Exception(f"Failed to call OpenAI API - {e}") def _call_anthropic(self, prompt: str) -> str: """Call the Anthropic API to generate a response.""" try: from anthropic import Anthropic client = Anthropic(api_key=self.api_key) response = client.messages.create( model="claude-3-5-sonnet-20241022", max_tokens=2000, temperature=0.3, system="You are a JSON data analysis assistant. Always respond with valid JSON.", messages=[ {"role": "user", "content": prompt} ] ) return response.content[0].text except ImportError: raise ImportError("Anthropic library not installed. Install with: pip install anthropic") except Exception as e: raise Exception(f"Failed to call Anthropic API - {e}") def _call_huggingface(self, prompt: str) -> str: """Call the Hugging Face Inference API (FREE) to generate a response.""" try: # Use a good free model for text generation model_name = self.api_key or "mistralai/Mistral-7B-Instruct-v0.3" # Default free model headers = { "Authorization": f"Bearer {self.api_key}" if self.api_key else None, "Content-Type": "application/json" } # Remove None values headers = {k: v for k, v in headers.items() if v is not None} # Create a properly formatted prompt full_prompt = f"""[INST]You are a JSON data analysis assistant. Always respond with valid JSON only, no explanations. {prompt}[/INST]""" payload = { "inputs": full_prompt, "parameters": { "max_new_tokens": 1000, "temperature": 0.3, "return_full_text": False } } api_url = f"https://api-inference.huggingface.co/models/{model_name}" response = requests.post(api_url, json=payload, headers=headers, timeout=60) if response.status_code == 503: raise Exception("Model is loading. Please wait a moment and try again.") response.raise_for_status() result = response.json() # Handle different response formats if isinstance(result, list) and len(result) > 0: return result[0].get('generated_text', '') elif isinstance(result, dict): return result.get('generated_text', '') else: return str(result) except Exception as e: raise Exception(f"Failed to call Hugging Face API - {e}") def parse_llm_output(self, output: str) -> Dict[str, Any]: """Parse and validate the LLM JSON output.""" try: output = output.strip() if output.startswith("```json"): output = output[7:] if output.startswith("```"): output = output[3:] if output.endswith("```"): output = output[:-3] output = output.strip() result = json.loads(output) return result except json.JSONDecodeError as e: raise ValueError(f"LLM output is not valid JSON - {e}") def analyze(self, target_field: str = "rotation_enabled") -> Dict[str, Any]: """Main analysis function.""" self.extract_metadata(target_field) prompt = self.generate_prompt(target_field) llm_output = self.call_llm(prompt) result = self.parse_llm_output(llm_output) return result def main(): """Main Streamlit application.""" st.title("📊 JSON Field Analyzer") if IS_HUGGINGFACE: st.info("🆓 Running on Hugging Face - FREE Hugging Face AI model available! No API key needed.") st.markdown("**Upload a JSON file and analyze important fields using LLM**") # Sidebar for configuration with st.sidebar: st.header("⚙️ Configuration") # Show environment info if IS_ONLINE and not IS_HUGGINGFACE: st.info("🌐 Running online - Cloud LLM required") # LLM Provider Selection # Default to Hugging Face (free) if online, Ollama on local if IS_ONLINE: default_index = 3 # Hugging Face (Free) else: default_index = 0 # Ollama llm_provider = st.selectbox( "🤖 LLM Provider", ["Ollama (Local)", "OpenAI (Cloud)", "Anthropic Claude (Cloud)", "Hugging Face (Free 🌟)"], index=default_index, help="Choose your LLM provider - Hugging Face is FREE and no API key needed!" ) # Extract provider name and model if llm_provider == "Ollama (Local)": provider_name = "ollama" api_key = None if IS_ONLINE: st.error("❌ Ollama not available on this platform") st.markdown("**Please select a cloud LLM provider:**") st.markdown("- OpenAI (Cloud) - GPT-4o Mini") st.markdown("- Anthropic Claude (Cloud) - Recommended") else: st.info("📝 Using local Ollama") elif llm_provider == "OpenAI (Cloud)": provider_name = "openai" api_key = os.getenv("OPENAI_API_KEY") or st.text_input( "OpenAI API Key", type="password", help="Enter your OpenAI API key (or set OPENAI_API_KEY env var)" ) if not api_key: st.warning("⚠️ Please enter your OpenAI API key") st.info("💡 Get key: https://platform.openai.com/api-keys") elif llm_provider == "Anthropic Claude (Cloud)": provider_name = "anthropic" api_key = os.getenv("ANTHROPIC_API_KEY") or st.text_input( "Anthropic API Key", type="password", help="Enter your Anthropic API key (or set ANTHROPIC_API_KEY env var)" ) if not api_key: st.warning("⚠️ Please enter your Anthropic API key") st.info("💡 Get key: https://console.anthropic.com") else: # Hugging Face (Free) provider_name = "huggingface" api_key = os.getenv("HUGGINGFACE_API_KEY") or st.text_input( "Hugging Face API Key (Optional)", type="password", help="Optional: Enter your HF token for faster inference (or set HUGGINGFACE_API_KEY env var)" ) if not api_key: st.info("✨ Using free Hugging Face Inference API - no key needed!") st.info("💡 Optional: Add your token in Settings > Secrets for better performance") st.markdown("---") target_field = st.text_input( "Target Field", value="rotation_enabled", help="The field you want to analyze (e.g., rotation_enabled, ssl_enforced)" ) st.markdown("---") st.markdown("### 📋 Setup Guides") with st.expander("🔧 Local Ollama Setup"): st.code(""" brew install ollama ollama serve ollama pull llama3.2:3b """, language="bash") with st.expander("☁️ Cloud API Setup"): st.markdown(""" **OpenAI:** - Get key: https://platform.openai.com/api-keys - Model: GPT-4o Mini **Anthropic:** - Get key: https://console.anthropic.com - Model: Claude 3.5 Sonnet """) # File upload section st.markdown("---") st.header("📤 Upload JSON File") uploaded_file = st.file_uploader( "Choose a JSON file", type=['json'], help="Upload a JSON file to analyze" ) # Display file info if uploaded if uploaded_file is not None: try: # Read file contents content = uploaded_file.read() data = json.loads(content) st.success("✅ File uploaded successfully!") # Show file info col1, col2 = st.columns(2) with col1: st.metric("File Size", f"{len(content) / 1024:.2f} KB") with col2: st.metric("JSON Structure", "Valid" if isinstance(data, (dict, list)) else "Invalid") # Analyze button st.markdown("---") col1, col2, col3 = st.columns([1, 2, 1]) with col2: analyze_button = st.button("🔍 Analyze with LLM", type="primary", use_container_width=True) # Run analysis if analyze_button: # Prevent Ollama usage on online platforms if provider_name == "ollama" and IS_ONLINE: st.error("❌ Ollama is not available on this platform") st.info("💡 Please select 'Anthropic Claude (Cloud)' or 'OpenAI (Cloud)' from the sidebar") # Validate API key for cloud providers (except Hugging Face which is optional) elif provider_name in ["openai", "anthropic"] and not api_key: st.error("❌ Please enter an API key for the selected cloud provider") else: try: with st.spinner(f"Analyzing with {llm_provider}... This may take a moment."): analyzer = FileAnalyzer(data, llm_provider=provider_name, api_key=api_key) result = analyzer.analyze(target_field=target_field) # Display results st.markdown("---") st.header("📊 Analysis Results") # Main results in columns col1, col2 = st.columns(2) with col1: st.subheader("🤖 Important Fields") for i, field in enumerate(result.get('important_fields', []), 1): st.markdown(f"**{i}. {field}**") with col2: st.subheader("💡 Reasoning") st.markdown(f'
{result.get("reasoning", "N/A")}
', unsafe_allow_html=True) # Regex patterns st.markdown("---") st.subheader("🔧 Generated Regex Patterns") regex_patterns = result.get('generated_regex', []) for i, pattern in enumerate(regex_patterns, 1): st.markdown(f"**Pattern {i}:**") st.code(pattern, language="regex") # Raw JSON output with st.expander("📄 View Raw JSON Output"): st.json(result) # Download results st.markdown("---") result_json = json.dumps(result, indent=2) st.download_button( label="⬇️ Download Results", data=result_json, file_name=f"analysis_{target_field}.json", mime="application/json" ) except ConnectionError as e: st.error(f"❌ {e}") if provider_name == "ollama": st.info("💡 Start Ollama with: `ollama serve`") else: st.info("💡 Check your internet connection and API key") except TimeoutError as e: st.error(f"❌ {e}") st.info("💡 The analysis took too long. Try again or use a larger timeout.") except Exception as e: st.error(f"❌ Error during analysis: {e}") st.exception(e) except json.JSONDecodeError: st.error("❌ Invalid JSON file. Please upload a valid JSON file.") except Exception as e: st.error(f"❌ Error reading file: {e}") st.exception(e) else: # Show example when no file is uploaded st.info("👆 Please upload a JSON file to get started") with st.expander("📖 How it works"): st.markdown(""" ### Workflow: 1. **Upload**: Upload your JSON file using the file uploader above 2. **Configure**: Set the target field name in the sidebar (default: `rotation_enabled`) 3. **Analyze**: Click the "Analyze with LLM" button 4. **Review**: View the important fields, reasoning, and regex patterns 5. **Download**: Save the results as JSON ### What it does: - Analyzes your JSON structure to detect summary fields, configurations, and objects - Uses LLM to identify important fields related to your target - Generates regex patterns for data extraction and validation - Provides reasoning for why each field is important ### Use cases: - AWS compliance validation (KMS rotation, SSL enforcement, etc.) - Data quality checks - Automated validation pattern generation - Field correlation analysis """) # Call main function - Streamlit will handle errors main()