notRaphael's picture
Add all remaining modules: index_store, query_engine, akinator, pipeline, app, requirements
fea499e verified
"""
Video Intelligence Platform β€” Index Store
SQLite for structured data (timestamps, captions, detections) +
FAISS for vector similarity search on frame and caption embeddings.
"""
import json
import sqlite3
import numpy as np
import faiss
from typing import List, Dict, Optional, Tuple
from pathlib import Path
class VideoIndex:
"""
Combined structured (SQLite) + vector (FAISS) index for video frames.
Stores:
- Frame metadata: timestamp, frame_idx, video_path
- Captions: dense text descriptions per frame
- Detections: objects detected per frame with attributes
- Visual embeddings: SigLIP2 frame vectors (FAISS)
- Caption embeddings: Gemini text-embedding-004 vectors (FAISS)
"""
def __init__(self, db_path: str = "video_index.db",
visual_dim: int = 1152,
caption_dim: int = 768):
self.db_path = db_path
self.visual_dim = visual_dim
self.caption_dim = caption_dim
# Initialize SQLite
self.conn = sqlite3.connect(db_path)
self.conn.row_factory = sqlite3.Row
self._create_tables()
# Initialize FAISS indices
self.visual_index = faiss.IndexFlatIP(visual_dim) # Inner product (cosine sim for normalized vecs)
self.caption_index = faiss.IndexFlatIP(caption_dim)
# Mapping: FAISS row β†’ frame_id
self.visual_id_map: List[int] = []
self.caption_id_map: List[int] = []
def _create_tables(self):
"""Create SQLite tables for structured storage."""
self.conn.executescript("""
CREATE TABLE IF NOT EXISTS videos (
video_id INTEGER PRIMARY KEY AUTOINCREMENT,
path TEXT UNIQUE NOT NULL,
duration_sec REAL,
fps REAL,
width INTEGER,
height INTEGER,
indexed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS frames (
frame_id INTEGER PRIMARY KEY AUTOINCREMENT,
video_id INTEGER NOT NULL,
frame_idx INTEGER NOT NULL,
timestamp_sec REAL NOT NULL,
caption TEXT DEFAULT '',
FOREIGN KEY (video_id) REFERENCES videos(video_id)
);
CREATE TABLE IF NOT EXISTS detections (
detection_id INTEGER PRIMARY KEY AUTOINCREMENT,
frame_id INTEGER NOT NULL,
label TEXT NOT NULL,
confidence REAL NOT NULL,
bbox_x0 REAL, bbox_y0 REAL, bbox_x1 REAL, bbox_y1 REAL,
FOREIGN KEY (frame_id) REFERENCES frames(frame_id)
);
CREATE INDEX IF NOT EXISTS idx_frames_video ON frames(video_id);
CREATE INDEX IF NOT EXISTS idx_frames_timestamp ON frames(timestamp_sec);
CREATE INDEX IF NOT EXISTS idx_detections_frame ON detections(frame_id);
CREATE INDEX IF NOT EXISTS idx_detections_label ON detections(label);
""")
self.conn.commit()
# ── Video Registration ──────────────────────────────────────────────────
def register_video(self, path: str, duration_sec: float = 0,
fps: float = 0, width: int = 0, height: int = 0) -> int:
"""Register a video and return its video_id."""
cursor = self.conn.execute(
"INSERT OR IGNORE INTO videos (path, duration_sec, fps, width, height) VALUES (?, ?, ?, ?, ?)",
(path, duration_sec, fps, width, height)
)
self.conn.commit()
if cursor.lastrowid:
return cursor.lastrowid
# Already exists
row = self.conn.execute("SELECT video_id FROM videos WHERE path = ?", (path,)).fetchone()
return row["video_id"]
# ── Frame Storage ───────────────────────────────────────────────────────
def add_frame(self, video_id: int, frame_idx: int,
timestamp_sec: float, caption: str = "") -> int:
"""Add a frame record and return frame_id."""
cursor = self.conn.execute(
"INSERT INTO frames (video_id, frame_idx, timestamp_sec, caption) VALUES (?, ?, ?, ?)",
(video_id, frame_idx, timestamp_sec, caption)
)
self.conn.commit()
return cursor.lastrowid
def update_caption(self, frame_id: int, caption: str):
"""Update the caption for an existing frame."""
self.conn.execute(
"UPDATE frames SET caption = ? WHERE frame_id = ?",
(caption, frame_id)
)
self.conn.commit()
# ── Detection Storage ───────────────────────────────────────────────────
def add_detections(self, frame_id: int, detections: List[Dict]):
"""Add detection results for a frame."""
for det in detections:
bbox = det.get("bbox", [0, 0, 0, 0])
self.conn.execute(
"INSERT INTO detections (frame_id, label, confidence, bbox_x0, bbox_y0, bbox_x1, bbox_y1) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(frame_id, det["label"], det["confidence"],
bbox[0], bbox[1], bbox[2], bbox[3])
)
self.conn.commit()
# ── Vector Index ────────────────────────────────────────────────────────
def add_visual_embedding(self, frame_id: int, embedding: np.ndarray):
"""Add a visual (SigLIP2) embedding for a frame."""
self.visual_index.add(embedding.reshape(1, -1).astype(np.float32))
self.visual_id_map.append(frame_id)
def add_caption_embedding(self, frame_id: int, embedding: np.ndarray):
"""Add a caption (Gemini) embedding for a frame."""
self.caption_index.add(embedding.reshape(1, -1).astype(np.float32))
self.caption_id_map.append(frame_id)
def add_visual_embeddings_batch(self, frame_ids: List[int], embeddings: np.ndarray):
"""Add visual embeddings in batch."""
self.visual_index.add(embeddings.astype(np.float32))
self.visual_id_map.extend(frame_ids)
def add_caption_embeddings_batch(self, frame_ids: List[int], embeddings: np.ndarray):
"""Add caption embeddings in batch."""
self.caption_index.add(embeddings.astype(np.float32))
self.caption_id_map.extend(frame_ids)
# ── Search ──────────────────────────────────────────────────────────────
def search_visual(self, query_embedding: np.ndarray, top_k: int = 20) -> List[Tuple[int, float]]:
"""Search by visual similarity. Returns [(frame_id, score), ...]"""
if self.visual_index.ntotal == 0:
return []
scores, indices = self.visual_index.search(
query_embedding.reshape(1, -1).astype(np.float32), min(top_k, self.visual_index.ntotal)
)
results = []
for score, idx in zip(scores[0], indices[0]):
if idx >= 0 and idx < len(self.visual_id_map):
results.append((self.visual_id_map[idx], float(score)))
return results
def search_captions(self, query_embedding: np.ndarray, top_k: int = 20) -> List[Tuple[int, float]]:
"""Search by caption text similarity. Returns [(frame_id, score), ...]"""
if self.caption_index.ntotal == 0:
return []
scores, indices = self.caption_index.search(
query_embedding.reshape(1, -1).astype(np.float32), min(top_k, self.caption_index.ntotal)
)
results = []
for score, idx in zip(scores[0], indices[0]):
if idx >= 0 and idx < len(self.caption_id_map):
results.append((self.caption_id_map[idx], float(score)))
return results
def search_detections(self, label_query: str) -> List[Dict]:
"""Search structured detections by label (SQL LIKE match)."""
rows = self.conn.execute("""
SELECT d.*, f.timestamp_sec, f.frame_idx, f.video_id, f.caption
FROM detections d
JOIN frames f ON d.frame_id = f.frame_id
WHERE LOWER(d.label) LIKE LOWER(?)
ORDER BY d.confidence DESC
""", (f"%{label_query}%",)).fetchall()
return [dict(row) for row in rows]
# ── Frame Retrieval ─────────────────────────────────────────────────────
def get_frame(self, frame_id: int) -> Optional[Dict]:
"""Get frame metadata by frame_id."""
row = self.conn.execute(
"SELECT * FROM frames WHERE frame_id = ?", (frame_id,)
).fetchone()
return dict(row) if row else None
def get_frame_detections(self, frame_id: int) -> List[Dict]:
"""Get all detections for a frame."""
rows = self.conn.execute(
"SELECT * FROM detections WHERE frame_id = ?", (frame_id,)
).fetchall()
return [dict(row) for row in rows]
def get_all_frames(self, video_id: Optional[int] = None) -> List[Dict]:
"""Get all frames, optionally filtered by video."""
if video_id:
rows = self.conn.execute(
"SELECT * FROM frames WHERE video_id = ? ORDER BY timestamp_sec",
(video_id,)
).fetchall()
else:
rows = self.conn.execute(
"SELECT * FROM frames ORDER BY timestamp_sec"
).fetchall()
return [dict(row) for row in rows]
# ── Attribute Analysis (for Akinator) ───────────────────────────────────
def get_unique_attributes(self, frame_ids: List[int]) -> Dict[str, List[str]]:
"""
Get unique attribute values across given frames.
Used by the Akinator tree to find the best splitting feature.
"""
if not frame_ids:
return {}
placeholders = ",".join("?" * len(frame_ids))
# Get unique detection labels
label_rows = self.conn.execute(f"""
SELECT DISTINCT label FROM detections
WHERE frame_id IN ({placeholders})
""", frame_ids).fetchall()
labels = [row["label"] for row in label_rows]
# Extract attributes from captions
caption_rows = self.conn.execute(f"""
SELECT caption FROM frames
WHERE frame_id IN ({placeholders}) AND caption != ''
""", frame_ids).fetchall()
# Simple attribute extraction from captions
attributes = {"detected_objects": labels}
# Analyze captions for location/time/etc
locations = set()
times = set()
for row in caption_rows:
caption = row["caption"].lower()
if "indoor" in caption:
locations.add("indoor")
if "outdoor" in caption:
locations.add("outdoor")
if "night" in caption or "dark" in caption:
times.add("night")
if "day" in caption or "bright" in caption or "sunny" in caption:
times.add("day")
if locations:
attributes["location"] = list(locations)
if times:
attributes["time_of_day"] = list(times)
return attributes
# ── Persistence ─────────────────────────────────────────────────────────
def save_faiss(self, visual_path: str = "visual_index.faiss",
caption_path: str = "caption_index.faiss"):
"""Save FAISS indices to disk."""
if self.visual_index.ntotal > 0:
faiss.write_index(self.visual_index, visual_path)
if self.caption_index.ntotal > 0:
faiss.write_index(self.caption_index, caption_path)
# Save ID maps
np.save(visual_path + ".ids.npy", np.array(self.visual_id_map))
np.save(caption_path + ".ids.npy", np.array(self.caption_id_map))
def load_faiss(self, visual_path: str = "visual_index.faiss",
caption_path: str = "caption_index.faiss"):
"""Load FAISS indices from disk."""
if Path(visual_path).exists():
self.visual_index = faiss.read_index(visual_path)
self.visual_id_map = np.load(visual_path + ".ids.npy").tolist()
if Path(caption_path).exists():
self.caption_index = faiss.read_index(caption_path)
self.caption_id_map = np.load(caption_path + ".ids.npy").tolist()
def close(self):
"""Close database connection."""
self.conn.close()
def stats(self) -> Dict:
"""Get index statistics."""
video_count = self.conn.execute("SELECT COUNT(*) FROM videos").fetchone()[0]
frame_count = self.conn.execute("SELECT COUNT(*) FROM frames").fetchone()[0]
detection_count = self.conn.execute("SELECT COUNT(*) FROM detections").fetchone()[0]
return {
"videos": video_count,
"frames": frame_count,
"detections": detection_count,
"visual_vectors": self.visual_index.ntotal,
"caption_vectors": self.caption_index.ntotal,
}