Spaces:
Runtime error
Runtime error
File size: 6,155 Bytes
edb671a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 | """Evaluate the rule engine + LLM pipeline against the full DCASE 2025 dataset.
Loads cached features (1250 clips), reconstructs AudioFeatures, runs
rank_candidates, and measures diagnostic accuracy per machine type.
"""
import joblib
import collections
from audio_analyzer import AudioFeatures
from fault_rules import rank_candidates, RULES
FEATURE_NAMES = [
"duration_s", "rms_db", "rms_variance", "zero_crossing_rate",
"spectral_centroid_hz", "spectral_bandwidth_hz", "spectral_rolloff_hz",
"dominant_frequency_hz", "harmonic_ratio", "onset_rate_per_sec",
"has_regular_pattern", "pattern_interval_ms", "peak_db", "anomaly_score",
]
MACHINE_TO_APPLIANCE = {
"fan": "Electric fan", "pump": "Dishwasher", "slider": "Power drill",
"ToyCar": "Car engine", "ToyTrain": "Tumble dryer",
"gearbox": "Electric motor (generic)", "bearing": "Electric motor (generic)",
"valve": "Refrigerator/Freezer",
}
def load_dcase():
cache = joblib.load("features_cache_r1.5_s42.joblib")
X, y, machines = cache["X"], cache["y"], cache["machines"]
clips = []
for i in range(len(X)):
feat_dict = {k: float(X[i][j]) for j, k in enumerate(FEATURE_NAMES)}
feat_dict["has_regular_pattern"] = bool(round(feat_dict["has_regular_pattern"]))
features = AudioFeatures(**feat_dict, signal_present=True)
appliance = MACHINE_TO_APPLIANCE.get(machines[i], "Electric motor (generic)")
clips.append({
"machine": machines[i],
"label": int(y[i]), # 0=normal, 1=anomaly
"appliance": appliance,
"features": features,
})
return clips
def evaluate(clips):
total = len(clips)
fired_count = 0
inconclusive_count = 0
machine_stats = collections.defaultdict(lambda: {"total": 0, "fired": 0, "anomaly_fired": 0, "normal_inconclusive": 0})
fault_names = collections.Counter()
for clip in clips:
cands = rank_candidates(clip["features"], clip["appliance"])
top = cands[0]
is_anomaly = clip["label"] == 1
machine = clip["machine"]
machine_stats[machine]["total"] += 1
if top.name != "Inconclusive":
fired_count += 1
machine_stats[machine]["fired"] += 1
if is_anomaly:
machine_stats[machine]["anomaly_fired"] += 1
fault_names[top.name] += 1
else:
inconclusive_count += 1
if not is_anomaly:
machine_stats[machine]["normal_inconclusive"] += 1
print(f"Total clips: {total}")
print(f"Rule engine fired: {fired_count} ({fired_count/total*100:.1f}%)")
print(f"Inconclusive: {inconclusive_count} ({inconclusive_count/total*100:.1f}%)")
print()
print("=== Per-machine breakdown ===")
print(f"{'Machine':<12} {'N':>4} {'Fired':>6} {'Hit%':>6} {'Anom Fired':>11} {'Normal Inconcl':>14}")
for machine in sorted(machine_stats.keys()):
s = machine_stats[machine]
n_anom = sum(1 for c in clips if c["machine"] == machine and c["label"] == 1)
n_norm = sum(1 for c in clips if c["machine"] == machine and c["label"] == 0)
anom_fired = s["anomaly_fired"]
print(f"{machine:<12} {s['total']:>4} {s['fired']:>6} {s['fired']/s['total']*100:>5.1f}% {anom_fired:>5}/{n_anom:<5} {s['normal_inconclusive']:>5}/{n_norm:<5}")
print()
print("=== Top fault names ===")
for name, count in fault_names.most_common(15):
print(f" {name:<40} {count:>4}")
# Check: for anomalous clips, what % get a specific fault (not Inconclusive)?
anom_clips = [c for c in clips if c["label"] == 1]
anom_fired = sum(1 for c in anom_clips if rank_candidates(c["features"], c["appliance"])[0].name != "Inconclusive")
print(f"\nAnomalous clips: {len(anom_clips)}")
print(f" Rule engine fires: {anom_fired} ({anom_fired/len(anom_clips)*100:.1f}%)")
print(f" Missed (Inconclusive): {len(anom_clips) - anom_fired} ({(len(anom_clips)-anom_fired)/len(anom_clips)*100:.1f}%)")
# Check: for normal clips, what % are correctly Inconclusive?
norm_clips = [c for c in clips if c["label"] == 0]
norm_inconcl = sum(1 for c in norm_clips if rank_candidates(c["features"], c["appliance"])[0].name == "Inconclusive")
print(f"\nNormal clips: {len(norm_clips)}")
print(f" Correctly Inconclusive: {norm_inconcl} ({norm_inconcl/len(norm_clips)*100:.1f}%)")
print(f" False positive (fired): {len(norm_clips) - norm_inconcl} ({(len(norm_clips)-norm_inconcl)/len(norm_clips)*100:.1f}%)")
# Show some false positive examples
print("\n=== False positive examples (normal clips where rules fire) ===")
shown = 0
for clip in clips:
if clip["label"] == 0 and shown < 5:
cands = rank_candidates(clip["features"], clip["appliance"])
if cands[0].name != "Inconclusive":
f = clip["features"]
print(f" [{clip['machine']}] {cands[0].name} (w={cands[0].weight:.2f})")
print(f" centroid={f.spectral_centroid_hz:.0f}Hz zcr={f.zero_crossing_rate:.3f} "
f"onset={f.onset_rate_per_sec:.1f}/s pattern={f.has_regular_pattern} "
f"dom_freq={f.dominant_frequency_hz:.0f}Hz rms_var={f.rms_variance:.4f}")
shown += 1
# Show some missed anomaly examples
print("\n=== Missed anomalies (anomalous clips with Inconclusive) ===")
shown = 0
for clip in clips:
if clip["label"] == 1 and shown < 5:
cands = rank_candidates(clip["features"], clip["appliance"])
if cands[0].name == "Inconclusive":
f = clip["features"]
print(f" [{clip['machine']}] centroid={f.spectral_centroid_hz:.0f}Hz "
f"zcr={f.zero_crossing_rate:.3f} onset={f.onset_rate_per_sec:.1f}/s "
f"pattern={f.has_regular_pattern} dom_freq={f.dominant_frequency_hz:.0f}Hz "
f"rms_var={f.rms_variance:.4f} anomaly_score={f.anomaly_score:.3f}")
shown += 1
if __name__ == "__main__":
clips = load_dcase()
evaluate(clips)
|