Spaces:
Building
Building
| #!/usr/bin/env python3 | |
| """ | |
| File Upload Analyzer - Streamlit Frontend | |
| This is a copy of file_upload_app.py for Hugging Face Spaces deployment. | |
| """ | |
| import streamlit as st | |
| import json | |
| import sys | |
| import os | |
| from pathlib import Path | |
| from typing import Dict, Any | |
| import io | |
| import requests | |
| # Try to import structure_analysis, fallback to inline if not available | |
| try: | |
| from structure_analysis import ( | |
| detect_summary_fields, | |
| classify_data_structure, | |
| get_hierarchy_summary | |
| ) | |
| except ImportError: | |
| # Inline fallback implementations | |
| def detect_summary_fields(data: Any, path: str = "") -> list: | |
| """Detect summary fields.""" | |
| fields = [] | |
| summary_indicators = ['total', 'count', 'percentage', 'summary', 'aggregate', 'statistics', 'percent'] | |
| def traverse(obj, current_path=""): | |
| if isinstance(obj, dict): | |
| for key, value in obj.items(): | |
| field_path = f"{current_path}.{key}" if current_path else key | |
| if any(ind in key.lower() for ind in summary_indicators): | |
| fields.append(field_path) | |
| if isinstance(value, (dict, list)): | |
| traverse(value, field_path) | |
| elif isinstance(obj, list) and len(obj) > 0: | |
| traverse(obj[0], current_path) | |
| traverse(data, path) | |
| return fields | |
| def classify_data_structure(data: Any) -> dict: | |
| """Classify data structure.""" | |
| return { | |
| 'summary_fields': [], | |
| 'config_fields': [], | |
| 'object_arrays': [], | |
| 'object_fields': [] | |
| } | |
| def get_hierarchy_summary(data: Any) -> dict: | |
| """Get hierarchy summary.""" | |
| return { | |
| 'has_summary': False, | |
| 'has_config': False, | |
| 'summary_fields': [], | |
| 'config_fields': [], | |
| 'levels_present': [] | |
| } | |
| # Detect if running on Streamlit Cloud or Hugging Face | |
| IS_STREAMLIT_CLOUD = os.getenv("STREAMLIT_SHARING_BASE_URL") is not None | |
| IS_HUGGINGFACE = os.getenv("SPACE_ID") is not None | |
| IS_ONLINE = IS_STREAMLIT_CLOUD or IS_HUGGINGFACE | |
| # Page config - must be first | |
| st.set_page_config( | |
| page_title="JSON Field Analyzer", | |
| page_icon="π", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| # Custom CSS | |
| st.markdown(""" | |
| <style> | |
| .main > div { | |
| padding-top: 1rem; | |
| } | |
| .stButton>button { | |
| width: 100%; | |
| } | |
| h1 { | |
| font-size: 2rem; | |
| } | |
| h2 { | |
| font-size: 1.3rem; | |
| border-bottom: 2px solid #0e1117; | |
| padding-bottom: 0.3rem; | |
| } | |
| .highlight { | |
| background-color: #f0f2f6; | |
| color: #262730; | |
| padding: 1rem; | |
| border-radius: 5px; | |
| border-left: 4px solid #1f77b4; | |
| margin: 1rem 0; | |
| } | |
| .highlight p { | |
| color: #262730; | |
| margin: 0; | |
| } | |
| .result-box { | |
| background-color: #f0f2f6; | |
| padding: 1.5rem; | |
| border-radius: 10px; | |
| margin: 1rem 0; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| class FileAnalyzer: | |
| """Analyzer for uploaded JSON files.""" | |
| OLLAMA_API_URL = "http://localhost:11434/api/generate" | |
| MODEL_NAME = "llama3.2:3b" | |
| def __init__(self, data: Dict[str, Any], llm_provider="ollama", api_key=None): | |
| self.data = data | |
| self.metadata = None | |
| self.llm_provider = llm_provider | |
| self.api_key = api_key | |
| def extract_metadata(self, target_field: str) -> Dict[str, Any]: | |
| """Extract key metadata from the JSON data for LLM analysis.""" | |
| # Enhanced: Detect summary fields and classify structure | |
| summary_fields = detect_summary_fields(self.data) | |
| classification = classify_data_structure(self.data) | |
| hierarchy_summary = get_hierarchy_summary(self.data) | |
| # Try to find objects in the data structure | |
| objects_with_target = self._find_objects_with_target(target_field) | |
| total = len(objects_with_target) | |
| target_true = sum(1 for obj in objects_with_target if obj.get(target_field) is True) | |
| percentage = (target_true / total * 100) if total > 0 else 0 | |
| metadata = { | |
| "total_objects": total, | |
| "target_count": target_true, | |
| "percentage": round(percentage, 2), | |
| "summary_fields_detected": summary_fields[:10], | |
| "classification": classification, | |
| "hierarchy_summary": hierarchy_summary, | |
| "has_summary_level": hierarchy_summary['has_summary'], | |
| "has_config_level": hierarchy_summary['has_config'] | |
| } | |
| self.metadata = metadata | |
| return metadata | |
| def _find_objects_with_target(self, target_field: str) -> list: | |
| """Find all objects in the data structure that contain the target field.""" | |
| found = [] | |
| def find_fields(obj): | |
| if isinstance(obj, dict): | |
| if target_field in obj: | |
| found.append(obj) | |
| for value in obj.values(): | |
| find_fields(value) | |
| elif isinstance(obj, list): | |
| for item in obj: | |
| find_fields(item) | |
| find_fields(self.data) | |
| return found | |
| def generate_prompt(self, target_field: str) -> str: | |
| """Generate a hierarchy-aware prompt for the LLM.""" | |
| if not self.metadata: | |
| self.extract_metadata(target_field) | |
| hierarchy = self.metadata.get('hierarchy_summary', {}) | |
| summary_fields = self.metadata.get('summary_fields_detected', []) | |
| classification = self.metadata.get('classification', {}) | |
| # Get sample object | |
| sample = {} | |
| def find_sample(obj): | |
| if isinstance(obj, dict): | |
| if target_field in obj: | |
| return obj | |
| for v in obj.values(): | |
| result = find_sample(v) | |
| if result: | |
| return result | |
| elif isinstance(obj, list) and len(obj) > 0: | |
| return find_sample(obj[0]) | |
| return {} | |
| sample = find_sample(self.data) | |
| # Get summary sample | |
| summary_sample = self.data.get('results', {}).get('summary', {}) or self.data.get('summary', {}) | |
| # Create samples | |
| sample_object = json.dumps({k: sample[k] for k in list(sample.keys())[:5]}, indent=2) if sample else "{}" | |
| sample_summary = json.dumps(summary_sample, indent=2) if summary_sample else "{}" | |
| # Build hierarchy instruction | |
| hierarchy_text = f""" | |
| DATA HIERARCHY (analyze in this priority order): | |
| LEVEL 1 - Summary/Aggregate Fields (HIGHEST PRIORITY): | |
| """ | |
| if summary_fields: | |
| for field in summary_fields[:5]: | |
| hierarchy_text += f" β {field}\n" | |
| if len(summary_fields) > 5: | |
| hierarchy_text += f" ... and {len(summary_fields) - 5} more\n" | |
| else: | |
| hierarchy_text += " No summary fields detected\n" | |
| hierarchy_text += f""" | |
| LEVEL 2 - Configuration/Compliance Fields: | |
| """ | |
| config_fields = classification.get('config_fields', []) | |
| if config_fields: | |
| for field in config_fields[:3]: | |
| hierarchy_text += f" β {field}\n" | |
| else: | |
| hierarchy_text += " No config fields detected\n" | |
| hierarchy_text += f""" | |
| LEVEL 3 - Individual Objects: | |
| β Sample object fields shown below | |
| CRITICAL INSTRUCTION: Check summary fields FIRST! They are the most important for validation. | |
| """ | |
| prompt = f"""You are analyzing JSON data to identify important fields related to "{target_field}". | |
| {hierarchy_text} | |
| CONTEXT: | |
| - Total objects: {self.metadata.get('total_objects', 0)} | |
| - Objects with "{target_field}" = true: {self.metadata.get('target_count', 0)} | |
| - Percentage: {self.metadata.get('percentage', 0)}% | |
| - Has summary level data: {self.metadata.get('has_summary_level', False)} | |
| SAMPLE SUMMARY DATA (check this first): | |
| {sample_summary} | |
| SAMPLE OBJECT DATA: | |
| {sample_object} | |
| TASK: | |
| Identify 3-4 important fields related to "{target_field}" in this priority order: | |
| 1. FIRST: Summary/aggregate fields (totals, percentages, counts) | |
| 2. SECOND: Configuration/compliance fields | |
| 3. THIRD: Individual object fields (if needed) | |
| Generate regex patterns that match JSON format (with quotes). | |
| VALIDATION PATTERN EXAMPLES: | |
| - Compare two aggregate values: "field1"\\s*:\\s*(\\d+)[\\s\\S]*?"field2"\\s*:\\s*(\\d+) | |
| - Extract percentage: "field_percentage"\\s*:\\s*(\\d+) | |
| - Extract boolean: "field_name"\\s*:\\s*(true|false) | |
| - Extract status: "compliance"\\s*:\\s*"([^"]*)" | |
| Output ONLY valid JSON: | |
| {{ | |
| "test_name": "Field Analysis: {target_field}", | |
| "important_fields": ["field1", "field2", "field3"], | |
| "reasoning": "Explain prioritization and why these fields matter", | |
| "generated_regex": ["regex1", "regex2", "regex3"] | |
| }} | |
| """ | |
| return prompt | |
| def call_llm(self, prompt: str) -> str: | |
| """Call the appropriate LLM based on provider.""" | |
| if self.llm_provider == "ollama": | |
| return self._call_ollama(prompt) | |
| elif self.llm_provider == "openai": | |
| return self._call_openai(prompt) | |
| elif self.llm_provider == "anthropic": | |
| return self._call_anthropic(prompt) | |
| elif self.llm_provider == "huggingface": | |
| return self._call_huggingface(prompt) | |
| else: | |
| raise ValueError(f"Unknown LLM provider: {self.llm_provider}") | |
| def _call_ollama(self, prompt: str) -> str: | |
| """Call the Ollama API to generate a response.""" | |
| try: | |
| payload = { | |
| "model": self.MODEL_NAME, | |
| "prompt": prompt, | |
| "stream": False, | |
| "format": "json" | |
| } | |
| response = requests.post(self.OLLAMA_API_URL, json=payload, timeout=120) | |
| response.raise_for_status() | |
| result = response.json() | |
| return result.get('response', '') | |
| except requests.exceptions.ConnectionError: | |
| raise ConnectionError("Cannot connect to Ollama. Make sure Ollama is running.") | |
| except requests.exceptions.Timeout: | |
| raise TimeoutError("Ollama request timed out.") | |
| except requests.exceptions.RequestException as e: | |
| raise Exception(f"Failed to call Ollama API - {e}") | |
| def _call_openai(self, prompt: str) -> str: | |
| """Call the OpenAI API to generate a response.""" | |
| try: | |
| from openai import OpenAI | |
| client = OpenAI(api_key=self.api_key) | |
| response = client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| {"role": "system", "content": "You are a JSON data analysis assistant. Always respond with valid JSON."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| temperature=0.3, | |
| max_tokens=2000 | |
| ) | |
| return response.choices[0].message.content | |
| except ImportError: | |
| raise ImportError("OpenAI library not installed. Install with: pip install openai") | |
| except Exception as e: | |
| raise Exception(f"Failed to call OpenAI API - {e}") | |
| def _call_anthropic(self, prompt: str) -> str: | |
| """Call the Anthropic API to generate a response.""" | |
| try: | |
| from anthropic import Anthropic | |
| client = Anthropic(api_key=self.api_key) | |
| response = client.messages.create( | |
| model="claude-3-5-sonnet-20241022", | |
| max_tokens=2000, | |
| temperature=0.3, | |
| system="You are a JSON data analysis assistant. Always respond with valid JSON.", | |
| messages=[ | |
| {"role": "user", "content": prompt} | |
| ] | |
| ) | |
| return response.content[0].text | |
| except ImportError: | |
| raise ImportError("Anthropic library not installed. Install with: pip install anthropic") | |
| except Exception as e: | |
| raise Exception(f"Failed to call Anthropic API - {e}") | |
| def _call_huggingface(self, prompt: str) -> str: | |
| """Call the Hugging Face Inference API (FREE) to generate a response.""" | |
| try: | |
| # Use a good free model for text generation | |
| model_name = self.api_key or "mistralai/Mistral-7B-Instruct-v0.3" # Default free model | |
| headers = { | |
| "Authorization": f"Bearer {self.api_key}" if self.api_key else None, | |
| "Content-Type": "application/json" | |
| } | |
| # Remove None values | |
| headers = {k: v for k, v in headers.items() if v is not None} | |
| # Create a properly formatted prompt | |
| full_prompt = f"""<s>[INST]You are a JSON data analysis assistant. Always respond with valid JSON only, no explanations. | |
| {prompt}[/INST]""" | |
| payload = { | |
| "inputs": full_prompt, | |
| "parameters": { | |
| "max_new_tokens": 1000, | |
| "temperature": 0.3, | |
| "return_full_text": False | |
| } | |
| } | |
| api_url = f"https://api-inference.huggingface.co/models/{model_name}" | |
| response = requests.post(api_url, json=payload, headers=headers, timeout=60) | |
| if response.status_code == 503: | |
| raise Exception("Model is loading. Please wait a moment and try again.") | |
| response.raise_for_status() | |
| result = response.json() | |
| # Handle different response formats | |
| if isinstance(result, list) and len(result) > 0: | |
| return result[0].get('generated_text', '') | |
| elif isinstance(result, dict): | |
| return result.get('generated_text', '') | |
| else: | |
| return str(result) | |
| except Exception as e: | |
| raise Exception(f"Failed to call Hugging Face API - {e}") | |
| def parse_llm_output(self, output: str) -> Dict[str, Any]: | |
| """Parse and validate the LLM JSON output.""" | |
| try: | |
| output = output.strip() | |
| if output.startswith("```json"): | |
| output = output[7:] | |
| if output.startswith("```"): | |
| output = output[3:] | |
| if output.endswith("```"): | |
| output = output[:-3] | |
| output = output.strip() | |
| result = json.loads(output) | |
| return result | |
| except json.JSONDecodeError as e: | |
| raise ValueError(f"LLM output is not valid JSON - {e}") | |
| def analyze(self, target_field: str = "rotation_enabled") -> Dict[str, Any]: | |
| """Main analysis function.""" | |
| self.extract_metadata(target_field) | |
| prompt = self.generate_prompt(target_field) | |
| llm_output = self.call_llm(prompt) | |
| result = self.parse_llm_output(llm_output) | |
| return result | |
| def main(): | |
| """Main Streamlit application.""" | |
| st.title("π JSON Field Analyzer") | |
| if IS_HUGGINGFACE: | |
| st.info("π Running on Hugging Face - FREE Hugging Face AI model available! No API key needed.") | |
| st.markdown("**Upload a JSON file and analyze important fields using LLM**") | |
| # Sidebar for configuration | |
| with st.sidebar: | |
| st.header("βοΈ Configuration") | |
| # Show environment info | |
| if IS_ONLINE and not IS_HUGGINGFACE: | |
| st.info("π Running online - Cloud LLM required") | |
| # LLM Provider Selection | |
| # Default to Hugging Face (free) if online, Ollama on local | |
| if IS_ONLINE: | |
| default_index = 3 # Hugging Face (Free) | |
| else: | |
| default_index = 0 # Ollama | |
| llm_provider = st.selectbox( | |
| "π€ LLM Provider", | |
| ["Ollama (Local)", "OpenAI (Cloud)", "Anthropic Claude (Cloud)", "Hugging Face (Free π)"], | |
| index=default_index, | |
| help="Choose your LLM provider - Hugging Face is FREE and no API key needed!" | |
| ) | |
| # Extract provider name and model | |
| if llm_provider == "Ollama (Local)": | |
| provider_name = "ollama" | |
| api_key = None | |
| if IS_ONLINE: | |
| st.error("β Ollama not available on this platform") | |
| st.markdown("**Please select a cloud LLM provider:**") | |
| st.markdown("- OpenAI (Cloud) - GPT-4o Mini") | |
| st.markdown("- Anthropic Claude (Cloud) - Recommended") | |
| else: | |
| st.info("π Using local Ollama") | |
| elif llm_provider == "OpenAI (Cloud)": | |
| provider_name = "openai" | |
| api_key = os.getenv("OPENAI_API_KEY") or st.text_input( | |
| "OpenAI API Key", | |
| type="password", | |
| help="Enter your OpenAI API key (or set OPENAI_API_KEY env var)" | |
| ) | |
| if not api_key: | |
| st.warning("β οΈ Please enter your OpenAI API key") | |
| st.info("π‘ Get key: https://platform.openai.com/api-keys") | |
| elif llm_provider == "Anthropic Claude (Cloud)": | |
| provider_name = "anthropic" | |
| api_key = os.getenv("ANTHROPIC_API_KEY") or st.text_input( | |
| "Anthropic API Key", | |
| type="password", | |
| help="Enter your Anthropic API key (or set ANTHROPIC_API_KEY env var)" | |
| ) | |
| if not api_key: | |
| st.warning("β οΈ Please enter your Anthropic API key") | |
| st.info("π‘ Get key: https://console.anthropic.com") | |
| else: # Hugging Face (Free) | |
| provider_name = "huggingface" | |
| api_key = os.getenv("HUGGINGFACE_API_KEY") or st.text_input( | |
| "Hugging Face API Key (Optional)", | |
| type="password", | |
| help="Optional: Enter your HF token for faster inference (or set HUGGINGFACE_API_KEY env var)" | |
| ) | |
| if not api_key: | |
| st.info("β¨ Using free Hugging Face Inference API - no key needed!") | |
| st.info("π‘ Optional: Add your token in Settings > Secrets for better performance") | |
| st.markdown("---") | |
| target_field = st.text_input( | |
| "Target Field", | |
| value="rotation_enabled", | |
| help="The field you want to analyze (e.g., rotation_enabled, ssl_enforced)" | |
| ) | |
| st.markdown("---") | |
| st.markdown("### π Setup Guides") | |
| with st.expander("π§ Local Ollama Setup"): | |
| st.code(""" | |
| brew install ollama | |
| ollama serve | |
| ollama pull llama3.2:3b | |
| """, language="bash") | |
| with st.expander("βοΈ Cloud API Setup"): | |
| st.markdown(""" | |
| **OpenAI:** | |
| - Get key: https://platform.openai.com/api-keys | |
| - Model: GPT-4o Mini | |
| **Anthropic:** | |
| - Get key: https://console.anthropic.com | |
| - Model: Claude 3.5 Sonnet | |
| """) | |
| # File upload section | |
| st.markdown("---") | |
| st.header("π€ Upload JSON File") | |
| uploaded_file = st.file_uploader( | |
| "Choose a JSON file", | |
| type=['json'], | |
| help="Upload a JSON file to analyze" | |
| ) | |
| # Display file info if uploaded | |
| if uploaded_file is not None: | |
| try: | |
| # Read file contents | |
| content = uploaded_file.read() | |
| data = json.loads(content) | |
| st.success("β File uploaded successfully!") | |
| # Show file info | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.metric("File Size", f"{len(content) / 1024:.2f} KB") | |
| with col2: | |
| st.metric("JSON Structure", "Valid" if isinstance(data, (dict, list)) else "Invalid") | |
| # Analyze button | |
| st.markdown("---") | |
| col1, col2, col3 = st.columns([1, 2, 1]) | |
| with col2: | |
| analyze_button = st.button("π Analyze with LLM", type="primary", use_container_width=True) | |
| # Run analysis | |
| if analyze_button: | |
| # Prevent Ollama usage on online platforms | |
| if provider_name == "ollama" and IS_ONLINE: | |
| st.error("β Ollama is not available on this platform") | |
| st.info("π‘ Please select 'Anthropic Claude (Cloud)' or 'OpenAI (Cloud)' from the sidebar") | |
| # Validate API key for cloud providers (except Hugging Face which is optional) | |
| elif provider_name in ["openai", "anthropic"] and not api_key: | |
| st.error("β Please enter an API key for the selected cloud provider") | |
| else: | |
| try: | |
| with st.spinner(f"Analyzing with {llm_provider}... This may take a moment."): | |
| analyzer = FileAnalyzer(data, llm_provider=provider_name, api_key=api_key) | |
| result = analyzer.analyze(target_field=target_field) | |
| # Display results | |
| st.markdown("---") | |
| st.header("π Analysis Results") | |
| # Main results in columns | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("π€ Important Fields") | |
| for i, field in enumerate(result.get('important_fields', []), 1): | |
| st.markdown(f"**{i}. {field}**") | |
| with col2: | |
| st.subheader("π‘ Reasoning") | |
| st.markdown(f'<div class="highlight">{result.get("reasoning", "N/A")}</div>', | |
| unsafe_allow_html=True) | |
| # Regex patterns | |
| st.markdown("---") | |
| st.subheader("π§ Generated Regex Patterns") | |
| regex_patterns = result.get('generated_regex', []) | |
| for i, pattern in enumerate(regex_patterns, 1): | |
| st.markdown(f"**Pattern {i}:**") | |
| st.code(pattern, language="regex") | |
| # Raw JSON output | |
| with st.expander("π View Raw JSON Output"): | |
| st.json(result) | |
| # Download results | |
| st.markdown("---") | |
| result_json = json.dumps(result, indent=2) | |
| st.download_button( | |
| label="β¬οΈ Download Results", | |
| data=result_json, | |
| file_name=f"analysis_{target_field}.json", | |
| mime="application/json" | |
| ) | |
| except ConnectionError as e: | |
| st.error(f"β {e}") | |
| if provider_name == "ollama": | |
| st.info("π‘ Start Ollama with: `ollama serve`") | |
| else: | |
| st.info("π‘ Check your internet connection and API key") | |
| except TimeoutError as e: | |
| st.error(f"β {e}") | |
| st.info("π‘ The analysis took too long. Try again or use a larger timeout.") | |
| except Exception as e: | |
| st.error(f"β Error during analysis: {e}") | |
| st.exception(e) | |
| except json.JSONDecodeError: | |
| st.error("β Invalid JSON file. Please upload a valid JSON file.") | |
| except Exception as e: | |
| st.error(f"β Error reading file: {e}") | |
| st.exception(e) | |
| else: | |
| # Show example when no file is uploaded | |
| st.info("π Please upload a JSON file to get started") | |
| with st.expander("π How it works"): | |
| st.markdown(""" | |
| ### Workflow: | |
| 1. **Upload**: Upload your JSON file using the file uploader above | |
| 2. **Configure**: Set the target field name in the sidebar (default: `rotation_enabled`) | |
| 3. **Analyze**: Click the "Analyze with LLM" button | |
| 4. **Review**: View the important fields, reasoning, and regex patterns | |
| 5. **Download**: Save the results as JSON | |
| ### What it does: | |
| - Analyzes your JSON structure to detect summary fields, configurations, and objects | |
| - Uses LLM to identify important fields related to your target | |
| - Generates regex patterns for data extraction and validation | |
| - Provides reasoning for why each field is important | |
| ### Use cases: | |
| - AWS compliance validation (KMS rotation, SSL enforcement, etc.) | |
| - Data quality checks | |
| - Automated validation pattern generation | |
| - Field correlation analysis | |
| """) | |
| # Call main function - Streamlit will handle errors | |
| main() | |