"""Evaluate the rule engine + LLM pipeline against the full DCASE 2025 dataset. Loads cached features (1250 clips), reconstructs AudioFeatures, runs rank_candidates, and measures diagnostic accuracy per machine type. """ import joblib import collections from audio_analyzer import AudioFeatures from fault_rules import rank_candidates, RULES FEATURE_NAMES = [ "duration_s", "rms_db", "rms_variance", "zero_crossing_rate", "spectral_centroid_hz", "spectral_bandwidth_hz", "spectral_rolloff_hz", "dominant_frequency_hz", "harmonic_ratio", "onset_rate_per_sec", "has_regular_pattern", "pattern_interval_ms", "peak_db", "anomaly_score", ] MACHINE_TO_APPLIANCE = { "fan": "Electric fan", "pump": "Dishwasher", "slider": "Power drill", "ToyCar": "Car engine", "ToyTrain": "Tumble dryer", "gearbox": "Electric motor (generic)", "bearing": "Electric motor (generic)", "valve": "Refrigerator/Freezer", } def load_dcase(): cache = joblib.load("features_cache_r1.5_s42.joblib") X, y, machines = cache["X"], cache["y"], cache["machines"] clips = [] for i in range(len(X)): feat_dict = {k: float(X[i][j]) for j, k in enumerate(FEATURE_NAMES)} feat_dict["has_regular_pattern"] = bool(round(feat_dict["has_regular_pattern"])) features = AudioFeatures(**feat_dict, signal_present=True) appliance = MACHINE_TO_APPLIANCE.get(machines[i], "Electric motor (generic)") clips.append({ "machine": machines[i], "label": int(y[i]), # 0=normal, 1=anomaly "appliance": appliance, "features": features, }) return clips def evaluate(clips): total = len(clips) fired_count = 0 inconclusive_count = 0 machine_stats = collections.defaultdict(lambda: {"total": 0, "fired": 0, "anomaly_fired": 0, "normal_inconclusive": 0}) fault_names = collections.Counter() for clip in clips: cands = rank_candidates(clip["features"], clip["appliance"]) top = cands[0] is_anomaly = clip["label"] == 1 machine = clip["machine"] machine_stats[machine]["total"] += 1 if top.name != "Inconclusive": fired_count += 1 machine_stats[machine]["fired"] += 1 if is_anomaly: machine_stats[machine]["anomaly_fired"] += 1 fault_names[top.name] += 1 else: inconclusive_count += 1 if not is_anomaly: machine_stats[machine]["normal_inconclusive"] += 1 print(f"Total clips: {total}") print(f"Rule engine fired: {fired_count} ({fired_count/total*100:.1f}%)") print(f"Inconclusive: {inconclusive_count} ({inconclusive_count/total*100:.1f}%)") print() print("=== Per-machine breakdown ===") print(f"{'Machine':<12} {'N':>4} {'Fired':>6} {'Hit%':>6} {'Anom Fired':>11} {'Normal Inconcl':>14}") for machine in sorted(machine_stats.keys()): s = machine_stats[machine] n_anom = sum(1 for c in clips if c["machine"] == machine and c["label"] == 1) n_norm = sum(1 for c in clips if c["machine"] == machine and c["label"] == 0) anom_fired = s["anomaly_fired"] print(f"{machine:<12} {s['total']:>4} {s['fired']:>6} {s['fired']/s['total']*100:>5.1f}% {anom_fired:>5}/{n_anom:<5} {s['normal_inconclusive']:>5}/{n_norm:<5}") print() print("=== Top fault names ===") for name, count in fault_names.most_common(15): print(f" {name:<40} {count:>4}") # Check: for anomalous clips, what % get a specific fault (not Inconclusive)? anom_clips = [c for c in clips if c["label"] == 1] anom_fired = sum(1 for c in anom_clips if rank_candidates(c["features"], c["appliance"])[0].name != "Inconclusive") print(f"\nAnomalous clips: {len(anom_clips)}") print(f" Rule engine fires: {anom_fired} ({anom_fired/len(anom_clips)*100:.1f}%)") print(f" Missed (Inconclusive): {len(anom_clips) - anom_fired} ({(len(anom_clips)-anom_fired)/len(anom_clips)*100:.1f}%)") # Check: for normal clips, what % are correctly Inconclusive? norm_clips = [c for c in clips if c["label"] == 0] norm_inconcl = sum(1 for c in norm_clips if rank_candidates(c["features"], c["appliance"])[0].name == "Inconclusive") print(f"\nNormal clips: {len(norm_clips)}") print(f" Correctly Inconclusive: {norm_inconcl} ({norm_inconcl/len(norm_clips)*100:.1f}%)") print(f" False positive (fired): {len(norm_clips) - norm_inconcl} ({(len(norm_clips)-norm_inconcl)/len(norm_clips)*100:.1f}%)") # Show some false positive examples print("\n=== False positive examples (normal clips where rules fire) ===") shown = 0 for clip in clips: if clip["label"] == 0 and shown < 5: cands = rank_candidates(clip["features"], clip["appliance"]) if cands[0].name != "Inconclusive": f = clip["features"] print(f" [{clip['machine']}] {cands[0].name} (w={cands[0].weight:.2f})") print(f" centroid={f.spectral_centroid_hz:.0f}Hz zcr={f.zero_crossing_rate:.3f} " f"onset={f.onset_rate_per_sec:.1f}/s pattern={f.has_regular_pattern} " f"dom_freq={f.dominant_frequency_hz:.0f}Hz rms_var={f.rms_variance:.4f}") shown += 1 # Show some missed anomaly examples print("\n=== Missed anomalies (anomalous clips with Inconclusive) ===") shown = 0 for clip in clips: if clip["label"] == 1 and shown < 5: cands = rank_candidates(clip["features"], clip["appliance"]) if cands[0].name == "Inconclusive": f = clip["features"] print(f" [{clip['machine']}] centroid={f.spectral_centroid_hz:.0f}Hz " f"zcr={f.zero_crossing_rate:.3f} onset={f.onset_rate_per_sec:.1f}/s " f"pattern={f.has_regular_pattern} dom_freq={f.dominant_frequency_hz:.0f}Hz " f"rms_var={f.rms_variance:.4f} anomaly_score={f.anomaly_score:.3f}") shown += 1 if __name__ == "__main__": clips = load_dcase() evaluate(clips)