Spaces:
Sleeping
Sleeping
| from pathlib import Path | |
| import sys | |
| ROOT = Path(__file__).resolve().parents[1] | |
| sys.path.insert(0, str(ROOT)) | |
| from app.config import get_settings # noqa: E402 | |
| from app.qdrant_store import match_filter # noqa: E402 | |
| from app.services import AfterimageService # noqa: E402 | |
| # scenario -> (region_id, expected sole outlier asset_id) | |
| SCENARIOS = { | |
| "vault": ("vault_pedestal", "vault_pedestal_incident"), | |
| "museum": ("museum_pedestal", "museum_pedestal_incident"), | |
| "bank": ("bank_curb", "bank_curb_incident"), | |
| } | |
| def fail(message: str) -> int: | |
| print(f"FAIL: {message}") | |
| return 1 | |
| def held_out_normal_score(service: AfterimageService, scenario: str, region_id: str) -> float | None: | |
| """Embed the canonical baseline crop — a held-out NORMAL frame, distinct from | |
| the seeded baseline variants — and score it against the region's learned band. | |
| This is the false-positive control: a normal frame must stay above the floor.""" | |
| region = next( | |
| (r for r in service.manifest.regions(scenario) if r["region_id"] == region_id), | |
| None, | |
| ) | |
| if not region or not region.get("baseline_crop"): | |
| return None | |
| vector = service.embedder.embed_path(service.manifest.asset_path(region["baseline_crop"])) | |
| query_filter = match_filter(scenario=scenario, region_id=region_id, is_baseline=True) | |
| points = service.store.search(vector, query_filter=query_filter, limit=8) | |
| # Exclude the exact self-match (~1.0) so this is a true held-out peer | |
| # similarity — a normal frame compared against the *other* baselines. | |
| peer = next((p for p in points if p.score is not None and p.score < 0.9999), None) | |
| return float(peer.score) if peer else None | |
| def verify_scenario(service: AfterimageService, scenario: str, region_id: str, incident_id: str) -> int: | |
| scan = service.scan_anomalies(scenario) | |
| regions = {item["region_id"]: item for item in scan["regions"]} | |
| region = regions.get(region_id) | |
| if region is None: | |
| return fail(f"{scenario}: missing region {region_id}") | |
| band = region["normal_band"] | |
| print( | |
| f"{scenario}/{region_id}: score={region['score']:.4f} " | |
| f"floor={region['threshold']:.4f} margin={region['threshold'] - region['score']:+.4f} " | |
| f"status={region['status']}" | |
| ) | |
| if band["source"] != "baseline_variants": | |
| return fail(f"{scenario}: normal band is not data-derived (source={band['source']})") | |
| if region["status"] != "anomalous" or region["score"] >= region["threshold"]: | |
| return fail(f"{scenario}: incident did not cross the learned normal band") | |
| # False-positive control: a held-out NORMAL frame must stay inside the band. | |
| # Without this, the detector could be one that simply always alarms. | |
| normal_top = held_out_normal_score(service, scenario, region_id) | |
| if normal_top is not None: | |
| if normal_top < region["threshold"]: | |
| return fail( | |
| f"{scenario}: held-out NORMAL frame scored {normal_top:.4f} < floor " | |
| f"{region['threshold']:.4f} — false positive" | |
| ) | |
| print( | |
| f" control: normal frame {normal_top:.4f} >= floor {region['threshold']:.4f} " | |
| f"(stays in band) | incident {region['score']:.4f} breaches" | |
| ) | |
| outliers = service.outliers(scenario) | |
| if outliers["inspector"]["api"] != "RecommendQuery(best_score)": | |
| return fail(f"{scenario}: outliers not backed by RecommendQuery(best_score)") | |
| outlier_ids = [item["payload"]["asset_id"] for item in outliers["results"]] | |
| if outlier_ids != [incident_id]: | |
| return fail(f"{scenario}: expected sole outlier {incident_id!r}, got {outlier_ids}") | |
| return 0 | |
| def main() -> int: | |
| service = AfterimageService(get_settings()) | |
| health = service.health() | |
| if health["embedding_mode"] != "fastembed": | |
| return fail(f"embeddings are not real CLIP (mode={health['embedding_mode']})") | |
| scenario_ids = {item["id"] for item in health["scenarios"]} | |
| if set(SCENARIOS).issubset(scenario_ids) is False: | |
| return fail(f"missing required scenarios, got {sorted(scenario_ids)}") | |
| if health["point_count"] < 30: | |
| return fail(f"expected at least 30 points, got {health['point_count']}") | |
| for scenario, (region_id, incident_id) in SCENARIOS.items(): | |
| result = verify_scenario(service, scenario, region_id, incident_id) | |
| if result: | |
| return result | |
| print( | |
| "Verification passed: all three incidents flagged, all three held-out " | |
| "normals stayed in band, all three outliers correct, real CLIP." | |
| ) | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |