File size: 6,155 Bytes
edb671a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
"""Evaluate the rule engine + LLM pipeline against the full DCASE 2025 dataset.

Loads cached features (1250 clips), reconstructs AudioFeatures, runs
rank_candidates, and measures diagnostic accuracy per machine type.
"""
import joblib
import collections
from audio_analyzer import AudioFeatures
from fault_rules import rank_candidates, RULES

FEATURE_NAMES = [
    "duration_s", "rms_db", "rms_variance", "zero_crossing_rate",
    "spectral_centroid_hz", "spectral_bandwidth_hz", "spectral_rolloff_hz",
    "dominant_frequency_hz", "harmonic_ratio", "onset_rate_per_sec",
    "has_regular_pattern", "pattern_interval_ms", "peak_db", "anomaly_score",
]

MACHINE_TO_APPLIANCE = {
    "fan": "Electric fan", "pump": "Dishwasher", "slider": "Power drill",
    "ToyCar": "Car engine", "ToyTrain": "Tumble dryer",
    "gearbox": "Electric motor (generic)", "bearing": "Electric motor (generic)",
    "valve": "Refrigerator/Freezer",
}


def load_dcase():
    cache = joblib.load("features_cache_r1.5_s42.joblib")
    X, y, machines = cache["X"], cache["y"], cache["machines"]
    clips = []
    for i in range(len(X)):
        feat_dict = {k: float(X[i][j]) for j, k in enumerate(FEATURE_NAMES)}
        feat_dict["has_regular_pattern"] = bool(round(feat_dict["has_regular_pattern"]))
        features = AudioFeatures(**feat_dict, signal_present=True)
        appliance = MACHINE_TO_APPLIANCE.get(machines[i], "Electric motor (generic)")
        clips.append({
            "machine": machines[i],
            "label": int(y[i]),  # 0=normal, 1=anomaly
            "appliance": appliance,
            "features": features,
        })
    return clips


def evaluate(clips):
    total = len(clips)
    fired_count = 0
    inconclusive_count = 0
    machine_stats = collections.defaultdict(lambda: {"total": 0, "fired": 0, "anomaly_fired": 0, "normal_inconclusive": 0})
    fault_names = collections.Counter()

    for clip in clips:
        cands = rank_candidates(clip["features"], clip["appliance"])
        top = cands[0]
        is_anomaly = clip["label"] == 1
        machine = clip["machine"]

        machine_stats[machine]["total"] += 1
        if top.name != "Inconclusive":
            fired_count += 1
            machine_stats[machine]["fired"] += 1
            if is_anomaly:
                machine_stats[machine]["anomaly_fired"] += 1
            fault_names[top.name] += 1
        else:
            inconclusive_count += 1
            if not is_anomaly:
                machine_stats[machine]["normal_inconclusive"] += 1

    print(f"Total clips: {total}")
    print(f"Rule engine fired: {fired_count} ({fired_count/total*100:.1f}%)")
    print(f"Inconclusive: {inconclusive_count} ({inconclusive_count/total*100:.1f}%)")
    print()

    print("=== Per-machine breakdown ===")
    print(f"{'Machine':<12} {'N':>4} {'Fired':>6} {'Hit%':>6} {'Anom Fired':>11} {'Normal Inconcl':>14}")
    for machine in sorted(machine_stats.keys()):
        s = machine_stats[machine]
        n_anom = sum(1 for c in clips if c["machine"] == machine and c["label"] == 1)
        n_norm = sum(1 for c in clips if c["machine"] == machine and c["label"] == 0)
        anom_fired = s["anomaly_fired"]
        print(f"{machine:<12} {s['total']:>4} {s['fired']:>6} {s['fired']/s['total']*100:>5.1f}% {anom_fired:>5}/{n_anom:<5} {s['normal_inconclusive']:>5}/{n_norm:<5}")

    print()
    print("=== Top fault names ===")
    for name, count in fault_names.most_common(15):
        print(f"  {name:<40} {count:>4}")

    # Check: for anomalous clips, what % get a specific fault (not Inconclusive)?
    anom_clips = [c for c in clips if c["label"] == 1]
    anom_fired = sum(1 for c in anom_clips if rank_candidates(c["features"], c["appliance"])[0].name != "Inconclusive")
    print(f"\nAnomalous clips: {len(anom_clips)}")
    print(f"  Rule engine fires: {anom_fired} ({anom_fired/len(anom_clips)*100:.1f}%)")
    print(f"  Missed (Inconclusive): {len(anom_clips) - anom_fired} ({(len(anom_clips)-anom_fired)/len(anom_clips)*100:.1f}%)")

    # Check: for normal clips, what % are correctly Inconclusive?
    norm_clips = [c for c in clips if c["label"] == 0]
    norm_inconcl = sum(1 for c in norm_clips if rank_candidates(c["features"], c["appliance"])[0].name == "Inconclusive")
    print(f"\nNormal clips: {len(norm_clips)}")
    print(f"  Correctly Inconclusive: {norm_inconcl} ({norm_inconcl/len(norm_clips)*100:.1f}%)")
    print(f"  False positive (fired): {len(norm_clips) - norm_inconcl} ({(len(norm_clips)-norm_inconcl)/len(norm_clips)*100:.1f}%)")

    # Show some false positive examples
    print("\n=== False positive examples (normal clips where rules fire) ===")
    shown = 0
    for clip in clips:
        if clip["label"] == 0 and shown < 5:
            cands = rank_candidates(clip["features"], clip["appliance"])
            if cands[0].name != "Inconclusive":
                f = clip["features"]
                print(f"  [{clip['machine']}] {cands[0].name} (w={cands[0].weight:.2f})")
                print(f"    centroid={f.spectral_centroid_hz:.0f}Hz  zcr={f.zero_crossing_rate:.3f}  "
                      f"onset={f.onset_rate_per_sec:.1f}/s  pattern={f.has_regular_pattern}  "
                      f"dom_freq={f.dominant_frequency_hz:.0f}Hz  rms_var={f.rms_variance:.4f}")
                shown += 1

    # Show some missed anomaly examples
    print("\n=== Missed anomalies (anomalous clips with Inconclusive) ===")
    shown = 0
    for clip in clips:
        if clip["label"] == 1 and shown < 5:
            cands = rank_candidates(clip["features"], clip["appliance"])
            if cands[0].name == "Inconclusive":
                f = clip["features"]
                print(f"  [{clip['machine']}] centroid={f.spectral_centroid_hz:.0f}Hz  "
                      f"zcr={f.zero_crossing_rate:.3f}  onset={f.onset_rate_per_sec:.1f}/s  "
                      f"pattern={f.has_regular_pattern}  dom_freq={f.dominant_frequency_hz:.0f}Hz  "
                      f"rms_var={f.rms_variance:.4f}  anomaly_score={f.anomaly_score:.3f}")
                shown += 1


if __name__ == "__main__":
    clips = load_dcase()
    evaluate(clips)