Spaces:
Running
Running
| """ | |
| nlp/search.py - Semantic Search using sentence-transformers/all-MiniLM-L6-v2 + FAISS | |
| Embeds natural language queries and matches against stored surveillance metadata. | |
| """ | |
| import os | |
| import time | |
| import numpy as np | |
| import faiss | |
| import torch | |
| from typing import List, Dict, Optional | |
| from sentence_transformers import SentenceTransformer | |
| from loguru import logger | |
| from config import settings, DEVICE, FAISS_DIR | |
| class SemanticSearchEngine: | |
| """ | |
| Encodes surveillance metadata (event descriptions, attributes) into | |
| sentence embeddings stored in FAISS. Supports natural-language querying. | |
| """ | |
| INDEX_FILE = str(FAISS_DIR / "search_index.faiss") | |
| META_FILE = str(FAISS_DIR / "search_meta.npy") | |
| def __init__(self): | |
| logger.info(f"Loading semantic search model: {settings.SEMANTIC_SEARCH_MODEL}") | |
| self.model = SentenceTransformer(settings.SEMANTIC_SEARCH_MODEL, device=str(DEVICE)) | |
| self.dim = settings.SEARCH_EMBEDDING_DIM | |
| self.index = self._load_or_create_index() | |
| self.meta: List[Dict] = self._load_meta() | |
| logger.info(f"✅ SemanticSearchEngine ready. Index size: {self.index.ntotal}") | |
| def _load_or_create_index(self) -> faiss.IndexFlatIP: | |
| if os.path.exists(self.INDEX_FILE): | |
| logger.info("Loading existing FAISS search index.") | |
| return faiss.read_index(self.INDEX_FILE) | |
| return faiss.IndexFlatIP(self.dim) | |
| def _load_meta(self) -> List[Dict]: | |
| if os.path.exists(self.META_FILE): | |
| return list(np.load(self.META_FILE, allow_pickle=True)) | |
| return [] | |
| def save(self): | |
| faiss.write_index(self.index, self.INDEX_FILE) | |
| np.save(self.META_FILE, np.array(self.meta, dtype=object)) | |
| def encode(self, texts: List[str]) -> np.ndarray: | |
| """Encode texts to L2-normalized embeddings (batch).""" | |
| embeddings = self.model.encode( | |
| texts, | |
| batch_size=32, | |
| normalize_embeddings=True, | |
| convert_to_numpy=True, | |
| show_progress_bar=False, | |
| ) | |
| return embeddings.astype(np.float32) | |
| def index_event(self, text: str, metadata: Dict) -> int: | |
| """ | |
| Add a single surveillance event description to the FAISS search index. | |
| Args: | |
| text: Natural language description of the event | |
| metadata: {"event_id", "person_id", "camera_id", "timestamp", "activity_type", ...} | |
| Returns: | |
| faiss_id (row index) | |
| """ | |
| embedding = self.encode([text]) | |
| faiss_id = self.index.ntotal | |
| self.index.add(embedding) | |
| self.meta.append({**metadata, "text": text, "faiss_id": faiss_id}) | |
| self.save() | |
| return faiss_id | |
| def index_batch(self, texts: List[str], metadatas: List[Dict]): | |
| """Batch indexing for bulk ingestion.""" | |
| embeddings = self.encode(texts) | |
| base_id = self.index.ntotal | |
| self.index.add(embeddings) | |
| for i, (text, meta) in enumerate(zip(texts, metadatas)): | |
| self.meta.append({**meta, "text": text, "faiss_id": base_id + i}) | |
| self.save() | |
| logger.info(f"Indexed {len(texts)} events into search index.") | |
| def search(self, query: str, top_k: int = 10, score_threshold: float = 0.4) -> List[Dict]: | |
| """ | |
| Search surveillance logs by natural language query. | |
| Returns: | |
| List of {"text": str, "score": float, ...metadata fields} | |
| """ | |
| if self.index.ntotal == 0: | |
| return [] | |
| t0 = time.perf_counter() | |
| query_emb = self.encode([query]) | |
| k = min(top_k, self.index.ntotal) | |
| distances, indices = self.index.search(query_emb, k) | |
| latency = (time.perf_counter() - t0) * 1000 | |
| results = [] | |
| for dist, idx in zip(distances[0], indices[0]): | |
| if idx == -1 or float(dist) < score_threshold: | |
| continue | |
| entry = dict(self.meta[idx]) | |
| entry["score"] = round(float(dist), 4) | |
| results.append(entry) | |
| logger.debug(f"Semantic search '{query[:40]}...' → {len(results)} results in {latency:.1f}ms") | |
| return sorted(results, key=lambda x: -x["score"]) | |