Spaces:
Runtime error
Runtime error
| """Evaluate the rule engine + LLM pipeline against the full DCASE 2025 dataset. | |
| Loads cached features (1250 clips), reconstructs AudioFeatures, runs | |
| rank_candidates, and measures diagnostic accuracy per machine type. | |
| """ | |
| import joblib | |
| import collections | |
| from audio_analyzer import AudioFeatures | |
| from fault_rules import rank_candidates, RULES | |
| FEATURE_NAMES = [ | |
| "duration_s", "rms_db", "rms_variance", "zero_crossing_rate", | |
| "spectral_centroid_hz", "spectral_bandwidth_hz", "spectral_rolloff_hz", | |
| "dominant_frequency_hz", "harmonic_ratio", "onset_rate_per_sec", | |
| "has_regular_pattern", "pattern_interval_ms", "peak_db", "anomaly_score", | |
| ] | |
| MACHINE_TO_APPLIANCE = { | |
| "fan": "Electric fan", "pump": "Dishwasher", "slider": "Power drill", | |
| "ToyCar": "Car engine", "ToyTrain": "Tumble dryer", | |
| "gearbox": "Electric motor (generic)", "bearing": "Electric motor (generic)", | |
| "valve": "Refrigerator/Freezer", | |
| } | |
| def load_dcase(): | |
| cache = joblib.load("features_cache_r1.5_s42.joblib") | |
| X, y, machines = cache["X"], cache["y"], cache["machines"] | |
| clips = [] | |
| for i in range(len(X)): | |
| feat_dict = {k: float(X[i][j]) for j, k in enumerate(FEATURE_NAMES)} | |
| feat_dict["has_regular_pattern"] = bool(round(feat_dict["has_regular_pattern"])) | |
| features = AudioFeatures(**feat_dict, signal_present=True) | |
| appliance = MACHINE_TO_APPLIANCE.get(machines[i], "Electric motor (generic)") | |
| clips.append({ | |
| "machine": machines[i], | |
| "label": int(y[i]), # 0=normal, 1=anomaly | |
| "appliance": appliance, | |
| "features": features, | |
| }) | |
| return clips | |
| def evaluate(clips): | |
| total = len(clips) | |
| fired_count = 0 | |
| inconclusive_count = 0 | |
| machine_stats = collections.defaultdict(lambda: {"total": 0, "fired": 0, "anomaly_fired": 0, "normal_inconclusive": 0}) | |
| fault_names = collections.Counter() | |
| for clip in clips: | |
| cands = rank_candidates(clip["features"], clip["appliance"]) | |
| top = cands[0] | |
| is_anomaly = clip["label"] == 1 | |
| machine = clip["machine"] | |
| machine_stats[machine]["total"] += 1 | |
| if top.name != "Inconclusive": | |
| fired_count += 1 | |
| machine_stats[machine]["fired"] += 1 | |
| if is_anomaly: | |
| machine_stats[machine]["anomaly_fired"] += 1 | |
| fault_names[top.name] += 1 | |
| else: | |
| inconclusive_count += 1 | |
| if not is_anomaly: | |
| machine_stats[machine]["normal_inconclusive"] += 1 | |
| print(f"Total clips: {total}") | |
| print(f"Rule engine fired: {fired_count} ({fired_count/total*100:.1f}%)") | |
| print(f"Inconclusive: {inconclusive_count} ({inconclusive_count/total*100:.1f}%)") | |
| print() | |
| print("=== Per-machine breakdown ===") | |
| print(f"{'Machine':<12} {'N':>4} {'Fired':>6} {'Hit%':>6} {'Anom Fired':>11} {'Normal Inconcl':>14}") | |
| for machine in sorted(machine_stats.keys()): | |
| s = machine_stats[machine] | |
| n_anom = sum(1 for c in clips if c["machine"] == machine and c["label"] == 1) | |
| n_norm = sum(1 for c in clips if c["machine"] == machine and c["label"] == 0) | |
| anom_fired = s["anomaly_fired"] | |
| print(f"{machine:<12} {s['total']:>4} {s['fired']:>6} {s['fired']/s['total']*100:>5.1f}% {anom_fired:>5}/{n_anom:<5} {s['normal_inconclusive']:>5}/{n_norm:<5}") | |
| print() | |
| print("=== Top fault names ===") | |
| for name, count in fault_names.most_common(15): | |
| print(f" {name:<40} {count:>4}") | |
| # Check: for anomalous clips, what % get a specific fault (not Inconclusive)? | |
| anom_clips = [c for c in clips if c["label"] == 1] | |
| anom_fired = sum(1 for c in anom_clips if rank_candidates(c["features"], c["appliance"])[0].name != "Inconclusive") | |
| print(f"\nAnomalous clips: {len(anom_clips)}") | |
| print(f" Rule engine fires: {anom_fired} ({anom_fired/len(anom_clips)*100:.1f}%)") | |
| print(f" Missed (Inconclusive): {len(anom_clips) - anom_fired} ({(len(anom_clips)-anom_fired)/len(anom_clips)*100:.1f}%)") | |
| # Check: for normal clips, what % are correctly Inconclusive? | |
| norm_clips = [c for c in clips if c["label"] == 0] | |
| norm_inconcl = sum(1 for c in norm_clips if rank_candidates(c["features"], c["appliance"])[0].name == "Inconclusive") | |
| print(f"\nNormal clips: {len(norm_clips)}") | |
| print(f" Correctly Inconclusive: {norm_inconcl} ({norm_inconcl/len(norm_clips)*100:.1f}%)") | |
| print(f" False positive (fired): {len(norm_clips) - norm_inconcl} ({(len(norm_clips)-norm_inconcl)/len(norm_clips)*100:.1f}%)") | |
| # Show some false positive examples | |
| print("\n=== False positive examples (normal clips where rules fire) ===") | |
| shown = 0 | |
| for clip in clips: | |
| if clip["label"] == 0 and shown < 5: | |
| cands = rank_candidates(clip["features"], clip["appliance"]) | |
| if cands[0].name != "Inconclusive": | |
| f = clip["features"] | |
| print(f" [{clip['machine']}] {cands[0].name} (w={cands[0].weight:.2f})") | |
| print(f" centroid={f.spectral_centroid_hz:.0f}Hz zcr={f.zero_crossing_rate:.3f} " | |
| f"onset={f.onset_rate_per_sec:.1f}/s pattern={f.has_regular_pattern} " | |
| f"dom_freq={f.dominant_frequency_hz:.0f}Hz rms_var={f.rms_variance:.4f}") | |
| shown += 1 | |
| # Show some missed anomaly examples | |
| print("\n=== Missed anomalies (anomalous clips with Inconclusive) ===") | |
| shown = 0 | |
| for clip in clips: | |
| if clip["label"] == 1 and shown < 5: | |
| cands = rank_candidates(clip["features"], clip["appliance"]) | |
| if cands[0].name == "Inconclusive": | |
| f = clip["features"] | |
| print(f" [{clip['machine']}] centroid={f.spectral_centroid_hz:.0f}Hz " | |
| f"zcr={f.zero_crossing_rate:.3f} onset={f.onset_rate_per_sec:.1f}/s " | |
| f"pattern={f.has_regular_pattern} dom_freq={f.dominant_frequency_hz:.0f}Hz " | |
| f"rms_var={f.rms_variance:.4f} anomaly_score={f.anomaly_score:.3f}") | |
| shown += 1 | |
| if __name__ == "__main__": | |
| clips = load_dcase() | |
| evaluate(clips) | |