|
|
"""
|
|
|
Test to verify JSON structure is correct after fixes
|
|
|
"""
|
|
|
import json
|
|
|
|
|
|
def check_json_structure(json_file_path):
|
|
|
"""
|
|
|
Verify the JSON structure matches requirements:
|
|
|
1. No duplicate maintenanceActions or futureFaultsPdf
|
|
|
2. They should be INSIDE aiVerdict only
|
|
|
3. No breakerId or operator fields
|
|
|
4. Proper aiVerdict field ordering (related fields consecutive)
|
|
|
5. Coil descriptions populated
|
|
|
6. Overall healthScore averaging R, Y, B phases
|
|
|
"""
|
|
|
with open(json_file_path, 'r', encoding='utf-8') as f:
|
|
|
data = json.load(f)
|
|
|
|
|
|
issues = []
|
|
|
|
|
|
print(f"\n{'='*60}")
|
|
|
print("TOP LEVEL STRUCTURE CHECK")
|
|
|
print(f"{'='*60}")
|
|
|
|
|
|
|
|
|
if "healthScore" in data:
|
|
|
print(f"β
Overall healthScore found: {data['healthScore']}")
|
|
|
|
|
|
|
|
|
phase_scores = []
|
|
|
for phase in ['r', 'y', 'b']:
|
|
|
if phase in data and "healthScore" in data[phase]:
|
|
|
phase_scores.append(data[phase]["healthScore"])
|
|
|
|
|
|
if phase_scores:
|
|
|
calculated_avg = round(sum(phase_scores) / len(phase_scores), 1)
|
|
|
if abs(data["healthScore"] - calculated_avg) < 0.2:
|
|
|
print(f"β
healthScore ({data['healthScore']}) is correct average of R={phase_scores[0] if len(phase_scores)>0 else 'N/A'}, Y={phase_scores[1] if len(phase_scores)>1 else 'N/A'}, B={phase_scores[2] if len(phase_scores)>2 else 'N/A'}")
|
|
|
else:
|
|
|
issues.append(f"β οΈ healthScore ({data['healthScore']}) doesn't match calculated average ({calculated_avg})")
|
|
|
else:
|
|
|
issues.append("β Overall healthScore NOT found at top level")
|
|
|
|
|
|
|
|
|
if "breakerId" in data:
|
|
|
issues.append("β breakerId found at top level (should be removed)")
|
|
|
else:
|
|
|
print(f"β
No breakerId field at top level")
|
|
|
|
|
|
if "operator" in data:
|
|
|
issues.append("β operator found at top level (should be removed)")
|
|
|
else:
|
|
|
print(f"β
No operator field at top level")
|
|
|
|
|
|
|
|
|
for phase in ['r', 'y', 'b']:
|
|
|
if phase not in data:
|
|
|
continue
|
|
|
|
|
|
phase_data = data[phase]
|
|
|
print(f"\n{'='*60}")
|
|
|
print(f"Checking Phase: {phase.upper()}")
|
|
|
print(f"{'='*60}")
|
|
|
|
|
|
|
|
|
if "breakerId" in phase_data:
|
|
|
issues.append(f"β [{phase.upper()}] breakerId found (should be removed)")
|
|
|
else:
|
|
|
print(f"β
[{phase.upper()}] No breakerId field")
|
|
|
|
|
|
if "operator" in phase_data:
|
|
|
issues.append(f"β [{phase.upper()}] operator found (should be removed)")
|
|
|
else:
|
|
|
print(f"β
[{phase.upper()}] No operator field")
|
|
|
|
|
|
|
|
|
if "maintenanceActions" in phase_data and "aiVerdict" in phase_data:
|
|
|
issues.append(f"β [{phase.upper()}] maintenanceActions at top level (should be ONLY in aiVerdict)")
|
|
|
|
|
|
if "futureFaultsPdf" in phase_data and "aiVerdict" in phase_data:
|
|
|
issues.append(f"β [{phase.upper()}] futureFaultsPdf at top level (should be ONLY in aiVerdict)")
|
|
|
|
|
|
|
|
|
if "aiVerdict" in phase_data:
|
|
|
ai_verdict = phase_data["aiVerdict"]
|
|
|
|
|
|
|
|
|
if "maintenanceActions" not in ai_verdict:
|
|
|
issues.append(f"β [{phase.upper()}] maintenanceActions NOT in aiVerdict")
|
|
|
else:
|
|
|
print(f"β
[{phase.upper()}] maintenanceActions found in aiVerdict")
|
|
|
|
|
|
if "futureFaultsPdf" not in ai_verdict:
|
|
|
issues.append(f"β [{phase.upper()}] futureFaultsPdf NOT in aiVerdict")
|
|
|
else:
|
|
|
print(f"β
[{phase.upper()}] futureFaultsPdf found in aiVerdict")
|
|
|
|
|
|
|
|
|
actual_keys = list(ai_verdict.keys())
|
|
|
print(f" Field order: {actual_keys[:10]}")
|
|
|
|
|
|
|
|
|
core_fields = ["confidence", "faultLabel", "severity", "severityReason", "rulEstimate", "uncertainty"]
|
|
|
core_indices = []
|
|
|
for field in core_fields:
|
|
|
if field in actual_keys:
|
|
|
core_indices.append(actual_keys.index(field))
|
|
|
|
|
|
if core_indices:
|
|
|
|
|
|
if core_indices == list(range(min(core_indices), min(core_indices) + len(core_indices))):
|
|
|
print(f"β
[{phase.upper()}] Core verdict fields are CONSECUTIVE (indices: {core_indices})")
|
|
|
else:
|
|
|
issues.append(f"β οΈ [{phase.upper()}] Core verdict fields are NOT consecutive (indices: {core_indices})")
|
|
|
|
|
|
|
|
|
if "maintenanceActions" in ai_verdict:
|
|
|
actions = ai_verdict["maintenanceActions"]
|
|
|
if actions and len(actions) > 0:
|
|
|
first_action = actions[0]
|
|
|
if "actions" in first_action and "priority" in first_action and "color" in first_action:
|
|
|
print(f"β
[{phase.upper()}] maintenanceActions format is CORRECT")
|
|
|
else:
|
|
|
issues.append(f"β [{phase.upper()}] maintenanceActions has wrong format")
|
|
|
|
|
|
if "futureFaultsPdf" in ai_verdict:
|
|
|
faults = ai_verdict["futureFaultsPdf"]
|
|
|
if faults and len(faults) > 0:
|
|
|
first_fault = faults[0]
|
|
|
required_keys = ["id", "fault", "probability", "timeline", "evidence", "color"]
|
|
|
missing = [k for k in required_keys if k not in first_fault]
|
|
|
if not missing:
|
|
|
print(f"β
[{phase.upper()}] futureFaultsPdf format is CORRECT")
|
|
|
else:
|
|
|
issues.append(f"β [{phase.upper()}] futureFaultsPdf missing keys: {missing}")
|
|
|
|
|
|
|
|
|
if "phaseWiseAnalysis" in phase_data:
|
|
|
phases_list = phase_data["phaseWiseAnalysis"]
|
|
|
has_descriptions = False
|
|
|
for p in phases_list:
|
|
|
if "waveformAnalysis" in p and "coilAnalysis" in p["waveformAnalysis"]:
|
|
|
coils = p["waveformAnalysis"]["coilAnalysis"]
|
|
|
for coil_name in ["closeCoil", "tripCoil1", "tripCoil2"]:
|
|
|
if coil_name in coils and "description" in coils[coil_name]:
|
|
|
desc = coils[coil_name]["description"]
|
|
|
if desc and len(desc) > 0:
|
|
|
has_descriptions = True
|
|
|
break
|
|
|
if has_descriptions:
|
|
|
break
|
|
|
|
|
|
if has_descriptions:
|
|
|
print(f"β
[{phase.upper()}] Coil descriptions are populated")
|
|
|
else:
|
|
|
issues.append(f"β οΈ [{phase.upper()}] Coil descriptions appear to be empty")
|
|
|
|
|
|
print()
|
|
|
|
|
|
|
|
|
print(f"\n{'='*60}")
|
|
|
print("FINAL SUMMARY")
|
|
|
print(f"{'='*60}")
|
|
|
if issues:
|
|
|
print(f"β {len(issues)} ISSUE(S) FOUND:")
|
|
|
for issue in issues:
|
|
|
print(f" {issue}")
|
|
|
else:
|
|
|
print("β
ALL CHECKS PASSED!")
|
|
|
print("β
Overall healthScore present and correct")
|
|
|
print("β
No duplicates found")
|
|
|
print("β
Correct field placement")
|
|
|
print("β
Core verdict fields are consecutive")
|
|
|
print("β
No breakerId or operator fields")
|
|
|
print("β
Coil descriptions populated")
|
|
|
print(f"{'='*60}")
|
|
|
|
|
|
return len(issues) == 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
import sys
|
|
|
|
|
|
|
|
|
json_path = "c:/Users/rkhanke/Downloads/dcrm_three_phase_1764926929196.json"
|
|
|
|
|
|
if len(sys.argv) > 1:
|
|
|
json_path = sys.argv[1]
|
|
|
|
|
|
print(f"Checking: {json_path}\n")
|
|
|
check_json_structure(json_path)
|
|
|
|
|
|
|