notRaphael's picture
Add all remaining modules: index_store, query_engine, akinator, pipeline, app, requirements
fea499e verified
"""
Video Intelligence Platform — Indexing Pipeline
Orchestrates the full offline indexing: frame extraction → detection → captioning → embedding → storage.
"""
import io
import time
import numpy as np
from pathlib import Path
from typing import List, Optional
from PIL import Image
from .config import Config
from .frame_extractor import extract_frames, get_video_info, ExtractedFrame
from .gemini_client import GeminiClient
from .visual_encoders import SigLIPEncoder, GroundingDINODetector
from .index_store import VideoIndex
class IndexingPipeline:
"""
Full offline indexing pipeline for a video file.
Pipeline steps:
1. Extract frames at configured FPS
2. Run Grounding DINO for attribute detection on each frame
3. Generate visual embeddings with SigLIP2
4. Caption frames with Gemini 2.0 Flash
5. Generate caption embeddings with Gemini text-embedding-004
6. Store everything in SQLite + FAISS
"""
def __init__(self, config: Config):
self.config = config
config.validate()
# Initialize components
print("=" * 60)
print("🚀 Initializing Video Intelligence Pipeline")
print("=" * 60)
self.gemini = GeminiClient(
api_key=config.gemini_api_key,
vision_model=config.gemini_vision_model,
embedding_model=config.gemini_embedding_model,
)
self.siglip = SigLIPEncoder(
model_name=config.siglip_model,
device=config.device,
)
self.detector = GroundingDINODetector(
model_name=config.grounding_dino_model,
device=config.device,
box_threshold=config.detection_box_threshold,
text_threshold=config.detection_text_threshold,
)
self.index = VideoIndex(
db_path=config.db_path,
visual_dim=config.siglip_embedding_dim,
caption_dim=config.gemini_embedding_dim,
)
print("=" * 60)
print("✅ All components initialized")
print("=" * 60)
def index_video(self, video_path: str,
caption_every_n: int = 1,
detect_every_n: int = 1) -> dict:
"""
Index a video file end-to-end.
Args:
video_path: Path to video file
caption_every_n: Caption every Nth frame (1 = all, 5 = every 5th)
detect_every_n: Run detection every Nth frame
Returns:
Dict with indexing statistics
"""
start_time = time.time()
# Step 0: Get video info and register
print("\n📊 Step 0: Registering video...")
info = get_video_info(video_path)
video_id = self.index.register_video(
path=video_path,
duration_sec=info["duration_sec"],
fps=info["fps"],
width=info["width"],
height=info["height"],
)
print(f" Video ID: {video_id} | {info['duration_sec']:.1f}s | {info['width']}x{info['height']}")
# Step 1: Extract frames
print("\n🎬 Step 1: Extracting frames...")
frames = extract_frames(
video_path,
fps=self.config.extract_fps,
max_frames=self.config.max_frames,
)
# Step 2: Store frames and run detection
print(f"\n🔍 Step 2: Running object detection on {len(frames)} frames...")
frame_ids = []
pil_images = []
for i, frame in enumerate(frames):
# Store frame in DB
frame_id = self.index.add_frame(
video_id=video_id,
frame_idx=frame.frame_idx,
timestamp_sec=frame.timestamp_sec,
)
frame_ids.append(frame_id)
pil_images.append(frame.image)
# Run detection
if i % detect_every_n == 0:
try:
detections = self.detector.detect_default_attributes(frame.image)
self.index.add_detections(frame_id, [
{"label": d.label, "confidence": d.confidence, "bbox": d.bbox}
for d in detections
])
if detections:
labels = [d.label for d in detections]
print(f" Frame {i}/{len(frames)} [{frame.timestamp_sec:.1f}s]: {labels}")
except Exception as e:
print(f" ⚠️ Detection failed for frame {i}: {e}")
if (i + 1) % 50 == 0:
print(f" ... processed {i + 1}/{len(frames)} frames")
# Step 3: Generate visual embeddings (SigLIP2)
print(f"\n🧠 Step 3: Generating visual embeddings...")
visual_embeddings = self.siglip.embed_frames(pil_images, batch_size=4)
self.index.add_visual_embeddings_batch(frame_ids, visual_embeddings)
print(f" ✅ {len(visual_embeddings)} frame embeddings stored ({visual_embeddings.shape})")
# Step 4: Caption frames with Gemini
print(f"\n📝 Step 4: Captioning frames with Gemini...")
captions = []
for i, frame in enumerate(frames):
if i % caption_every_n != 0:
captions.append("")
continue
try:
# Convert PIL to JPEG bytes
buf = io.BytesIO()
frame.image.save(buf, format="JPEG", quality=85)
jpeg_bytes = buf.getvalue()
caption = self.gemini.caption_frame(jpeg_bytes)
captions.append(caption)
self.index.update_caption(frame_ids[i], caption)
if caption:
print(f" Frame {i} [{frame.timestamp_sec:.1f}s]: {caption[:100]}...")
# Rate limiting: Gemini free tier = ~15 RPM
time.sleep(4.0)
except Exception as e:
print(f" ⚠️ Captioning failed for frame {i}: {e}")
captions.append("")
time.sleep(2.0)
# Step 5: Generate caption embeddings (Gemini text-embedding-004)
print(f"\n📐 Step 5: Generating caption embeddings...")
non_empty_captions = [(i, c) for i, c in enumerate(captions) if c]
if non_empty_captions:
caption_texts = [c for _, c in non_empty_captions]
try:
caption_embeddings = self.gemini.embed_texts(caption_texts, task_type="RETRIEVAL_DOCUMENT")
emb_frame_ids = [frame_ids[i] for i, _ in non_empty_captions]
emb_array = np.array(caption_embeddings, dtype=np.float32)
self.index.add_caption_embeddings_batch(emb_frame_ids, emb_array)
print(f" ✅ {len(caption_embeddings)} caption embeddings stored")
except Exception as e:
print(f" ⚠️ Caption embedding failed: {e}")
# Step 6: Save indices
print(f"\n💾 Step 6: Saving indices...")
self.index.save_faiss(
self.config.faiss_visual_path,
self.config.faiss_caption_path,
)
elapsed = time.time() - start_time
stats = self.index.stats()
stats["elapsed_sec"] = elapsed
print(f"\n{'=' * 60}")
print(f"✅ Indexing complete!")
print(f" Frames: {stats['frames']}")
print(f" Detections: {stats['detections']}")
print(f" Visual vectors: {stats['visual_vectors']}")
print(f" Caption vectors: {stats['caption_vectors']}")
print(f" Time: {elapsed:.1f}s")
print(f"{'=' * 60}")
return stats