Spaces:
Sleeping
Sleeping
| from pathlib import Path | |
| import sys | |
| from qdrant_client import models | |
| ROOT = Path(__file__).resolve().parents[1] | |
| sys.path.insert(0, str(ROOT)) | |
| from app.config import get_settings # noqa: E402 | |
| from app.embedder import ImageEmbedder # noqa: E402 | |
| from app.manifest import Manifest # noqa: E402 | |
| from app.qdrant_store import QdrantStore, point_id # noqa: E402 | |
| def payload_base(manifest: Manifest, scenario: dict, item: dict, crop: str, memory_type: str) -> dict: | |
| camera = next(cam for cam in manifest.cameras(scenario["id"]) if cam["id"] == item["camera_id"]) | |
| return { | |
| "scenario": scenario["id"], | |
| "camera_id": item["camera_id"], | |
| "zone": item.get("zone", camera["zone"]), | |
| "timestamp": int(item.get("timestamp", 1716900000)), | |
| "frame_url": manifest.asset_url(item.get("frame", camera.get("incident_frame", camera["baseline_frame"]))), | |
| "crop_url": manifest.asset_url(crop), | |
| "bbox": item.get("bbox", [0, 0, 0, 0]), | |
| "asset_id": item["asset_id"], | |
| "memory_type": memory_type, | |
| "is_baseline": bool(item.get("is_baseline", False)), | |
| "is_incident": bool(item.get("is_incident", False)), | |
| "object_label": item.get("object_label", "region"), | |
| "track_id": item.get("track_id"), | |
| "region_id": item.get("region_id"), | |
| "region_label": item.get("region_label"), | |
| "alarm_enabled": item.get("alarm_enabled"), | |
| } | |
| def region_points(manifest: Manifest, embedder: ImageEmbedder, scenario: dict): | |
| points = [] | |
| for region in manifest.regions(scenario["id"]): | |
| camera = next(cam for cam in manifest.cameras(scenario["id"]) if cam["id"] == region["camera_id"]) | |
| for state, crop, baseline in [ | |
| ("baseline", region["baseline_crop"], True), | |
| ("incident", region["incident_crop"], False), | |
| ]: | |
| asset_id = f"{region['region_id']}_{state}" | |
| payload = payload_base( | |
| manifest, | |
| scenario, | |
| { | |
| **region, | |
| "asset_id": asset_id, | |
| "frame": camera[f"{state}_frame"], | |
| "is_baseline": baseline, | |
| "is_incident": not baseline, | |
| "object_label": region.get(f"{state}_label", region["region_id"]), | |
| }, | |
| crop, | |
| f"region_{state}", | |
| ) | |
| vector = embedder.embed_path(manifest.asset_path(crop)) | |
| points.append(models.PointStruct(id=point_id(f"{scenario['id']}:{asset_id}"), vector=vector, payload=payload)) | |
| if baseline: | |
| for index, variant_crop in enumerate(baseline_variants(manifest, crop, region.get("variant_prefix")), start=1): | |
| variant_id = f"{asset_id}_v{index}" | |
| variant_payload = { | |
| **payload, | |
| "asset_id": variant_id, | |
| "crop_url": manifest.asset_url(variant_crop), | |
| "memory_variant": index, | |
| } | |
| vector = embedder.embed_path(manifest.asset_path(variant_crop)) | |
| points.append(models.PointStruct(id=point_id(f"{scenario['id']}:{variant_id}"), vector=vector, payload=variant_payload)) | |
| return points | |
| def baseline_variants(manifest: Manifest, crop: str, variant_prefix: str | None = None) -> list[str]: | |
| path = Path(crop) | |
| stem = variant_prefix or path.stem | |
| variant_dir = manifest.settings.resolved_asset_root / path.parent / "variants" | |
| if not variant_dir.exists(): | |
| return [] | |
| return [ | |
| f"{path.parent.as_posix()}/variants/{variant.name}" | |
| for variant in sorted(variant_dir.glob(f"{stem}_v*.jpg")) | |
| ] | |
| def object_points(manifest: Manifest, embedder: ImageEmbedder, scenario: dict): | |
| points = [] | |
| for item in manifest.objects(scenario["id"]): | |
| payload = payload_base(manifest, scenario, item, item["crop"], item["memory_type"]) | |
| vector = embedder.embed_path(manifest.asset_path(item["crop"])) | |
| points.append(models.PointStruct(id=point_id(f"{scenario['id']}:{item['asset_id']}"), vector=vector, payload=payload)) | |
| return points | |
| def main() -> int: | |
| settings = get_settings() | |
| manifest = Manifest(settings) | |
| embedder = ImageEmbedder(settings) | |
| store = QdrantStore(settings) | |
| store.ensure_collection(recreate=True) | |
| points = [] | |
| for scenario in manifest.scenarios: | |
| points.extend(region_points(manifest, embedder, scenario)) | |
| points.extend(object_points(manifest, embedder, scenario)) | |
| store.upsert(points) | |
| print(f"Seeded {len(points)} points into {settings.collection_name} using {embedder.mode}.") | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |