File size: 4,676 Bytes
850c319
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
from pathlib import Path
import sys

ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT))

from app.config import get_settings  # noqa: E402
from app.qdrant_store import match_filter  # noqa: E402
from app.services import AfterimageService  # noqa: E402


# scenario -> (region_id, expected sole outlier asset_id)
SCENARIOS = {
    "vault": ("vault_pedestal", "vault_pedestal_incident"),
    "museum": ("museum_pedestal", "museum_pedestal_incident"),
    "bank": ("bank_curb", "bank_curb_incident"),
}


def fail(message: str) -> int:
    print(f"FAIL: {message}")
    return 1


def held_out_normal_score(service: AfterimageService, scenario: str, region_id: str) -> float | None:
    """Embed the canonical baseline crop — a held-out NORMAL frame, distinct from
    the seeded baseline variants — and score it against the region's learned band.
    This is the false-positive control: a normal frame must stay above the floor."""
    region = next(
        (r for r in service.manifest.regions(scenario) if r["region_id"] == region_id),
        None,
    )
    if not region or not region.get("baseline_crop"):
        return None
    vector = service.embedder.embed_path(service.manifest.asset_path(region["baseline_crop"]))
    query_filter = match_filter(scenario=scenario, region_id=region_id, is_baseline=True)
    points = service.store.search(vector, query_filter=query_filter, limit=8)
    # Exclude the exact self-match (~1.0) so this is a true held-out peer
    # similarity — a normal frame compared against the *other* baselines.
    peer = next((p for p in points if p.score is not None and p.score < 0.9999), None)
    return float(peer.score) if peer else None


def verify_scenario(service: AfterimageService, scenario: str, region_id: str, incident_id: str) -> int:
    scan = service.scan_anomalies(scenario)
    regions = {item["region_id"]: item for item in scan["regions"]}
    region = regions.get(region_id)
    if region is None:
        return fail(f"{scenario}: missing region {region_id}")

    band = region["normal_band"]
    print(
        f"{scenario}/{region_id}: score={region['score']:.4f} "
        f"floor={region['threshold']:.4f} margin={region['threshold'] - region['score']:+.4f} "
        f"status={region['status']}"
    )
    if band["source"] != "baseline_variants":
        return fail(f"{scenario}: normal band is not data-derived (source={band['source']})")
    if region["status"] != "anomalous" or region["score"] >= region["threshold"]:
        return fail(f"{scenario}: incident did not cross the learned normal band")

    # False-positive control: a held-out NORMAL frame must stay inside the band.
    # Without this, the detector could be one that simply always alarms.
    normal_top = held_out_normal_score(service, scenario, region_id)
    if normal_top is not None:
        if normal_top < region["threshold"]:
            return fail(
                f"{scenario}: held-out NORMAL frame scored {normal_top:.4f} < floor "
                f"{region['threshold']:.4f} — false positive"
            )
        print(
            f"  control: normal frame {normal_top:.4f} >= floor {region['threshold']:.4f} "
            f"(stays in band) | incident {region['score']:.4f} breaches"
        )

    outliers = service.outliers(scenario)
    if outliers["inspector"]["api"] != "RecommendQuery(best_score)":
        return fail(f"{scenario}: outliers not backed by RecommendQuery(best_score)")
    outlier_ids = [item["payload"]["asset_id"] for item in outliers["results"]]
    if outlier_ids != [incident_id]:
        return fail(f"{scenario}: expected sole outlier {incident_id!r}, got {outlier_ids}")
    return 0


def main() -> int:
    service = AfterimageService(get_settings())
    health = service.health()

    if health["embedding_mode"] != "fastembed":
        return fail(f"embeddings are not real CLIP (mode={health['embedding_mode']})")
    scenario_ids = {item["id"] for item in health["scenarios"]}
    if set(SCENARIOS).issubset(scenario_ids) is False:
        return fail(f"missing required scenarios, got {sorted(scenario_ids)}")
    if health["point_count"] < 30:
        return fail(f"expected at least 30 points, got {health['point_count']}")

    for scenario, (region_id, incident_id) in SCENARIOS.items():
        result = verify_scenario(service, scenario, region_id, incident_id)
        if result:
            return result

    print(
        "Verification passed: all three incidents flagged, all three held-out "
        "normals stayed in band, all three outliers correct, real CLIP."
    )
    return 0


if __name__ == "__main__":
    raise SystemExit(main())