bluestpanda commited on
Commit
a9f051b
·
1 Parent(s): 9714df8
Dockerfile CHANGED
@@ -10,6 +10,7 @@ RUN apt-get update && apt-get install -y \
10
 
11
  COPY requirements.txt ./
12
  COPY src/ ./src/
 
13
 
14
  RUN pip3 install -r requirements.txt
15
 
@@ -17,4 +18,4 @@ EXPOSE 8501
17
 
18
  HEALTHCHECK CMD curl --fail http://localhost:8501/_stcore/health
19
 
20
- ENTRYPOINT ["streamlit", "run", "src/streamlit_app.py", "--server.port=8501", "--server.address=0.0.0.0"]
 
10
 
11
  COPY requirements.txt ./
12
  COPY src/ ./src/
13
+ COPY structure_analysis.py ./src/
14
 
15
  RUN pip3 install -r requirements.txt
16
 
 
18
 
19
  HEALTHCHECK CMD curl --fail http://localhost:8501/_stcore/health
20
 
21
+ ENTRYPOINT ["streamlit", "run", "src/app.py", "--server.port=8501", "--server.address=0.0.0.0"]
README.md CHANGED
@@ -1,19 +1,59 @@
1
  ---
2
- title: Paramify Test
3
- emoji: 🚀
4
- colorFrom: red
5
- colorTo: red
6
  sdk: docker
7
  app_port: 8501
8
  tags:
9
  - streamlit
 
 
 
10
  pinned: false
11
- short_description: Streamlit template space
12
  ---
13
 
14
- # Welcome to Streamlit!
15
 
16
- Edit `/src/streamlit_app.py` to customize this app to your heart's desire. :heart:
17
 
18
- If you have any questions, checkout our [documentation](https://docs.streamlit.io) and [community
19
- forums](https://discuss.streamlit.io).
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  ---
2
+ title: Field Correlation Analyzer
3
+ emoji: 🤖
4
+ colorFrom: blue
5
+ colorTo: purple
6
  sdk: docker
7
  app_port: 8501
8
  tags:
9
  - streamlit
10
+ - json
11
+ - analysis
12
+ - field-correlation
13
  pinned: false
14
+ short_description: Analyze JSON files and detect important fields with regex pattern generation
15
  ---
16
 
17
+ # Field Correlation Analyzer
18
 
19
+ Upload a JSON file to analyze important fields and generate regex patterns for field extraction.
20
 
21
+ ## Features
22
+
23
+ - 🔍 **Automatic Field Detection**: Detects summary/aggregate fields automatically
24
+ - 📊 **Hierarchy Analysis**: Classifies data structure by hierarchy levels
25
+ - 🎯 **Smart Recommendations**: Recommends important fields for validation
26
+ - 📝 **Regex Generation**: Generates regex patterns for field extraction
27
+ - 📥 **Export Results**: Download analysis results as JSON
28
+
29
+ ## How to Use
30
+
31
+ 1. Upload a JSON file with structured data
32
+ 2. Set the target field you want to analyze (e.g., `rotation_enabled`)
33
+ 3. Click "Analyze" to process the data
34
+ 4. Review the structure analysis and field recommendations
35
+ 5. Select fields and generate regex patterns
36
+ 6. Download the results as JSON
37
+
38
+ ## Example JSON Structure
39
+
40
+ ```json
41
+ {
42
+ "results": {
43
+ "summary": {
44
+ "total_keys": 13,
45
+ "rotated_keys": 6,
46
+ "rotation_percentage": 46
47
+ },
48
+ "kms_keys": {
49
+ "object": [
50
+ {
51
+ "key_id": "12345",
52
+ "rotation_enabled": true,
53
+ "key_state": "Enabled"
54
+ }
55
+ ]
56
+ }
57
+ }
58
+ }
59
+ ```
requirements.txt CHANGED
@@ -1,6 +1 @@
1
- requests>=2.31.0
2
  streamlit>=1.28.0
3
- pandas>=2.0.0
4
- openai>=1.0.0
5
- anthropic>=0.7.0
6
-
 
 
1
  streamlit>=1.28.0
 
 
 
 
src/streamlit_app.py CHANGED
@@ -1,40 +1,356 @@
1
- import altair as alt
2
- import numpy as np
3
- import pandas as pd
 
 
 
4
  import streamlit as st
 
 
 
 
5
 
6
- """
7
- # Welcome to Streamlit!
 
 
 
 
8
 
9
- Edit `/streamlit_app.py` to customize this app to your heart's desire :heart:.
10
- If you have any questions, checkout our [documentation](https://docs.streamlit.io) and [community
11
- forums](https://discuss.streamlit.io).
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
12
 
13
- In the meantime, below is an example of what you can do with just a few lines of code:
14
- """
15
 
16
- num_points = st.slider("Number of points in spiral", 1, 10000, 1100)
17
- num_turns = st.slider("Number of turns in spiral", 1, 300, 31)
18
-
19
- indices = np.linspace(0, 1, num_points)
20
- theta = 2 * np.pi * num_turns * indices
21
- radius = indices
22
-
23
- x = radius * np.cos(theta)
24
- y = radius * np.sin(theta)
25
-
26
- df = pd.DataFrame({
27
- "x": x,
28
- "y": y,
29
- "idx": indices,
30
- "rand": np.random.randn(num_points),
31
- })
32
-
33
- st.altair_chart(alt.Chart(df, height=700, width=700)
34
- .mark_point(filled=True)
35
- .encode(
36
- x=alt.X("x", axis=None),
37
- y=alt.Y("y", axis=None),
38
- color=alt.Color("idx", legend=None, scale=alt.Scale()),
39
- size=alt.Size("rand", legend=None, scale=alt.Scale(range=[1, 150])),
40
- ))
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ Hugging Face Streamlit App for LLM Field Analyzer
4
+ Upload a JSON file and analyze important fields with pattern generation.
5
+ """
6
+
7
  import streamlit as st
8
+ import json
9
+ from pathlib import Path
10
+ from typing import Dict, Any
11
+ import io
12
 
13
+ # Page configuration
14
+ st.set_page_config(
15
+ page_title="Field Correlation Analyzer",
16
+ page_icon="🤖",
17
+ layout="wide"
18
+ )
19
 
20
+ # Import our modules
21
+ try:
22
+ from structure_analysis import (
23
+ detect_summary_fields,
24
+ classify_data_structure,
25
+ get_hierarchy_summary
26
+ )
27
+ except ImportError:
28
+ st.error("⚠️ structure_analysis.py not found. Make sure all files are uploaded.")
29
+ st.stop()
30
+
31
+ # Session state
32
+ if 'analysis_result' not in st.session_state:
33
+ st.session_state.analysis_result = None
34
+
35
+
36
+ def analyze_with_llm(data: Dict[str, Any], target_field: str = "rotation_enabled") -> Dict[str, Any]:
37
+ """
38
+ Analyze data and generate a prompt for LLM analysis.
39
+ Returns structured analysis without requiring Ollama.
40
+ """
41
+ # Detect summary fields
42
+ summary_fields = detect_summary_fields(data)
43
+ classification = classify_data_structure(data)
44
+ hierarchy_summary = get_hierarchy_summary(data)
45
+
46
+ # Extract samples
47
+ sample_object = {}
48
+ if 'results' in data:
49
+ for section in data['results'].values():
50
+ if isinstance(section, list) and len(section) > 0:
51
+ sample_object = section[0]
52
+ break
53
+ elif isinstance(section, dict):
54
+ for key, value in section.items():
55
+ if isinstance(value, list) and len(value) > 0:
56
+ sample_object = value[0] if isinstance(value[0], dict) else {}
57
+ break
58
+
59
+ summary_sample = data.get('results', {}).get('summary', {}) or data.get('summary', {})
60
+
61
+ # Count objects with target field
62
+ def count_objects_with_field(obj, field_name):
63
+ count = 0
64
+ if isinstance(obj, dict):
65
+ if field_name in obj:
66
+ count += 1
67
+ for v in obj.values():
68
+ count += count_objects_with_field(v, field_name)
69
+ elif isinstance(obj, list):
70
+ for item in obj:
71
+ count += count_objects_with_field(item, field_name)
72
+ return count
73
+
74
+ total_objects = count_objects_with_field(data, target_field)
75
+
76
+ # Generate analysis
77
+ analysis = {
78
+ "summary_fields_detected": summary_fields[:10],
79
+ "classification": classification,
80
+ "hierarchy_summary": hierarchy_summary,
81
+ "total_objects": total_objects,
82
+ "sample_object": sample_object,
83
+ "summary_sample": summary_sample,
84
+ "recommended_fields": []
85
+ }
86
+
87
+ # Recommend fields based on priority
88
+ if summary_fields:
89
+ analysis["recommended_fields"].extend(summary_fields[:3])
90
+ if classification.get('config_fields'):
91
+ analysis["recommended_fields"].extend(classification['config_fields'][:2])
92
+ if sample_object:
93
+ analysis["recommended_fields"].extend([k for k in sample_object.keys() if target_field in k.lower()])
94
+
95
+ return analysis
96
+
97
+
98
+ def generate_regex_patterns(field_names: list, data_sample: dict, summary_sample: dict) -> list:
99
+ """Generate regex patterns for given fields."""
100
+ patterns = []
101
+
102
+ for field in field_names:
103
+ # Try to find the field value type
104
+ field_lower = field.lower()
105
+
106
+ # Check in summary first
107
+ if 'summary' in str(field):
108
+ field_name = field.split('.')[-1]
109
+ # Boolean pattern
110
+ if field_name in summary_sample and isinstance(summary_sample.get(field_name), bool):
111
+ patterns.append(f'"summary.{field_name}"\\s*:\\s*(true|false)')
112
+ # Number pattern
113
+ elif isinstance(summary_sample.get(field_name), (int, float)):
114
+ patterns.append(f'"summary.{field_name}"\\s*:\\s*(\\d+)')
115
+ # Check in object
116
+ elif field in data_sample:
117
+ value = data_sample[field]
118
+ if isinstance(value, bool):
119
+ patterns.append(f'"{field}"\\s*:\\s*(true|false)')
120
+ elif isinstance(value, (int, float)):
121
+ patterns.append(f'"{field}"\\s*:\\s*(\\d+)')
122
+ elif isinstance(value, str):
123
+ patterns.append(f'"{field}"\\s*:\\s*"([^"]*)"')
124
+ else:
125
+ # Generic pattern based on field name
126
+ if 'percentage' in field_lower or 'count' in field_lower or 'total' in field_lower:
127
+ patterns.append(f'"{field}"\\s*:\\s*(\\d+)')
128
+ elif 'enabled' in field_lower or 'enforced' in field_lower:
129
+ patterns.append(f'"{field}"\\s*:\\s*(true|false)')
130
+ else:
131
+ patterns.append(f'"{field}"\\s*:\\s*"([^"]*)"')
132
+
133
+ return patterns
134
+
135
+
136
+ def main():
137
+ """Main application."""
138
+ st.title("🤖 Field Correlation Analyzer")
139
+ st.markdown("Upload a JSON file to analyze important fields and generate regex patterns")
140
+
141
+ # File upload
142
+ uploaded_file = st.file_uploader(
143
+ "Choose a JSON file",
144
+ type=['json'],
145
+ help="Upload a JSON file with structured data"
146
+ )
147
+
148
+ if uploaded_file is not None:
149
+ # Read and parse JSON
150
+ try:
151
+ content = uploaded_file.read()
152
+ data = json.loads(content)
153
+
154
+ st.success("✅ File loaded successfully!")
155
+
156
+ # Sidebar for settings
157
+ with st.sidebar:
158
+ st.header("⚙️ Settings")
159
+
160
+ # Target field input
161
+ target_field = st.text_input(
162
+ "Target Field",
163
+ value="rotation_enabled",
164
+ help="The field you want to analyze"
165
+ )
166
+
167
+ # Analyze button
168
+ if st.button("🔍 Analyze", type="primary"):
169
+ with st.spinner("Analyzing data structure..."):
170
+ analysis_result = analyze_with_llm(data, target_field)
171
+ st.session_state.analysis_result = analysis_result
172
+ st.session_state.data = data
173
+
174
+ # Display results if available
175
+ if st.session_state.analysis_result:
176
+ analysis = st.session_state.analysis_result
177
+
178
+ # Summary metrics
179
+ col1, col2, col3, col4 = st.columns(4)
180
+ with col1:
181
+ st.metric("Summary Fields", len(analysis['summary_fields_detected']))
182
+ with col2:
183
+ st.metric("Total Objects", analysis['total_objects'])
184
+ with col3:
185
+ st.metric("Has Summary", "Yes" if analysis['hierarchy_summary']['has_summary'] else "No")
186
+ with col4:
187
+ st.metric("Config Fields", len(analysis['classification'].get('config_fields', [])))
188
+
189
+ st.markdown("---")
190
+
191
+ # Create tabs
192
+ tab1, tab2, tab3, tab4 = st.tabs([
193
+ "📊 Structure Analysis",
194
+ "🎯 Field Recommendations",
195
+ "📝 Generated Patterns",
196
+ "📄 Raw Data"
197
+ ])
198
+
199
+ with tab1:
200
+ st.subheader("Data Hierarchy")
201
+
202
+ # Summary fields
203
+ if analysis['summary_fields_detected']:
204
+ st.markdown("#### Level 1: Summary/Aggregate Fields (Highest Priority)")
205
+ for field in analysis['summary_fields_detected'][:10]:
206
+ st.write(f"✓ `{field}`")
207
+
208
+ # Config fields
209
+ config_fields = analysis['classification'].get('config_fields', [])
210
+ if config_fields:
211
+ st.markdown("#### Level 2: Configuration/Compliance Fields")
212
+ for field in config_fields[:10]:
213
+ st.write(f"✓ `{field}`")
214
+
215
+ # Object arrays
216
+ object_arrays = analysis['classification'].get('object_arrays', [])
217
+ if object_arrays:
218
+ st.markdown("#### Level 3: Object Arrays")
219
+ for field in object_arrays[:5]:
220
+ st.write(f"✓ `{field}`")
221
+
222
+ # Show sample data
223
+ with st.expander("📋 View Summary Data Sample"):
224
+ st.json(analysis['summary_sample'])
225
+
226
+ with st.expander("📋 View Object Data Sample"):
227
+ st.json(analysis['sample_object'])
228
+
229
+ with tab2:
230
+ st.subheader("Recommended Fields for Analysis")
231
+
232
+ if analysis['recommended_fields']:
233
+ st.info("These fields are recommended based on the data hierarchy and target field.")
234
+
235
+ # Let user select fields
236
+ selected_fields = st.multiselect(
237
+ "Select fields to generate patterns for:",
238
+ analysis['recommended_fields'],
239
+ default=analysis['recommended_fields'][:3]
240
+ )
241
+
242
+ if selected_fields and st.button("Generate Patterns"):
243
+ patterns = generate_regex_patterns(
244
+ selected_fields,
245
+ analysis['sample_object'],
246
+ analysis['summary_sample']
247
+ )
248
+
249
+ st.session_state.generated_patterns = {
250
+ 'fields': selected_fields,
251
+ 'patterns': patterns
252
+ }
253
+ else:
254
+ st.warning("No recommended fields found.")
255
+
256
+ with tab3:
257
+ if 'generated_patterns' in st.session_state:
258
+ patterns_data = st.session_state.generated_patterns
259
+
260
+ st.subheader("Generated Regex Patterns")
261
+
262
+ # Show patterns
263
+ for i, (field, pattern) in enumerate(zip(patterns_data['fields'], patterns_data['patterns']), 1):
264
+ st.markdown(f"**Pattern {i}: {field}**")
265
+ st.code(pattern, language="regex", line_numbers=False)
266
+ st.markdown("---")
267
+
268
+ # Copy to clipboard
269
+ all_patterns = "\n".join(patterns_data['patterns'])
270
+ st.text_area(
271
+ "All Patterns (copy this):",
272
+ all_patterns,
273
+ height=100
274
+ )
275
+
276
+ # JSON export
277
+ export_data = {
278
+ "test_name": "Field Analysis",
279
+ "important_fields": patterns_data['fields'],
280
+ "reasoning": "Fields identified using hierarchical analysis prioritizing summary/aggregate fields",
281
+ "generated_regex": patterns_data['patterns']
282
+ }
283
+
284
+ st.download_button(
285
+ label="📥 Download as JSON",
286
+ data=json.dumps(export_data, indent=2),
287
+ file_name="analysis_result.json",
288
+ mime="application/json"
289
+ )
290
+ else:
291
+ st.info("👆 Go to 'Field Recommendations' tab to select fields and generate patterns.")
292
+
293
+ with tab4:
294
+ st.subheader("Raw Data Structure")
295
+
296
+ # Full data viewer
297
+ st.json(data)
298
+
299
+ # Download raw data
300
+ st.download_button(
301
+ label="📥 Download Raw Data",
302
+ data=json.dumps(data, indent=2),
303
+ file_name="raw_data.json",
304
+ mime="application/json"
305
+ )
306
+
307
+ except json.JSONDecodeError as e:
308
+ st.error(f"❌ Invalid JSON file: {e}")
309
+ except Exception as e:
310
+ st.error(f"❌ Error processing file: {e}")
311
+
312
+ else:
313
+ # Show example when no file uploaded
314
+ st.info("👆 Please upload a JSON file to begin analysis")
315
+
316
+ with st.expander("📖 How to use"):
317
+ st.markdown("""
318
+ **Steps:**
319
+ 1. Upload a JSON file with structured data
320
+ 2. Set the target field you want to analyze (e.g., `rotation_enabled`)
321
+ 3. Click "Analyze" to process the data
322
+ 4. Review the structure analysis and field recommendations
323
+ 5. Select fields and generate regex patterns
324
+ 6. Download the results as JSON
325
+
326
+ **What this tool does:**
327
+ - Detects summary/aggregate fields automatically
328
+ - Classifies data structure by hierarchy levels
329
+ - Recommends important fields for validation
330
+ - Generates regex patterns for field extraction
331
+ """)
332
+
333
+ with st.expander("📋 Example JSON Structure"):
334
+ example = {
335
+ "results": {
336
+ "summary": {
337
+ "total_keys": 13,
338
+ "rotated_keys": 6,
339
+ "rotation_percentage": 46
340
+ },
341
+ "kms_keys": {
342
+ "object": [
343
+ {
344
+ "key_id": "12345",
345
+ "rotation_enabled": True,
346
+ "key_state": "Enabled"
347
+ }
348
+ ]
349
+ }
350
+ }
351
+ }
352
+ st.json(example)
353
 
 
 
354
 
355
+ if __name__ == "__main__":
356
+ main()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/structure_analysis.py ADDED
@@ -0,0 +1,99 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Structure analysis utilities for detecting fields in JSON data.
3
+ """
4
+
5
+ from typing import Dict, Any, List
6
+
7
+
8
+ def detect_summary_fields(data: Dict[str, Any]) -> List[str]:
9
+ """
10
+ Detect summary/aggregate fields in the data structure.
11
+ Looks for fields in 'summary' sections or aggregate fields.
12
+ """
13
+ summary_fields = []
14
+
15
+ # Check for 'summary' in results
16
+ if 'results' in data and isinstance(data['results'], dict):
17
+ if 'summary' in data['results']:
18
+ summary_data = data['results']['summary']
19
+ if isinstance(summary_data, dict):
20
+ summary_fields.extend([f"summary.{key}" for key in summary_data.keys()])
21
+
22
+ # Check for top-level 'summary'
23
+ if 'summary' in data and isinstance(data['summary'], dict):
24
+ summary_fields.extend([f"summary.{key}" for key in data['summary'].keys()])
25
+
26
+ # Look for aggregate patterns in field names
27
+ def find_aggregate_fields(obj, path=""):
28
+ if isinstance(obj, dict):
29
+ for key, value in obj.items():
30
+ current_path = f"{path}.{key}" if path else key
31
+
32
+ # Check for aggregate patterns
33
+ if any(pattern in key.lower() for pattern in ['total', 'count', 'sum', 'average', 'avg', 'percent', 'percentage']):
34
+ if isinstance(value, (int, float)):
35
+ summary_fields.append(current_path)
36
+
37
+ # Recurse
38
+ find_aggregate_fields(value, current_path)
39
+ elif isinstance(obj, list) and len(obj) > 0:
40
+ find_aggregate_fields(obj[0], path)
41
+
42
+ find_aggregate_fields(data)
43
+
44
+ # Remove duplicates and return
45
+ return list(set(summary_fields))
46
+
47
+
48
+ def classify_data_structure(data: Dict[str, Any]) -> Dict[str, Any]:
49
+ """
50
+ Classify the data structure and return categorization.
51
+ """
52
+ config_fields = []
53
+ object_arrays = []
54
+
55
+ def classify_recursive(obj, path=""):
56
+ if isinstance(obj, dict):
57
+ for key, value in obj.items():
58
+ current_path = f"{path}.{key}" if path else key
59
+
60
+ # Check for config/compliance fields
61
+ if any(pattern in key.lower() for pattern in ['config', 'compliance', 'enabled', 'enforced', 'policy']):
62
+ config_fields.append(current_path)
63
+
64
+ # Check for object arrays
65
+ if isinstance(value, list) and len(value) > 0 and isinstance(value[0], dict):
66
+ object_arrays.append(current_path)
67
+
68
+ # Recurse
69
+ classify_recursive(value, current_path)
70
+ elif isinstance(obj, list) and len(obj) > 0:
71
+ classify_recursive(obj[0], path)
72
+
73
+ classify_recursive(data)
74
+
75
+ return {
76
+ 'config_fields': config_fields,
77
+ 'object_arrays': object_arrays
78
+ }
79
+
80
+
81
+ def get_hierarchy_summary(data: Dict[str, Any]) -> Dict[str, Any]:
82
+ """
83
+ Get a summary of the data hierarchy.
84
+ """
85
+ has_summary = False
86
+
87
+ # Check for summary sections
88
+ if 'results' in data and isinstance(data['results'], dict):
89
+ if 'summary' in data['results']:
90
+ has_summary = True
91
+
92
+ if 'summary' in data:
93
+ has_summary = True
94
+
95
+ return {
96
+ 'has_summary': has_summary,
97
+ 'levels': 2 if has_summary else 1
98
+ }
99
+
structure_analysis.py ADDED
@@ -0,0 +1,98 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Structure analysis utilities for detecting fields in JSON data.
3
+ """
4
+
5
+ from typing import Dict, Any, List
6
+
7
+
8
+ def detect_summary_fields(data: Dict[str, Any]) -> List[str]:
9
+ """
10
+ Detect summary/aggregate fields in the data structure.
11
+ Looks for fields in 'summary' sections or aggregate fields.
12
+ """
13
+ summary_fields = []
14
+
15
+ # Check for 'summary' in results
16
+ if 'results' in data and isinstance(data['results'], dict):
17
+ if 'summary' in data['results']:
18
+ summary_data = data['results']['summary']
19
+ if isinstance(summary_data, dict):
20
+ summary_fields.extend([f"summary.{key}" for key in summary_data.keys()])
21
+
22
+ # Check for top-level 'summary'
23
+ if 'summary' in data and isinstance(data['summary'], dict):
24
+ summary_fields.extend([f"summary.{key}" for key in data['summary'].keys()])
25
+
26
+ # Look for aggregate patterns in field names
27
+ def find_aggregate_fields(obj, path=""):
28
+ if isinstance(obj, dict):
29
+ for key, value in obj.items():
30
+ current_path = f"{path}.{key}" if path else key
31
+
32
+ # Check for aggregate patterns
33
+ if any(pattern in key.lower() for pattern in ['total', 'count', 'sum', 'average', 'avg', 'percent', 'percentage']):
34
+ if isinstance(value, (int, float)):
35
+ summary_fields.append(current_path)
36
+
37
+ # Recurse
38
+ find_aggregate_fields(value, current_path)
39
+ elif isinstance(obj, list) and len(obj) > 0:
40
+ find_aggregate_fields(obj[0], path)
41
+
42
+ find_aggregate_fields(data)
43
+
44
+ # Remove duplicates and return
45
+ return list(set(summary_fields))
46
+
47
+
48
+ def classify_data_structure(data: Dict[str, Any]) -> Dict[str, Any]:
49
+ """
50
+ Classify the data structure and return categorization.
51
+ """
52
+ config_fields = []
53
+ object_arrays = []
54
+
55
+ def classify_recursive(obj, path=""):
56
+ if isinstance(obj, dict):
57
+ for key, value in obj.items():
58
+ current_path = f"{path}.{key}" if path else key
59
+
60
+ # Check for config/compliance fields
61
+ if any(pattern in key.lower() for pattern in ['config', 'compliance', 'enabled', 'enforced', 'policy']):
62
+ config_fields.append(current_path)
63
+
64
+ # Check for object arrays
65
+ if isinstance(value, list) and len(value) > 0 and isinstance(value[0], dict):
66
+ object_arrays.append(current_path)
67
+
68
+ # Recurse
69
+ classify_recursive(value, current_path)
70
+ elif isinstance(obj, list) and len(obj) > 0:
71
+ classify_recursive(obj[0], path)
72
+
73
+ classify_recursive(data)
74
+
75
+ return {
76
+ 'config_fields': config_fields,
77
+ 'object_arrays': object_arrays
78
+ }
79
+
80
+
81
+ def get_hierarchy_summary(data: Dict[str, Any]) -> Dict[str, Any]:
82
+ """
83
+ Get a summary of the data hierarchy.
84
+ """
85
+ has_summary = False
86
+
87
+ # Check for summary sections
88
+ if 'results' in data and isinstance(data['results'], dict):
89
+ if 'summary' in data['results']:
90
+ has_summary = True
91
+
92
+ if 'summary' in data:
93
+ has_summary = True
94
+
95
+ return {
96
+ 'has_summary': has_summary,
97
+ 'levels': 2 if has_summary else 1
98
+ }