Add all remaining modules: index_store, query_engine, akinator, pipeline, app, requirements
fea499e verified | """ | |
| Video Intelligence Platform β Index Store | |
| SQLite for structured data (timestamps, captions, detections) + | |
| FAISS for vector similarity search on frame and caption embeddings. | |
| """ | |
| import json | |
| import sqlite3 | |
| import numpy as np | |
| import faiss | |
| from typing import List, Dict, Optional, Tuple | |
| from pathlib import Path | |
| class VideoIndex: | |
| """ | |
| Combined structured (SQLite) + vector (FAISS) index for video frames. | |
| Stores: | |
| - Frame metadata: timestamp, frame_idx, video_path | |
| - Captions: dense text descriptions per frame | |
| - Detections: objects detected per frame with attributes | |
| - Visual embeddings: SigLIP2 frame vectors (FAISS) | |
| - Caption embeddings: Gemini text-embedding-004 vectors (FAISS) | |
| """ | |
| def __init__(self, db_path: str = "video_index.db", | |
| visual_dim: int = 1152, | |
| caption_dim: int = 768): | |
| self.db_path = db_path | |
| self.visual_dim = visual_dim | |
| self.caption_dim = caption_dim | |
| # Initialize SQLite | |
| self.conn = sqlite3.connect(db_path) | |
| self.conn.row_factory = sqlite3.Row | |
| self._create_tables() | |
| # Initialize FAISS indices | |
| self.visual_index = faiss.IndexFlatIP(visual_dim) # Inner product (cosine sim for normalized vecs) | |
| self.caption_index = faiss.IndexFlatIP(caption_dim) | |
| # Mapping: FAISS row β frame_id | |
| self.visual_id_map: List[int] = [] | |
| self.caption_id_map: List[int] = [] | |
| def _create_tables(self): | |
| """Create SQLite tables for structured storage.""" | |
| self.conn.executescript(""" | |
| CREATE TABLE IF NOT EXISTS videos ( | |
| video_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| path TEXT UNIQUE NOT NULL, | |
| duration_sec REAL, | |
| fps REAL, | |
| width INTEGER, | |
| height INTEGER, | |
| indexed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ); | |
| CREATE TABLE IF NOT EXISTS frames ( | |
| frame_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| video_id INTEGER NOT NULL, | |
| frame_idx INTEGER NOT NULL, | |
| timestamp_sec REAL NOT NULL, | |
| caption TEXT DEFAULT '', | |
| FOREIGN KEY (video_id) REFERENCES videos(video_id) | |
| ); | |
| CREATE TABLE IF NOT EXISTS detections ( | |
| detection_id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| frame_id INTEGER NOT NULL, | |
| label TEXT NOT NULL, | |
| confidence REAL NOT NULL, | |
| bbox_x0 REAL, bbox_y0 REAL, bbox_x1 REAL, bbox_y1 REAL, | |
| FOREIGN KEY (frame_id) REFERENCES frames(frame_id) | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_frames_video ON frames(video_id); | |
| CREATE INDEX IF NOT EXISTS idx_frames_timestamp ON frames(timestamp_sec); | |
| CREATE INDEX IF NOT EXISTS idx_detections_frame ON detections(frame_id); | |
| CREATE INDEX IF NOT EXISTS idx_detections_label ON detections(label); | |
| """) | |
| self.conn.commit() | |
| # ββ Video Registration ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def register_video(self, path: str, duration_sec: float = 0, | |
| fps: float = 0, width: int = 0, height: int = 0) -> int: | |
| """Register a video and return its video_id.""" | |
| cursor = self.conn.execute( | |
| "INSERT OR IGNORE INTO videos (path, duration_sec, fps, width, height) VALUES (?, ?, ?, ?, ?)", | |
| (path, duration_sec, fps, width, height) | |
| ) | |
| self.conn.commit() | |
| if cursor.lastrowid: | |
| return cursor.lastrowid | |
| # Already exists | |
| row = self.conn.execute("SELECT video_id FROM videos WHERE path = ?", (path,)).fetchone() | |
| return row["video_id"] | |
| # ββ Frame Storage βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def add_frame(self, video_id: int, frame_idx: int, | |
| timestamp_sec: float, caption: str = "") -> int: | |
| """Add a frame record and return frame_id.""" | |
| cursor = self.conn.execute( | |
| "INSERT INTO frames (video_id, frame_idx, timestamp_sec, caption) VALUES (?, ?, ?, ?)", | |
| (video_id, frame_idx, timestamp_sec, caption) | |
| ) | |
| self.conn.commit() | |
| return cursor.lastrowid | |
| def update_caption(self, frame_id: int, caption: str): | |
| """Update the caption for an existing frame.""" | |
| self.conn.execute( | |
| "UPDATE frames SET caption = ? WHERE frame_id = ?", | |
| (caption, frame_id) | |
| ) | |
| self.conn.commit() | |
| # ββ Detection Storage βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def add_detections(self, frame_id: int, detections: List[Dict]): | |
| """Add detection results for a frame.""" | |
| for det in detections: | |
| bbox = det.get("bbox", [0, 0, 0, 0]) | |
| self.conn.execute( | |
| "INSERT INTO detections (frame_id, label, confidence, bbox_x0, bbox_y0, bbox_x1, bbox_y1) " | |
| "VALUES (?, ?, ?, ?, ?, ?, ?)", | |
| (frame_id, det["label"], det["confidence"], | |
| bbox[0], bbox[1], bbox[2], bbox[3]) | |
| ) | |
| self.conn.commit() | |
| # ββ Vector Index ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def add_visual_embedding(self, frame_id: int, embedding: np.ndarray): | |
| """Add a visual (SigLIP2) embedding for a frame.""" | |
| self.visual_index.add(embedding.reshape(1, -1).astype(np.float32)) | |
| self.visual_id_map.append(frame_id) | |
| def add_caption_embedding(self, frame_id: int, embedding: np.ndarray): | |
| """Add a caption (Gemini) embedding for a frame.""" | |
| self.caption_index.add(embedding.reshape(1, -1).astype(np.float32)) | |
| self.caption_id_map.append(frame_id) | |
| def add_visual_embeddings_batch(self, frame_ids: List[int], embeddings: np.ndarray): | |
| """Add visual embeddings in batch.""" | |
| self.visual_index.add(embeddings.astype(np.float32)) | |
| self.visual_id_map.extend(frame_ids) | |
| def add_caption_embeddings_batch(self, frame_ids: List[int], embeddings: np.ndarray): | |
| """Add caption embeddings in batch.""" | |
| self.caption_index.add(embeddings.astype(np.float32)) | |
| self.caption_id_map.extend(frame_ids) | |
| # ββ Search ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def search_visual(self, query_embedding: np.ndarray, top_k: int = 20) -> List[Tuple[int, float]]: | |
| """Search by visual similarity. Returns [(frame_id, score), ...]""" | |
| if self.visual_index.ntotal == 0: | |
| return [] | |
| scores, indices = self.visual_index.search( | |
| query_embedding.reshape(1, -1).astype(np.float32), min(top_k, self.visual_index.ntotal) | |
| ) | |
| results = [] | |
| for score, idx in zip(scores[0], indices[0]): | |
| if idx >= 0 and idx < len(self.visual_id_map): | |
| results.append((self.visual_id_map[idx], float(score))) | |
| return results | |
| def search_captions(self, query_embedding: np.ndarray, top_k: int = 20) -> List[Tuple[int, float]]: | |
| """Search by caption text similarity. Returns [(frame_id, score), ...]""" | |
| if self.caption_index.ntotal == 0: | |
| return [] | |
| scores, indices = self.caption_index.search( | |
| query_embedding.reshape(1, -1).astype(np.float32), min(top_k, self.caption_index.ntotal) | |
| ) | |
| results = [] | |
| for score, idx in zip(scores[0], indices[0]): | |
| if idx >= 0 and idx < len(self.caption_id_map): | |
| results.append((self.caption_id_map[idx], float(score))) | |
| return results | |
| def search_detections(self, label_query: str) -> List[Dict]: | |
| """Search structured detections by label (SQL LIKE match).""" | |
| rows = self.conn.execute(""" | |
| SELECT d.*, f.timestamp_sec, f.frame_idx, f.video_id, f.caption | |
| FROM detections d | |
| JOIN frames f ON d.frame_id = f.frame_id | |
| WHERE LOWER(d.label) LIKE LOWER(?) | |
| ORDER BY d.confidence DESC | |
| """, (f"%{label_query}%",)).fetchall() | |
| return [dict(row) for row in rows] | |
| # ββ Frame Retrieval βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_frame(self, frame_id: int) -> Optional[Dict]: | |
| """Get frame metadata by frame_id.""" | |
| row = self.conn.execute( | |
| "SELECT * FROM frames WHERE frame_id = ?", (frame_id,) | |
| ).fetchone() | |
| return dict(row) if row else None | |
| def get_frame_detections(self, frame_id: int) -> List[Dict]: | |
| """Get all detections for a frame.""" | |
| rows = self.conn.execute( | |
| "SELECT * FROM detections WHERE frame_id = ?", (frame_id,) | |
| ).fetchall() | |
| return [dict(row) for row in rows] | |
| def get_all_frames(self, video_id: Optional[int] = None) -> List[Dict]: | |
| """Get all frames, optionally filtered by video.""" | |
| if video_id: | |
| rows = self.conn.execute( | |
| "SELECT * FROM frames WHERE video_id = ? ORDER BY timestamp_sec", | |
| (video_id,) | |
| ).fetchall() | |
| else: | |
| rows = self.conn.execute( | |
| "SELECT * FROM frames ORDER BY timestamp_sec" | |
| ).fetchall() | |
| return [dict(row) for row in rows] | |
| # ββ Attribute Analysis (for Akinator) βββββββββββββββββββββββββββββββββββ | |
| def get_unique_attributes(self, frame_ids: List[int]) -> Dict[str, List[str]]: | |
| """ | |
| Get unique attribute values across given frames. | |
| Used by the Akinator tree to find the best splitting feature. | |
| """ | |
| if not frame_ids: | |
| return {} | |
| placeholders = ",".join("?" * len(frame_ids)) | |
| # Get unique detection labels | |
| label_rows = self.conn.execute(f""" | |
| SELECT DISTINCT label FROM detections | |
| WHERE frame_id IN ({placeholders}) | |
| """, frame_ids).fetchall() | |
| labels = [row["label"] for row in label_rows] | |
| # Extract attributes from captions | |
| caption_rows = self.conn.execute(f""" | |
| SELECT caption FROM frames | |
| WHERE frame_id IN ({placeholders}) AND caption != '' | |
| """, frame_ids).fetchall() | |
| # Simple attribute extraction from captions | |
| attributes = {"detected_objects": labels} | |
| # Analyze captions for location/time/etc | |
| locations = set() | |
| times = set() | |
| for row in caption_rows: | |
| caption = row["caption"].lower() | |
| if "indoor" in caption: | |
| locations.add("indoor") | |
| if "outdoor" in caption: | |
| locations.add("outdoor") | |
| if "night" in caption or "dark" in caption: | |
| times.add("night") | |
| if "day" in caption or "bright" in caption or "sunny" in caption: | |
| times.add("day") | |
| if locations: | |
| attributes["location"] = list(locations) | |
| if times: | |
| attributes["time_of_day"] = list(times) | |
| return attributes | |
| # ββ Persistence βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def save_faiss(self, visual_path: str = "visual_index.faiss", | |
| caption_path: str = "caption_index.faiss"): | |
| """Save FAISS indices to disk.""" | |
| if self.visual_index.ntotal > 0: | |
| faiss.write_index(self.visual_index, visual_path) | |
| if self.caption_index.ntotal > 0: | |
| faiss.write_index(self.caption_index, caption_path) | |
| # Save ID maps | |
| np.save(visual_path + ".ids.npy", np.array(self.visual_id_map)) | |
| np.save(caption_path + ".ids.npy", np.array(self.caption_id_map)) | |
| def load_faiss(self, visual_path: str = "visual_index.faiss", | |
| caption_path: str = "caption_index.faiss"): | |
| """Load FAISS indices from disk.""" | |
| if Path(visual_path).exists(): | |
| self.visual_index = faiss.read_index(visual_path) | |
| self.visual_id_map = np.load(visual_path + ".ids.npy").tolist() | |
| if Path(caption_path).exists(): | |
| self.caption_index = faiss.read_index(caption_path) | |
| self.caption_id_map = np.load(caption_path + ".ids.npy").tolist() | |
| def close(self): | |
| """Close database connection.""" | |
| self.conn.close() | |
| def stats(self) -> Dict: | |
| """Get index statistics.""" | |
| video_count = self.conn.execute("SELECT COUNT(*) FROM videos").fetchone()[0] | |
| frame_count = self.conn.execute("SELECT COUNT(*) FROM frames").fetchone()[0] | |
| detection_count = self.conn.execute("SELECT COUNT(*) FROM detections").fetchone()[0] | |
| return { | |
| "videos": video_count, | |
| "frames": frame_count, | |
| "detections": detection_count, | |
| "visual_vectors": self.visual_index.ntotal, | |
| "caption_vectors": self.caption_index.ntotal, | |
| } | |